# Big Data Project - Trading at the close

Create a spark session, the app created is named 'Optiver', which is the provider of this dataset

In [25]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("Optiver").getOrCreate()

## Getting the data
The data is in a zip file, it is uploaded to file storage, and unzipped to obtain the train.csv file which has all data

In [26]:
import zipfile

zip_file_path = 'train.csv.zip'  
output_folder = 'optiver_bigdata' 

with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(output_folder)

print(f'The contents of {zip_file_path} have been successfully extracted to {output_folder}.')


The contents of train.csv.zip have been successfully extracted to optiver_bigdata.


A spark data frame is created from the uploaded csv.

In [27]:
# Read the CSV file into a DataFrame
df = spark.read.csv("optiver_bigdata/train.csv", header=True, inferSchema=True)

# Register the DataFrame as a temporary table
df.createOrReplaceTempView("train_csv")

                                                                                

## Data Exploration



In [28]:
df.show()

[Stage 167:==>              (1 + 1) / 6][Stage 169:==>              (1 + 1) / 6]

+--------+-------+-----------------+--------------+-----------------------+---------------+-------------+---------+----------+---------+---------+---------+---------+---+----------+-------+------+
|stock_id|date_id|seconds_in_bucket|imbalance_size|imbalance_buy_sell_flag|reference_price| matched_size|far_price|near_price|bid_price| bid_size|ask_price| ask_size|wap|    target|time_id|row_id|
+--------+-------+-----------------+--------------+-----------------------+---------------+-------------+---------+----------+---------+---------+---------+---------+---+----------+-------+------+
|       0|      0|                0|    3180602.69|                      1|       0.999812|1.338027664E7|     NULL|      NULL| 0.999812|  60651.5| 1.000026|  8493.03|1.0| -3.029704|      0| 0_0_0|
|       1|      0|                0|     166603.91|                     -1|       0.999896|   1642214.25|     NULL|      NULL| 0.999896|  3233.04|  1.00066| 20605.09|1.0| -5.519986|      0| 0_0_1|
|       2|     

                                                                                

In [29]:
df.columns

['stock_id',
 'date_id',
 'seconds_in_bucket',
 'imbalance_size',
 'imbalance_buy_sell_flag',
 'reference_price',
 'matched_size',
 'far_price',
 'near_price',
 'bid_price',
 'bid_size',
 'ask_price',
 'ask_size',
 'wap',
 'target',
 'time_id',
 'row_id']

In [30]:
df.printSchema()

root
 |-- stock_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- seconds_in_bucket: integer (nullable = true)
 |-- imbalance_size: double (nullable = true)
 |-- imbalance_buy_sell_flag: integer (nullable = true)
 |-- reference_price: double (nullable = true)
 |-- matched_size: double (nullable = true)
 |-- far_price: double (nullable = true)
 |-- near_price: double (nullable = true)
 |-- bid_price: double (nullable = true)
 |-- bid_size: double (nullable = true)
 |-- ask_price: double (nullable = true)
 |-- ask_size: double (nullable = true)
 |-- wap: double (nullable = true)
 |-- target: double (nullable = true)
 |-- time_id: integer (nullable = true)
 |-- row_id: string (nullable = true)



There are **17** columns in the data. Column names and their data types:

| Column Name             | Data Type   |
|-------------------------|-------------|
| stock_id                | integer     |
| date_id                 | integer     |
| seconds_in_bucket       | integer     |
| imbalance_size          | double      |
| imbalance_buy_sell_flag | integer     |
| reference_price         | double      |
| matched_size            | double      |
| far_price               | double      |
| near_price              | double      |
| bid_price               | double      |
| bid_size                | double      |
| ask_price               | double      |
| ask_size                | double      |
| wap                     | double      |
| target                  | double      |
| time_id                 | integer     |
| row_id                  | string      |


In [31]:
# To count the number of rows in the DataFrame
row_count = df.count()
print("Number of rows in the DataFrame:", row_count)

