In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()

24/11/13 12:18:43 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.0.108 instead (on interface wlp1s0)
24/11/13 12:18:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/13 12:18:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

In [4]:
earnings = spark.read.csv('lly_earnings.csv', header=True, inferSchema=True)
prices = spark.read.csv('lly_prices.csv', header=True, inferSchema=True)

In [5]:
earnings.printSchema()

root
 |-- date: date (nullable = true)
 |-- period_end: date (nullable = true)
 |-- eps: double (nullable = true)
 |-- estimated_eps: double (nullable = true)
 |-- surprise: double (nullable = true)
 |-- surprise_pct: double (nullable = true)
 |-- report_time: string (nullable = true)
 |-- eps_ms: double (nullable = true)



In [6]:
earnings['eps_ms']

Column<'eps_ms'>

In [7]:
column_rename_map = {col:col[1:-1].lower() for col in prices.columns}
prices = prices.withColumnsRenamed(column_rename_map)

In [8]:
earnings = earnings.withColumnRenamed('date', 'filed_date')

In [9]:
earnings.show()

+----------+----------+----+-------------+--------+------------+-----------+------------------+
|filed_date|period_end| eps|estimated_eps|surprise|surprise_pct|report_time|            eps_ms|
+----------+----------+----+-------------+--------+------------+-----------+------------------+
|1996-04-15|1996-03-31|0.36|         0.36|     0.0|         0.0| pre-market|              NULL|
|1996-07-16|1996-06-30|0.32|         0.32|     0.0|         0.0| pre-market|              NULL|
|1996-10-22|1996-09-30|0.38|         0.32|    0.06|       18.75| pre-market|              NULL|
|1997-01-27|1996-12-31|0.34|         0.34|     0.0|         0.0| pre-market|               1.4|
|1997-04-21|1997-03-31|0.41|          0.4|    0.01|         2.5| pre-market|1.4500000000000002|
|1997-07-23|1997-06-30|0.38|         0.38|     0.0|         0.0| pre-market|              1.51|
|1997-10-20|1997-09-30|0.41|          0.4|    0.01|         2.5| pre-market|              1.54|
|1998-01-29|1997-12-31|0.41|         0.4

In [10]:
joined = prices.join(earnings, on=(prices['date'] == earnings['filed_date']), how='outer')

In [11]:
import pyspark.sql.functions as F

In [12]:
joined = joined.withColumn(col=F.coalesce(joined['date'], joined['filed_date']), colName='date')

In [13]:
final = joined.select(['date', 'close', 'eps_ms']).sort('date')

In [14]:
from pyspark.sql.window import Window as W

In [15]:
return_lags = -64
final = final.withColumn('return', F.lag(final['close'], return_lags).over(W.orderBy('date')) / final['close'])

In [16]:
ffill_window = W.orderBy('date').rowsBetween(W.unboundedPreceding, W.currentRow)
final = final.withColumn('eps_ms', F.last(final['eps_ms'], ignorenulls=True).over(ffill_window))

In [17]:
final = final.withColumn('log_adj_ey', F.log(1 + (final['eps_ms'] / final['close'])))

In [18]:
final = final.withColumn('log_return', F.log(final['return']))

In [19]:
final = final.select(['date', 'log_adj_ey', 'log_return']).na.drop()

In [20]:
from pyspark.ml.feature import VectorAssembler
feature_assembler = VectorAssembler(inputCols=['log_adj_ey'], outputCol='independent_features')
final = feature_assembler.transform(final)
final = final.select(['date', 'independent_features', 'log_return'])

In [21]:
from pyspark.ml.regression import LinearRegression
from datetime import date

train = final.where(final['date'] < date(2020,1,1))
test = final.where(final['date'] >= date(2020,1,1))

In [22]:
model = LinearRegression(featuresCol='independent_features', labelCol='log_return').fit(train)

24/11/13 12:18:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/13 12:18:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/13 12:18:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/13 12:18:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/13 12:18:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/13 12:18:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/13 1

In [23]:
model.coefficients

DenseVector([0.0487])

In [24]:
model.intercept

0.012593409373648898

In [25]:
predictions = model.evaluate(test)

24/11/13 12:18:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/13 12:18:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/13 12:18:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/13 12:18:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/13 12:18:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/13 12:18:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/13 1

In [26]:
predictions.r2

-0.5451897354293682