# Notebook for Preprocessing
This notebook focuses on the preprocessing of financial market data, specifically Parquet files containing stock information for S&P 500 companies. The preprocessing steps are designed to clean, transform, and enhance the raw data, ensuring it is ready for analysis and modeling tasks.

Let's start by importing the necessary libraries. Key libraries include :
- `numpy` and `pandas`: For numerical calculations and efficient DataFrame manipulations.
- `random` and `os`: For file and folder operations and random selections.

In [None]:
# Import necessary libraries

import pandas as pd
import numpy as np

import random
import os

The following function preprocesses parquet files containing stock market data for S&P 500 companies. The goal is to clean and structure the data, making it suitable for further financial analysis. Given the messy and inconsistent nature of financial data, this function ensures consistency and quality while adding essential metrics to analyze market behavior and dynamics. It ultimately provides a structured dataset tailored for future advanced applications.

The function processes the data through the following steps :

1) Data Extraction : Ensures the raw data is loaded and the temporal information is properly structured.
- Reads Parquet files for each company from the input folder.  
- Extracts and formats the `date` and `time` from the `index` column.  

2) Filtering : Focuses on market hours removes noise from off-hour trading.
- Retains only rows within US market hours (09:30 to 16:00 EST).  

3) Renaming and Cleaning : Provides a consistent and interpretable dataset.
- Renames columns (e.g., `X.Open` → `bid_price`) for clarity and consistency.  
- Removes rows with missing values in critical fields like `bid_price` and `ask_price`.  

4) Feature Engineering : Calculates additional metrics to enrich the dataset.
- `mid_price`: The average of bid and ask prices.  
- `order_density`: The total volume of bid and ask orders.  
- `order_imbalance`: Measures the imbalance between bid and ask volumes.  
- `spread`: The absolute difference between ask and bid prices, measuring market liquidity.
- `vw_spread`: Volume-weighted spread to reflect liquidity costs.   
- `relative_spread`: The spread relative to the mid_price, providing a normalized measure of liquidity.  
- `log_return`: Logarithmic returns to capture price changes.  
- `volatility`: Rolling standard deviation of log returns to assess market variability.

5) Saving Processed Data : Stores the preprocessed data for downstream analysis and modeling.
- Exports the cleaned and enhanced dataset as CSV files to the specified output folder.  

In [None]:
def preprocess_stocks_parquet_files(input_folder, output_folder, volatility_window=10):
    """
    Preprocess Parquet files for each company's ticker in the S&P500 folder:
    - Extract data from the 'index' column into 'date' and 'time' (properly formatted).
    - Filter rows based on time range (09:30:00-05:00 to 16:00:00-05:00).
    - Rename columns for consistency.
    - Add calculated features: mid_price, order_density, order_imbalance, spread, vw_spread, relative_spread, log_return, volatility.
    - Remove rows with missing values in bid/ask columns.
    - Remove rows where spread is negative.
    - Save the preprocessed DataFrame as CSV in the specified output folder.

    Args:
        input_folder (str): Path to the folder containing company tickers with Parquet files.
        output_folder (str): Path to save the preprocessed CSV files.
        volatility_window (int): Rolling window size for volatility calculation.

    Returns:
        None: Saves preprocessed CSV files in the output folder.
    """
    # Ensure the output folder exists
    os.makedirs(output_folder, exist_ok=True)

    # List all tickers (subfolders) in the input directory, sorted alphabetically
    tickers = sorted([d for d in os.listdir(input_folder) if os.path.isdir(os.path.join(input_folder, d))])

    for ticker in tickers:
        ticker_folder = os.path.join(input_folder, ticker)
        output_csv = os.path.join(output_folder, f"{ticker}_2010_cleaned.csv")

        print(f"Processing ticker: {ticker}")

        # List all Parquet files in the ticker's folder
        parquet_files = [
            os.path.join(ticker_folder, f) for f in os.listdir(ticker_folder)
            if f.endswith('.parquet') and f.startswith('2010')
        ]

        if not parquet_files:
            print(f"No Parquet files for 2010 found for ticker: {ticker}")
            continue

        # Read and concatenate all 2010 Parquet files
        dataframes = []
        for parquet_file in sorted(parquet_files):  # Ensure files are processed in date order
            df = pd.read_parquet(parquet_file)
            dataframes.append(df)

        if dataframes:
            full_dataframe = pd.concat(dataframes, ignore_index=True)

            # Ensure the 'index' column is present and convert to datetime
            full_dataframe['index'] = pd.to_datetime(full_dataframe['index'], errors='coerce')

            # Split 'index' into 'date' and 'time' columns
            full_dataframe['date'] = full_dataframe['index'].dt.date
            full_dataframe['time'] = full_dataframe['index'].dt.strftime('%H:%M:%S%z')
            
            # Fix timezone formatting
            full_dataframe['time'] = full_dataframe['time'].str[:-5] + full_dataframe['time'].str[-5:-2] + ':' + full_dataframe['time'].str[-2:]
            full_dataframe.drop(columns=['index'], inplace=True)  # Remove the original index column

            # Reorder columns to place 'date' and 'time' first
            cols = ['date', 'time'] + [col for col in full_dataframe.columns if col not in ['date', 'time']]
            full_dataframe = full_dataframe[cols]

            # Filter rows based on time range (09:30:00-05:00 to 16:00:00-05:00)
            full_dataframe = full_dataframe[
                (full_dataframe['time'] >= '09:30:00-05:00') & (full_dataframe['time'] <= '16:00:00-05:00')
            ]

            # Rename columns
            full_dataframe.rename(
                columns={
                    'X.Open': 'bid_price',
                    'X.High': 'bid_volume',
                    'X.Low': 'ask_price',
                    'X.Close': 'ask_volume'
                },
                inplace=True
            )

            # Drop rows with missing values in bid/ask columns
            full_dataframe.dropna(subset=['bid_price', 'bid_volume', 'ask_price', 'ask_volume'], inplace=True)

            # Add calculated columns
            full_dataframe['mid_price'] = (full_dataframe['bid_price'] + full_dataframe['ask_price']) / 2
            full_dataframe['order_density'] = full_dataframe['bid_volume'] + full_dataframe['ask_volume']
            full_dataframe['order_imbalance'] = (full_dataframe['bid_volume'] - full_dataframe['ask_volume']) / (full_dataframe['bid_volume'] + full_dataframe['ask_volume'])
            full_dataframe['spread'] = full_dataframe['ask_price'] - full_dataframe['bid_price']
            full_dataframe['vw_spread'] = full_dataframe['spread'] * full_dataframe['order_density']
            full_dataframe['relative_spread'] = full_dataframe['spread'] / full_dataframe['mid_price']
            
            # Handle zeros or NaNs in mid_price to avoid log errors
            full_dataframe['log_return'] = np.where(
                (full_dataframe['mid_price'] > 0) & (full_dataframe['mid_price'].shift(1) > 0),
                np.log(full_dataframe['mid_price'] / full_dataframe['mid_price'].shift(1)),
                np.nan
            )
            full_dataframe['volatility'] = full_dataframe['log_return'].rolling(window=volatility_window, min_periods=1).std()

            # Remove rows where spread is negative
            full_dataframe = full_dataframe[full_dataframe['spread'] >= 0]

            # Save the preprocessed DataFrame to CSV
            full_dataframe.to_csv(output_csv, index=False)
            print(f"Preprocessed CSV saved: {output_csv}")
        else:
            print(f"No data for 2010 for ticker: {ticker}")

