## Predicting stock market sentiment using Economic Trends

In [None]:
from datetime import datetime

import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator

In [None]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
database = '####'
collection = 'ts_da'
user_name = '####'
password = '####'
address = 'g14cluster.tlbgg.mongodb.net'
connection_string = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"

#### Functions
- StringIndexer
- OneHotEncoding

In [None]:
def indexStringColumns(df, cols):
    # variable newdf will be updated several times
    newdf = df
    for c in cols:
        # For each given colum, fits StringIndexerModel.
        si = StringIndexer(inputCol=c, outputCol=c+"-num").setHandleInvalid("keep")
        sm = si.fit(newdf)
        # Creates a DataFame by putting the transformed values in the new colum with suffix "-num" 
        # and then drops the original columns.
        # and drop the "-num" suffix
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf

def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:
        # For each given colum, create OneHotEncoder. 
        # dropLast : Whether to drop the last category in the encoded vector (default: true)
        ohe = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
        ohe_model = ohe.fit(newdf)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-onehot" 
        #and then drops the original columns.
        #and drop the "-onehot" suffix. 
        newdf = ohe_model.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

### Preprocessing pipeline
- Fetch monthly Economic trends data
- Fetch Stock Quote trends data
- Preprocessing to obtain the percent change in stock price and other indicators from previous close

In [None]:
collection = 'econ_data'
connection_string = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"
df_econ = spark.read.format("mongo").option("uri",connection_string).load()
df_econ = df_econ.withColumn("date",col("date").cast(DateType())).withColumn('date_n',date_add(col('date'),-1).alias('date_n'))
df_econ = df_econ.withColumn('month', date_format(col('date_n'),'M').cast(LongType())).withColumn('year', date_format(col('date_n'),'y').cast(LongType()))
econ_cols = ['cpi', '10year_ty', '2year_ty', 'brent', 'corn', 'wheat', 'copper', 'unemployment', 'durables', 'retail_sales', 'federal_funds_rate']
for i in econ_cols:
    df_econ = df_econ.withColumn(i,col(i).cast(FloatType()))
    windowSpec  = Window.partitionBy().orderBy('date')
    df_econ = df_econ.withColumn('prev_'+str(i), lag(i, 1).over(windowSpec))
    df_econ = df_econ.withColumn('pc_'+str(i), round(100*(col(i) - col('prev_'+str(i)))/col('prev_'+str(i)), 4).cast(FloatType()))
    
pc_econ_cols = ['pc_'+str(i) for i in econ_cols]
pc_econ_cols.insert(0, 'date')
pc_econ_cols.insert(1, 'month')
pc_econ_cols.insert(1, 'year')

dff_econ = df_econ.select(pc_econ_cols).orderBy(['year', 'month'], ascending = [False, False])
dff_econ.show(5)

print(f'count before dropna: {dff_econ.count()}')
dff_econ = dff_econ.na.drop()
print(f'count after dropna: {dff_econ.count()}')

# ts_ma
collection = 'ts_ma'
connection_string = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"
df_tsma = spark.read.format("mongo").option("uri",connection_string).load()
df_tsma = df_tsma.withColumn("close",col("close").cast("float"))
df_tsma = df_tsma.withColumn("date", df_tsma["date"].cast(DateType())).withColumn("volume", df_tsma["volume"].cast(LongType())).withColumn('month', date_format(col('date'), 'M').cast(LongType())).withColumn('year', date_format(col('date'),'y').cast(LongType())).withColumn('logvol', log(col('volume')))

@udf('int')
def buy_sell(x):
    if (x>=0): return 1
    else: return 0

windowSpec  = Window.partitionBy('symbol').orderBy("date")
df_tsma = df_tsma.select('date', 'year', 'month', 'symbol', 'logvol', 'close', lag('close', 1).over(windowSpec).alias('close_prev'))
df_tsma = df_tsma.withColumn('percent_change', round(100*(col('close') - col('close_prev'))/col('close_prev'), 4).cast(FloatType()))
df_tsma = df_tsma.na.drop(subset=['percent_change'])
df_tsma = df_tsma.withColumn('buysell', buy_sell(col('percent_change')))

#checking null
df_tsma.filter(df_tsma['percent_change'].isNull()).count()

