# IST 718 Final Code
Team members: Deyu Li, Chensheng Ma, Zeyang Zhou

In [1]:
# Load the packages needed for this project
# create spark and sparkcontext objects
# Team members: Deyu Li, Chengshen Ma, Zeyang Zhou
from pyspark import sql
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.config('spark.driver.memory','8g').config("spark.driver.maxResultSize", "8g").\
                                                                config("spark.sql.execution.arrow.enabled", "true").getOrCreate()
sc = spark.sparkContext
import pyspark
from pyspark.ml import feature, regression, Pipeline, classification, pipeline, evaluation, clustering
from pyspark.sql import functions as fn, Row
import matplotlib.pyplot as plt
import pandas as pd

In [2]:
def _map_to_pandas(rdds):
    """ Needs to be here due to pickling issues """
    return [pd.DataFrame(list(rdds))]

def toPandas(df, n_partitions=None):
    """
    Returns the contents of `df` as a local `pandas.DataFrame` in a speedy fashion. The DataFrame is
    repartitioned if `n_partitions` is passed.
    :param df:              pyspark.sql.DataFrame
    :param n_partitions:    int or None
    :return:                pandas.DataFrame
    """
    if n_partitions is not None: df = df.repartition(n_partitions)
    df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
    df_pand = pd.concat(df_pand)
    df_pand.columns = df.columns
    return df_pand

In [3]:
#####If JVM crashed, we can use it to reset the gateway
# from py4j.java_gateway import JavaGateway
# gateway = JavaGateway()                   # connect to the JVM
# random = gateway.jvm.java.util.Random()   # create a java.util.Random instance
# number1 = random.nextInt(10)              # call the Random.nextInt method
# number2 = random.nextInt(10)
# print(number1,number2)

# Descriptive Analysis

In [4]:
#Load the dataset
#book_example = spark.read.parquet('D:/Desktop/Jupyter Notebook/ist 718/project/optiver-realized-volatility-prediction/book_train.parquet')

#book_train = spark.read.option("header",True).csv('D:/Desktop/Jupyter Notebook/ist 718/project/optiver-realized-volatility-prediction/train.csv')
#book_train = book_train.select('time_id','stock_id',fn.col('target').cast("float"))
#trade_example = spark.read.parquet('D:/Desktop/Jupyter Notebook/ist 718/project/optiver-realized-volatility-prediction/trade_train.parquet')

#Load the dataset
book_example = spark.read.parquet('C:/Users/visco/Downloads/Optiver_Realized_Volatility_Prediction-main/input/optiver-realized-volatility-prediction/book_train.parquet')

book_train = spark.read.option("header",True).csv('C:/Users/visco/Downloads/Optiver_Realized_Volatility_Prediction-main/input/optiver-realized-volatility-prediction/train.csv')

trade_example = spark.read.parquet('C:/Users/visco/Downloads/Optiver_Realized_Volatility_Prediction-main/input/optiver-realized-volatility-prediction/trade_train.parquet')
book_train = book_train.select('time_id','stock_id',fn.col('target').cast("float"))

In [None]:
# The overview of the order dataset
book_example.orderBy('stock_id').show()

In [None]:
# The overview of the trade dataset
trade_example.orderBy('stock_id','time_id','seconds_in_bucket').show()

In [None]:
# Data type
book_example.printSchema()

In [None]:
# Show the stock_id 
book_example.select("stock_id").distinct().orderBy('stock_id').show()

In [None]:
# We have 112 different stocks.
stock_count= book_example.select("stock_id").distinct().count()
# In one stock, we have 3830 time windows and each of them is limited in a ten minutes scale.
time_id_count= book_example.select("time_id").distinct().count()
# One time id means an order or trade happened in an unknown 10 minutes scale in the stock market.
#plt.scatter(book_example11.select('seconds_in_bucket').toPandas())

In [5]:
# Wap 1
book_example= book_example.withColumn('wap',(fn.col('bid_price1')*fn.col('ask_size1')+fn.col('ask_price1')*fn.col('bid_size1'))/\
                           (fn.col('bid_size1')+fn.col('ask_size1')))
