In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
!find /insert file path/
root = 'file path'

file_count = sum(len(files) for _, _, files in os.walk(root))
print(f"Total files in deeplob: {file_count}")

In [None]:
import os
import re
import glob
import pandas as pd

folder = 'folder path'

#  Find all logsample and fwdsampler files
log_paths = glob.glob(os.path.join(folder, '*log_sample*'))
fwd_paths = glob.glob(os.path.join(folder, '*fwdsampler*'))

#  Helper to get date out of filename
def extract_date(filepath):
    base = os.path.basename(filepath)
    name, _ = os.path.splitext(base)

    clean = re.sub(r'(?i)^copy of\s*', '', name)
    m = re.match(r'(\d{8})', clean)

    return pd.to_datetime(m.group(1), format='%Y%m%d').date()

# Build a mapping date  {'log': path, 'fwd': path}
by_date = {}
for p in log_paths:
    d = extract_date(p)
    by_date.setdefault(d, {})['log'] = p
for p in fwd_paths:
    d = extract_date(p)
    by_date.setdefault(d, {})['fwd'] = p
print(by_date)




In [None]:
import os
import pandas as pd
import pyarrow

#  the source directory containing CSV files
csv_dir = 'insert file path'

#the new target directory for Parquet files
parquet_new_dir = 'insert file path'

# Create the new target directory if it doesn't exist
os.makedirs(parquet_new_dir, exist_ok=True)
print(f"Ensured target directory exists: {parquet_new_dir}")

# Check if the source directory exists before listing
if not os.path.exists(csv_dir):
    print(f"Error: Source directory not found: {csv_dir}")
else:
    print(f"Processing files from: {csv_dir}")
    for csv_file in os.listdir(csv_dir):


        csv_path = os.path.join(csv_dir, csv_file)

        # Define the output filename in the new directory
        parquet_fname = csv_file + '.parquet'
        parquet_path = os.path.join(parquet_new_dir, parquet_fname)



        # Read the CSV file into a pandas DataFrame
        try:
            df = pd.read_csv(csv_path)
            print(f"Read {csv_file}: {df.shape[0]} rows")

            # Save the DataFrame to Parquet format in the new directory
            df.to_parquet(parquet_path,
                          engine='pyarrow',
                          compression='snappy')

            print(f"Saved {parquet_fname} to {parquet_new_dir}")

        except Exception as e:
            print(f"Error processing {csv_file}: {e}")

# List files in the new directory
print("\nContents of the new directory:")
if os.path.exists(parquet_new_dir):
    for fname in os.listdir(parquet_new_dir):
        print(f"  {fname}")
else:
    print(f"Target directory {parquet_new_dir} was not created or is empty.")

In [None]:
# removing windows of length less than 100 and truncating the ones with length>100 and keeping the last 100 enteries
import os
import pandas as pd
import pyarrow.parquet as pq
import numpy as np

processed_parquet_dir = '/folder path'

target_column = 'first_tp'

# Check if the directory exists before listing
if not os.path.exists(processed_parquet_dir):
    print(f"Error: Directory not found: {processed_parquet_dir}")
