<a href="https://colab.research.google.com/github/stevegbrooks/big-portfolio-learner/blob/data_integration/GLM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# CIS 545 Final Project

## Big Portfolio Learner: Integrate Other Data

### Team members: Steven Brooks & Chenlia Xu

# Section 1: Setting Up Environment

In [2]:
import random
import numpy as np 
import json
import matplotlib
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import cm
from datetime import datetime
import glob
import seaborn as sns
import re
import os

In [3]:
%%capture
## If boto3 not already installed uncomment the following:
!pip3 install boto3

In [4]:
import boto3
from botocore import UNSIGNED

from botocore.config import Config

s3 = boto3.resource('s3', config=Config(signature_version=UNSIGNED))
s3.Bucket('cis545project').download_file('data/stock_data.zip', 'stock_data.zip')
s3.Bucket('cis545project').download_file('data/technical_data.zip', 'technical_data.zip')

In [5]:
%%capture

stock_dir = "stock_data"
if not os.path.exists(stock_dir):
  os.makedirs(stock_dir)
!unzip /content/stock_data.zip -d /content/$stock_dir
!rm -f $stock_dir/.gitempty

# tech_dir = "technical_data"
# if not os.path.exists(tech_dir):
#   os.makedirs(tech_dir)
# !unzip /content/technical_data.zip -d /content/$tech_dir
# !rm -f $tech_dir/.gitempty

# Section 2: Setup Spark Session

In [6]:
%%capture

!apt install libkrb5-dev
!pip install findspark
!pip install sparkmagic
!pip install pyspark
!pip install pyspark --user
!pip install seaborn --user
!pip install imageio --user
!pip install folium --user

!apt update
!apt install gcc python-dev libkrb5-dev

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

import os

spark = SparkSession.builder.getOrCreate()

In [8]:
%load_ext sparkmagic.magics

In [9]:
#graph section
import networkx as nx
# SQLite RDBMS
import sqlite3
# Parallel processing
# import swifter
import pandas as pd
# NoSQL DB
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError, OperationFailure

import os
os.environ['SPARK_HOME'] = '/content/spark-2.4.5-bin-hadoop2.7'
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
import pyspark
from pyspark.sql import SQLContext

try:
    if(spark == None):
        spark = SparkSession.builder.appName('Initial').getOrCreate()
        sqlContext=SQLContext(spark)
except NameError:
    spark = SparkSession.builder.appName('Initial').getOrCreate()
    sqlContext=SQLContext(spark)

# Section 3: Load stock and technical data

In [10]:
stock_data_sdf = spark.read.load(
    'stock_data/*.csv', 
    format = 'csv', 
    header = 'true', 
    inferSchema = 'true', 
    sep = ','
)

First we'll set up the spark dataframe for stock prices using the work in `step1a`.

In [11]:
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import year, month, date_format


stock_data_sdf = stock_data_sdf.withColumn("timestamp_as_dt", to_timestamp(stock_data_sdf.timestamp, 'yyyy-MM-dd'))
stock_data_sdf = stock_data_sdf.withColumn("year", year('timestamp_as_dt'))
stock_data_sdf = stock_data_sdf.filter("year >= 2002 AND year <= 2019")

count_by_symbol_year_sdf = stock_data_sdf.groupBy(["symbol", "year"]).count()
count_years_by_symbol_sdf = count_by_symbol_year_sdf.groupBy(["symbol"]).count()

### Just grab stocks that have data in each of the 18 years from 2002 to 2019
### AND remove the three outliers
stocks_with_all_analysis_yrs_sdf = count_years_by_symbol_sdf.filter("count == 18") #18 years of data from 2002 and 2019
stocks_to_remove = ['DCTH', 'BRK-A', 'AIKI']
stocks_with_all_analysis_yrs_sdf = stocks_with_all_analysis_yrs_sdf.filter(stocks_with_all_analysis_yrs_sdf.symbol.isin(stocks_to_remove) == False)

stock_data_sdf.createOrReplaceTempView("stock_data")
stocks_with_all_analysis_yrs_sdf.createOrReplaceTempView("stocks_with_all_analysis_yrs")