[Stage 167:>  (1 + 1) / 6][Stage 169:>  (1 + 1) / 6][Stage 176:==>(4 + 1) / 5]6]

Number of rows in the DataFrame: 5237980


                                                                                

In [32]:
# Count distinct stock ids in csv
spark.sql("select count(distinct(stock_id)) as Number_of_Stocks from train_csv").show()




[Stage 167:==>              (1 + 1) / 6][Stage 169:==>              (1 + 1) / 6]

+----------------+
|Number_of_Stocks|
+----------------+
|             200|
+----------------+



                                                                                

In [33]:
from pyspark.sql.functions import col
df_stock_id_0 = df.filter((col("stock_id")==0) & (col("date_id")==0))

In [34]:
df_stock_id_0.count()

                                                                                

55

In [35]:
# Count distinct dates. This gives us number of days the data spans
spark.sql("select count(distinct(date_id)) as Number_of_Days from train_csv").show()



[Stage 167:==>              (1 + 1) / 6][Stage 169:==>              (1 + 1) / 6]

+--------------+
|Number_of_Days|
+--------------+
|           481|
+--------------+



                                                                                

### Data understanding:
Number of stocks in data = 200

Number of days for which data is captured = 481

For each stock each day, number of data points = 55

Rows = 5237980 , Columns = 17 

## Data Cleaning

We start off by analysing null values in our data and fixing it.

In [36]:
from pyspark.sql.functions import col

# Create a list of columns and their respective null counts
null_counts = [df.where(col(c).isNull()).count() for c in df.columns]

# Create a dictionary to map column names to their null counts
columns_with_nulls = {col: count for col, count in zip(df.columns, null_counts) if count > 0}

# Display columns with non-zero null values
print("Columns with non-zero null values:")
for col, count in columns_with_nulls.items():
    print(f"{col}: {count} null values")



[Stage 167:>  (1 + 1) / 6][Stage 169:>  (1 + 1) / 6][Stage 242:==>(4 + 1) / 5]6]

Columns with non-zero null values:
imbalance_size: 220 null values
reference_price: 220 null values
matched_size: 220 null values
far_price: 2894342 null values
near_price: 2857180 null values
bid_price: 220 null values
ask_price: 220 null values
wap: 220 null values


                                                                                

In [37]:
#Analyse null values in the dataset
print('Data where ask_price is null:')
spark.sql("select stock_id, date_id, count(*) from train_csv where ask_price is null group by stock_id, date_id").show()
print('Data where bid_price is null:')
spark.sql("select stock_id, date_id, count(*) from train_csv where bid_price is null group by stock_id, date_id").show()
print('Data where imbalance_size is null:')
spark.sql("select stock_id, date_id, count(*) from train_csv where imbalance_size is null group by stock_id, date_id").show()
print('Data where reference_price is null:')
spark.sql("select stock_id, date_id, count(*) from train_csv where reference_price is null group by stock_id, date_id").show()
print('Data where matched_size is null:')
spark.sql("select stock_id, date_id, count(*) from train_csv where matched_size is null group by stock_id, date_id").show()
print('Data where wap is null:')
spark.sql("select stock_id, date_id, count(*) from train_csv where wap is null group by stock_id, date_id").show()


Data where ask_price is null:


                                                                                

+--------+-------+--------+
|stock_id|date_id|count(1)|
+--------+-------+--------+
|     131|     35|      55|
|     158|    388|      55|
|     101|    328|      55|
|      19|    438|      55|
+--------+-------+--------+

Data where bid_price is null:


                                                                                

+--------+-------+--------+
|stock_id|date_id|count(1)|
+--------+-------+--------+
|     131|     35|      55|
|     158|    388|      55|
|     101|    328|      55|
|      19|    438|      55|
+--------+-------+--------+

Data where imbalance_size is null:


                                                                                

+--------+-------+--------+
|stock_id|date_id|count(1)|
+--------+-------+--------+
|     131|     35|      55|
|     158|    388|      55|
|     101|    328|      55|
|      19|    438|      55|
+--------+-------+--------+