else:
    print(f"Processing files in: {processed_parquet_dir}")
    for pq_file in os.listdir(processed_parquet_dir):
        if not pq_file.endswith('.log_sample.parquet'):
            continue

        # Construct the full path to the Parquet file
        file_path = os.path.join(processed_parquet_dir, pq_file)

        if not os.path.isfile(file_path):
            print(f"Skipping {pq_file}: Not a file.")
            continue

        print(f"\nProcessing file: {pq_file}")

        # Read the Parquet file into a pandas DataFrame
        try:
            df = pd.read_parquet(file_path)
            initial_rows = df.shape[0]
            print(f"Initial rows: {initial_rows}")

            if target_column not in df.columns:
                print(f"  Warning: Target column '{target_column}' not found in {pq_file}. Skipping processing.")
                continue

            # Get chunk sizes to see what we're working with
            chunk_sizes = df.groupby(target_column).size()

            if chunk_sizes.empty:
                print(f"  Warning: No data found in column '{target_column}' for {pq_file}. No rows to filter.")
                df_filtered = df.copy()
            else:
                # Print some stats about what we found
                total_chunks = len(chunk_sizes)
                chunks_under_100 = (chunk_sizes < 100).sum()
                chunks_over_100 = (chunk_sizes > 100).sum()
                chunks_exactly_100 = (chunk_sizes == 100).sum()

                print(f"  Found {total_chunks} total chunks:")
                print(f"    {chunks_under_100} chunks < 100 rows (will drop)")
                print(f"    {chunks_exactly_100} chunks = 100 rows (will keep)")
                print(f"    {chunks_over_100} chunks > 100 rows (will truncate)")

                # Keep only chunks that have exactly 100 or more rows, then truncate to last 100
                # This is more efficient than the old loop approach
                valid_chunks = chunk_sizes[chunk_sizes >= 100].index

                if len(valid_chunks) == 0:
                    print("  No valid chunks found, creating empty dataframe")
                    df_filtered = pd.DataFrame(columns=df.columns)
                else:
                    # Vectorized approach - much faster than loops
                    df_filtered = (df[df[target_column].isin(valid_chunks)]
                                   .groupby(target_column, group_keys=False)
                                   .tail(100)  # Keep last 100 rows per group
                                   .reset_index(drop=True))

            filtered_rows = len(df_filtered)
            dropped_rows = initial_rows - filtered_rows

            if dropped_rows > 0:
                print(f"Filtered: Dropped {dropped_rows} rows ({initial_rows} -> {filtered_rows}).")
            else:
                print("Filtered: No rows dropped.")

            # Always save if we had data to begin with
            if initial_rows > 0:
                print(f"Overwriting {pq_file} with filtered data...")
                df_filtered.to_parquet(file_path,
                                      engine='pyarrow',
                                      compression='snappy')
                print(f"Successfully saved filtered {pq_file}")
            else:
                print("File was empty initially, no file written.")

        except Exception as e:
            print(f"Error processing {pq_file}: {e}")

print("\nProcessing complete.")

In [None]:
# aligns pairs of "log_sample" and "fwdsampler" files based on matching timestamps, ensuring that for each 100-row chunk in a logbook file, there's a corresponding timestamp in the fwdsampler file. If a logbook chunk doesn't have a matching timestamp in the fwdsampler file (specifically for its 100th row), that entire logbook chunk is removed, and the corresponding rows in the fwdsampler file are also removed.

import os
import pandas as pd
import pyarrow.parquet as pq
import numpy as np

# Define the directory containing the Parquet files
parquet_dir = 'file path'

chunk_id_column = 'first_tp' #  'first_tp' identifies the chunks

# The column that counts the position within each chunk (should reach 100 for the last row of a full chunk)
counter_column = 'counter'

# The timestamp column name
timestamp_column = 'time'

#  files are named YYYYMMDD.log_sample.parquet and YYYYMMDD.fwdsampler.parquet

# 1) Group files by date prefix
files_by_date = {}
if os.path.exists(parquet_dir):
    for fname in os.listdir(parquet_dir):
        if not fname.endswith('.parquet'):
            continue
        parts = fname.split('.')
        if len(parts) < 3:
             continue
        date_str = parts[0]
        file_type = parts[1]

        if len(date_str) == 8 and file_type in ['log_sample', 'fwdsampler']:
             date_path = files_by_date.setdefault(date_str, {})
             date_path[file_type] = os.path.join(parquet_dir, fname)
else:
     print(f"Error: Directory not found: {parquet_dir}")
     files_by_date = {}

print(f"Found {len(files_by_date)} dates with files in {parquet_dir}")

# print(f"\nAligning Logbook chunks ({chunk_id_column}/{counter_column}) with Fwdsampler timestamps...")