stock_data_2002_2019_sdf = spark.sql(
    """
    SELECT *
    FROM stock_data
    WHERE symbol IN (SELECT symbol FROM stocks_with_all_analysis_yrs)
    """
)

Second, we set up the spark dataframe for technical indicators.  

In [12]:
# technical_data_sdf = spark.read.load(
#     'technical_data/*.csv', 
#     format = 'csv', 
#     header = 'true', 
#     inferSchema = 'true', 
#     sep = ','
# )

Then we will reduce the technical data set to just those stocks that match up with the first dataset above.

In [13]:
# technical_data_sdf = technical_data_sdf.withColumn("timestamp_as_dt", to_timestamp(technical_data_sdf.timestamp, 'yyyy-MM-dd'))
# technical_data_sdf = technical_data_sdf.withColumn("year", year('timestamp_as_dt'))
# technical_data_sdf = technical_data_sdf.filter("year >= 2002 AND year <= 2019")

# technical_data_sdf.createOrReplaceTempView("technical_data")

# technical_data_2002_2019_sdf = spark.sql(
#     """
#     SELECT *
#     FROM technical_data
#     WHERE symbol IN (SELECT symbol FROM stocks_with_all_analysis_yrs)
#     """
# )

Next step is to clean up technical data (especially fixing the schema so its numeric instead of string) and then setting it up for modeling.

# Section 4: Load Other Data.  

### Step 1: Download other data from google drive.  

In [16]:
# from google.colab import drive
# drive.mount('/content/gdrive')

os.mkdir('/content/other_data')
!cp '/content/gdrive/My Drive/other_data/forex.csv' other_data/forex.csv
!cp '/content/gdrive/My Drive/other_data/gdp.csv' other_data/gdp.csv
!cp '/content/gdrive/My Drive/other_data/stockindex.csv' other_data/stockindex.csv
!cp '/content/gdrive/My Drive/other_data/tbond.csv' other_data/tbond.csv
!cp '/content/gdrive/My Drive/other_data/unemployment.csv' other_data/unemployment.csv


### Step 2: Acquire overnight stock index performance of other major stock markets, e.g. FTSE, DAX, CAC, Nikkei, HKSE, SHSE.

In [17]:
stockindex_data = pd.read_csv('/content/other_data/stockindex.csv')

stockindex_data = stockindex_data[['date', 'FTSE100', 'DAX', 'CAC', 'Nikkei225', 'Hang_Seng', 'SHSZ300']]
stockindex_data['date'] = pd.to_datetime(stockindex_data.date)
stockindex_data = stockindex_data[stockindex_data['date'] < pd.Timestamp(2020,1,1)]

# filling missing data
stockindex_data = stockindex_data.fillna(method='ffill')
stockindex_data.iloc[0, stockindex_data.columns.get_loc('Nikkei225')] = 10871.49
stockindex_data.iloc[1, stockindex_data.columns.get_loc('Nikkei225')] = 10871.49
stockindex_data.iloc[0, stockindex_data.columns.get_loc('SHSZ300')] = 1316.46
stockindex_data.iloc[1, stockindex_data.columns.get_loc('SHSZ300')] = 1316.46

# add quarter / month column
stockindex_data['quarter'] = stockindex_data['date'].dt.to_period('Q')
stockindex_data['month'] = stockindex_data['date'].dt.to_period('M')

# calculating day-over-day % price change
stockindex_data['FTSE100_DoD'] = stockindex_data['FTSE100'].pct_change(1)
stockindex_data['DAX_DoD'] = stockindex_data['DAX'].pct_change(1)
stockindex_data['CAC_DoD'] = stockindex_data['CAC'].pct_change(1)
stockindex_data['Nikkei225_DoD'] = stockindex_data['Nikkei225'].pct_change(1)
stockindex_data['Hang_Seng_DoD'] = stockindex_data['Hang_Seng'].pct_change(1)
stockindex_data['SHSZ300_DoD'] = stockindex_data['SHSZ300'].pct_change(1)
stockindex_data = stockindex_data[['date', 'quarter', 'month', 'FTSE100_DoD', 'DAX_DoD', 'CAC_DoD', 'Nikkei225_DoD', 'Hang_Seng_DoD', 'SHSZ300_DoD']]
                        