# Wap 2
book_example= book_example.withColumn('wap2',(fn.col('bid_price2')*fn.col('ask_size2')+fn.col('ask_price2')*fn.col('bid_size2'))/\
                           (fn.col('bid_size2')+fn.col('ask_size2')))

In [6]:
# Calculate the log return of each stock by window function
from pyspark.sql import Window
def log_return(list_stock_prices):
    log_return1 = fn.log(fn.col('wap'))
    log_return2 = fn.lag(fn.col('log_return1')).over(Window.partitionBy("stock_id").orderBy('seconds_in_bucket'))
    list_stock_prices = list_stock_prices.withColumn('log_return1',log_return1) 
    list_stock_prices = list_stock_prices.withColumn('log_return2',log_return2) 
    list_stock_prices= list_stock_prices.withColumn('log_return',fn.col('log_return1')-fn.col('log_return2'))    
    return list_stock_prices
example = log_return(book_example)
def log_return_w2(list_stock_prices):
    log_return3 = fn.log(fn.col('wap2'))
    log_return4 = fn.lag(fn.col('log_return3')).over(Window.partitionBy("stock_id").orderBy('seconds_in_bucket'))
    list_stock_prices = list_stock_prices.withColumn('log_return3',log_return3) 
    list_stock_prices = list_stock_prices.withColumn('log_return4',log_return4) 
    list_stock_prices= list_stock_prices.withColumn('log_return_w2',fn.col('log_return3')-fn.col('log_return4'))    
    return list_stock_prices
example = log_return_w2(example)

In [7]:
#0-112
index = list(range(121))
example_0_1 = example[example.stock_id.isin(index)]
example_0_1 = example_0_1.drop('log_return1','log_return2').orderBy('stock_id','seconds_in_bucket')

In [8]:
# Calculate the volatility in past 10 mins 
example_pt = example_0_1.groupBy('stock_id','time_id').agg(fn.sum(fn.col('log_return')**2).alias('past_target')).orderBy('stock_id','time_id')

In [9]:
# Extract distinct time_id (There are the same time windows in all datasets of 127 stocks)
example_0_1.createOrReplaceTempView('book_spark_example')
time_id_index= spark.sql('select time_id from book_spark_example').dropDuplicates().\
                       select('time_id')

## Features Engineering

In [10]:
# bid&ask spread
example_0_1 = example_0_1.withColumn('bidask1',fn.col('bid_price1')*fn.col('ask_price1'))
example_0_1 = example_0_1.withColumn('bidask2',fn.col('bid_price2')*fn.col('ask_price2'))
example_0_1 = example_0_1.withColumn('bidask3',fn.col('bid_price1')*fn.col('ask_price2'))
example_0_1 = example_0_1.withColumn('bidask4',fn.col('bid_price2')*fn.col('ask_price1'))

example_0_1 = example_0_1.withColumn('diff1',fn.col('bid_price1')*fn.col('bid_size1')-fn.col('bid_price2')*fn.col('bid_size2'))
example_0_1 = example_0_1.withColumn('diff2',fn.col('bid_price1')*fn.col('ask_size1')-fn.col('bid_price2')*fn.col('ask_size2'))
example_0_1 = example_0_1.withColumn('diff3',fn.col('ask_price1')*fn.col('ask_size1')-fn.col('ask_price2')*fn.col('ask_size2'))
example_0_1 = example_0_1.withColumn('diff4',fn.col('ask_price1')*fn.col('bid_size1')-fn.col('ask_price2')*fn.col('bid_size2'))

example_0_1 = example_0_1.withColumn('price_spread',(fn.col('bid_price1')-fn.col('ask_price1'))/(fn.col('bid_price1')+fn.col('ask_price1'))/2)
example_0_1 = example_0_1.withColumn('price_spread2',(fn.col('bid_price2')-fn.col('ask_price2'))/(fn.col('bid_price2')+fn.col('ask_price2'))/2)

