# Import data from SQL Database

In [0]:
import os

jdbcHostname="datalakeprojectsqlserver.database.windows.net"
jdbcPort=os.getenv("jdbcPort")
jdbcDatabase=os.getenv("jdbcDatabase")
jdbcUsername=os.getenv("jdbcUsername")
jdbcPassword=os.getenv("jdbcPassword")
jdbcDriver="com.microsoft.sqlserver.jdbc.SQLServerDriver"

jdbcUrl=f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};databaseName={jdbcDatabase};user={jdbcUsername};password={jdbcPassword}"

In [0]:
df1=spark.read.format("jdbc").option("url",jdbcUrl).option("dbtable","stocks").load()
display(df1)

Date,High,Low,Open,Close,Volume,Adj Close,company_name
2017-01-03,758.760009765625,747.7000122070312,757.9199829101562,753.6699829101562,3521100.0,753.6699829101562,AMAZON
2017-01-04,759.6799926757812,754.2000122070312,758.3900146484375,757.1799926757812,2510500.0,757.1799926757812,AMAZON
2017-01-05,782.4000244140625,760.260009765625,761.5499877929688,780.4500122070312,5830100.0,780.4500122070312,AMAZON
2017-01-06,799.4400024414062,778.47998046875,782.3599853515625,795.989990234375,5986200.0,795.989990234375,AMAZON
2017-01-09,801.77001953125,791.77001953125,798.0,796.9199829101562,3446100.0,796.9199829101562,AMAZON
2017-01-10,798.0,789.5399780273438,796.5999755859375,795.9000244140625,2558400.0,795.9000244140625,AMAZON
2017-01-11,799.5,789.510009765625,793.6599731445312,799.02001953125,2992800.0,799.02001953125,AMAZON
2017-01-12,814.1300048828125,799.5,800.3099975585938,813.6400146484375,4873900.0,813.6400146484375,AMAZON
2017-01-13,821.6500244140625,811.4000244140625,814.3200073242188,817.1400146484375,3791900.0,817.1400146484375,AMAZON
2017-01-17,816.0,803.4400024414062,815.7000122070312,809.719970703125,3670500.0,809.719970703125,AMAZON


# Data Transformations

In [0]:
def daily_return_rate(df, stock_name, start_date, end_date):
    from pyspark.sql.functions import round,col,lit,concat
    from pyspark.sql.window import Window
    
    df = df.filter(df.company_name == stock_name).withColumn("daily_return_rate", concat(round((col("Close")-col("Open"))*100/col("Open"), 2),lit("%")))
    return df.filter((df.Date >= lit(start_date)) & (df.Date <= lit(end_date)))

In [0]:
daily_return_rate = daily_return_rate(df1, "AMAZON", "2017-01-03", "2017-01-17")
display(daily_return_rate)

Date,High,Low,Open,Close,Volume,Adj Close,company_name,daily_return_rate
2017-01-03,758.760009765625,747.7000122070312,757.9199829101562,753.6699829101562,3521100.0,753.6699829101562,AMAZON,-0.56%
2017-01-04,759.6799926757812,754.2000122070312,758.3900146484375,757.1799926757812,2510500.0,757.1799926757812,AMAZON,-0.16%
2017-01-05,782.4000244140625,760.260009765625,761.5499877929688,780.4500122070312,5830100.0,780.4500122070312,AMAZON,2.48%
2017-01-06,799.4400024414062,778.47998046875,782.3599853515625,795.989990234375,5986200.0,795.989990234375,AMAZON,1.74%
2017-01-09,801.77001953125,791.77001953125,798.0,796.9199829101562,3446100.0,796.9199829101562,AMAZON,-0.14%
2017-01-10,798.0,789.5399780273438,796.5999755859375,795.9000244140625,2558400.0,795.9000244140625,AMAZON,-0.09%
2017-01-11,799.5,789.510009765625,793.6599731445312,799.02001953125,2992800.0,799.02001953125,AMAZON,0.68%
2017-01-12,814.1300048828125,799.5,800.3099975585938,813.6400146484375,4873900.0,813.6400146484375,AMAZON,1.67%
2017-01-13,821.6500244140625,811.4000244140625,814.3200073242188,817.1400146484375,3791900.0,817.1400146484375,AMAZON,0.35%
2017-01-17,816.0,803.4400024414062,815.7000122070312,809.719970703125,3670500.0,809.719970703125,AMAZON,-0.73%