# 2) Iterate through dates and process pairs
for date_str, paths in sorted(files_by_date.items()):
    log_path = paths.get('log_sample')
    fwd_path = paths.get('fwdsampler')

    if not log_path or not fwd_path:
        print(f"\nSkipping date {date_str}: Missing log_sample or fwdsampler file.")
        continue

    # Flag to track if either file is modified for this date
    modified_this_date = False
    dropped_log_rows = 0 # Initialize dropped counts
    dropped_fwd_rows = 0

    try:
        df_log = pd.read_parquet(log_path)
        initial_log_rows = len(df_log)

        df_fwd = pd.read_parquet(fwd_path)
        initial_fwd_rows = len(df_fwd)

        # Check required columns in logbook
        missing_log_cols = [col for col in [chunk_id_column, counter_column, timestamp_column] if col not in df_log.columns]
        if missing_log_cols:
            print(f"  Warning: Logbook missing required columns {missing_log_cols} in {os.path.basename(log_path)} for date {date_str}. Skipping alignment for this date.")
            continue

        # Check required columns in fwdsampler
        if timestamp_column not in df_fwd.columns:
             print(f"  Warning: Fwdsampler missing '{timestamp_column}' column in {os.path.basename(fwd_path)} for date {date_str}. Skipping alignment for this date.")
             continue


        #  Determine Logbook Timestamps to Match

        # Filter logbook to find the rows where the counter column is 100
        # Also check that these rows belong to chunks that have exactly 100 rows in total
        chunk_counts = df_log[chunk_id_column].value_counts()
        valid_chunks_by_count = chunk_counts[chunk_counts == 100].index.tolist()

        # Filter logbook rows: counter == 100 AND chunk ID is in the list of 100-row chunks
        log_100th_rows = df_log[
            (df_log[counter_column] == 100) &
            (df_log[chunk_id_column].isin(valid_chunks_by_count))
        ].copy()

        if log_100th_rows.empty:
             # No 100th rows found for 100-row chunks → no timestamps to match → no chunks to keep
             chunk_ids_with_matching_fwd_ts = set()
             fwd_timestamps_to_keep = set() # No timestamps to match
        else:
            # Extract the timestamps from these 100th rows
            # Try the new merge approach - should be faster than the old set operations
            timestamp_matches = pd.merge(
                log_100th_rows[[chunk_id_column, timestamp_column]].drop_duplicates(),
                df_fwd[[timestamp_column]].drop_duplicates(),
                on=timestamp_column,
                how='inner'
            )

            chunk_ids_with_matching_fwd_ts = set(timestamp_matches[chunk_id_column])
            fwd_timestamps_to_keep = set(timestamp_matches[timestamp_column])

            # debug: let's see how many matches we got
            # print(f"  Debug: Found {len(timestamp_matches)} timestamp matches")

        # Filter the original logbook DataFrame based on matching chunk IDs
        df_log_aligned = df_log[
            df_log[chunk_id_column].isin(chunk_ids_with_matching_fwd_ts)
        ].copy()
        aligned_log_rows = len(df_log_aligned)
        dropped_log_rows = initial_log_rows - aligned_log_rows

        # Filter the fwdsampler DataFrame based on the matched timestamps
        df_fwd_aligned = df_fwd[
            df_fwd[timestamp_column].isin(fwd_timestamps_to_keep)
        ].copy()
        aligned_fwd_rows = len(df_fwd_aligned)
        dropped_fwd_rows = initial_fwd_rows - aligned_fwd_rows

        # Determine if any modifications were made
        modified_this_date = (dropped_log_rows > 0) or (dropped_fwd_rows > 0)

        # Print Summary and Overwrite if Modified
        if modified_this_date:
             print(f"\nProcessing date {date_str}: Aligning {os.path.basename(log_path)} and {os.path.basename(fwd_path)}.")
             # Print relevant info if modifications happened
             print(f"  Found {len(chunk_counts)} unique {chunk_id_column} values initially.")
             print(f"  Found {len(valid_chunks_by_count)} chunks with exactly 100 rows in logbook initially.")
             print(f"  Identified {len(chunk_ids_with_matching_fwd_ts)} chunks whose 100th row timestamp matched a fwdsampler timestamp.")


             # Logbook modification details
             if dropped_log_rows > 0:
                 print(f"  Logbook initial rows: {initial_log_rows}")
                 print(f"  Aligned: Dropped {dropped_log_rows} rows ({initial_log_rows} -> {aligned_log_rows}) from {os.path.basename(log_path)}.")
                 print(f"  Overwriting {os.path.basename(log_path)} with aligned data...")
                 df_log_aligned.to_parquet(log_path,
                                          engine='pyarrow',
                                          compression='snappy')
                 print(f"  Successfully saved aligned {os.path.basename(log_path)}")
             else:
                  print(f"  Logbook ({os.path.basename(log_path)}) required no row dropping based on chunk timestamp alignment.") # Indicate no logbook change


             # Fwdsampler modification details
             if dropped_fwd_rows > 0:
                 print(f"  Fwdsampler initial rows: {initial_fwd_rows}")
                 print(f"  Aligned: Dropped {dropped_fwd_rows} rows ({initial_fwd_rows} -> {aligned_fwd_rows}) from {os.path.basename(fwd_path)} to match kept logbook chunk 100th timestamps.")
                 print(f"  Overwriting {os.path.basename(fwd_path)} with aligned data...")
                 df_fwd_aligned.to_parquet(fwd_path,
                                          engine='pyarrow',
                                          compression='snappy')
                 print(f"  Successfully saved aligned {os.path.basename(fwd_path)}")
             else:
                  print(f"  Fwdsampler ({os.path.basename(fwd_path)}) required no row dropping based on logbook chunk timestamp alignment.") # Indicate no fwdsampler change

        else:
             # Print a single message if neither file was modified for this date
             print(f"\nDate {date_str}: No modifications needed for {os.path.basename(log_path)} and {os.path.basename(fwd_path)} based on chunk timestamp alignment.")


    except Exception as e:
        print(f"  Error processing date {date_str}: {e}")