example_0_1 = example_0_1.withColumn('bid_spread',fn.col('bid_price1')-fn.col('bid_price2'))
example_0_1 = example_0_1.withColumn('ask_spread',fn.col('ask_price1')-fn.col('ask_size2'))
example_0_1 = example_0_1.withColumn('bid_ask_spread',fn.col('bid_spread')-fn.col('ask_spread'))

#WAP spread
example_0_1 = example_0_1.withColumn('wap3',(fn.col('bid_price1')*fn.col('ask_size2')+fn.col('ask_price1')*fn.col('bid_size2'))/\
                           (fn.col('bid_size1')+fn.col('ask_size1')))
example_0_1 = example_0_1.withColumn('wap4',(fn.col('bid_price2')*fn.col('ask_size1')+fn.col('ask_price2')*fn.col('bid_size1'))/\
                           (fn.col('bid_size2')+fn.col('ask_size2')))
example_0_1 = example_0_1.withColumn('wap_balance',fn.col('wap')-fn.col('wap2'))
example_0_1 = example_0_1.withColumn('wap_balance2',fn.col('wap3')-fn.col('wap4'))

#size spread
example_0_1 = example_0_1.withColumn('total_volume',fn.col('bid_size1')+fn.col('bid_size2')+fn.col('ask_size1')+fn.col('ask_size2'))
example_0_1 = example_0_1.withColumn('volume_imbalance',fn.abs(fn.col('ask_size1')+fn.col('ask_size2')-fn.col('bid_size1')-fn.col('bid_size2')))

# features spread
example_0_1 = example_0_1.withColumn('wap_balance3',fn.col('wap')-fn.col('wap4'))
example_0_1 = example_0_1.withColumn('wap_balance4',fn.col('wap2')-fn.col('wap3'))
example_0_1 = example_0_1.withColumn('wap_balance5',fn.col('wap2')-fn.col('wap4'))

example_0_1 = example_0_1.withColumn('bidsize_spread',fn.col('bid_size1')-fn.col('ask_size1'))
example_0_1 = example_0_1.withColumn('bidsize_spread1',fn.col('bid_size1')-fn.col('ask_size2'))
example_0_1 = example_0_1.withColumn('asksize_spread',fn.col('bid_size2')-fn.col('ask_size2'))
example_0_1 = example_0_1.withColumn('bidsize_asksize_spread',fn.col('bidsize_spread')-fn.col('asksize_spread'))

example_0_1 = example_0_1.withColumn('wap_sqrt1',fn.sqrt('wap'))
example_0_1 = example_0_1.withColumn('wap_sqrt2',fn.sqrt('wap2'))
example_0_1 = example_0_1.withColumn('wap_sqrt3',fn.sqrt('wap3'))
example_0_1 = example_0_1.withColumn('wap_sqrt4',fn.sqrt('wap4'))

example_0_1 = example_0_1.withColumn('bid_sqrt1',fn.sqrt('bid_price1'))
example_0_1 = example_0_1.withColumn('bid_sqrt2',fn.sqrt('bid_price2'))
example_0_1 = example_0_1.withColumn('ask_sqrt1',fn.sqrt('ask_price1'))
example_0_1 = example_0_1.withColumn('ask_sqrt2',fn.sqrt('ask_price2'))

## Splits

In [11]:
# Splitting
training_df_index, validation_df_index, testing_df_index = time_id_index.randomSplit([0.6, 0.3, 0.1], seed=0)
# Index match
training_df_index = [int(row['time_id']) for row in training_df_index.collect()]
validation_df_index = [int(row['time_id']) for row in validation_df_index.collect()]
testing_df_index = [int(row['time_id']) for row in testing_df_index.collect()]

training_df = example_0_1.where(fn.col('time_id').isin(training_df_index))
validation_df = example_0_1.where(fn.col('time_id').isin(validation_df_index))
testing_df = example_0_1.where(fn.col('time_id').isin(testing_df_index))

In [12]:
book_train_0 = book_train.select('*').where("stock_id==0 or stock_id ==1")

