In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pandas as pd
import numpy as np
import gc
from datetime import datetime
import os

# --- CONFIGURATION ---
# UPDATE THESE PATHS TO MATCH YOUR GCS BUCKET
INPUT_FILE = "/content/drive/MyDrive/bitmex_incremental_book_L2.csv"
OUTPUT_DIR = "/content/drive/MyDrive/"
CHUNK_SIZE = 500000  # Process 500k rows at a time to save RAM

def calculate_hourly_volatility(file_path):
    """
    Scans the file to calculate volatility (std dev of returns) for every hour.
    """
    print(f"üîç Scanning {file_path} to analyze Market Regimes...")

    hourly_stats = {}

    # We only need timestamp and price for this pass
    cols = ['timestamp', 'price']

    # Iterate through file
    for chunk in pd.read_csv(file_path, compression='gzip', usecols=cols, chunksize=CHUNK_SIZE):
        # Convert microsecond timestamp to datetime
        chunk['datetime'] = pd.to_datetime(chunk['timestamp'], unit='us')
        chunk['hour_key'] = chunk['datetime'].dt.floor('h') # Group by Hour

        # Calculate Log Returns: ln(Pt / Pt-1)
        # (This is the standard financial metric for volatility)
        chunk['price'] = chunk['price'].replace(0, np.nan).ffill() # Handle zero prices
        chunk['log_ret'] = np.log(chunk['price'] / chunk['price'].shift(1)).fillna(0)

        # Group by hour and store sum of squared returns (for variance/std calculation)
        # We aggregate logic here to handle chunks spanning across hours
        grouped = chunk.groupby('hour_key')['log_ret'].agg(['std', 'count'])

        for timestamp, row in grouped.iterrows():
            if timestamp not in hourly_stats:
                hourly_stats[timestamp] = []
            hourly_stats[timestamp].append(row['std'])

    print("‚úÖ Scan complete. Aggregating stats...")

    # Average the std dev across chunks for the same hour (simplified approximation)
    final_volatility = {}
    for ts, stds in hourly_stats.items():
        # Filter out NaN
        valid_stds = [s for s in stds if not np.isnan(s)]
        if valid_stds:
            final_volatility[ts] = np.mean(valid_stds)

    return pd.Series(final_volatility).sort_index()

def split_dataset(input_path, output_dir, volatility_series):
    """
    Splits the main dataset into 3 files based on the volatility thresholds.
    """
    # 1. Define Thresholds (Quantiles)
    # Bottom 33% = Quiet, Middle 33% = Normal, Top 33% = Volatile
    q33 = volatility_series.quantile(0.33)
    q66 = volatility_series.quantile(0.66)

    print(f"\nüìä REGIME THRESHOLDS:")
    print(f"   üü¢ Quiet (Low Vol):   < {q33:.6f}")
    print(f"   üü° Normal (Med Vol):  {q33:.6f} - {q66:.6f}")
    print(f"   üî¥ Volatile (High):   > {q66:.6f}")

    # Map every hour to a label: 0=Quiet, 1=Normal, 2=Volatile
    hour_to_regime = {}
    for ts, vol in volatility_series.items():
        if vol < q33:
            hour_to_regime[ts] = 'quiet'
        elif vol < q66:
            hour_to_regime[ts] = 'normal'
        else:
            hour_to_regime[ts] = 'volatile'

    print(f"\nüöÄ Starting Split Process into {output_dir}...")

    # Prepare Output Paths
    paths = {
        'quiet': f"{output_dir}regime_quiet.csv",
        'normal': f"{output_dir}regime_normal.csv",
        'volatile': f"{output_dir}regime_volatile.csv"
    }

    # Initialize files (write headers)
    first_chunk = True

    for chunk in pd.read_csv(input_path, compression='gzip', chunksize=CHUNK_SIZE):
        # Calculate Hour Key
        temp_dt = pd.to_datetime(chunk['timestamp'], unit='us')
        chunk['hour_key'] = temp_dt.dt.floor('h')

        # Map the regime
        chunk['regime'] = chunk['hour_key'].map(hour_to_regime)

        # Drop helper columns if you don't want them in final data
        chunk = chunk.drop(columns=['hour_key'])

        # Split and Append
        for regime_name, path in paths.items():
            subset = chunk[chunk['regime'] == regime_name]

            if not subset.empty:
                # Remove the 'regime' column before saving (optional)
                save_data = subset.drop(columns=['regime'])

                # If first time, write header. Else append.
                mode = 'w' if first_chunk else 'a'
                header = first_chunk

                # Note: GCS doesn't support 'append' mode easily in pandas.
                # If writing to GCS directly, we usually write local temp files then upload.
                # FOR GCS: We will append to local files, then upload at the end.
                local_path = f"{regime_name}_temp.csv"
                save_data.to_csv(local_path, mode=mode, header=header, index=False)

        first_chunk = False
        print(f".", end="", flush=True)

    print("\n\n‚úÖ Split complete. Uploading to Cloud Storage...")

    # Upload local temp files to GCS
    import shutil
    from google.cloud import storage

    # Simple upload logic (assuming auth is set)
    # Alternatively use gsutil cp via os.system
    for regime_name, gcs_path in paths.items():
        local_filename = f"{regime_name}_temp.csv"
        if os.path.exists(local_filename):
            print(f"   Uploading {local_filename} -> {gcs_path}")
            os.system(f"gsutil cp {local_filename} {gcs_path}")
            os.remove(local_filename) # Cleanup

if __name__ == "__main__":
    # 1. Calculate Volatility Map
    vol_series = calculate_hourly_volatility(INPUT_FILE)

    # 2. Split and Save
    split_dataset(INPUT_FILE, OUTPUT_DIR, vol_series)

üîç Scanning /content/drive/MyDrive/bitmex_incremental_book_L2.csv to analyze Market Regimes...
‚úÖ Scan complete. Aggregating stats...

üìä REGIME THRESHOLDS:
   üü¢ Quiet (Low Vol):   < 0.431036
   üü° Normal (Med Vol):  0.431036 - 0.460932
   üî¥ Volatile (High):   > 0.460932

üöÄ Starting Split Process into /content/drive/MyDrive/...
.......................................

‚úÖ Split complete. Uploading to Cloud Storage...
   Uploading quiet_temp.csv -> /content/drive/MyDrive/regime_quiet.csv
   Uploading normal_temp.csv -> /content/drive/MyDrive/regime_normal.csv
   Uploading volatile_temp.csv -> /content/drive/MyDrive/regime_volatile.csv