stockindex_data.head()

Unnamed: 0,date,quarter,month,FTSE100_DoD,DAX_DoD,CAC_DoD,Nikkei225_DoD,Hang_Seng_DoD,SHSZ300_DoD
0,2002-01-02,2002Q1,2002-01,,,,,,
1,2002-01-03,2002Q1,2002-01,0.019259,0.019817,0.021998,0.0,0.006402,0.0
2,2002-01-04,2002Q1,2002-01,0.00094,0.009191,0.00034,0.0,0.024391,0.0
3,2002-01-07,2002Q1,2002-01,-0.005673,-0.016265,-0.014361,0.006519,0.016278,-0.010923
4,2002-01-08,2002Q1,2002-01,-0.008161,0.000793,-0.01028,-0.022551,-0.015045,-0.007196


### Step 3: Acquire tresaury bond (1-yr, 3yr, 10-yr) market daily performance

In [18]:
tbond_data = pd.read_csv('/content/other_data/tbond.csv')

tbond_data = tbond_data[['date', '3M', '1Y', '10Y']]
tbond_data['date'] = pd.to_datetime(tbond_data.date)
tbond_data = tbond_data[tbond_data['date'] < pd.Timestamp(2020,1,1)]

tbond_data['3M_DoD'] = tbond_data['3M'].pct_change(1)
tbond_data['1Y_DoD'] = tbond_data['1Y'].pct_change(1)
tbond_data['10Y_DoD'] = tbond_data['10Y'].pct_change(1)
tbond_data = tbond_data[['date', '3M_DoD', '1Y_DoD','10Y_DoD']]
                        
tbond_data.head()

Unnamed: 0,date,3M_DoD,1Y_DoD,10Y_DoD
0,2002-01-02,,,
1,2002-01-03,-0.005747,-0.017544,-0.007692
2,2002-01-04,-0.00578,0.004464,0.003876
3,2002-01-07,-0.023256,-0.026667,-0.017375
4,2002-01-08,0.0,0.0,0.001965


### Step 4: Acquire forex market (USD/EUR, USD/JPY, USD/AUD, etc.) daily performance.  

In [19]:
forex_data = pd.read_csv('/content/other_data/forex.csv')

forex_data = forex_data[['date', 'USDGBP', 'USDEUR', 'USDJPY', 'USDHKD', 'USDAUD', 'USDCAD']]

# filling missing data
forex_data = forex_data.fillna(method='ffill')

# convert data format
forex_data['date'] = pd.to_datetime(forex_data.date)
forex_data = forex_data[forex_data['date'] < pd.Timestamp(2020,1,1)]

# calculating day-over-day % price change
forex_data['USDGBP_DoD'] = forex_data['USDGBP'].pct_change(1)
forex_data['USDEUR_DoD'] = forex_data['USDEUR'].pct_change(1)
forex_data['USDJPY_DoD'] = forex_data['USDJPY'].pct_change(1)
forex_data['USDHKD_DoD'] = forex_data['USDHKD'].pct_change(1)
forex_data['USDAUD_DoD'] = forex_data['USDAUD'].pct_change(1)
forex_data['USDCAD_DoD'] = forex_data['USDCAD'].pct_change(1)
forex_data = forex_data[['date', 'USDGBP_DoD', 'USDEUR_DoD', 'USDJPY_DoD', 'USDHKD_DoD', 'USDAUD_DoD', 'USDCAD_DoD']]
                        
forex_data.head()

Unnamed: 0,date,USDGBP_DoD,USDEUR_DoD,USDJPY_DoD,USDHKD_DoD,USDAUD_DoD,USDCAD_DoD
0,2002-01-02,,,,,,
1,2002-01-03,0.005496,-0.003692,0.000304,-2.6e-05,-0.000412,0.001317
2,2002-01-04,-0.006185,0.009219,-0.004331,3.8e-05,-0.011121,-0.000626
3,2002-01-07,0.004921,0.002418,-0.001908,7.7e-05,0.004634,-0.000689
4,2002-01-08,0.00072,0.003484,0.014221,1.3e-05,-0.010158,0.004704


### Step 5: Acquire economic data, including monthly unemployment rate, quarterly yoy GDP growth.  

