<a href="https://colab.research.google.com/github/mochamadness/Predicting-The-Stock-Trend-Using-News-Sentiment-Analysis-and-Technical-Indicators-in-Spark/blob/Dung/Financial_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install findspark
!pip install pyspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=beac632ea5c4545fe8a4d8445e334f090e8535d5c0ff156360fb845caf044b07
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import length, col
from pyspark import SparkContext
sc = SparkContext()
sql = SQLContext(sc)



In [3]:
import pandas as pd
import yfinance as yf
from functools import reduce
import pandas_datareader as pdr
from pyspark.sql.window import Window
from pyspark.sql.functions import lit, DataFrame, udf, lead,lag, when, avg, abs, max, min

  _empty_series = pd.Series()


##  Data import and Feature Generation

In [16]:
class STOCK():
    def __init__(self,*args, start_date, end_date):
        # Combine individual stock dataframes in one Spark dataframe
        self.df_complete = reduce(DataFrame.union, self.complete_df(args,start_date, end_date))

    # Import Stocks data as spark dataframe from Yahoo finance
    def complete_df(self, args, start_date, end_date):
        dfs = []
        for symbol in args:
            stock = sql.createDataFrame(yf.download(symbol, start=start_date, end=end_date).reset_index()) \
                       .withColumn("Symbol", lit(symbol)).withColumnRenamed('Adj Close', 'Adj_close')
            dfs.append(stock)
        return dfs

    #Generate Features:
    def add_features(self, symbol):
        """ T_trend_udf: returns user defined function to create Today's Trend feature by subtracting the
                         open price from the close price of the trading day.
            F_trend: return a window function that accesses the next day close's price to substract today's close
                     price from it and create Future Trend feature
        """

        symbol_df = self.df_complete.filter(self.df_complete["Symbol"] == symbol)
        T_trend_udf = udf(lambda Close_price, Open_price: "Uptrend" if Close_price - Open_price >= 0 else "Downtrend")
        F_trend = lead(symbol_df["Close"],1).over(Window().orderBy("Date"))

        # Add all created features to the dataframe:
        symbol_df = symbol_df.withColumn("today_trend", T_trend_udf(symbol_df["Close"], symbol_df["Open"])) \
                             .withColumn("tomorrow_trend", when((F_trend - symbol_df["Close"]) >= 0, "Uptrend") \
                                        .otherwise("Downtrend")) \
                             .withColumn("RSI", self.RSI(symbol_df)) \
                             .withColumn("SMA", self.SMA(symbol_df)) \
                             .withColumn("K", self.K(symbol_df))

        return symbol_df.na.drop().filter(symbol_df["RSI"] > 0)




    def RSI(self, symbol_df):
        """ A function returns the 14-days period RSI feature to be added later to the dataframe.
        RSI = 100 - 100/[1 + (Average of 14 days gains) / (Average of 14 days losses)]

        """

        # Creating gain_loss column for each day, close_price_today - close_price_yesterday:
        gain_loss = symbol_df['close'] - lag(symbol_df["Close"],1).over(Window().orderBy("Date"))

        # Calculating the Average of 14 days gains
        gain = when(gain_loss > 0, gain_loss).otherwise(0)
        Avg_gain = avg(gain).over(Window.orderBy("Date").rowsBetween(-14, -1))

        # Calculating the Average of 14 days loss
        loss = when(gain_loss < 0, gain_loss).otherwise(0)
        Avg_loss = avg(abs(loss)).over(Window.orderBy("Date").rowsBetween(-14, -1))

        # Calculating RS:
        RS = Avg_gain / Avg_loss

        # Calculating 14-days RSI:
        RSI = 100 - (100/(1+RS))

        return RSI

    def SMA(self, symbol_df):
        """ A function returns the 14-days period SMA feature to be added later to the dataframe.
        SMA = sum(14_close_prices) / 14

        """
        acc = lag(symbol_df["Close"],1).over(Window().orderBy("Date"))
        SMA = avg(acc).over(Window().rowsBetween(-14,-1))
        return SMA

    def K(self, symbol_df):
        """ A function returns Stochastic Oscillator indicator to be added later to the dataframe.
        %K = 100 * [(Current Close_price - Lowest Low over the past 14 days)
                 /(Highest High over the past 14 days - Lowest Low over the past 14 days)]
        """
        HH_14 = max('High').over(Window().orderBy("Date").rowsBetween(-14,0))
        LL_14 = min("Low").over(Window().orderBy("Date").rowsBetween(-14,0))
        K = 100*((symbol_df["Close"] - LL_14) / (HH_14 - LL_14))
        return K


    #Save dataframe as CSV file:
    def save_df(self, df, file_name):
        df.coalesce(1).write.csv(file_name, header=True)



In [17]:
#Example: Create dataframe of two stocks for one year time period
Example = STOCK('AAPL', 'NFLX', start_date = "2019-05-01", end_date = "2020-05-01").df_complete
Example.show(2)

[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed


+-------------------+------------------+------------------+------------------+------------------+------------------+---------+------+
|               Date|              Open|              High|               Low|             Close|         Adj_close|   Volume|Symbol|
+-------------------+------------------+------------------+------------------+------------------+------------------+---------+------+
|2019-05-01 00:00:00|52.470001220703125| 53.82749938964844|52.307498931884766|52.630001068115234| 50.73149490356445|259309200|  AAPL|
|2019-05-02 00:00:00|52.459999084472656|53.162498474121094|52.032501220703125|52.287498474121094|50.401344299316406|127985200|  AAPL|
+-------------------+------------------+------------------+------------------+------------------+------------------+---------+------+
only showing top 2 rows



In [18]:
# Stocks spark datafame:
stock = STOCK('AAPL', 'AMZN', 'NFLX', start_date = "2016-01-01", end_date = "2020-04-01")
stock_df = stock.df_complete

#create features for each stock dataframe
AAPL_df = stock.add_features("AAPL")
AMZN_df = stock.add_features("AMZN")
NFLX_df = stock.add_features("NFLX")

[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed


In [19]:
print('Number of records:{}'.format(AAPL_df.count()))
AAPL_df.printSchema()
AAPL_df.show()

Number of records:1063
root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj_close: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- Symbol: string (nullable = false)
 |-- today_trend: string (nullable = true)
 |-- tomorrow_trend: string (nullable = false)
 |-- RSI: double (nullable = true)
 |-- SMA: double (nullable = true)
 |-- K: double (nullable = true)

+-------------------+------------------+------------------+------------------+------------------+------------------+---------+------+-----------+--------------+------------------+------------------+------------------+
|               Date|              Open|              High|               Low|             Close|         Adj_close|   Volume|Symbol|today_trend|tomorrow_trend|               RSI|               SMA|                 K|
+-------------------+------------------+---

In [None]:
# #Save data
# stock.save_df(AAPL_df,"AAPL_stock")
# stock.save_df(AMZN_df,"AMZN_stock")
# stock.save_df(NFLX_df,"NFLX_stock")