+----------+----+-----+------+------------+-----------+--------+-------+--------+---------+---------------+-----------+---------------+---------------------+
|      date|year|month|pc_cpi|pc_10year_ty|pc_2year_ty|pc_brent|pc_corn|pc_wheat|pc_copper|pc_unemployment|pc_durables|pc_retail_sales|pc_federal_funds_rate|
+----------+----+-----+------+------------+-----------+--------+-------+--------+---------+---------------+-----------+---------------+---------------------+
|2023-01-01|2022|   12|0.7995|     -2.4862|    -1.8648|  1.9525|    0.2| -1.0993|   7.6006|        -2.8571|   -15.7878|       -17.7817|               5.6098|
|2022-12-01|2022|   11|-0.307|     -6.9409|    -4.6667|-11.4855| -5.825| -6.0044|   3.9905|        -2.7778|    14.2997|         7.8411|               8.4656|
|2022-11-01|2022|   10|-0.101|     -2.2613|     2.7397| -2.0465|-6.5852| -2.6527|   5.2121|        -2.7027|     -4.948|         2.0948|              22.7273|
|2022-10-01|2022|    9|0.4056|     13.0682|    13.47

#### Joining, Indexing, OHE

In [None]:
# joining the two dataframes
df_j1 = df_tsma.join(dff_econ, ['year', 'month']).orderBy(['symbol','year', 'month'], ascending = [True, False, False])
df_j1.show(3)

# String Indexing
str_cols = ['symbol']
df_stri = indexStringColumns(df_j1, str_cols)

# One Hot Encoding
cat_cols = ['year', 'month']
# new joined and transformed dataframe
df_jt1 = oneHotEncodeColumns(df_stri, cat_cols)
df_jt1.show(5)

+----+-----+----------+------+------------------+--------+----------+--------------+-------+----------+------+------------+-----------+--------+-------+--------+---------+---------------+-----------+---------------+---------------------+
|year|month|      date|symbol|            logvol|   close|close_prev|percent_change|buysell|      date|pc_cpi|pc_10year_ty|pc_2year_ty|pc_brent|pc_corn|pc_wheat|pc_copper|pc_unemployment|pc_durables|pc_retail_sales|pc_federal_funds_rate|
+----+-----+----------+------+------------------+--------+----------+--------------+-------+----------+------+------------+-----------+--------+-------+--------+---------+---------------+-----------+---------------+---------------------+
|2022|   12|2022-12-30|  AAPL|21.239515506344638|129.7324|  147.6183|      -12.1163|      0|2023-01-01|0.7995|     -2.4862|    -1.8648|  1.9525|    0.2| -1.0993|   7.6006|        -2.8571|   -15.7878|       -17.7817|               5.6098|
|2022|   11|2022-11-30|  AAPL| 21.26846286900817

### Create a dataframe with Features and Labels
- Using VectorAssembler

In [None]:
class_incols = ['year', 'month', 'logvol', 'pc_cpi', 'pc_10year_ty', 'pc_2year_ty', 'pc_brent', 'pc_corn', 'pc_wheat', 
              'pc_copper', 'pc_unemployment', 'pc_durables', 'pc_retail_sales', 'pc_federal_funds_rate', 'symbol']
class_incols_num = ['logvol', 'pc_cpi', 'pc_10year_ty', 'pc_2year_ty', 'pc_brent', 'pc_corn', 'pc_wheat', 
              'pc_copper', 'pc_unemployment', 'pc_durables', 'pc_retail_sales', 'pc_federal_funds_rate']
va = VectorAssembler(outputCol="features",
                     inputCols=class_incols_num)
df_va = va.transform(df_jt1).select('features', 'buysell').withColumnRenamed('buysell', 'label')

#splitting the data
splits = df_va.randomSplit([0.8, 0.2], seed = 1)
train = splits[0].cache()
validation = splits[1].cache()
#setting evaluator
reval = BinaryClassificationEvaluator()

In [None]:
class_incols = ['year', 'month', 'logvol', 'pc_cpi', 'pc_10year_ty', 'pc_2year_ty', 'pc_brent', 'pc_corn', 'pc_wheat', 
              'pc_copper', 'pc_unemployment', 'pc_durables', 'pc_retail_sales', 'pc_federal_funds_rate', 'symbol']
class_incols_num = ['logvol', 'pc_cpi', 'pc_10year_ty', 'pc_2year_ty', 'pc_brent', 'pc_corn', 'pc_wheat', 
              'pc_copper', 'pc_unemployment', 'pc_durables', 'pc_retail_sales', 'pc_federal_funds_rate']
# considering only numeric columns
va = VectorAssembler(outputCol="features",
                     inputCols=class_incols_num + ['symbol'])
df_va = va.transform(df_jt1).select('features', 'buysell').withColumnRenamed('buysell', 'label')

#splitting the data
splits = df_va.randomSplit([0.8, 0.2], seed = 1)
train = splits[0].cache()
validation = splits[1].cache()
#setting evaluator
reval = BinaryClassificationEvaluator()

### Random Forest Classifier with Cross Validation