In [20]:
gdp_data = pd.read_csv('/content/other_data/gdp.csv')

gdp_data = gdp_data[['date', 'real_gdp']]

# filling missing data
gdp_data = gdp_data.fillna(method='ffill')

# convert data format
gdp_data['date'] = pd.to_datetime(gdp_data.date)
gdp_data = gdp_data[gdp_data['date'] < pd.Timestamp(2020,1,1)]
gdp_data['real_gdp_QoQ'] = gdp_data['real_gdp'].pct_change(1)
gdp_data = gdp_data.drop(columns=['real_gdp'])

# add quarter column
gdp_data['quarter'] = gdp_data['date'].dt.to_period('Q')

gdp_data.head()

Unnamed: 0,date,real_gdp_QoQ,quarter
0,2002-03-31,,2002Q1
1,2002-06-30,0.006159,2002Q2
2,2002-09-30,0.004029,2002Q3
3,2002-12-31,0.001308,2002Q4
4,2003-03-31,0.005166,2003Q1


### Step 5: Acquire monthly unemployment data.

In [21]:
unemployment_data = pd.read_csv('/content/other_data/unemployment.csv')

unemployment_data = unemployment_data[['date', 'unemployment']]

# filling missing data
unemployment_data = unemployment_data.fillna(method='ffill')

# convert data format
unemployment_data['date'] = pd.to_datetime(unemployment_data.date)
unemployment_data = unemployment_data[unemployment_data['date'] < pd.Timestamp(2020,1,1)]
unemployment_data['unemployment_MoM'] = unemployment_data['unemployment'].pct_change(1)
unemployment_data = unemployment_data.drop(columns=['unemployment'])

# add month column
unemployment_data['month'] = unemployment_data['date'].dt.to_period('M')

unemployment_data.head()

Unnamed: 0,date,unemployment_MoM,month
0,2002-01-31,,2002-01
1,2002-02-28,0.0,2002-02
2,2002-03-31,0.0,2002-03
3,2002-04-30,0.035088,2002-04
4,2002-05-31,-0.016949,2002-05


### Step 6: Merge all other data into a single dataframe.  

In [22]:
all_other_data = stockindex_data.merge(tbond_data, left_on='date', right_on='date', how='left')
all_other_data = all_other_data.merge(forex_data, left_on='date', right_on='date', how='left')
all_other_data = all_other_data.merge(gdp_data, left_on='quarter', right_on='quarter', how='left')
all_other_data = all_other_data.merge(unemployment_data, left_on='month', right_on='month', how='left')

all_other_data = all_other_data.drop(columns=['date_x', 'date_y'])
all_other_data = all_other_data.dropna()

