In [1]:
# Load the packages needed for this project
# create spark and sparkcontext objects
from pyspark.sql import SparkSession
import numpy as np

spark = SparkSession.builder.config('spark.driver.memory','16g').getOrCreate()
sc = spark.sparkContext

import pyspark
from pyspark.ml import feature, regression, Pipeline, classification, pipeline, evaluation
from pyspark.sql import functions as fn, Row
from pyspark import sql

import matplotlib.pyplot as plt
import pandas as pd

In [2]:
#If JVM crashed, we can use it to reset the gateway
from py4j.java_gateway import JavaGateway
gateway = JavaGateway()                   # connect to the JVM
random = gateway.jvm.java.util.Random()   # create a java.util.Random instance
number1 = random.nextInt(10)              # call the Random.nextInt method
number2 = random.nextInt(10)
print(number1,number2)


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:25333)

# Descriptive Analysis

In [52]:
#Load the dataset
book_example = spark.read.parquet('D:/Desktop/Jupyter Notebook/ist 718/project/optiver-realized-volatility-prediction/book_train.parquet')

book_train = spark.read.option("header",True).csv('D:/Desktop/Jupyter Notebook/ist 718/project/optiver-realized-volatility-prediction/train.csv')

trade_example = spark.read.parquet('D:/Desktop/Jupyter Notebook/ist 718/project/optiver-realized-volatility-prediction/trade_train.parquet')

In [53]:
book_example.orderBy('stock_id').show()

+-------+-----------------+----------+----------+----------+----------+---------+---------+---------+---------+--------+
|time_id|seconds_in_bucket|bid_price1|ask_price1|bid_price2|ask_price2|bid_size1|ask_size1|bid_size2|ask_size2|stock_id|
+-------+-----------------+----------+----------+----------+----------+---------+---------+---------+---------+--------+
|      5|               47| 1.0028185| 1.0032322| 1.0023013| 1.0038011|       55|        1|      100|       34|       0|
|      5|               17| 1.0014222| 1.0023013| 1.0013704| 1.0024048|        3|      100|        2|      100|       0|
|      5|               46| 1.0028185| 1.0032322| 1.0023013| 1.0038011|      155|        1|      200|       34|       0|
|      5|               11| 1.0014222| 1.0023013| 1.0013704| 1.0024048|        3|      100|        2|      100|       0|
|      5|               16| 1.0014222| 1.0023013| 1.0013704| 1.0024048|        3|      126|        2|      100|       0|
|      5|               21| 1.00

In [54]:
trade_example.orderBy('stock_id','time_id','seconds_in_bucket').show()

+-------+-----------------+---------+----+-----------+--------+
|time_id|seconds_in_bucket|    price|size|order_count|stock_id|
+-------+-----------------+---------+----+-----------+--------+
|      5|               21|1.0023013| 326|         12|       0|
|      5|               46| 1.002778| 128|          4|       0|
|      5|               50|1.0028185|  55|          1|       0|
|      5|               57|1.0031554| 121|          5|       0|
|      5|               68|1.0036459|   4|          1|       0|
|      5|               78|1.0037625| 134|          5|       0|
|      5|              122|1.0042067| 102|          3|       0|
|      5|              127|1.0045768|   1|          1|       0|
|      5|              144|  1.00437|   6|          1|       0|
|      5|              147|1.0039636| 233|          4|       0|
|      5|              177|1.0038528|   1|          1|       0|
|      5|              183|1.0039562|   2|          1|       0|
|      5|              187|1.0042665| 16

In [55]:
# Data type
book_example.printSchema()

root
 |-- time_id: short (nullable = true)
 |-- seconds_in_bucket: short (nullable = true)
 |-- bid_price1: float (nullable = true)
 |-- ask_price1: float (nullable = true)
 |-- bid_price2: float (nullable = true)
 |-- ask_price2: float (nullable = true)
 |-- bid_size1: integer (nullable = true)
 |-- ask_size1: integer (nullable = true)
 |-- bid_size2: integer (nullable = true)
 |-- ask_size2: integer (nullable = true)
 |-- stock_id: integer (nullable = true)



In [56]:
# Show the stock_id 
book_example.select("stock_id").distinct().orderBy('stock_id').show()

+--------+
|stock_id|
+--------+
|       0|
|       1|
|       2|
|       3|
|       4|
|       5|
|       6|
|       7|
|       8|
|       9|
|      10|
|      11|
|      13|
|      14|
|      15|
|      16|
|      17|
|      18|
|      19|
|      20|
+--------+
only showing top 20 rows