In [None]:
# with CV
# this takes a long time to run
rf = RandomForestClassifier()
cv = CrossValidator().setEstimator(rf).setEvaluator(reval).setNumFolds(5)

#ParamGridBuilder() – combinations of parameters and their values.
paramGrid = ParamGridBuilder().addGrid(rf.maxDepth, [5, 10, 15])\
                              .addGrid(rf.numTrees, [3, 7, 10]).build()

cv.setEstimatorParamMaps(paramGrid)
cvmodel = cv.fit(train)
cvrf_pred = cvmodel.transform(validation)
print (reval.getMetricName() +":" + str(reval.evaluate(cvrf_pred)))
cvrf_pred.show()

areaUnderROC:0.6564704635601175
+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[21.2441692119621...|    0|[5.43452380952380...|[0.77636054421768...|       0.0|
|[21.2608902109102...|    0|[1.16666666666666...|[0.16666666666666...|       1.0|
|[20.2476123058214...|    1|[1.92692307692307...|[0.27527472527472...|       1.0|
|[20.4425239337101...|    0|[6.89858793324775...|[0.98551256189253...|       0.0|
|[21.7863606414366...|    0|[5.71428571428571...|[0.81632653061224...|       0.0|
|[22.0808502543149...|    0|[3.28518518518518...|[0.46931216931216...|       1.0|
|[19.9736344874276...|    1|[1.44514767932489...|[0.20644966847498...|       1.0|
|[20.3427602511708...|    0|[1.23269454123112...|[0.17609922017587...|       1.0|
|[20.2757991857990...|    0|[0.36616161616161...|[0.05230880230880

### Random Forest

In [None]:
# without CV
rf = RandomForestClassifier(maxDepth=14, numTrees = 5)
rfmodel = rf.fit(train)

rf_pred = rfmodel.transform(validation)
print (reval.getMetricName() +":" + str(reval.evaluate(rf_pred)))
rf_pred.show()

areaUnderROC:0.6460774517818755
+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[21.2441692119621...|    0|           [1.0,4.0]|           [0.2,0.8]|       1.0|
|[21.2608902109102...|    0|           [4.0,1.0]|           [0.8,0.2]|       0.0|
|[20.2476123058214...|    1|[0.97656691134952...|[0.19531338226990...|       1.0|
|[20.4425239337101...|    0|           [4.2,0.8]|[0.84000000000000...|       0.0|
|[21.7863606414366...|    0|[4.05555555555555...|[0.81111111111111...|       0.0|
|[22.0808502543149...|    0|           [2.0,3.0]|           [0.4,0.6]|       1.0|
|[19.9736344874276...|    1|[1.30817610062893...|[0.26163522012578...|       1.0|
|[20.3427602511708...|    0|[0.16278166278166...|[0.03255633255633...|       1.0|
|[20.2757991857990...|    0|[0.54073171314550...|[0.10814634262910

### Observations
- Raw data fetched from MongoDB was pre-processed using StringIndexer and OneHotEncoding and a UDF to generate percent change in various economic indicators across time.
- This was then compared to stock price percentage change for the same time period
- Following features were taken into consideration in the final model
  - 'logvol': log value of the $ amount of traded value
  - 'pc_cpi': percent change in Consumer Price index
  - 'pc_10year_ty': percent change in 10 year Treasury Yield
  - 'pc_2year_ty': percent change in 2 year treasury yield
  - 'pc_brent': percent change in brent (crude) price
  - 'pc_corn': percent change in corn price
  - 'pc_wheat': percent change in wheat price
  - 'pc_copper': percent change in copper price
  - 'pc_unemployment': percent change in unemployment
  - 'pc_durables': percent change in durables purchase by consumers
  - 'pc_retail_sales': percent change in retail sales
  - 'pc_federal_funds_rate': percent change federal funds rate
- It can be seen that the above Model's Area under the ROC curve is ~0.65, which indicates that the model performs better than a random model which would have Area under ROC to be 0.50
- Several other models were given a try (Random Forest CV) out of which random forest with maxDepth = 14 and numTrees = 5 works the best

#### VectorAssembler feature and label data - Saved to Mongo

In [None]:
def sparseToDenseArray(sparse_array):
    return sparse_array.toArray().tolist()

udf_sparse_dense_array = udf(sparseToDenseArray, ArrayType(FloatType()))
df_va_dense_v_to_array =  df_va.select(udf_sparse_dense_array(df_va["features"]).alias("features"), df_va["label"])

In [None]:
collection = 'ts_ma_econ_data_va'
connection_string = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"
df_va_dense_v_to_array.write.format("com.mongodb.spark.sql.DefaultSource")\
                     .mode("overwrite")\
                     .option("uri", connection_string)\
                     .save()