Data where reference_price is null:


                                                                                

+--------+-------+--------+
|stock_id|date_id|count(1)|
+--------+-------+--------+
|     131|     35|      55|
|     158|    388|      55|
|     101|    328|      55|
|      19|    438|      55|
+--------+-------+--------+

Data where matched_size is null:


                                                                                

+--------+-------+--------+
|stock_id|date_id|count(1)|
+--------+-------+--------+
|     131|     35|      55|
|     158|    388|      55|
|     101|    328|      55|
|      19|    438|      55|
+--------+-------+--------+

Data where wap is null:


[Stage 167:>  (1 + 1) / 6][Stage 169:>  (1 + 1) / 6][Stage 260:==>(4 + 1) / 5]6]

+--------+-------+--------+
|stock_id|date_id|count(1)|
+--------+-------+--------+
|     131|     35|      55|
|     158|    388|      55|
|     101|    328|      55|
|      19|    438|      55|
+--------+-------+--------+



                                                                                

It can be seen that for stock_ids **131,158,101 and 19** we dont have any values for **reference_price**, **matched_size**, **bid_price**, **bid_size**,**ask_price**, **wap**, **target**.



In [38]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Assuming you already have a SparkSession named spark and your DataFrame is named df

# Define the window specification
window_spec = Window.partitionBy("stock_id").orderBy("date_id", "seconds_in_bucket").rowsBetween(Window.unboundedPreceding, 0)

# Define columns to fill
columns_to_fill = ["imbalance_size", "reference_price", "matched_size", "bid_price", "ask_price", "wap"]

# Apply backward fill for each column
for column in columns_to_fill:
    df = df.withColumn(column, F.last(column, True).over(window_spec))

# Show the DataFrame after filling null values
df.show()


[Stage 167:>  (1 + 1) / 6][Stage 169:>  (1 + 1) / 6][Stage 265:>  (0 + 1) / 1]6]

+--------+-------+-----------------+--------------+-----------------------+---------------+-------------+---------+----------+---------+--------+---------+--------+--------+-----------+-------+--------+
|stock_id|date_id|seconds_in_bucket|imbalance_size|imbalance_buy_sell_flag|reference_price| matched_size|far_price|near_price|bid_price|bid_size|ask_price|ask_size|     wap|     target|time_id|  row_id|
+--------+-------+-----------------+--------------+-----------------------+---------------+-------------+---------+----------+---------+--------+---------+--------+--------+-----------+-------+--------+
|      12|      0|                0| 1.173994544E7|                      1|       0.999794| 1.35971187E7|     NULL|      NULL| 0.999794| 33221.0| 1.000296| 47824.0|     1.0|   8.399487|      0|  0_0_12|
|      12|      0|               10| 1.019305804E7|                      1|       1.000882|1.512392596E7|     NULL|      NULL| 1.000673| 1674.47| 1.000965| 5503.44|1.000741|   4.990101|   

                                                                                

## Fixing Null Values

In [39]:
# Create a list of columns and their respective null counts
null_counts = [df.where(col(c).isNull()).count() for c in df.columns]

# Create a dictionary to map column names to their null counts
columns_with_nulls = {col: count for col, count in zip(df.columns, null_counts) if count > 0}

# Display columns with non-zero null values
print("Columns with non-zero null values:")
for col, count in columns_with_nulls.items():
    print(f"{col}: {count} null values")

[Stage 167:>  (1 + 1) / 6][Stage 169:>  (1 + 1) / 6][Stage 332:==>(4 + 1) / 5]  

Columns with non-zero null values:
far_price: 2894342 null values
near_price: 2857180 null values


                                                                                

In [40]:
# Fill remaining null values in "far_price" and "near_price" with 0
df = df.fillna(0, subset=["far_price", "near_price"])

# Show the first 60 rows of the DataFrame after filling all null values
df.show(60)