features = ['seconds_in_bucket','bid_price1','ask_price1','bid_price2',\
           'ask_price2','bid_size1','bid_size2','ask_size2','wap','log_return','wap2','log_return_w2',\
            'wap3','wap4','bidask1','bidask2','bidask3','bidask4','diff1','diff2','diff3','diff4',\
           'price_spread','price_spread2','wap_balance','wap_balance2','bid_spread','ask_spread','bid_ask_spread',\
           'total_volume','volume_imbalance','wap_balance3','wap_balance4','wap_balance5',\
           'bidsize_spread','bidsize_spread1','asksize_spread','bidsize_asksize_spread','wap_sqrt1','wap_sqrt2','wap_sqrt3',\
           'wap_sqrt4','bid_sqrt1','bid_sqrt2','ask_sqrt1','ask_sqrt2']

expressions = [fn.avg(col).alias(col) for col in features]

train_feature= training_df.groupBy('stock_id','time_id').agg(*expressions).orderBy('stock_id','time_id')

valid_feature = validation_df.groupBy('stock_id','time_id').agg(*expressions).orderBy('stock_id','time_id')

test_feature = testing_df.groupBy('stock_id','time_id').agg(*expressions).orderBy('stock_id','time_id')

train_0_1 = train_feature.join(book_train_0, on = ['stock_id','time_id'])
valid_0_1 = valid_feature.join(book_train_0, on = ['stock_id','time_id'])
test_0_1 = test_feature.join(book_train_0, on = ['stock_id','time_id'])

train_0_1 = train_0_1.join(example_pt, on = ['stock_id','time_id'])
valid_0_1 = valid_0_1.join(example_pt, on = ['stock_id','time_id'])
test_0_1 = test_0_1.join(example_pt, on = ['stock_id','time_id'])

In [13]:
# We try different combination lists to find the best features
feature_list = ['seconds_in_bucket','bid_price1','ask_price1','bid_price2',\
           'ask_price2','bid_size1','bid_size2','ask_size2','wap','log_return','wap2','log_return_w2',\
            'wap3','wap4','bidask1','bidask2','bidask3','bidask4','diff1','diff2','diff3','diff4',\
           'price_spread','price_spread2','wap_balance','wap_balance2','bid_spread','ask_spread','bid_ask_spread',\
           'total_volume','volume_imbalance','wap_balance3','wap_balance4','wap_balance5',\
           'bidsize_spread','bidsize_spread1','asksize_spread','bidsize_asksize_spread','wap_sqrt1','wap_sqrt2','wap_sqrt3',\
           'wap_sqrt4','bid_sqrt1','bid_sqrt2','ask_sqrt1','ask_sqrt2','past_target']

kmeans = clustering.KMeans(k=10, featuresCol='features_2', predictionCol='clustering')
# Feature pipeline

featurize = Pipeline(stages=[
    feature.VectorAssembler(inputCols=feature_list,
                           outputCol='features_1'),
    feature.StandardScaler(withMean=True,
                           inputCol='features_1', outputCol='features_2'),
    kmeans,
    feature.VectorAssembler(inputCols=['clustering','features_2'],
                           outputCol='features')
]).fit(train_0_1)

## RMSPE

In [14]:
# Rmspe
expr = fn.sqrt(fn.mean((((fn.col('target')-fn.col('prediction'))/fn.col('target')))**2))

## Linear Regression

In [15]:
# Linear model: all features
features_df = featurize.transform(train_0_1)
features_df = features_df.select('stock_id','time_id','target','features')
validation_df_tf=featurize.transform(valid_0_1)
validation_df_tf = validation_df_tf.select('stock_id','time_id','target','features')
lr_estimator = regression.LinearRegression(featuresCol='features',labelCol='target')
lr_estimator = lr_estimator.fit(features_df)
prediction_m= lr_estimator.transform(validation_df_tf)
predictions_df = prediction_m.select('stock_id','time_id','target','prediction').orderBy('stock_id','time_id')
# RMSPE
rmspe = predictions_df.select(expr.alias('rmspe'))
print(rmspe.toPandas().values[0][0])
# R2 and AdjR2
print(lr_estimator.summary.r2)
print(lr_estimator.summary.r2adj)