In [57]:
# We have 12 different stocks.
stock_count= book_example.select("stock_id").distinct().count()
# In one stock, we have 3830 time windows and each of them is limited in a ten minutes scale.
time_id_count= book_example.select("time_id").distinct().count()
# One time id means order or trade happened in an unknown 10 minutes scale in the stock market.
#plt.scatter(book_example11.select('seconds_in_bucket').toPandas())

In [58]:
# Wap
book_example= book_example.withColumn('wap',(fn.col('bid_price1')*fn.col('ask_size1')+fn.col('ask_price1')*fn.col('bid_size1'))/\
                           (fn.col('bid_size1')+fn.col('ask_size1')))

In [59]:
# Calculate the log return of each stock by window function
from pyspark.sql import Window
def log_return(list_stock_prices):
    log_return1 = fn.log(fn.col('wap'))
    log_return2 = fn.lag(fn.col('log_return1')).over(Window.partitionBy("stock_id").orderBy('seconds_in_bucket'))
    list_stock_prices = list_stock_prices.withColumn('log_return1',log_return1) 
    list_stock_prices = list_stock_prices.withColumn('log_return2',log_return2) 
    list_stock_prices= list_stock_prices.withColumn('log_return',fn.col('log_return1')-fn.col('log_return2'))    
    return list_stock_prices
example = log_return(book_example)

In [60]:
# Calculate the volatility in past 10 mins 
example_pt = example.groupBy('stock_id','time_id').agg(fn.sum(fn.col('log_return')**2).alias('past_target')).orderBy('stock_id','time_id')

In [76]:
# Take example form the large dataset
example_0_1= example.select('*').where("stock_id==0 or stock_id ==1")
example_0_1=example.drop('log_return1','log_return2').orderBy('stock_id','seconds_in_bucket')

In [77]:
# Extract distinct time_id (There are the same time windows in all datasets of 127 stocks)
example_0_1.registerTempTable('book_spark_example')
time_id_index= spark.sql('select time_id from book_spark_example').dropDuplicates().\
                       select('time_id')

In [78]:
# Splits
training_df_index, validation_df_index, testing_df_index = time_id_index.randomSplit([0.6, 0.3, 0.1], seed=0)
# Index match
training_df_index = [int(row['time_id']) for row in training_df_index.collect()]
validation_df_index = [int(row['time_id']) for row in validation_df_index.collect()]
testing_df_index = [int(row['time_id']) for row in testing_df_index.collect()]

training_df = example_0_1.where(fn.col('time_id').isin(training_df_index))
validation_df = example_0_1.where(fn.col('time_id').isin(validation_df_index))
testing_df = example_0_1.where(fn.col('time_id').isin(testing_df_index))

## LINEAR REGRESSION

In [146]:
book_train_0 = book_train.select('*').where("stock_id==0 or stock_id ==1")

features = ['seconds_in_bucket','bid_price1','ask_price1','bid_price2',\
           'ask_price2','bid_size1','bid_size2','ask_size2','wap','log_return']

expressions = [fn.avg(col).alias(col) for col in features]

train_feature= training_df.groupBy('stock_id','time_id').agg(*expressions).orderBy('stock_id','time_id')

In [150]:
train_feature.select('time_id').show()

+-------+
|time_id|
+-------+
|     11|
|     16|
|     31|
|     72|
|     97|
|    123|
|    128|
|    152|
|    169|
|    207|
|    211|
|    213|
|    218|
|    227|
|    229|
|    250|
|    254|
|    266|
|    303|
|    310|
+-------+
only showing top 20 rows



In [46]:
var = feature.VectorAssembler(inputCols=['time_id','seconds_in_bucket','bid_price1','ask_price1','bid_price2',\
           'ask_price2','bid_size1','bid_size2','ask_size2','stock_id','wap','log_return'], outputCol='features')
features_df = var.transform(training_df)
validation_df_tf=var.transform(validation_df)

In [47]:
# Linear model
lr_estimator = regression.LinearRegression(featuresCol='features',labelCol='target')
lr_estimator = lr_estimator.fit(features_df)
prediction_m= lr_estimator.transform(features_df)
predictions_df = prediction_m.select('stock_id','time_id','target','prediction')
predictions_df.show(5)

IllegalArgumentException: target does not exist. Available: time_id, seconds_in_bucket, bid_price1, ask_price1, bid_price2, ask_price2, bid_size1, ask_size1, bid_size2, ask_size2, stock_id, wap, log_return, features