### Connection to database and getting data

In [0]:
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

database_host = "stocksdbserver.database.windows.net"
database_port = "1433" # update if you use a non-default port
database_name = "stocksdb"
table = "stocks"
user = "qangelot"
password = "mypassword1234"
url = f"jdbc:sqlserver://{database_host}:{database_port};database={database_name}"



In [0]:
df = (spark.read
  .format("jdbc")
  .option("driver", driver)
  .option("url", url)
  .option("dbtable", table)
  .option("user", user)
  .option("password", password)
  .load()
)


### Exploratory data analysis

In [0]:
# fixing date_ type
from pyspark.sql.functions import to_date

df = df.withColumn("date_",to_date("date_"))
df.show(5)


#### 1. Get return rate

In [0]:
from datetime import datetime
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, lit


def get_return_rate(df, stock, start_date, end_date): 
    '''Get the get_return_rate for specific stock and time interval'''

    temp = df[df["stock_name"] == str(stock)] 

    # convert string to date object
    d1 = datetime.strptime(start_date, "%Y-%m-%d")
    d2 = datetime.strptime(end_date, "%Y-%m-%d")

    # difference between dates in timedelta
    delta = d2 - d1

    # get init and end val
    init = temp.filter(temp.date_ == lit(start_date).cast('date')).collect()[0]['close_']
    end = temp.filter(temp.date_ == lit(end_date).cast('date')).collect()[0]['close_']

    # calculated by subtracting the initial value of the investment 
    # from its current value, and then dividing it by the initial value.
    result = ((end - init) / init * 100.00 )/delta.days

    return result


res = get_return_rate(df, 'AMAZON', '2017-01-03', '2017-02-03')
res

#### 2. Get moving average

In [0]:
from pyspark.sql.functions import avg
from pyspark.sql.window import Window
import pyspark.sql.functions as f


def get_moving_average(df, stock, start_date, end_date, num_points=5): 
    '''Function that take as input a dataframe, a stock name, a start_date, 
    an end_date, the number of points to consider for the moving average and 
    add a new column to the dataframe with the values of calculated moving average'''

    temp = df[df["stock_name"] == str(stock)] 

    # create a lag feature 
    window = Window.partitionBy("stock_name").orderBy(col("date_").asc())

    column = 'open_'
    for i in range(1, num_points+1):
        temp = temp.withColumn(f'prev_{str(column)}_{i}', 
                               f.lag(temp[str(column)], offset=i).over(window))

    # avg over the prev_price_ columns 
    prev_price_cols = temp.select(temp.colRegex(f"`prev_{column}_\d+`")).columns
    prev_price_cols = [col(c) for c in prev_price_cols]

    averageFunc = sum(x for x in prev_price_cols)/len(prev_price_cols)

    # create a moving_average_ column to store the result
    result = temp.withColumn(f'moving_average_{i}', (averageFunc))

    return result

res = get_moving_average(df, 'AMAZON', '2017-01-03', '2017-02-03', 5)
res.show()


In [0]:
res.write.format("jdbc") \
  .option("url", url) \
  .option("dbtable", "MovingAvgDf") \
  .option("user", user) \
  .option("password", password) \
  .save()