0.48678633325165616
0.5740882868901542
0.569511666735851


## Stepwise

In [35]:
# We use a for-loop to run our stepwise method
feature_list_1 = ['time_id','seconds_in_bucket','bid_price1','ask_price1','bid_price2',\
           'ask_price2','bid_size1','bid_size2','ask_size2','wap','log_return','wap2','log_return_w2',\
            'wap3','wap4','bidask1','bidask2','bidask3','bidask4','diff1','diff2','diff3','diff4',\
           'price_spread','price_spread2','wap_balance','wap_balance2','bid_spread','ask_spread','bid_ask_spread',\
           'total_volume','volume_imbalance','wap_balance3','wap_balance4','wap_balance5',\
           'bidsize_spread','bidsize_spread1','asksize_spread','bidsize_asksize_spread','wap_sqrt1','wap_sqrt2','wap_sqrt3',\
           'wap_sqrt4','bid_sqrt1','bid_sqrt2','ask_sqrt1','ask_sqrt2','past_target']
RMSPE = []
for i in range(len(feature_list_1)):
    feature_list1 = feature_list_1[:i+2]
    featurize = Pipeline(stages=[
    feature.VectorAssembler(inputCols=feature_list1,
                           outputCol='features_1'),
    feature.StandardScaler(withMean=True,
                           inputCol='features_1', outputCol='features_2'),
    kmeans,
    feature.VectorAssembler(inputCols=['clustering','features_2'],
                           outputCol='features')
]).fit(train_0_1)
    features_df = featurize.transform(train_0_1)
    features_df = features_df.select('stock_id','time_id','target','features')
    validation_df_tf=featurize.transform(valid_0_1)
    validation_df_tf = validation_df_tf.select('stock_id','time_id','target','features')
    lr_estimator = regression.LinearRegression(featuresCol='features',labelCol='target')
    lr_estimator = lr_estimator.fit(features_df)
    prediction_m= lr_estimator.transform(validation_df_tf)
    predictions_df = prediction_m.select('stock_id','time_id','target','prediction').orderBy('stock_id','time_id')
    rmspe = predictions_df.select(expr.alias('rmspe'))
    rmspe = rmspe.toPandas().values[0][0]
    print(rmspe)

0.7704907395797467
0.7684489409757145
0.5051339281156756
0.5044966409441641
0.48317289440273675
0.5014564715604184
0.4992756528522388
0.49286218213040867
0.49028674511329956


## GBRT

In [25]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import feature, regression, evaluation, Pipeline
## GBTRegressor
gbr = GBTRegressor(labelCol='target',seed=0)
gbr1 = GBTRegressor(labelCol='target',maxDepth=3, maxBins=20,seed=0)
gbr2 = GBTRegressor(labelCol='target',maxDepth=4, maxBins=30,seed=0)
gbr3 = GBTRegressor(labelCol='target',maxDepth=5, maxBins=40,seed=0)

gbr_pipe = Pipeline(stages=[featurize ,gbr]).fit(train_0_1)
gbr_pipe1 = Pipeline(stages=[featurize ,gbr1]).fit(train_0_1)
gbr_pipe2 = Pipeline(stages=[featurize ,gbr2]).fit(train_0_1)
gbr_pipe3 = Pipeline(stages=[featurize ,gbr3]).fit(train_0_1)

In [26]:
prediction_gbr= gbr_pipe.transform(valid_0_1)
predictions_gbr = prediction_gbr.select('stock_id','time_id','target','prediction').orderBy('stock_id','time_id')
prediction_gbr1= gbr_pipe1.transform(valid_0_1)
predictions_gbr1 = prediction_gbr1.select('stock_id','time_id','target','prediction').orderBy('stock_id','time_id')
prediction_gbr2= gbr_pipe2.transform(valid_0_1)
predictions_gbr2 = prediction_gbr2.select('stock_id','time_id','target','prediction').orderBy('stock_id','time_id')
prediction_gbr3= gbr_pipe3.transform(valid_0_1)
predictions_gbr3 = prediction_gbr3.select('stock_id','time_id','target','prediction').orderBy('stock_id','time_id')

