In [1]:
# !python -m pip uninstall -y dask distributed
# !python -m pip install "dask[distributed]" --upgrade
# !python -m pip install dask --upgrade
# !pip install jupyter-server-proxy

## Creation of backtesting subset for NASDAQ-100 DATASET

In [None]:
import pandas as pd
import json
import time
import os
import re
import asyncio
import concurrent.futures
from google.cloud import storage
from datetime import datetime
from tenacity import retry, stop_after_attempt, wait_exponential
import pandas as pd
import dask.dataframe as dd
import pyarrow as pa
import pyarrow.parquet as pq
from dask.diagnostics import ProgressBar
import os
import tempfile
from google.cloud import storage
import dask
from pandas.api.types import CategoricalDtype
from dask.distributed import Client, LocalCluster
import shutil
import glob

In [None]:
# GCS setup
BACKTEST_BUCKET = "backtest_bucket121545"
PROJECT_ID = "################"

def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
    """Upload a file to a GCS bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"Uploaded {source_file_name} to {destination_blob_name}")

def download_from_gcs(bucket_name, source_blob_name, destination_file_name):
    """Download a file from a GCS bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(source_blob_name)
    try:
        blob.download_to_filename(destination_file_name)
        print(f"Downloaded {source_blob_name} to {destination_file_name}")
        return True
    except Exception as e:
        print(f"Error downloading {source_blob_name}: {e}")
        return False