print("\nChunk timestamp alignment process complete.")



In [None]:
#file sizes comparison
import os
import pandas as pd
import pyarrow.parquet as pq

# Define the directory containing the cleaned and aligned Parquet files
parquet_dir = 'file path'

# 1) Group files by date prefix
files_by_date = {}
if os.path.exists(parquet_dir):
    for fname in os.listdir(parquet_dir):
        if not fname.endswith('.parquet'):
            continue
        parts = fname.split('.')


        date_str = parts[0]
        # The type is the part before .parquet, e.g., log_sample or fwdsampler
        file_type = parts[1]




print(f"Found {len(files_by_date)} dates with files in {parquet_dir}")


print("\nChecking Logbook vs Fwdsampler Row Counts...")

# 2) Iterate through dates and print row counts side-by-side
# Sort by date string for consistent output
for date_str, paths in sorted(files_by_date.items()):
    log_path = paths.get('log_sample')
    fwd_path = paths.get('fwdsampler')

    log_rows = "N/A" # Initialize counts as N/A
    fwd_rows = "N/A"

    if log_path and os.path.exists(log_path):
        try:
            # Use pyarrow.parquet to efficiently get row count from metadata
            log_pf = pq.ParquetFile(log_path)
            log_rows = log_pf.metadata.num_rows
        except Exception as e:
            log_rows = f"Error ({e})"

    if fwd_path and os.path.exists(fwd_path):
         try:
            # Use pyarrow.parquet to efficiently get row count from metadata
            fwd_pf = pq.ParquetFile(fwd_path)
            fwd_rows = fwd_pf.metadata.num_rows
         except Exception as e:
            fwd_rows = f"Error ({e})"

    # Print the results for the date
    print(f"  Date {date_str}: Log rows: {log_rows}, Fwd rows: {fwd_rows}")

print("\nRow count check complete.")

In [None]:
"""
-----------------------------------------------------------
ONE-PASS loader that builds **aligned** book-and-tick tensors
and the target vector straight from the cleaned  *.parquet files.

 Normalises prices by (price – fwd.mid) / fwd.mid
• Normalises sizes and trade qty by (value / totalDepth@t=99)
• Streams Parquet in ≤200 k-row batches → fits 15 M rows on 32 GB RAM
• Drops any window whose row-99 ‘time’ key is missing in the
  forward-sampler file (rare data glitches)
-----------------------------------------------------------
Usage
------


"""

from __future__ import annotations
from pathlib import Path
from collections import defaultdict
import numpy as np
import pyarrow.dataset as ds
import torch, gc, warnings

import numpy as np, torch, pyarrow.dataset as ds, gc
from pathlib import Path

PRICE_COLS = [f"{s}_price{i}" for s in ("ask","bid") for i in range(10)]
SIZE_COLS  = [f"{s}_size{i}"  for s in ("ask","bid") for i in range(10)]
LOB_COLS   = PRICE_COLS + SIZE_COLS
TICK_RAW   = ["tick_type", "side", "price", "quantity", "old_price"]
ALL_COLS   = ["time", "first_tp"] + LOB_COLS + TICK_RAW
TT = {"NEW":0,"MODIFY":1,"TRADE":2,"CANCEL":3}
SD = {"Buy":0,"Sell":1}

