## Parse the L2 CSV Files

In [None]:
%pip install polars pandas plotly numpy scikit-learn torch

### Process 1 CSV first

In [None]:
import polars as pl
import pandas as pd
import numpy as np
import ast
import re
import os
import plotly


In [None]:

# Replace 'path/to/your/file.csv' with the actual path to your CSV file
# the last two columns are , [ with commas inside, so we need to handle that]

file_path = 'CFE_20241006_Level2.csv'
df = pd.read_csv(file_path)
df['bids'] = df['bids'].apply(ast.literal_eval)
df['asks'] = df['asks'].apply(ast.literal_eval)

df_plr = pl.from_pandas(df)
display(df_plr.head())

#print(df_plr["asks"][0])

In [None]:

df_plr = df_plr.with_columns(
    pl.col("srcTime").str.to_datetime(format="%Y-%m-%d %H:%M:%S.%f")
)
display(df_plr.head())

In [None]:
# Create expressions to extract bid and ask quantities and numbers for 10 levels
expressions = []
for i in range(10):
    # Bids: quantity and number of orders
    expressions.append(pl.col("bids").list.get(i).list.get(0).alias(f"bid_qty_{i}"))
    expressions.append(pl.col("bids").list.get(i).list.get(1).alias(f"bid_num_{i}"))
    # Asks: quantity and number of orders
    expressions.append(pl.col("asks").list.get(i).list.get(0).alias(f"ask_qty_{i}"))
    expressions.append(pl.col("asks").list.get(i).list.get(1).alias(f"ask_num_{i}"))

# Add the new columns to the DataFrame and drop the original list columns
df_plr_mod = df_plr.with_columns(expressions).drop("bids", "asks")

# Display the head of the transformed DataFrame to verify
display(df_plr_mod.head())

In [None]:
# print current working directory
print("Current working directory:", os.getcwd())

In [None]:
import glob

# Find all CSV files in the current directory matching the pattern
csv_files = glob.glob('CFE_*_Level2.csv')

# List to hold processed dataframes
all_dfs = []

# Re-create the expressions for expanding bids/asks as they were defined in the previous cell
expressions = []
for i in range(10):
    # Bids: quantity and number of orders
    expressions.append(pl.col("bids").list.get(i).list.get(0).alias(f"bid_qty_{i}"))
    expressions.append(pl.col("bids").list.get(i).list.get(1).alias(f"bid_num_{i}"))
    # Asks: quantity and number of orders
    expressions.append(pl.col("asks").list.get(i).list.get(0).alias(f"ask_qty_{i}"))
    expressions.append(pl.col("asks").list.get(i).list.get(1).alias(f"ask_num_{i}"))

for file_path in csv_files:
    print(f"Processing {file_path}...")
    # Read the CSV file into a pandas DataFrame
    temp_df_pd = pd.read_csv(file_path)
    
    # Apply literal_eval to parse the string representations of lists
    temp_df_pd['bids'] = temp_df_pd['bids'].apply(ast.literal_eval)
    temp_df_pd['asks'] = temp_df_pd['asks'].apply(ast.literal_eval)
    
    # Convert to Polars DataFrame
    temp_df_pl = pl.from_pandas(temp_df_pd)
    
    # Process the DataFrame
    temp_df_pl = temp_df_pl.with_columns(
        pl.col("srcTime").str.to_datetime(format="%Y-%m-%d %H:%M:%S.%f")
    ).with_columns(expressions).drop("bids", "asks")

    # write to processed folder
    data_dir = "processed_data"
    data_dir_full = os.path.join(os.getcwd(), data_dir)
    processed_file_path = os.path.join(data_dir_full, os.path.basename(file_path))
    os.makedirs("processed_data", exist_ok=True)
    temp_df_pl.write_csv(processed_file_path)
    print(f"Processed file saved to {processed_file_path}")
    
    #all_dfs.append(temp_df_pl)

# Concatenate all DataFrames
# if all_dfs:
#     df_combined = pl.concat(all_dfs)

#     # Sort the combined DataFrame
#     df_sorted = df_combined.sort("pktSeqNum", "msgSeqNum")

#     # Display the head of the final sorted DataFrame
#     display(df_sorted.head())
#     print(f"Total rows in combined dataframe: {df_sorted.shape[0]}")
# else:
#     print("No CSV files found to process.")

In [None]:
import glob
import os
import polars as pl

# Directory where processed CSVs are stored
data_dir = "processed_data"

# Find all processed CSV files
processed_csv_files = glob.glob(os.path.join(data_dir, 'CFE_*_Level2.csv'))

if not processed_csv_files:
    print("No processed CSV files found to concatenate.")
else:
    print(f"Found {len(processed_csv_files)} files to concatenate.")
    
    # Lazily read all CSVs, which is more memory-efficient
    lazy_dfs = [pl.scan_csv(f) for f in processed_csv_files]
    
    # Concatenate all lazy dataframes and then collect the result
    df_concatenated = pl.concat(lazy_dfs, how="vertical").collect()

    # Sort the combined DataFrame to ensure chronological order
    df_sorted = df_concatenated.sort("srcTime", "pktSeqNum", "msgSeqNum")

    df_sorted = df_sorted.with_columns(
        pl.col("srcTime").str.to_datetime(format="%Y-%m-%dT%H:%M:%S.%f")
    )

    # Define the output path for the Parquet file
    output_parquet_path = os.path.join(data_dir, 'concatenated_l2_data.parquet')
    
    # Write the sorted DataFrame to a Parquet file
    df_sorted.write_parquet(output_parquet_path)
    
    print(f"Successfully concatenated and saved data to {output_parquet_path}")
    display(df_sorted.head())
    print(f"Total rows in concatenated dataframe: {df_sorted.shape[0]}")

In [None]:
# Filter the DataFrame for Symbol 'VXV4' and tradingStatus 'T'
df_filtered = df_sorted.filter(
    (pl.col("Symbol") == "VXV4") & (pl.col("tradingStatus") == "T")
)

# Display the head of the filtered DataFrame
display(df_filtered.head())

# save the file to a new parquet
filtered_parquet_path = os.path.join(data_dir, 'filtered_vxv4_trading.parquet')
df_filtered.write_parquet(filtered_parquet_path)
print(f"Filtered data saved to {filtered_parquet_path}")

# Print the shape of the filtered dataframe to see how many rows match
print(f"Shape of the filtered dataframe: {df_filtered.shape}")