all_other_data.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 4438 entries, 63 to 4682
Data columns (total 20 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   quarter           4438 non-null   period[Q-DEC] 
 1   month             4438 non-null   period[M]     
 2   FTSE100_DoD       4438 non-null   float64       
 3   DAX_DoD           4438 non-null   float64       
 4   CAC_DoD           4438 non-null   float64       
 5   Nikkei225_DoD     4438 non-null   float64       
 6   Hang_Seng_DoD     4438 non-null   float64       
 7   SHSZ300_DoD       4438 non-null   float64       
 8   3M_DoD            4438 non-null   float64       
 9   1Y_DoD            4438 non-null   float64       
 10  10Y_DoD           4438 non-null   float64       
 11  USDGBP_DoD        4438 non-null   float64       
 12  USDEUR_DoD        4438 non-null   float64       
 13  USDJPY_DoD        4438 non-null   float64       
 14  USDHKD_DoD        4438 

# Section 5: Combine stock price with all other data to create mega_data.  

### Step 1: Prepare stock data for integration.  

In [23]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

price_window = Window.partitionBy("symbol").orderBy("symbol")
stock_data_2002_2019_sdf = stock_data_2002_2019_sdf.withColumn("prev_price", F.lead(stock_data_2002_2019_sdf.adjusted_close).over(price_window))
stock_data_2002_2019_sdf = stock_data_2002_2019_sdf.withColumn("price_DoD", F.when(F.isnull(stock_data_2002_2019_sdf.adjusted_close - stock_data_2002_2019_sdf.prev_price), 0).otherwise((stock_data_2002_2019_sdf.adjusted_close - stock_data_2002_2019_sdf.prev_price)/stock_data_2002_2019_sdf.prev_price))

stock_data_2002_2019_sdf.show()


+------+----------+--------------+-------+-------------------+----+-------------+--------------------+
|symbol| timestamp|adjusted_close| volume|    timestamp_as_dt|year|   prev_price|           price_DoD|
+------+----------+--------------+-------+-------------------+----+-------------+--------------------+
|   ABM|2019-12-31| 36.4209157617| 285355|2019-12-31 00:00:00|2019|36.8774386535|-0.01237946312078...|
|   ABM|2019-12-30| 36.8774386535| 226209|2019-12-30 00:00:00|2019|37.0023817608|-0.00337662337812...|
|   ABM|2019-12-27| 37.0023817608| 241733|2019-12-27 00:00:00|2019|37.1177138598|-0.00310719834296...|
|   ABM|2019-12-26| 37.1177138598| 357479|2019-12-26 00:00:00|2019|36.7140515133|0.010994764398414835|
|   ABM|2019-12-24| 36.7140515133| 166650|2019-12-24 00:00:00|2019|36.2719451338|0.012188659248053945|
|   ABM|2019-12-23| 36.2719451338| 519962|2019-12-23 00:00:00|2019|37.1946019258|-0.02480620155152132|
|   ABM|2019-12-20| 37.1946019258|1423851|2019-12-20 00:00:00|2019|37.444

### Step 2: Integration by joining stock data with all other data.  

In [24]:
all_other_data_sdf = spark.createDataFrame(all_other_data) 

stock_data_2002_2019_sdf.createOrReplaceTempView("stock_data_0219_596")
all_other_data_sdf.createOrReplaceTempView("all_other_data")

mega_data_sdf = spark.sql(
    """
    SELECT stock_data_0219_596.symbol, stock_data_0219_596.timestamp_as_dt, stock_data_0219_596.adjusted_close, stock_data_0219_596.price_DoD, all_other_data.*
    FROM stock_data_0219_596
    LEFT JOIN all_other_data
      ON stock_data_0219_596.timestamp_as_dt = all_other_data.date;
    """
)

mega_data_sdf.show()


+------+-------------------+--------------+--------------------+-------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+----------------+
|symbol|    timestamp_as_dt|adjusted_close|           price_DoD|quarter|month|         FTSE100_DoD|             DAX_DoD|             CAC_DoD|       Nikkei225_DoD|       Hang_Seng_DoD|         SHSZ300_DoD|              3M_DoD|              1Y_DoD|             10Y_DoD|          USDGBP_DoD|          USDEUR_DoD|          USDJPY_DoD|          USDHKD_DoD|          USDAUD_DoD|          USDCAD_DoD|        real_gdp_QoQ|               date|unemployment_MoM|
+------+-------------------+--------------+--------------------+-------+-----+------------------

# Section 6: Run Linear Regression model on mega data.  

### Step 1: Split the data into features and label

In [33]:
label = mega_data_sdf['price_DoD']
features = mega_data_sdf['FTSE100_DoD', 'DAX_DoD', 'CAC_DoD', 'Nikkei225_DoD', 'Hang_Seng_DoD', 'SHSZ300_DoD', '3M_DoD', '1Y_DoD', '10Y_DoD', 
                           'USDGBP_DoD', 'USDEUR_DoD', 'USDJPY_DoD', 'USDHKD_DoD', 'USDAUD_DoD', 'USDCAD_DoD', 'real_gdp_QoQ', 'unemployment_MoM']

train_sdf, test_sdf = mega_data_sdf.randomSplit([0.8, 0.2])

from pyspark.ml.feature import StringIndexer, VectorAssembler
assembler = VectorAssembler(inputCols=features, outputCol="features", handleInvalid="skip")
featured_sdf = assembler.transform(mega_data_sdf)


TypeError: ignored

### Step: Run linear regression model.  

In [31]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol=features, labelCol=label)
lr_model = lr.fit(train_sdf)

trainingSummary = lr_model.summary

print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)


TypeError: ignored

# Section 7: Construct our portfolio with GLM.  