In [29]:
rmspe = predictions_gbr.select(expr.alias('rmspe'))
rmspe = rmspe.toPandas().values[0][0]
print(rmspe)
rmspe1 = predictions_gbr1.select(expr.alias('rmspe1'))
rmspe1 = rmspe1.toPandas().values[0][0]
print(rmspe1)
rmspe2 = predictions_gbr2.select(expr.alias('rmspe2'))
rmspe2 = rmspe2.toPandas().values[0][0]
print(rmspe2)
rmspe3 = predictions_gbr3.select(expr.alias('rmspe3'))
rmspe3 = rmspe3.toPandas().values[0][0]
print(rmspe3)

0.40711654681934684
0.4164272519431899
0.405455024684439
0.3969574037163013


## RF

In [17]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
rf1 = RandomForestRegressor(labelCol='target',maxDepth=6, maxBins=60, seed=0)
pipe_rf1 = Pipeline(stages=[featurize ,rf1]).fit(train_0_1)
rf2 = RandomForestRegressor(labelCol='target',maxDepth=10, maxBins=40, seed=0)
pipe_rf2 = Pipeline(stages=[featurize ,rf2]).fit(train_0_1)
rf3 = RandomForestRegressor(labelCol='target',maxDepth=30, maxBins=20, seed=0)
pipe_rf3 = Pipeline(stages=[featurize ,rf3]).fit(train_0_1)

In [21]:
prediction_rf1= pipe_rf1.transform(valid_0_1)
predictions_rf1 = prediction_rf1.select('stock_id','time_id','target','prediction').orderBy('stock_id','time_id')
prediction_rf2= pipe_rf2.transform(valid_0_1)
predictions_rf2 = prediction_rf2.select('stock_id','time_id','target','prediction').orderBy('stock_id','time_id')
prediction_rf3= pipe_rf3.transform(valid_0_1)
predictions_rf3 = prediction_rf3.select('stock_id','time_id','target','prediction').orderBy('stock_id','time_id')

In [22]:
example_tree = pipe_rf1.stages[-1].trees[1].toDebugString
print(example_tree)

DecisionTreeRegressionModel: uid=dtr_b4a328c9dda7, depth=6, numNodes=109, numFeatures=48
  If (feature 23 <= -0.19374832230393635)
   If (feature 17 <= -1.5381100210787697)
    If (feature 13 <= 2.118605676237279)
     If (feature 35 <= 0.9328709252406794)
      If (feature 38 <= -2.0480748532179027)
       Predict: 0.01970827206969261
      Else (feature 38 > -2.0480748532179027)
       If (feature 42 <= 0.6744653867740833)
        Predict: 0.008895230970235282
       Else (feature 42 > 0.6744653867740833)
        Predict: 0.005359369197062084
     Else (feature 35 > 0.9328709252406794)
      If (feature 25 <= 0.95957836465277)
       If (feature 1 <= -0.4173028031630112)
        Predict: 0.011618120595812798
       Else (feature 1 > -0.4173028031630112)
        Predict: 0.016164269670844077
      Else (feature 25 > 0.95957836465277)
       If (feature 42 <= 0.47241367678926593)
        Predict: 0.007682507392019033
       Else (feature 42 > 0.47241367678926593)
        Predict: 0.012

In [23]:
rmspe1 = predictions_rf1.select(expr.alias('rmspe'))
rmspe1 = rmspe1.toPandas().values[0][0]
print(rmspe1)
rmspe2 = predictions_rf2.select(expr.alias('rmspe'))
rmspe2 = rmspe2.toPandas().values[0][0]
print(rmspe2)
rmspe3 = predictions_rf3.select(expr.alias('rmspe'))
rmspe3 = rmspe3.toPandas().values[0][0]
print(rmspe3)

0.4178629738533463
0.4070248202754393
0.42254949601863223