def day_fast(date:str, raw_dir:Path, batch_rows=200_000):
    log_p = raw_dir/f"{date}.log_sample.parquet"
    fwd_p = raw_dir/f"{date}.fwdsampler.parquet"

    fwd = ds.dataset(fwd_p, format="parquet").to_table().to_pandas()[["time","mid","mid_10.000s"]]
    fwd["target"] = (fwd["mid_10.000s"]-fwd["mid"])/fwd["mid"]
    fwd_lookup = fwd.set_index("time")
    valid_times = fwd_lookup.index.to_numpy()

    scanner = ds.dataset(log_p, format="parquet").scanner(columns=ALL_COLS,
                                                          batch_size=batch_rows)

    rows_out, tgt_out = [], []

    for batch in scanner.to_batches():
        df = batch.to_pandas()

        # ------- drop rows whose *last* time not in fwd (cheap pre-filter)
        #mask_keep = np.isin(df["time"].values, valid_times, assume_unique=False)
        #df = df.loc[mask_keep]

        #if df.empty: continue

        # ------- block index 0-99 per time key
        df["idx"] = df.groupby("first_tp").cumcount()
        complete  = df["idx"]==99
        ref_rows  = df.loc[complete, ["first_tp", "time"] + SIZE_COLS]
        # merge ref_rows with fwd mid / target
        ref = ref_rows.merge(fwd_lookup, left_on="time", right_index=True, how="inner")
        if ref.empty:
          print(f"DROPPED BATCH: {len(ref_rows)} complete windows")
          print(f"Sample ref times: {ref_rows['time'].head().values}")
          print(f"Sample fwd times: {fwd_lookup.index[:5].values}")   # no full blocks in this chunk
          continue

        # -------- compute per-row broadcast constants -----------------
        mid = ref["mid"].to_numpy(np.float32)              # (W,)
        vol = ref[SIZE_COLS].to_numpy(np.float32).reshape(-1,20).sum(1)
        vol[vol<=0] = 1.0

        # index array of 100-row slices we keep
        keep_ftps  = ref["first_tp"].values
        mask_win   = np.isin(df["first_tp"].values, keep_ftps, assume_unique=False)
        block = df.loc[mask_win].sort_values(["first_tp","idx"])

        # reshape to (W,100,…) using np.column_stack → much faster than loops
        W = len(keep_ftps)
        book = block[LOB_COLS].to_numpy(np.float32).reshape(W,100,40)
        tick = np.zeros((W,100,9), dtype=np.float32)

        # ---------- vectorised tick encoding ---------------------------
        # ---------- vectorised tick encoding (safe) ----------------------------
        tt = block["tick_type"].map(TT).fillna(-1).astype(np.int8).values
        sd = block["side"     ].map(SD).fillna(-1).astype(np.int8).values
        tt = tt.reshape(W, 100)
        sd = sd.reshape(W, 100)

        tick[:] = 0.0                             # ensure zero-fill
        # ---------- one-hot NEW / MODIFY / TRADE / CANCEL -----------------
        mask_tt = tt >= 0
        if mask_tt.any():
           w_idx, t_idx = np.nonzero(mask_tt)          # same length L
           tick[w_idx, t_idx, tt[w_idx, t_idx]] = 1.0

