In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import length, col
from pyspark import SparkContext
sc = SparkContext()
sql = SQLContext(sc)

In [2]:
import pandas as pd
import yfinance as yf
from functools import reduce
import pandas_datareader as pdr
from pyspark.sql.window import Window
from pyspark.sql.functions import lit, DataFrame, udf, lead,lag, when, avg, abs, max, min

##  Data import and Feature Generation

In [3]:
class STOCK():
    def __init__(self,*args, start_date, end_date):
        # Combine individual stock dataframes in one Spark dataframe
        self.df_complete = reduce(DataFrame.union, self.complete_df(args,start_date, end_date))
    
    # Import Stocks data as spark dataframe from Yahoo finance
    def complete_df(self, args, start_date, end_date):
        dfs = []
        for symbol in args:
            stock = sql.createDataFrame(pdr.get_data_yahoo(symbol, start=start_date, end=end_date).reset_index()) \
                       .withColumn("Symbol", lit(symbol)).withColumnRenamed('Adj Close', 'Adj_close')
            dfs.append(stock)
        return dfs 
    
    #Generate Features:
    def add_features(self, symbol):
        """ T_trend_udf: returns user defined function to create Today's Trend feature by subtracting the 
                         open price from the close price of the trading day.
            F_trend: return a window function that accesses the next day close's price to substract today's close 
                     price from it and create Future Trend feature
        """
        
        symbol_df = self.df_complete.filter(self.df_complete["Symbol"] == symbol)
        T_trend_udf = udf(lambda Close_price, Open_price: "Uptrend" if Close_price - Open_price >= 0 else "Downtrend")
        F_trend = lead(symbol_df["Close"],1).over(Window().orderBy("Date")) 
        
        # Add all created features to the dataframe:
        symbol_df = symbol_df.withColumn("today_trend", T_trend_udf(symbol_df["Close"], symbol_df["Open"])) \
                             .withColumn("tomorrow_trend", when((F_trend - symbol_df["Close"]) >= 0, "Uptrend") \
                                        .otherwise("Downtrend")) \
                             .withColumn("RSI", self.RSI(symbol_df)) \
                             .withColumn("SMA", self.SMA(symbol_df)) \
                             .withColumn("K", self.K(symbol_df))
        
        return symbol_df.na.drop().filter(symbol_df["RSI"] > 0)
    
    
    
    def RSI(self, symbol_df):
        """ A function returns the 14-days period RSI feature to be added later to the dataframe.
        RSI = 100 - 100/[1 + (Average of 14 days gains) / (Average of 14 days losses)]
            
        """
        
        # Creating gain_loss column for each day, close_price_today - close_price_yesterday:
        gain_loss = symbol_df['close'] - lag(symbol_df["Close"],1).over(Window().orderBy("Date"))
    
        # Calculating the Average of 14 days gains 
        gain = when(gain_loss > 0, gain_loss).otherwise(0) 
        Avg_gain = avg(gain).over(Window.orderBy("Date").rowsBetween(-14, -1))

        # Calculating the Average of 14 days loss
        loss = when(gain_loss < 0, gain_loss).otherwise(0) 
        Avg_loss = avg(abs(loss)).over(Window.orderBy("Date").rowsBetween(-14, -1))

        # Calculating RS:
        RS = Avg_gain / Avg_loss

        # Calculating 14-days RSI:
        RSI = 100 - (100/(1+RS))

        return RSI
    
    def SMA(self, symbol_df):
        """ A function returns the 14-days period SMA feature to be added later to the dataframe.
        SMA = sum(14_close_prices) / 14 
    
        """
        acc = lag(symbol_df["Close"],1).over(Window().orderBy("Date"))
        SMA = avg(acc).over(Window().rowsBetween(-14,-1))
        return SMA
    
    def K(self, symbol_df):
        """ A function returns Stochastic Oscillator indicator to be added later to the dataframe.
        %K = 100 * [(Current Close_price - Lowest Low over the past 14 days) 
                 /(Highest High over the past 14 days - Lowest Low over the past 14 days)]
        """
        HH_14 = max('High').over(Window().orderBy("Date").rowsBetween(-14,0))
        LL_14 = min("Low").over(Window().orderBy("Date").rowsBetween(-14,0))
        K = 100*((symbol_df["Close"] - LL_14) / (HH_14 - LL_14))
        return K
    
    
    #Save dataframe as CSV file:
    def save_df(self, df, file_name):
        df.coalesce(1).write.csv(file_name, header=True)
        
    

In [4]:
#Example: Create dataframe of two stocks for one year time period
Example = STOCK('AAPL', 'NFLX', start_date = "2019-05-01", end_date = "2020-05-01").df_complete
Example.show(2)

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+------+
|               Date|              High|               Low|              Open|             Close|   Volume|         Adj_close|Symbol|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+------+
|2019-05-01 00:00:00|215.30999755859375|209.22999572753906| 209.8800048828125|210.52000427246094|6.48273E7|207.23912048339844|  AAPL|
|2019-05-02 00:00:00|212.64999389648438| 208.1300048828125|209.83999633789062|209.14999389648438|3.19963E7| 205.8904571533203|  AAPL|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+------+
only showing top 2 rows



In [5]:
# Stocks spark datafame:
stock = STOCK('AAPL', 'AMZN', 'NFLX', start_date = "2016-01-01", end_date = "2020-04-01")
stock_df = stock.df_complete

#create features for each stock dataframe
AAPL_df = stock.add_features("AAPL") 
AMZN_df = stock.add_features("AMZN") 
NFLX_df = stock.add_features("NFLX")

In [6]:
print('Number of records:{}'.format(AAPL_df.count()))
AAPL_df.printSchema()
AAPL_df.show()

Number of records:1064
root
 |-- Date: timestamp (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Adj_close: double (nullable = true)
 |-- Symbol: string (nullable = false)
 |-- today_trend: string (nullable = true)
 |-- tomorrow_trend: string (nullable = false)
 |-- RSI: double (nullable = true)
 |-- SMA: double (nullable = true)
 |-- K: double (nullable = true)

+-------------------+------------------+-----------------+------------------+------------------+----------+-----------------+------+-----------+--------------+------------------+------------------+------------------+
|               Date|              High|              Low|              Open|             Close|    Volume|        Adj_close|Symbol|today_trend|tomorrow_trend|               RSI|               SMA|                 K|
+-------------------+------------------+---

In [7]:
# #Save data
# stock.save_df(AAPL_df,"AAPL_stock")
# stock.save_df(AMZN_df,"AMZN_stock")
# stock.save_df(NFLX_df,"NFLX_stock")