def check_gcs_file_exists(bucket_name, source_blob_name):
    """Check if a file exists in a GCS bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(source_blob_name)
    return blob.exists()

def validate_parquet_file(file_path):
    """Validate if a file is a valid Parquet file."""
    try:
        pd.read_parquet(file_path, engine='pyarrow')
        print(f"Validated {file_path} as a Parquet file")
        return True
    except Exception as e:
        print(f"File {file_path} is not a valid Parquet file: {e}")
        return False

def upload_files_to_bucket(local_files, destination_folder="my_data"):
    """
    Uploading a list of local files to a GCS bucket under a specified folder and verify the upload.

    Args:
        local_files (list): List of local file paths to upload.
        destination_folder (str): Destination folder in the GCS bucket (default: "my_data").

    Returns:
        list: List of GCS paths for the uploaded files.
    """
    uploaded_files = []

    # Check if local files exist
    missing_local_files = [file for file in local_files if not os.path.exists(file)]
    if missing_local_files:
        raise FileNotFoundError(f"The following local files are missing: {missing_local_files}")

    # Uploading each file and verifying
    for local_file in local_files:
        destination_blob_name = f"{destination_folder}/{os.path.basename(local_file)}"
        try:
            upload_to_gcs(BACKTEST_BUCKET, local_file, destination_blob_name)
            if check_gcs_file_exists(BACKTEST_BUCKET, destination_blob_name):
                print(f"Verified: {destination_blob_name} exists in GCS")
                uploaded_files.append(destination_blob_name)
            else:
                raise RuntimeError(f"Failed to verify upload of {destination_blob_name} in GCS")
        except Exception as e:
            print(f"Failed to upload {local_file} to {destination_blob_name}: {e}")
            raise

    return uploaded_files

def download_ticker_partition(ticker, prefix="parquet_data", local_dir="temp_parquet"):
    """
    Download all parquet files for a specific ticker partition.

    Args:
        ticker (str): Ticker symbol to download.
        prefix (str): GCS prefix for the parquet data (default: "parquet_data").
        local_dir (str): Local directory to store downloaded files (default: "temp_parquet").

    Returns:
        list: List of downloaded file paths.
    """
    if not os.path.exists(local_dir):
        os.makedirs(local_dir)

    storage_client = storage.Client()
    bucket = storage_client.bucket(BACKTEST_BUCKET)
    partition_prefix = f"{prefix}/ticker={ticker}/"
    blobs = bucket.list_blobs(prefix=partition_prefix)

    downloaded_files = []
    for blob in blobs:
        if not blob.name.endswith('.parquet'):
            continue
        local_file = os.path.join(local_dir, os.path.basename(blob.name))
        if download_from_gcs(BACKTEST_BUCKET, blob.name, local_file):
            downloaded_files.append(local_file)

    return downloaded_files

def batch_download_tickers(tickers, batch_size=10, prefix="parquet_data", local_dir="temp_parquet"):
    """
    Batch download ticker partitions.

    Args:
        tickers (list): List of ticker symbols to download.
        batch_size (int): Number of tickers per batch (default: 10).
        prefix (str): GCS prefix for the parquet data (default: "parquet_data").
        local_dir (str): Local directory to store downloaded files (default: "temp_parquet").

    Returns:
        list: List of dictionaries mapping tickers to their downloaded files.
    """
    total_tickers = len(tickers)
    downloaded_batches = []

    for batch_start in range(0, total_tickers, batch_size):
        batch_end = min(batch_start + batch_size, total_tickers)
        batch_tickers = tickers[batch_start:batch_end]
        print(f"Downloading batch: tickers {batch_start + 1} to {batch_end}")

        batch_files = {}
        for ticker in batch_tickers:
            ticker_files = download_ticker_partition(ticker, prefix, local_dir)
            batch_files[ticker] = ticker_files

        downloaded_batches.append(batch_files)

    return downloaded_batches

### Dask config

In [None]:
# Progress bar for Dask operations
ProgressBar().register()

# Configuring Dask spill-to-disk
temp_dir = "/tmp/dask-spill"  
os.makedirs(temp_dir, exist_ok=True)

# Setting up LocalCluster with 8 workers for better parallelism
cluster = LocalCluster(
    n_workers=8,                     # More workers for 8 vCPUs
    threads_per_worker=2,            # 2 threads per worker
    memory_limit="6GB",              # ~48GB total
    processes=True,                  # Processes for better isolation
    local_directory=temp_dir         # Spill to fast disk
)
client = Client(cluster)
print("Dask Dashboard:", client.dashboard_link)

dask.config.set({
    "temporary-directory": temp_dir,
    "distributed.worker.memory.spill": 0.7,    
    "distributed.worker.memory.target": 0.6,   
    "distributed.worker.memory.pause": 0.85,   
    "distributed.worker.memory.terminate": 0.95,  
})

INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:42395
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:42619'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:45189'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:37385'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:35151'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:33113'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:38199'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:44493'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:32809'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:34249 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:34249
INFO:distributed.core:Starting

Dask Dashboard: http://127.0.0.1:8787/status


<dask.config.set at 0x7b52ca8f2bc0>

### Step 1: Loading and filtering

In [None]:
# Loading NASDAQ-100 tickers from filtered JSON
with open('/content/nasdaq100_ticker_dataset.json', 'r') as f:
    nasdaq100_data = json.load(f)
nasdaq_100 = sorted(set(item['Ticker'] for item in nasdaq100_data if item.get('Ticker')))
print(f"Loaded {len(nasdaq_100)} NASDAQ-100 tickers from JSON")

# Loading the Parquet file
file_path = "backtest_data_step4.parquet"
df = pd.read_parquet(file_path)

# Filtering for NASDAQ-100 tickers
df = df[df['Ticker'].isin(nasdaq_100)]
print(f"Filtered to {len(df['Ticker'].unique())} NASDAQ-100 tickers")
print(f"Rows before filtering trading hours: {len(df)}")

# Filtering for trading hours (4:00 AM to 8:00 PM, Monday to Friday)
print("Filtering to trading hours (04:00:00 to 20:00:00, Monday to Friday)")
df['date'] = pd.to_datetime(df['Date'])
df['hour'] = df['Time'].map(lambda x: x.hour)  # Extracting hour from datetime.time
df = df[
    (df['hour'].between(4, 20)) &  # 4:00 AM to 8:00 PM
    (df['date'].dt.weekday < 5)    # Monday to Friday
]
df = df.drop(columns=['date', 'hour'])  # Drop temporary columns
print(f"Filtered to trading hours, rows remaining: {len(df)}")

Loaded 101 NASDAQ-100 tickers from JSON
Filtered to 101 NASDAQ-100 tickers
Rows before filtering trading hours: 466547
Filtering to trading hours (04:00:00 to 20:00:00, Monday to Friday)


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['date'] = pd.to_datetime(df['Date'])
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['hour'] = df['Time'].map(lambda x: x.hour)  # Extract hour from datetime.time


Filtered to trading hours, rows remaining: 451073


### Step 2: Creating complete time index - Timestamp column, Deduplicating, Identifying daily bounds

In [None]:
# Creating a 'timestamp' column by combining 'Date' and 'Time'
df['timestamp'] = pd.to_datetime(df['Date']) + pd.to_timedelta(df['Time'].astype(str))

# Rename columns
df = df.rename(columns={
    'Ticker': 'ticker',
    'Open': 'open',
    'High': 'high',
    'Low': 'low',
    'Close': 'close',
    'Volume': 'volume',
    'Prev_Session_High': 'prev_session_high',
    'Prev_Session_Low': 'prev_session_low',
    'Bid-Ask Spread (Estimated)': 'estimated_bid_ask_spread',
    'Estimated OBD': 'estimated_obd',
    '50-day SMA': '50_day_sma'
})

# Drop unnecessary columns
columns_to_drop = ['Date', 'Time']
df = df.drop(columns=columns_to_drop)
print("Step 1: Columns after dropping 'Date' and 'Time':", df.columns.tolist())

Step 1: Columns after dropping 'Date' and 'Time': ['ticker', 'open', 'high', 'low', 'close', 'volume', 'prev_session_high', 'prev_session_low', 'estimated_bid_ask_spread', 'estimated_obd', '50_day_sma', 'timestamp']


In [None]:
# Deduplicating
df = df.set_index(['ticker', 'timestamp'])
duplicates = df.index[df.index.duplicated()].unique()
print("Step 2: Duplicate index entries:", duplicates)
df = df[~df.index.duplicated(keep='last')]

Step 2: Duplicate index entries: MultiIndex([], names=['ticker', 'timestamp'])


In [None]:
# Identifying daily bounds
df_reset = df.reset_index()
daily_bounds = df_reset.groupby(['ticker', df_reset['timestamp'].dt.date]).agg({
    'timestamp': ['min', 'max']
}).reset_index()
daily_bounds.columns = ['ticker', 'date', 'first_timestamp', 'last_timestamp']
daily_bounds['date'] = pd.to_datetime(daily_bounds['date'])

In [None]:
# Generating complete time index as a DataFrame
def generate_time_index(row):
    start_time = row['first_timestamp']
    end_time = row['last_timestamp']
    return pd.date_range(start=start_time, end=end_time, freq='1min')

# Creating DataFrame chunks per ticker
index_chunks = []
for ticker in daily_bounds['ticker'].unique():
    ticker_bounds = daily_bounds[daily_bounds['ticker'] == ticker]
    time_indices = []
    for _, row in ticker_bounds.iterrows():
        timestamps = generate_time_index(row)
        for ts in timestamps:
            time_indices.append((ticker, ts))
    ticker_df = pd.DataFrame(time_indices, columns=['ticker', 'timestamp'])
    index_chunks.append(ticker_df)

# Concatenating into complete_index_df
complete_index_df = pd.concat(index_chunks, ignore_index=True)
print("Created complete_index_df with", len(complete_index_df), "rows")

# Optimizing ticker as categorical
complete_index_df['ticker'] = complete_index_df['ticker'].astype('category')

# Saving
complete_index_df.to_parquet('complete_index.parquet', engine='pyarrow', index=False)

Created complete_index_df with 667578 rows


### Step 3: Reindexing and forward-filling 

In [None]:
complete_index_ddf = dd.read_parquet('complete_index.parquet', engine='pyarrow')
print("Complete index row count:", complete_index_ddf.shape[0].compute())
print("Unique tickers:", complete_index_ddf['ticker'].nunique().compute())

Complete index row count: 667578




Unique tickers: 101


In [None]:
# Converting 'ticker' column to ordered categorical type
complete_index_ddf['ticker'] = complete_index_ddf['ticker'].astype(CategoricalDtype(categories=complete_index_ddf['ticker'].unique().compute(), ordered=True))

# Sorting and repartitioning by ticker
complete_index_ddf = complete_index_ddf.sort_values(['ticker', 'timestamp']).repartition(partition_size="100MB").persist()



In [None]:
# Reindexing and forward-fill using Dask
print(df_reset.columns.tolist())

# Consistent ticker categories
unique_tickers = pd.concat([complete_index_ddf['ticker'].unique().compute(),
                           pd.Series(df_reset['ticker'].unique())]).unique()
ticker_categories = CategoricalDtype(categories=unique_tickers, ordered=True)
df_reset['ticker'] = df_reset['ticker'].astype(ticker_categories)
complete_index_ddf['ticker'] = complete_index_ddf['ticker'].astype(ticker_categories)

# Numeric columns to float64
numeric_cols = ['open', 'high', 'low', 'close', 'volume', 'prev_session_high',
                'prev_session_low', 'estimated_bid_ask_spread', 'estimated_obd', '50_day_sma']
df_reset[numeric_cols] = df_reset[numeric_cols].astype('float64', errors='ignore')

# Converting to Dask DataFrame with optimized partitions
ddf = dd.from_pandas(df_reset, npartitions=50).repartition(partition_size="100MB").persist()

# Merging with complete index - left merge
print("Original row count:", ddf.shape[0].compute())
print("Complete index row count:", complete_index_ddf.shape[0].compute())
ddf = complete_index_ddf.merge(ddf, on=['ticker', 'timestamp'], how='left').persist()
print("Merged row count:", ddf.shape[0].compute())
print("Columns after merge:", ddf.columns.tolist())

# Missing percentages after merge
missing_percentages = ddf.isna().mean().compute() * 100
for col, percentage in missing_percentages.items():
    print(f"  {col}: {percentage:.2f}%")

# Sorting and forward-fill
ddf = ddf.sort_values(['ticker', 'timestamp']).persist()
ddf['date_only'] = ddf['timestamp'].dt.date
cols_to_ffill = ['open', 'high', 'low', 'close', 'prev_session_high', 'prev_session_low', '50_day_sma']
ddf[cols_to_ffill] = ddf.groupby(['ticker', 'date_only'], observed=True)[cols_to_ffill].ffill()
ddf = ddf.reset_index(drop=True)

# Session masks for volume handling
ddf['hour'] = ddf['timestamp'].dt.hour
ddf['minute'] = ddf['timestamp'].dt.minute
ddf['is_regular'] = ((ddf['hour'] == 9) & (ddf['minute'] >= 30)) | (ddf['hour'].between(10, 15)) | ((ddf['hour'] == 16) & (ddf['minute'] == 0))
ddf['is_extended'] = (ddf['hour'].between(4, 20)) & (~ddf['is_regular'])

# Volume handling functions
def interpolate_volume_regular_hours(df):
    df['volume'] = df['volume'].interpolate(method='linear', limit_direction='both')  
    df['volume'] = df['volume'].clip(lower=0)
    return df

def fill_volume_extended_hours(df):
    df['volume'] = df['volume'].fillna(0.0000000001)  # Using small value instead of 0
    return df

# Processing volume for regular and extended hours - Interpolation and small value fill
regular_ddf = ddf[ddf['is_regular']].copy()
regular_ddf = regular_ddf.map_partitions(interpolate_volume_regular_hours, meta=ddf)

extended_ddf = ddf[ddf['is_extended']].copy()
extended_ddf = extended_ddf.map_partitions(fill_volume_extended_hours, meta=ddf)

# Handling non-trading hours
non_trading_ddf = ddf[~(ddf['is_regular'] | ddf['is_extended'])].copy()
non_trading_ddf['volume'] = non_trading_ddf['volume'].fillna(0.0000000001)  # small value

# Combining and sorting
print("Step 5: Combining regular, extended, and non-trading hours data")
ddf = dd.concat([regular_ddf, extended_ddf, non_trading_ddf]).sort_values(['ticker', 'timestamp'])

# Final fill to catch any remaining NaN in volume
print("Step 5: Applying final fill for any remaining missing volume")
ddf['volume'] = ddf['volume'].fillna(0.0000000001)

# Cleaning
ddf = ddf.dropna(subset=['ticker', 'timestamp'])
ddf = ddf.drop(columns=['date_only', 'hour', 'minute', 'is_regular', 'is_extended'])

# Missing percentages
missing_percentages = ddf.isna().mean().compute() * 100
for column, percentage in missing_percentages.items():
    print(f"  {column}: {percentage:.2f}%")

# PyArrow schema
schema_forward = pa.schema([
    ('ticker', pa.dictionary(pa.int16(), pa.string())),
    ('timestamp', pa.timestamp('ns')),
    ('open', pa.float64()),
    ('high', pa.float64()),
    ('low', pa.float64()),
    ('close', pa.float64()),
    ('volume', pa.float64()),
    ('prev_session_high', pa.float64()),
    ('prev_session_low', pa.float64()),
    ('estimated_bid_ask_spread', pa.float64()),
    ('estimated_obd', pa.float64()),
    ('50_day_sma', pa.float64())
])



Step 5: Starting reindex and forward-fill process
Step 5: Columns before converting to Dask: ['ticker', 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'prev_session_high', 'prev_session_low', 'estimated_bid_ask_spread', 'estimated_obd', '50_day_sma']
Step 5: Ensuring consistent ticker categories


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Step 5: Casting numeric columns to float64
Step 5: Converting Pandas DataFrame to Dask DataFrame


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Step 5: Performing left merge with complete index
Step 5: Original row count: 451073
Step 5: Complete index row count: 667578
Step 5: Merged row count: 667578
Step 5: Columns after merge: ['ticker', 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'prev_session_high', 'prev_session_low', 'estimated_bid_ask_spread', 'estimated_obd', '50_day_sma']
Step 5: Missing data percentages after merge:
  ticker: 0.00%
  timestamp: 0.00%
  open: 32.43%
  high: 32.43%
  low: 32.43%
  close: 32.43%
  volume: 32.43%
  prev_session_high: 32.93%
  prev_session_low: 32.93%
  estimated_bid_ask_spread: 32.43%
  estimated_obd: 32.43%
  50_day_sma: 32.43%
Step 5: Sorting by ticker and timestamp
Step 5: Performing forward-fill within ticker-date groups
Step 5: Defining session masks for regular and extended hours
Step 5: Interpolating volume for regular hours
Step 5: Setting volume to small value for extended hours
Step 5: Handling volume for non-trading hours
Step 5: Combining regular, extended, and no



Missing percentages (%):
  ticker: 0.00%
  timestamp: 0.00%
  open: 0.00%
  high: 0.00%
  low: 0.00%
  close: 0.00%
  volume: 0.00%
  prev_session_high: 0.61%
  prev_session_low: 0.61%
  estimated_bid_ask_spread: 32.43%
  estimated_obd: 32.43%
  50_day_sma: 0.00%
Step 5: Defining PyArrow schema for parquet output


### Step 4: Computing prev_session_high/low, backward-fill and metrics

In [None]:
# Optimizing data types once again
ddf['open'] = ddf['open'].astype('float32')
ddf['high'] = ddf['high'].astype('float32')
ddf['low'] = ddf['low'].astype('float32')
ddf['close'] = ddf['close'].astype('float32')
ddf['volume'] = ddf['volume'].astype('float32')
ddf['50_day_sma'] = ddf['50_day_sma'].astype('float32')

# Sorting
ddf = ddf.sort_values(['ticker', 'timestamp']).persist()

# Computing daily high and low per ticker
daily_high_low = ddf.groupby(['ticker', ddf['timestamp'].dt.date], observed=False).agg({
    'high': 'max',
    'low': 'min'
}).reset_index()
daily_high_low = daily_high_low.rename(columns={
    'timestamp': 'date_only',
    'high': 'prev_session_high',
    'low': 'prev_session_low'
})

# Shift to get previous trading day's values
meta_shift = {
    'ticker': 'category',
    'date_only': 'object',
    'prev_session_high': 'float32',
    'prev_session_low': 'float32',
    'prev_trading_date': 'object'
}
daily_high_low['prev_trading_date'] = daily_high_low.groupby('ticker', observed=False)['date_only'].shift(1, meta=('prev_trading_date', 'object'))
daily_high_low['prev_session_high'] = daily_high_low.groupby('ticker', observed=False)['prev_session_high'].shift(1, meta=('prev_session_high', 'float32'))
daily_high_low['prev_session_low'] = daily_high_low.groupby('ticker', observed=False)['prev_session_low'].shift(1, meta=('prev_session_low', 'float32'))

# Forward-filling missing values
daily_high_low['prev_trading_date'] = daily_high_low.groupby('ticker', observed=False)['prev_trading_date'].ffill()
daily_high_low['prev_session_high'] = daily_high_low.groupby('ticker', observed=False)['prev_session_high'].ffill()
daily_high_low['prev_session_low'] = daily_high_low.groupby('ticker', observed=False)['prev_session_low'].ffill()

# Merging computed prev_session_high/low
ddf['date_only'] = ddf['timestamp'].dt.date
ddf = ddf.merge(
    daily_high_low[['ticker', 'date_only', 'prev_session_high', 'prev_session_low']],
    left_on=['ticker', 'date_only'],
    right_on=['ticker', 'date_only'],
    how='left',
    suffixes=('', '_computed')
)

# Filling missing prev_session_high/low
ddf['prev_session_high'] = ddf['prev_session_high'].where(
    ~ddf['prev_session_high'].isna(), ddf['prev_session_high_computed']
)
ddf['prev_session_low'] = ddf['prev_session_low'].where(
    ~ddf['prev_session_low'].isna(), ddf['prev_session_low_computed']
)

# Dropping temp
ddf = ddf.drop(columns=['prev_session_high_computed', 'prev_session_low_computed'])

# Missing percentages
missing_percentages = ddf.isna().mean().compute() * 100
print("Missing percentages (%):")
for column, percentage in missing_percentages.items():
    print(f"  {column}: {percentage:.2f}%")

# Handling remaining missing prev_session_high/low
ddf['prev_session_high'] = ddf['prev_session_high'].fillna(ddf['high'])
ddf['prev_session_low'] = ddf['prev_session_low'].fillna(ddf['low'])

# Backward-filling
meta_bfill = {
    'ticker': 'category',
    'timestamp': 'datetime64[ns]',
    'open': 'float32',
    'high': 'float32',
    'low': 'float32',
    'close': 'float32',
    'volume': 'float32',
    'prev_session_high': 'float32',
    'prev_session_low': 'float32',
    'estimated_bid_ask_spread': 'float32',
    'estimated_obd': 'float32',
    '50_day_sma': 'float32',
    'date_only': 'object'
}

def bfill_partition(partition):
    partition = partition.set_index(['ticker', 'timestamp'])
    partition['date_only'] = partition.index.get_level_values('timestamp').date
    cols_to_bfill = ['open', 'high', 'low', 'close', '50_day_sma']
    partition[cols_to_bfill] = partition.groupby(['ticker', 'date_only'], observed=True)[cols_to_bfill].bfill()
    return partition.reset_index().drop(columns=['index'], errors='ignore')

ddf = ddf.map_partitions(bfill_partition, meta=meta_bfill)

# Computing metrics - estimated bid-ask spread and obd
def compute_metrics_partition(partition):
    partition['estimated_bid_ask_spread'] = partition['high'] - partition['low']
    partition['estimated_bid_ask_spread'] = partition['estimated_bid_ask_spread'].clip(lower=0.0001)
    partition['estimated_obd'] = partition['volume'] / partition['estimated_bid_ask_spread']
    return partition

ddf = ddf.map_partitions(compute_metrics_partition, meta=meta_bfill)

# Drop temp
ddf = ddf.drop(columns=['date_only'])

missing_percentages = ddf.isna().mean().compute() * 100
print("Missing percentages (%):")
for column, percentage in missing_percentages.items():
    print(f"  {column}: {percentage:.2f}%")

Step 6: Starting computation of prev_session_high/low, backward-fill, and metrics
Step 6: Optimizing data types to float32
Step 6: Sorting by ticker and timestamp


INFO:distributed.core:Event loop was unresponsive in Nanny for 3.02s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
INFO:distributed.core:Event loop was unresponsive in Nanny for 3.03s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.


Step 6: Computing daily high and low per ticker
Step 6: Shifting to get previous trading day's values
Step 6: Forward-filling missing previous trading dates and values
Step 6: Merging computed prev_session_high/low into main DataFrame
Step 6: Filling missing prev_session_high/low with computed values
Step 6: Dropping temporary computed columns
Step 6: Calculating missing percentages for each column




Missing percentages (%):
  ticker: 0.00%
  timestamp: 0.00%
  open: 0.00%
  high: 0.00%
  low: 0.00%
  close: 0.00%
  volume: 0.00%
  prev_session_high: 0.58%
  prev_session_low: 0.58%
  estimated_bid_ask_spread: 32.43%
  estimated_obd: 32.43%
  50_day_sma: 0.00%
  date_only: 0.00%
Step 6: Handling remaining missing prev_session_high/low
Step 6: Performing backward-fill for specified columns
Step 6: Computing estimated bid-ask spread and order book depth
Step 6: Dropping temporary date_only column
Step 6: Calculating missing percentages for each column




Missing percentages (%):
  ticker: 0.00%
  timestamp: 0.00%
  open: 0.00%
  high: 0.00%
  low: 0.00%
  close: 0.00%
  volume: 0.00%
  prev_session_high: 0.00%
  prev_session_low: 0.00%
  estimated_bid_ask_spread: 0.00%
  estimated_obd: 0.00%
  50_day_sma: 0.00%


### Step 5: Adding News impact binary indicator

In [None]:
news_df = pd.read_csv('final_news.csv')

# Converting Impact_Date to datetime (date only)
news_df['Impact_Date'] = pd.to_datetime(news_df['Impact_Date']).dt.date
news_df['Date'] = pd.to_datetime(news_df['Date']).dt.date

# Filtering news for NASDAQ-100 tickers
news_df = news_df[news_df['Ticker'].isin(nasdaq_100) | news_df['Ticker'].isna()]

# Handling Ticker column
news_df['Ticker'] = news_df['Ticker'].replace('', None).replace('None', None)

# Splitting into general and ticker-specific news
general_news = news_df[news_df['Ticker'].isna()][['Impact_Date']].drop_duplicates()
ticker_news = news_df[news_df['Ticker'].notna()][['Ticker', 'Impact_Date']].drop_duplicates()

# Converting to sets for efficient lookup
general_news_dates = set(general_news['Impact_Date'])
ticker_news_pairs = set(ticker_news.apply(lambda row: (row['Ticker'], row['Impact_Date']), axis=1))

# Adding news_impact column
def add_news_impact(partition):
    partition['date_only'] = partition['timestamp'].dt.date
    partition['news_impact'] = 0
    partition['news_impact'] = partition['news_impact'].where(
        ~partition['date_only'].isin(general_news_dates), 1
    )
    partition['ticker_date'] = partition.apply(
        lambda row: (row['ticker'], row['date_only']), axis=1
    )
    partition['news_impact'] = partition['news_impact'].where(
        ~partition['ticker_date'].isin(ticker_news_pairs), 1
    )
    partition = partition.drop(columns=['date_only', 'ticker_date'])
    return partition

meta_with_news = {
    'ticker': 'category',
    'timestamp': 'datetime64[ns]',
    'open': 'float64',
    'high': 'float64',
    'low': 'float64',
    'close': 'float64',
    'volume': 'float64',
    'prev_session_high': 'float64',
    'prev_session_low': 'float64',
    'estimated_bid_ask_spread': 'float64',
    'estimated_obd': 'float64',
    '50_day_sma': 'float64',
    'news_impact': 'int8'
}

ddf = ddf.map_partitions(add_news_impact, meta=meta_with_news)

Step 7: Adding news impact


### Step 6: Finalizing and saving

In [None]:
# Finalizing dataframe
def finalize_partition(partition):
    final_columns = [
        'ticker', 'timestamp', 'open', 'high', 'low', 'close', 'volume',
        'prev_session_high', 'prev_session_low', 'estimated_bid_ask_spread',
        'estimated_obd', '50_day_sma', 'news_impact'
    ]
    partition = partition.drop_duplicates(subset=final_columns, keep='first')
    return partition[final_columns]

meta_final = {
    'ticker': 'category',
    'timestamp': 'datetime64[ns]',
    'open': 'float64',
    'high': 'float64',
    'low': 'float64',
    'close': 'float64',
    'volume': 'float64',
    'prev_session_high': 'float64',
    'prev_session_low': 'float64',
    'estimated_bid_ask_spread': 'float64',
    'estimated_obd': 'float64',
    '50_day_sma': 'float64',
    'news_impact': 'int8'
}

ddf = ddf.map_partitions(finalize_partition, meta=meta_final)

print("Row count:", ddf.shape[0].compute())

#ddf = ddf.repartition(partition_size="50MB").persist()
#ddf = ddf.sort_values(['ticker', 'timestamp']).persist()

Step 8: Finalizing DataFrame
Step 8»; Repartitioning and sorting DataFrame


In [None]:
# Saving to parquet
unique_tickers = sorted(nasdaq_100)
print(f"Number of unique tickers: {len(unique_tickers)}")


# Output directory
output_dir_final = '/content/backtesting_final'
os.makedirs(output_dir_final, exist_ok=True)

# PyArrow schema
schema_forward = pa.schema([
    ('ticker', pa.dictionary(pa.int16(), pa.string(), ordered=True)),  # Specifying ordered=True
    ('timestamp', pa.timestamp('ns')),
    ('open', pa.float32()),
    ('high', pa.float32()),
    ('low', pa.float32()),
    ('close', pa.float32()),
    ('volume', pa.float32()),
    ('prev_session_high', pa.float32()),
    ('prev_session_low', pa.float32()),
    ('estimated_bid_ask_spread', pa.float32()),
    ('estimated_obd', pa.float32()),
    ('50_day_sma', pa.float32()),
    ('news_impact', pa.int8())
])

# Writing to Parquet with partitioning by ticker
start_time = time.time()
ddf.to_parquet(
    output_dir_final,
    engine='pyarrow',
    schema=schema_forward,
    partition_on=['ticker'],  # Partitioning by ticker column
    write_metadata_file=False,
    compression=None,  # mAKING it faster without compression
    append=False
)
end_time = time.time()
print(f"Time to save Parquet to {output_dir_final}: {end_time - start_time:.2f} seconds")

Step 9: Saving to Parquet
Number of unique tickers: 101




Time to save Parquet to /content/backtesting_final: 4.14 seconds


### AAPL inspection

In [None]:
# Path
parquet_path = "/content/backtesting_final/"

# path - AAPL subfolder
aapl_subfolder = os.path.join(parquet_path, "ticker=AAPL")
if not os.path.exists(aapl_subfolder):
    raise FileNotFoundError(f"Subfolder for ticker=AAPL not found at {aapl_subfolder}")

# All Parquet files in the AAPL subfolder
parquet_files = glob.glob(os.path.join(aapl_subfolder, "*.parquet"))
if not parquet_files:
    raise FileNotFoundError(f"No Parquet files found in {aapl_subfolder}")

# Reading all Parquet files into a list of DataFrames
tables = [pq.read_table(file) for file in parquet_files]

# Into single table
table = pa.concat_tables(tables)

df_aapl = table.to_pandas()

# Correct data types based on my schema
df_aapl['ticker'] = df_aapl['ticker'].astype('category')
df_aapl['timestamp'] = pd.to_datetime(df_aapl['timestamp'])
df_aapl['open'] = df_aapl['open'].astype('float64')
df_aapl['high'] = df_aapl['high'].astype('float64')
df_aapl['low'] = df_aapl['low'].astype('float64')
df_aapl['close'] = df_aapl['close'].astype('float64')
df_aapl['volume'] = df_aapl['volume'].astype('float64')
df_aapl['prev_session_high'] = df_aapl['prev_session_high'].astype('float64')
df_aapl['prev_session_low'] = df_aapl['prev_session_low'].astype('float64')
df_aapl['estimated_bid_ask_spread'] = df_aapl['estimated_bid_ask_spread'].astype('float64')
df_aapl['estimated_obd'] = df_aapl['estimated_obd'].astype('float64')
df_aapl['50_day_sma'] = df_aapl['50_day_sma'].astype('float64')
df_aapl['news_impact'] = df_aapl['news_impact'].astype('int8')

# Sorting by timestamp for inspection
df_aapl = df_aapl.sort_values('timestamp')

# Saving
output_csv = "aapl_data.csv"
df_aapl.to_csv(output_csv, index=False, date_format='%Y-%m-%d %H:%M:%S')

print(f"Rows: {len(df_aapl)}")
print({df_aapl.columns.tolist()})
print(df_aapl.head(5))

AAPL data saved to aapl_data.csv
Number of rows: 3809
Columns: ['timestamp', 'open', 'high', 'low', 'close', 'volume', 'prev_session_high', 'prev_session_low', 'estimated_bid_ask_spread', 'estimated_obd', '50_day_sma', 'news_impact', 'ticker']
First 5 rows for verification:
            timestamp       open       high        low      close  \
0 2020-03-06 09:00:00  72.250000  72.250000  72.004997  72.004997   
1 2020-03-06 09:01:00  72.025002  72.025002  72.025002  72.025002   
2 2020-03-06 09:02:00  72.025002  72.025002  72.025002  72.025002   
3 2020-03-06 09:03:00  72.025002  72.025002  72.025002  72.025002   
4 2020-03-06 09:04:00  72.232498  72.250000  72.232498  72.250000   

         volume  prev_session_high  prev_session_low  \
0  2.880000e+03          74.887497         72.852501   
1  4.532000e+03          74.887497         72.852501   
2  1.000000e-10          74.887497         72.852501   
3  1.000000e-10          74.887497         72.852501   
4  5.668000e+03          74.88

#### Cleaning directory

In [None]:
directory_to_delete = '/content/backtesting_final'
try:
    shutil.rmtree(directory_to_delete)
    print(f"Directory '{directory_to_delete}' and its contents successfully deleted.")
except FileNotFoundError:
    print(f"Directory '{directory_to_delete}' not found.")
except OSError as e:
    print(f"Error deleting directory '{directory_to_delete}': {e}")

In [None]:
# Closing the client and cluster
client.close()
cluster.close()

INFO:distributed.scheduler:Remove client Client-8ee56be6-2458-11f0-96c1-0242c0a80a0a
INFO:distributed.core:Received 'close-stream' from tcp://127.0.0.1:38072; closing.
INFO:distributed.scheduler:Remove client Client-8ee56be6-2458-11f0-96c1-0242c0a80a0a
INFO:distributed.scheduler:Close client connection: Client-8ee56be6-2458-11f0-96c1-0242c0a80a0a
INFO:distributed.scheduler:Retire worker addresses (stimulus_id='retire-workers-1745865296.6417468') (0, 1, 2, 3, 4, 5, 6, 7)
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:33273'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:39025'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:37055'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.nanny:Closing Nanny at 'tcp://127.