[Stage 167:==>              (1 + 1) / 6][Stage 169:==>              (1 + 1) / 6]

+--------+-------+-----------------+--------------+-----------------------+---------------+-------------+---------+----------+---------+---------+---------+---------+--------+-----------+-------+--------+
|stock_id|date_id|seconds_in_bucket|imbalance_size|imbalance_buy_sell_flag|reference_price| matched_size|far_price|near_price|bid_price| bid_size|ask_price| ask_size|     wap|     target|time_id|  row_id|
+--------+-------+-----------------+--------------+-----------------------+---------------+-------------+---------+----------+---------+---------+---------+---------+--------+-----------+-------+--------+
|      12|      0|                0| 1.173994544E7|                      1|       0.999794| 1.35971187E7|      0.0|       0.0| 0.999794|  33221.0| 1.000296|  47824.0|     1.0|   8.399487|      0|  0_0_12|
|      12|      0|               10| 1.019305804E7|                      1|       1.000882|1.512392596E7|      0.0|       0.0| 1.000673|  1674.47| 1.000965|  5503.44|1.000741|   4.

                                                                                

In [41]:
df.createOrReplaceTempView("filled_csv")

print('Data where near_price is null:')
spark.sql("select stock_id, date_id, count(*) as Number_of_null_values from filled_csv where near_price is null group by stock_id, date_id").show(200)
print('Data where far_price is null:')
spark.sql("select stock_id, date_id, count(*) as Number_of_null_values from filled_csv where far_price is null group by stock_id, date_id").show(200)

Data where near_price is null:
+--------+-------+---------------------+
|stock_id|date_id|Number_of_null_values|
+--------+-------+---------------------+
+--------+-------+---------------------+

Data where far_price is null:
+--------+-------+---------------------+
|stock_id|date_id|Number_of_null_values|
+--------+-------+---------------------+
+--------+-------+---------------------+



In [None]:
# Downloading the csv before normalization for plotly vizualization 

df = df.repartition(1)
df.write.csv("cleaned_optiver_data", header=True)

## Data Normalization

In [42]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

columns_to_scale = [
      'imbalance_size', 'reference_price', 'matched_size', 'far_price', 'near_price', 'bid_price', 'bid_size',
     'ask_price', 'ask_size', 'wap']

# Assemble columns into a single vector
assembler = VectorAssembler(inputCols=columns_to_scale, outputCol="scaled_features")
df = assembler.transform(df)

# Apply MinMaxScaler to normalize the features
scaler = MinMaxScaler(inputCol="scaled_features", outputCol="normalized_features")
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)

# Show normalized features
df.show(truncate=True)


[Stage 167:>  (1 + 1) / 6][Stage 169:>  (1 + 1) / 6][Stage 346:>  (0 + 1) / 1]6]

+--------+-------+-----------------+--------------+-----------------------+---------------+-------------+---------+----------+---------+--------+---------+--------+--------+-----------+-------+--------+--------------------+--------------------+
|stock_id|date_id|seconds_in_bucket|imbalance_size|imbalance_buy_sell_flag|reference_price| matched_size|far_price|near_price|bid_price|bid_size|ask_price|ask_size|     wap|     target|time_id|  row_id|     scaled_features| normalized_features|
+--------+-------+-----------------+--------------+-----------------------+---------------+-------------+---------+----------+---------+--------+---------+--------+--------+-----------+-------+--------+--------------------+--------------------+
|      12|      0|                0| 1.173994544E7|                      1|       0.999794| 1.35971187E7|      0.0|       0.0| 0.999794| 33221.0| 1.000296| 47824.0|     1.0|   8.399487|      0|  0_0_12|[1.173994544E7,0....|[0.00393690004969...|
|      12|      0|  

                                                                                

In [51]:
# Save the normalized data as a hive table for downstream tasks

df.write.option('path', 'hive_table_bigdata').saveAsTable('t')

[Stage 167:==>              (1 + 1) / 6][Stage 169:==>              (1 + 1) / 6]