In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
# from pyspark.ml.feature import OneHotEncoderEstimator, VectorAssembler, StringIndexer
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.sql import functions as F
from pyspark.sql.window import Window


In [6]:
# init spark session
spark = SparkSession \
        .builder \
        .appName("StockAnalysis") \
        .getOrCreate()
        


In [14]:
# Load Data
df = spark.read.format('parquet').load('AMZN.parquet')
# multiple stocks
# df = spark.read.format('parquet').load('**.parquet')

In [15]:
df.schema

StructType([StructField('TradeDate', DateType(), True), StructField('BarDateTime', TimestampType(), True), StructField('Ticker', StringType(), True), StructField('SecId', LongType(), True), StructField('OpenBarTimeOffset', DecimalType(1,0), True), StructField('OpenBidPrice', DecimalType(6,2), True), StructField('OpenBidSize', LongType(), True), StructField('OpenAskPrice', DecimalType(6,2), True), StructField('OpenAskSize', LongType(), True), StructField('FirstTradeTimeOffset', DecimalType(11,9), True), StructField('FirstTradePrice', DecimalType(8,4), True), StructField('FirstTradeSize', LongType(), True), StructField('HighBidTimeOffset', DecimalType(11,9), True), StructField('HighBidPrice', DecimalType(6,2), True), StructField('HighBidSize', LongType(), True), StructField('HighAskTimeOffset', DecimalType(11,9), True), StructField('HighAskPrice', DecimalType(6,2), True), StructField('HighAskSize', LongType(), True), StructField('HighTradeTimeOffset', DecimalType(11,9), True), StructFiel

In [16]:
df.head()

Row(TradeDate=datetime.date(2022, 4, 14), BarDateTime=datetime.datetime(2022, 4, 14, 8, 0), Ticker='AMZN', SecId=33127, OpenBarTimeOffset=Decimal('0'), OpenBidPrice=Decimal('0.00'), OpenBidSize=0, OpenAskPrice=Decimal('0.00'), OpenAskSize=0, FirstTradeTimeOffset=Decimal('0.030081174'), FirstTradePrice=Decimal('3117.0800'), FirstTradeSize=3, HighBidTimeOffset=Decimal('0.080006998'), HighBidPrice=Decimal('3113.00'), HighBidSize=100, HighAskTimeOffset=Decimal('0.030556032'), HighAskPrice=Decimal('3505.00'), HighAskSize=100, HighTradeTimeOffset=Decimal('0.077200828'), HighTradePrice=Decimal('3120.0000'), HighTradeSize=3, LowBidTimeOffset=Decimal('0.030282024'), LowBidPrice=Decimal('2727.70'), LowBidSize=100, LowAskTimeOffset=Decimal('33.972592287'), LowAskPrice=Decimal('3117.69'), LowAskSize=100, LowTradeTimeOffset=Decimal('0.862502277'), LowTradePrice=Decimal('3113.0000'), LowTradeSize=2, CloseBarTimeOffset=Decimal('59.999999999'), CloseBidPrice=Decimal('3113.00'), CloseBidSize=100, Close

In [17]:
df = df.withColumn("Return", F.log(df["LastTradePrice"]/df["FirstTradePrice"]) * 100)
df.select('Return').describe().show()

+-------+--------------------+
|summary|              Return|
+-------+--------------------+
|  count|              228934|
|   mean|-9.41443904713050...|
| stddev|  0.2000002763424231|
|    min|  -55.86041421949761|
|    max|  24.087798249203324|
+-------+--------------------+



## Month with the highest average return

In [19]:
df = df.withColumn('Month', F.month(df['TradeDate']))
df.groupBy('Month').agg({'Return': 'mean'}).orderBy('avg(Return)').collect()


[Row(Month=10, avg(Return)=-0.003955481201327299),
 Row(Month=4, avg(Return)=-0.0037670491957626337),
 Row(Month=1, avg(Return)=-0.002735197910440544),
 Row(Month=3, avg(Return)=-0.0018026123002441684),
 Row(Month=8, avg(Return)=-0.0009237852396056522),
 Row(Month=9, avg(Return)=-0.0007859928816761018),
 Row(Month=5, avg(Return)=-0.00043664948961306423),
 Row(Month=6, avg(Return)=0.00015944247885182574),
 Row(Month=11, avg(Return)=0.0002233009375146218),
 Row(Month=12, avg(Return)=0.00027493932919336493),
 Row(Month=2, avg(Return)=0.0009285135468963352),
 Row(Month=7, avg(Return)=0.0013669028175564034)]

## Day with the highest average return

In [20]:
df = df.withColumn('Date', F.dayofyear(df['TradeDate']))
df.groupBy('Date').agg({'Return': 'mean'}).orderBy('avg(Return)').collect()

[Row(Date=293, avg(Return)=-0.06981610179606702),
 Row(Date=77, avg(Return)=-0.03099927410520282),
 Row(Date=66, avg(Return)=-0.01857422129038664),
 Row(Date=300, avg(Return)=-0.012712887430613454),
 Row(Date=138, avg(Return)=-0.012568052601052666),
 Row(Date=33, avg(Return)=-0.0113173038106261),
 Row(Date=89, avg(Return)=-0.010316042197160516),
 Row(Date=26, avg(Return)=-0.009890792974510575),
 Row(Date=301, avg(Return)=-0.009691582888972787),
 Row(Date=7, avg(Return)=-0.009094643862512368),
 Row(Date=161, avg(Return)=-0.008623506401786005),
 Row(Date=104, avg(Return)=-0.008485827903697618),
 Row(Date=14, avg(Return)=-0.008294086315457524),
 Row(Date=112, avg(Return)=-0.008227457583214734),
 Row(Date=54, avg(Return)=-0.007988419886487443),
 Row(Date=206, avg(Return)=-0.007895402784956618),
 Row(Date=256, avg(Return)=-0.007798469643998162),
 Row(Date=21, avg(Return)=-0.007698915045168004),
 Row(Date=119, avg(Return)=-0.007550608690625525),
 Row(Date=27, avg(Return)=-0.00737862726071606

## 50 Day Moving Average

In [23]:
windowSpec = Window.orderBy(F.col("TradeDate")).rowsBetween(-50, 0)

# Calculate new moving average column using 'avg' and the windowSpec
df = df.withColumn('50DMA', F.avg("LastTradePrice").over(windowSpec)) 

In [24]:
deviation = F.log(F.col('LastTradePrice')/F.col('50DMA'))*100
df = df.withColumn("50_DMA_DEV", deviation)
df.select(['TradeDate','LastTradePrice','50DMA','50_DMA_DEV']).show()

+----------+--------------+-------------+------------------+
| TradeDate|LastTradePrice|        50DMA|        50_DMA_DEV|
+----------+--------------+-------------+------------------+
|2021-10-14|     3305.6700|3305.67000000|               0.0|
|2021-10-14|        0.0000|1652.83500000|              null|
|2021-10-14|     3308.0000|2204.55666667| 40.58173466770914|
|2021-10-14|     3306.5000|2480.04250000|28.761453100575334|
|2021-10-14|     3306.5000|2645.33400000|22.309289480202082|
|2021-10-14|     3306.5000|2755.52833333|18.228103138012862|
|2021-10-14|     3307.6700|2834.40571429|15.441172340689203|
|2021-10-14|     3306.5000|2893.41750000|13.345189868978158|
|2021-10-14|     3306.5000|2939.31555556|11.771347811639389|
|2021-10-14|     3308.0000|2976.18400000|10.570183414487957|
|2021-10-14|     3308.8800|3006.42909091| 9.585673759192895|
|2021-10-14|     3308.1100|3031.56916667|  8.72966670711106|
|2021-10-14|     3308.8800|3052.90076923| 8.051755303364086|
|2021-10-14|     3310.00

In [25]:
df.orderBy(df["50_DMA_DEV"].desc()).head(1)[0].asDict()


{'TradeDate': datetime.date(2021, 12, 31),
 'BarDateTime': datetime.datetime(2021, 12, 31, 10, 51),
 'Ticker': 'AMZN',
 'SecId': 33127,
 'OpenBarTimeOffset': Decimal('0'),
 'OpenBidPrice': Decimal('3360.00'),
 'OpenBidSize': 100,
 'OpenAskPrice': Decimal('3384.00'),
 'OpenAskSize': 100,
 'FirstTradeTimeOffset': Decimal('25.235214081'),
 'FirstTradePrice': Decimal('3375.7500'),
 'FirstTradeSize': 1,
 'HighBidTimeOffset': Decimal('0E-9'),
 'HighBidPrice': Decimal('3360.00'),
 'HighBidSize': 100,
 'HighAskTimeOffset': Decimal('0E-9'),
 'HighAskPrice': Decimal('3384.00'),
 'HighAskSize': 100,
 'HighTradeTimeOffset': Decimal('25.235214081'),
 'HighTradePrice': Decimal('3375.7500'),
 'HighTradeSize': 1,
 'LowBidTimeOffset': Decimal('0E-9'),
 'LowBidPrice': Decimal('3360.00'),
 'LowBidSize': 100,
 'LowAskTimeOffset': Decimal('0E-9'),
 'LowAskPrice': Decimal('3384.00'),
 'LowAskSize': 100,
 'LowTradeTimeOffset': Decimal('25.235214081'),
 'LowTradePrice': Decimal('3375.7500'),
 'LowTradeSize': 

## Most Active Trading Month

In [26]:
df.groupBy('Month').agg({'Volume': 'mean'}).orderBy('avg(Volume)').collect()

[Row(Month=12, avg(Volume)=1962.4917613636364),
 Row(Month=3, avg(Volume)=2188.501811594203),
 Row(Month=4, avg(Volume)=2242.108385416667),
 Row(Month=11, avg(Volume)=2308.9234734734737),
 Row(Month=1, avg(Volume)=2487.0499479166665),
 Row(Month=2, avg(Volume)=2827.877850877193),
 Row(Month=5, avg(Volume)=3102.7077876984126),
 Row(Month=10, avg(Volume)=12671.743697916667),
 Row(Month=8, avg(Volume)=27687.835670524168),
 Row(Month=9, avg(Volume)=32572.577529761904),
 Row(Month=7, avg(Volume)=35150.92723958333),
 Row(Month=6, avg(Volume)=35530.05119047619)]

## Most Actve Trading Day

In [27]:
df.groupBy('Date').agg({'Volume': 'mean'}).orderBy('avg(Volume)').collect()

[Row(Date=364, avg(Volume)=1080.2572916666666),
 Row(Date=363, avg(Volume)=1131.0291666666667),
 Row(Date=357, avg(Volume)=1217.04375),
 Row(Date=108, avg(Volume)=1217.303125),
 Row(Date=294, avg(Volume)=1251.7364583333333),
 Row(Date=315, avg(Volume)=1340.4302083333334),
 Row(Date=320, avg(Volume)=1371.8072916666667),
 Row(Date=287, avg(Volume)=1372.940625),
 Row(Date=98, avg(Volume)=1396.3916666666667),
 Row(Date=298, avg(Volume)=1396.6864583333333),
 Row(Date=84, avg(Volume)=1421.0479166666667),
 Row(Date=293, avg(Volume)=1428.2885416666666),
 Row(Date=342, avg(Volume)=1440.190625),
 Row(Date=292, avg(Volume)=1456.7895833333334),
 Row(Date=328, avg(Volume)=1487.7291666666667),
 Row(Date=101, avg(Volume)=1493.3052083333334),
 Row(Date=12, avg(Volume)=1515.665625),
 Row(Date=60, avg(Volume)=1516.9760416666666),
 Row(Date=103, avg(Volume)=1518.6375),
 Row(Date=343, avg(Volume)=1519.6385416666667),
 Row(Date=319, avg(Volume)=1541.821875),
 Row(Date=61, avg(Volume)=1547.540625),
 Row(Dat

In [29]:
df.groupBy('Ticker').agg({'Return':'mean'}).orderBy('avg(Return)').collect()

[Row(Ticker='AMZN', avg(Return)=-0.0009414439047130509)]

In [34]:
df.groupBy('Ticker').agg({'Return':'mean'}).orderBy('avg(Return)').collect()

[Row(Ticker='AMZN', avg(Return)=-0.0009414439047130509)]

In [33]:
df.select(
    F.percentile_approx("Return", [0.25, 0.5, 0.75], 1000000).alias("quantiles")
).collect()

[Row(quantiles=[-0.03243540252799592, 0.0, 0.03170792498609782])]

In [36]:
df.groupBy('Ticker').agg(F.corr('Return','UpTickVolume').alias('corr')).collect()

[Row(Ticker='AMZN', corr=0.005646206190619229)]

In [41]:
df.groupBy('Ticker').agg(F.corr('Return','DownTickVolume').alias('corr')).collect()

[Row(Ticker='AMZN', corr=-0.00014415274828344282)]

In [42]:
df.groupBy('Ticker').agg(F.corr('Return','RepeatUpTickVolume').alias('corr')).collect()

[Row(Ticker='AMZN', corr=0.025555694553735218)]

In [43]:
df.groupBy('Ticker').agg(F.corr('Return','RepeatDownTickVolume').alias('corr')).collect()

[Row(Ticker='AMZN', corr=-0.013325918547784615)]

In [45]:
df.groupBy('Ticker').agg(F.corr('Return','VolumeWeightPrice').alias('corr')).collect()

[Row(Ticker='AMZN', corr=-0.0020864728945817054)]

In [46]:
df.groupBy('Ticker').agg(F.corr('Return','TradeAtAsk').alias('corr')).collect()

[Row(Ticker='AMZN', corr=0.009754992453063048)]

In [53]:
df.groupBy('Ticker').agg(F.corr('Return','TradeAtMidAsk').alias('corr')).collect()

[Row(Ticker='AMZN', corr=0.01957501064714033)]

In [62]:
df.groupBy('Ticker').agg(F.corr('Return','TradeToMidVolWeight').alias('corr')).collect()

[Row(Ticker='AMZN', corr=0.02114204431785122)]

In [63]:
df.groupBy('Ticker').agg(F.corr('Return','TradeToMidVolWeightRelative').alias('corr')).collect()

[Row(Ticker='AMZN', corr=0.020302774326473715)]

In [73]:
df.groupBy('Ticker').agg(F.corr('Return','60_MMA_DEV').alias('corr')).collect()

[Row(Ticker='AMZN', corr=0.01465999866198453)]

In [37]:
df.groupBy('Ticker').agg(F.kurtosis('Return')).collect()

[Row(Ticker='AMZN', kurtosis(Return)=30545.774986581426)]

In [38]:
df.groupBy('Ticker').agg(F.skewness('Return')).collect()

[Row(Ticker='AMZN', skewness(Return)=-104.50196234582617)]

In [47]:
df.columns

['TradeDate',
 'BarDateTime',
 'Ticker',
 'SecId',
 'OpenBarTimeOffset',
 'OpenBidPrice',
 'OpenBidSize',
 'OpenAskPrice',
 'OpenAskSize',
 'FirstTradeTimeOffset',
 'FirstTradePrice',
 'FirstTradeSize',
 'HighBidTimeOffset',
 'HighBidPrice',
 'HighBidSize',
 'HighAskTimeOffset',
 'HighAskPrice',
 'HighAskSize',
 'HighTradeTimeOffset',
 'HighTradePrice',
 'HighTradeSize',
 'LowBidTimeOffset',
 'LowBidPrice',
 'LowBidSize',
 'LowAskTimeOffset',
 'LowAskPrice',
 'LowAskSize',
 'LowTradeTimeOffset',
 'LowTradePrice',
 'LowTradeSize',
 'CloseBarTimeOffset',
 'CloseBidPrice',
 'CloseBidSize',
 'CloseAskPrice',
 'CloseAskSize',
 'LastTradeTimeOffset',
 'LastTradePrice',
 'LastTradeSize',
 'MinSpread',
 'MaxSpread',
 'CancelSize',
 'VolumeWeightPrice',
 'NBBOQuoteCount',
 'TradeAtBid',
 'TradeAtBidMid',
 'TradeAtMid',
 'TradeAtMidAsk',
 'TradeAtAsk',
 'TradeAtCrossOrLocked',
 'Volume',
 'TotalTrades',
 'FinraVolume',
 'FinraVolumeWeightPrice',
 'UptickVolume',
 'DowntickVolume',
 'RepeatUptick

## 60 Min Moving Average

In [39]:
windowSpec1 = Window.orderBy(F.col("BarDateTime")).rowsBetween(-60, 0)

# Calculate new moving average column using 'avg' and the windowSpec
df = df.withColumn('60MMA', F.avg("LastTradePrice").over(windowSpec)) 

In [40]:
deviation1 = F.log(F.col('LastTradePrice')/F.col('60MMA'))*100
df = df.withColumn("60_MMA_DEV", deviation1)
df.select(['BarDateTime','LastTradePrice','60MMA','60_MMA_DEV']).show()

+-------------------+--------------+-------------+------------------+
|        BarDateTime|LastTradePrice|        60MMA|        60_MMA_DEV|
+-------------------+--------------+-------------+------------------+
|2021-10-14 08:00:00|     3305.6700|3305.67000000|               0.0|
|2021-10-14 08:01:00|        0.0000|1652.83500000|              null|
|2021-10-14 08:02:00|     3308.0000|2204.55666667| 40.58173466770914|
|2021-10-14 08:03:00|     3306.5000|2480.04250000|28.761453100575334|
|2021-10-14 08:04:00|     3306.5000|2645.33400000|22.309289480202082|
|2021-10-14 08:05:00|     3306.5000|2755.52833333|18.228103138012862|
|2021-10-14 08:06:00|     3307.6700|2834.40571429|15.441172340689203|
|2021-10-14 08:07:00|     3306.5000|2893.41750000|13.345189868978158|
|2021-10-14 08:08:00|     3306.5000|2939.31555556|11.771347811639389|
|2021-10-14 08:09:00|     3308.0000|2976.18400000|10.570183414487957|
|2021-10-14 08:10:00|     3308.8800|3006.42909091| 9.585673759192895|
|2021-10-14 08:11:00

In [86]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

stages = []
num_cols = ['UptickVolume','RepeatUptickVolume','DowntickVolume','RepeatDowntickVolume','TradeAtMidAsk','TradeToMidVolWeight','TradeToMidVolWeightRelative']
assembler = VectorAssembler(inputCols=num_cols, outputCol="features")
stages += [assembler]

# Create pipeline and use on dataset
pipeline = Pipeline(stages=stages)
df1 = df.dropna()
df1 = pipeline.fit(df1).transform(df1)

# Load training data
# training = spark.read.format("libsvm")\.load("data/mllib/sample_linear_regression_data.txt")
# training = df.select(['BarDateTime','UpTickVolume','RepeatUptickVolume','DownTickVolume','RepeatDownTickVolume','TradeAtMidAsk','TradeToMidVolWeight','TradeToMidVolWeightRelative'])


In [88]:
train, test = df1.randomSplit([0.90, 0.1], seed=123)
print('Train dataset count:', train.count())
print('Test dataset count:', test.count())

Train dataset count: 205903
Test dataset count: 23031


In [89]:
from pyspark.ml.feature import StandardScaler

# Fit scaler to train dataset
scaler = StandardScaler().setInputCol('features') \
        .setOutputCol('scaled_features')
scaler_model = scaler.fit(train)

# Scale train and test features
train = scaler_model.transform(train)
test = scaler_model.transform(test)

In [90]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='scaled_features', labelCol='Return')
lr_model = lr.fit(train)

In [91]:
train_predictions = lr_model.transform(train)
test_predictions = lr_model.transform(test)

In [93]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="Return", metricName="r2")

print("Train R2:", evaluator.evaluate(train_predictions))
print("Test R2:", evaluator.evaluate(test_predictions))

Train R2: 0.002298775509172124
Test R2: 0.0034769443012746537


In [92]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.0005880700166696911,0.005599020490284347,-0.0006682534583530403,-0.006391555765650179,0.00493085247806078,0.0039021395152557654,0.003477107832033288]
Intercept: -0.0019232421413790447


In [94]:
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lr_model.coefficients))
print("Intercept: %s" % str(lr_model.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lr_model.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: [-0.0005880700166696911,0.005599020490284347,-0.0006682534583530403,-0.006391555765650179,0.00493085247806078,0.0039021395152557654,0.003477107832033288]
Intercept: -0.0019232421413790447
numIterations: 0
objectiveHistory: [0.0]
+--------------------+
|           residuals|
+--------------------+
|-0.02670529268366...|
|-0.00876405895121...|
|-0.00432970866258...|
|-0.03678609376815...|
| 0.04718192417699304|
|-0.00432909976555...|
|-0.00680775822307484|
|0.008882151143198477|
|-0.00332308931570...|
| 0.01978328044928357|
|0.012599285076998815|
|0.008931458985716793|
| 0.03560868794403341|
|0.006489195994968741|
|0.003387074804273282|
|0.028696786821407194|
|0.010648582461821194|
|-0.03186954056903963|
| -0.0938085974207427|
| 0.09433766905688638|
+--------------------+
only showing top 20 rows

RMSE: 0.202513
r2: 0.002299


In [95]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

list_extract = []
for i in df1.schema['features'].metadata["ml_attr"]["attrs"]:
    list_extract = list_extract + df1.schema['features'] \
                    .metadata["ml_attr"]["attrs"][i]
varlist = pd.DataFrame(list_extract)
varlist['weight'] = varlist['idx'].apply(lambda x: coef[x])
weights = varlist.sort_values('weight', ascending = False)

NameError: name 'coef' is not defined