In [0]:
def movingAverage(df, stock_name, nb_moving_points, start_date, end_date):
    from pyspark.sql.functions import mean,col,lit,desc
    from pyspark.sql.window import Window
    
    window = Window.partitionBy().orderBy(desc("Date")).rowsBetween(0, nb_moving_points-1) 
    df = df.filter(df.company_name == stock_name).withColumn("movingAverage", mean(col("Open")).over(window))
    return df.filter((df.Date >= lit(start_date)) & (df.Date <= lit(end_date)))

In [0]:
movingAverage=movingAverage(df1, "AMAZON", 5, "2017-01-03", "2017-01-17")
display(movingAverage)

Date,High,Low,Open,Close,Volume,Adj Close,company_name,movingAverage
2017-01-17,816.0,803.4400024414062,815.7000122070312,809.719970703125,3670500.0,809.719970703125,AMAZON,812.0700073242188
2017-01-17,816.0,803.4400024414062,815.7000122070312,809.719970703125,3670500.0,809.719970703125,AMAZON,808.9920043945312
2017-01-13,821.6500244140625,811.4000244140625,814.3200073242188,817.1400146484375,3791900.0,817.1400146484375,AMAZON,804.5839965820312
2017-01-13,821.6500244140625,811.4000244140625,814.3200073242188,817.1400146484375,3791900.0,817.1400146484375,AMAZON,800.4519897460938
2017-01-12,814.1300048828125,799.5,800.3099975585938,813.6400146484375,4873900.0,813.6400146484375,AMAZON,796.9079833984375
2017-01-12,814.1300048828125,799.5,800.3099975585938,813.6400146484375,4873900.0,813.6400146484375,AMAZON,796.1659790039063
2017-01-11,799.5,789.510009765625,793.6599731445312,799.02001953125,2992800.0,799.02001953125,AMAZON,795.7039794921875
2017-01-11,799.5,789.510009765625,793.6599731445312,799.02001953125,2992800.0,799.02001953125,AMAZON,796.5719848632813
2017-01-10,798.0,789.5399780273438,796.5999755859375,795.9000244140625,2558400.0,795.9000244140625,AMAZON,794.3119873046875
2017-01-10,798.0,789.5399780273438,796.5999755859375,795.9000244140625,2558400.0,795.9000244140625,AMAZON,791.4639892578125


# Save into Azure Blob Storage

In [0]:
containerName = os.getenv("containerName")
storageAccountName = os.getenv("storageAccountName")
account_key = os.getenv("account_key")

spark.conf.set(f"fs.azure.account.key.{storageAccountName}.blob.core.windows.net", account_key)
output_container_path = f"wasbs://{containerName}@{storageAccountName}.blob.core.windows.net/"

In [0]:
# write moving average dataframe to ouput container
(movingAverage
 .coalesce(1)
   .write
   .option("header", "true")
   .mode("overwrite")
   .csv(f"{output_container_path}/movingAverage_.csv"))


# write daily return rate dataframe to ouput container
(daily_return_rate
  .coalesce(1)
   .write
   .option("header", "true")
   .mode("overwrite")
   .csv(f"{output_container_path}/dailyReturnRate_.csv"))

In [0]:
# Get the name of the wrangled-data CSV file that was just saved to Azure blob storage (it starts with 'part-')
files = dbutils.fs.ls(f"{output_container_path}/movingAverage_.csv")
output_file = [x for x in files if x.name.startswith("part-")]

# Move the wrangled-data CSV file from a sub-folder (wrangled_data_folder) to the root of the blob container
# While simultaneously changing the file name
dbutils.fs.mv(output_file[0].path, f"{output_container_path}/movingAverage.csv")

# Delete all folders and files with 'wrangled_data' and leave only the folder needed
dbutils.fs.rm(f"{output_container_path}/movingAverage_.csv", True)


# Get the name of the wrangled-data CSV file that was just saved to Azure blob storage (it starts with 'part-')
files = dbutils.fs.ls(f"{output_container_path}/dailyReturnRate_.csv")
output_file = [x for x in files if x.name.startswith("part-")]

# Move the wrangled-data CSV file from a sub-folder (wrangled_data_folder) to the root of the blob container
# While simultaneously changing the file name
dbutils.fs.mv(output_file[0].path, f"{output_container_path}/dailyReturnRate.csv")

# Delete all folders and files with 'wrangled_data' and leave only the folder needed
dbutils.fs.rm(f"{output_container_path}/dailyReturnRate_.csv", True)