# ---------- one-hot Buy / Sell ------------------------------------
        mask_sd = sd >= 0
        if mask_sd.any():
           w_idx, t_idx = np.nonzero(mask_sd)
           tick[w_idx, t_idx, 4 + sd[w_idx, t_idx]] = 1.0



        col = lambda c: block[c].to_numpy(np.float32).reshape(W,100)
        tick[:,:,6] = col("price")
        tick[:,:,7] = col("quantity")
        tick[:,:,8] = col("old_price")

        # ---------- reference normalisation ----------------------------
        mid_bc = mid[:,None,None]             # (W,1,1)
        vol_bc = vol[:,None,None]

        book[:,:,:20] = (book[:,:,:20] - mid_bc) / mid_bc
        book[:,:, 20:] = book[:,:,20:] / vol_bc

        mask_old = tick[:,:,8]!=0
        mid_plane = np.repeat(mid[:, None], 100, axis=1)   # (W,100)  not (W,1)
              # (W,1)  → will broadcast to (W,100)

        tick[:, :, 6] = (tick[:, :, 6] - mid_plane) / mid_plane     # broadcasts OK
        mask_old = tick[:, :, 8] != 0
        tick[:, :, 8][mask_old] = (tick[:, :, 8][mask_old] - mid_plane[mask_old]) / mid_plane[mask_old]
        tick[:, :, 7] /= vol[:, None]


        rows_out.append(np.concatenate([book,tick],axis=2))  # (W,100,49)
        tgt_out.append(ref["target"].to_numpy(np.float32))

        del df, block, book, tick, ref, mid, vol
        gc.collect()

    X = torch.from_numpy(np.concatenate(rows_out,0))
    y = torch.from_numpy(np.concatenate(tgt_out,0))
    return X, y


# ───────────────────────────────────────── multi-day convenience wrapper ──
def dates_to_dataset(dates: list[str], raw_dir: str | Path,
                     gpu: bool = False
                     ) -> tuple[torch.Tensor, torch.Tensor]:
    """
    Load*-normalise*-stack several days in order.
    `gpu=True` → final tensors moved to CUDA with non-blocking copy.
    """
    raw_dir = Path(raw_dir)
    xs, ys  = [], []
    for d in dates:
        print(f"⋯ {d}", end="", flush=True)
        Xi, yi = day_fast(d, RAW_DIR)

        xs.append(Xi); ys.append(yi)
        print(f"  ✓ {Xi.size(0):,} windows")
    X = torch.cat(xs, 0)
    y = torch.cat(ys, 0)
    if gpu and torch.cuda.is_available():
        X = X.cuda(non_blocking=True)
        y = y.cuda(non_blocking=True)
    return X, y

# ───────────────────────────────────────────────────────────── quick demo ──
if __name__ == "__main__":
    RAW_DIR      = Path("/content/drive/MyDrive/new_data_parquet")
    TRAIN_DATES  = sorted({p.name[:8] for p in RAW_DIR.glob("*.log_sample.parquet")})[:-6]
    TEST_DATES   = sorted({p.name[:8] for p in RAW_DIR.glob("*.log_sample.parquet")})[-6:]

    print("Loading train …")
    X_train, y_train = dates_to_dataset(TRAIN_DATES, RAW_DIR, gpu=False)
    print("Loading test  …")
    X_test , y_test  = dates_to_dataset(TEST_DATES , RAW_DIR, gpu=False)

    print("\nFinal shapes")
    print("X_train", X_train.shape, "y_train", y_train.shape)
    print("X_test ", X_test.shape , "y_test ", y_test.shape)


In [None]:
import torch, gc

def add_cross_and_normalise(X_train, X_test):
    """
    X_train : (N_train, 100, 49)  – already price/size-normalised
    X_test  : (N_test, 100, 49)
    returns
        X_train_z : (N_train, 100, 69)
        X_test_z  : (N_test, 100, 69)
    """

    # ----- 1. build cross-products ------------------------------------
    def cross_block(X):
        # slices
        ask_p  = X[...,  :10]     # (N,100,10)
        bid_p  = X[..., 10:20]
        ask_sz = X[..., 20:30]
        bid_sz = X[..., 30:40]

        cross_1 = ask_p * bid_sz  # askPriceᵢ × bidSizeᵢ
        cross_2 = bid_p * ask_sz  # bidPriceᵢ × askSizeᵢ
        return torch.cat([X, cross_1, cross_2], dim=-1)   # (N,100,69)

    X_train_aug = cross_block(X_train)
    X_test_aug  = cross_block(X_test)

    # ----- 2. Z-score statistics from TRAIN only ----------------------
    # flatten (N,100) for mean/var per channel
    mean = X_train_aug.mean(dim=(0,1), keepdim=True)      # (1,1,69)
    std  = X_train_aug.std (dim=(0,1), keepdim=True).clamp_min(1e-6)

    # ----- 3. apply ---------------------------------------------------
    X_train_z = (X_train_aug - mean) / std
    X_test_z  = (X_test_aug  - mean) / std


    del X_train_aug, X_test_aug
    gc.collect()

    return X_train_z, X_test_z