In [None]:
if __name__ == "__main__":
    # Paths for input and output folders
    input_folder = "S&P500"  # Folder containing company subfolders with Parquet files
    output_folder = "S&P500_cleaned"  # Folder to save preprocessed CSV files

    # Preprocess all Parquet files for S&P500
    preprocess_stocks_parquet_files(input_folder, output_folder)

Randomly selecting and displaying a cleaned CSV file provides a quick way to verify that previous preprocessing steps were applied correctly.

In [105]:
def display_random_cleaned_csv(folder_path):
    """
    Randomly selects and displays a cleaned CSV file from the specified folder.

    Args:
        folder_path (str): Path to the folder containing cleaned CSV files.

    Returns:
        None: Displays the content of the randomly selected CSV.
    """
    # List all files in the folder
    csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]

    if not csv_files:
        print("No CSV files found in the specified folder.")
        return

    # Randomly select a CSV file
    random_csv = random.choice(csv_files)
    csv_path = os.path.join(folder_path, random_csv)

    # Load and display the CSV
    print(f"Displaying CSV for {random_csv}:")
    df = pd.read_csv(csv_path)
    display(df)

# Specify the folder containing cleaned CSV files
cleaned_folder = "S&P500_cleaned"

# Display a random cleaned CSV
display_random_cleaned_csv(cleaned_folder)

Displaying CSV for FRT_2010_cleaned.csv:


Unnamed: 0,date,time,bid_price,bid_volume,ask_price,ask_volume,mid_price,order_density,order_imbalance,spread,vw_spread,relative_spread,log_return,volatility
0,2010-01-04,09:31:00-05:00,67.76,10.0,67.96,2.0,67.860,12.0,0.666667,0.20,2.40,0.002947,,
1,2010-01-04,09:32:00-05:00,68.01,11.0,68.04,1.0,68.025,12.0,0.833333,0.03,0.36,0.000441,0.002429,
2,2010-01-04,09:33:00-05:00,68.06,8.0,68.07,1.0,68.065,9.0,0.777778,0.01,0.09,0.000147,0.000588,0.001302
3,2010-01-04,09:34:00-05:00,68.07,5.0,68.14,3.0,68.105,8.0,0.250000,0.07,0.56,0.001028,0.000588,0.001063
4,2010-01-04,09:35:00-05:00,68.08,7.0,68.15,1.0,68.115,8.0,0.750000,0.07,0.56,0.001028,0.000147,0.001015
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
75820,2010-12-31,15:52:00-05:00,77.96,7.0,77.97,2.0,77.965,9.0,0.555556,0.01,0.09,0.000128,0.000128,0.000757
75821,2010-12-31,15:53:00-05:00,78.00,8.0,78.00,1.0,78.000,9.0,0.777778,0.00,0.00,0.000000,0.000449,0.000804
75822,2010-12-31,15:54:00-05:00,78.06,5.0,78.08,3.0,78.070,8.0,0.250000,0.02,0.16,0.000256,0.000897,0.000887
75823,2010-12-31,15:55:00-05:00,78.12,5.0,78.12,2.0,78.120,7.0,0.428571,0.00,0.00,0.000000,0.000640,0.000932


We are now guaranteed that the cleaned data is structured and ready for analysis.