# Import Libraries

In [1]:
import pandas as pd
import glob
import os
import re
from datetime import time, timedelta
import numpy as np
import warnings

warnings.filterwarnings('ignore')

In [2]:
DATA_DIR = '../../data'

# List of quarters designated as "In-Sample"
IN_SAMPLE_QUARTERS = [
    "2023_Q1", "2023_Q3", "2023_Q4",
    "2024_Q2", "2024_Q4",
    "2025_Q1", "2025_Q2"
]

# Time Zone Setting (CET/CEST)
# "Europe/Warsaw" or "Europe/Paris" handles CET and DST (CEST) correctly.
TARGET_TZ = 'Europe/Warsaw'

In [3]:
# ---------------------------------------------------------
# 2. Data Loading Function
# ---------------------------------------------------------

def load_project_data(data_dir, assume_utc=True):
    """
    Loads Parquet files, manages timezone conversion strictly, and adds metadata.
    """
    file_paths = glob.glob(os.path.join(data_dir, "*.parquet"))

    if not file_paths:
        print(f"[Warning] No .parquet files found in {data_dir}")
        return {}

    data_store = {
        "group1": {},
        "group2": {}
    }

    print(f"Found {len(file_paths)} files. Loading...")

    for path in file_paths:
        filename = os.path.basename(path)
        match = re.match(r"(data[12])_(\d{4})_(Q\d)\.parquet", filename)

        if match:
            raw_group = match.group(1)
            group_id = "group" + raw_group[-1]
            year = match.group(2)
            quarter = match.group(3)
            quarter_id = f"{year}_{quarter}"

            try:
                # 1. Load Data
                df = pd.read_parquet(path)

                # 2. Set 'datetime' column as Index
                if 'datetime' in df.columns:
                    df = df.set_index('datetime')

                # 3. Ensure Index is Datetime Object
                df.index = pd.to_datetime(df.index)

                # 4. Timezone Conversion
                if df.index.tz is None:
                    if assume_utc:
                        # UTC -> CET -> Naive
                        df.index = df.index.tz_localize('UTC').tz_convert(TARGET_TZ).tz_localize(None)
                else:
                    # Aware -> CET -> Naive
                    df.index = df.index.tz_convert(TARGET_TZ).tz_localize(None)

                # 5. Sort by Time
                df = df.sort_index()

                # 6. Add Metadata
                df['Quarter_ID'] = quarter_id
                df['Is_In_Sample'] = quarter_id in IN_SAMPLE_QUARTERS

                data_store[group_id][quarter_id] = df

            except Exception as e:
                print(f"Error loading {filename}: {e}")

    return data_store

In [4]:
# ---------------------------------------------------------
# 3. Helper Function to Combine Data
# ---------------------------------------------------------

def combine_quarters(data_store, group_id, only_in_sample=True):
    if group_id not in data_store:
        return pd.DataFrame()

    quarters_dict = data_store[group_id]
    df_list = []

    for q_id, df in quarters_dict.items():
        if only_in_sample and not df['Is_In_Sample'].iloc[0]:
            continue
        df_list.append(df)

    if df_list:
        combined_df = pd.concat(df_list)
        combined_df = combined_df.sort_index()
        return combined_df
    else:
        return pd.DataFrame()

In [5]:
# ---------------------------------------------------------
# 4. Execution Example
# ---------------------------------------------------------

def apply_common_assumptions(df, group_type):
    """
    Applies trading rules, session times, and exclusion zones.
    """
    if df.empty: return df

    # Initialize flags
    df['can_trade'] = False
    df['force_exit'] = False

    t = df.index.time

    # =========================================================
    # GROUP 1: SP, NQ (1-min)
    # =========================================================
    if group_type == "group1":
        # 1. Calculation Exclusion (Set prices to NaN)
        mask_morning_nan = (t >= time(9, 31)) & (t <= time(9, 40))
        mask_evening_nan = (t >= time(15, 51)) & (t <= time(16, 0))

        cols_to_nan = [c for c in df.columns if c not in ['Quarter_ID', 'Is_In_Sample', 'can_trade', 'force_exit']]
        df.loc[mask_morning_nan | mask_evening_nan, cols_to_nan] = np.nan

        # 2. Trading Window (09:56 - 15:40)
        trade_start = time(9, 56)
        exit_time = time(15, 40)

        mask_trade = (t >= trade_start) & (t <= exit_time)
        df.loc[mask_trade, 'can_trade'] = True

        # 3. Force Exit
        df.loc[t == exit_time, 'force_exit'] = True

        # Ensure cannot trade after exit
        df.loc[t > exit_time, 'can_trade'] = False

    # =========================================================
    # GROUP 2: FX/Metals (5-min)
    # =========================================================
    elif group_type == "group2":
        # 1. Row Removal (Gap Creation)
        keep_mask = (t < time(16, 50)) | (t >= time(18, 10))
        df = df.loc[keep_mask].copy()

        # Re-index time after removal
        t = df.index.time

        # 2. Trading Window
        df['can_trade'] = True

        # 3. Force Exit (16:45)
        df.loc[t == time(16, 45), 'force_exit'] = True

        # 4. Restart Logic (handled by return calculation gap check)

    return df

In [6]:
# ---------------------------------------------------------
# 5. Return & Volatility Function (Session-based Grouping)
# ---------------------------------------------------------

def add_return_and_volatility(df, assets, interval_min, vol_window=20):
    """
    Adds Log Returns and Annualized Volatility.
    Uses 'Session ID' based on time gaps to robustly handle breaks and day boundaries.
    """
    df = df.copy()

    if df.empty: return df

    # 1. Dynamic Annualization Factor
    daily_counts = df.groupby(df.index.date).size()
    bars_per_day_mode = daily_counts.mode()[0] if not daily_counts.empty else (24*60/interval_min)
    annual_factor = np.sqrt(bars_per_day_mode * 252)

    print(f"Stats for {assets}:")
    print(f"  - Freq: {interval_min} min")
    print(f"  - Bars/Day (Mode): {bars_per_day_mode}")
    print(f"  - Annual Factor: {annual_factor:.2f}")

    # 2. Create Session ID based on Time Gaps
    # This detects overnight gaps AND intraday breaks (Group 2 17:00-18:00)
    # Threshold: slightly larger than the interval (e.g., interval + 30s tolerance)
    time_diff = df.index.to_series().diff()
    max_gap = pd.Timedelta(minutes=interval_min) + pd.Timedelta(seconds=30)

    # A new session starts if time difference > max_gap
    # First row is always a new session (fillna)
    is_new_session = (time_diff > max_gap).fillna(True)
    session_id = is_new_session.cumsum()

    # 3. Calculate Returns & Volatility
    for asset in assets:
        if asset not in df.columns:
            continue

        # --- A. Log Return Calculation (Session-based) ---
        # Group by session_id prevents calculating returns across gaps/breaks/nights.
        df[f'{asset}_rtn'] = df.groupby(session_id)[asset].apply(
            lambda x: np.log(x / x.shift(1))
        )

        # --- B. Volatility Calculation ---
        # Calculate volatility within each continuous session
        df[f'{asset}_vol'] = df.groupby(session_id)[f'{asset}_rtn'].transform(
            lambda x: x.rolling(window=vol_window).std()
        ) * annual_factor

    return df

In [9]:
# ---------------------------------------------------------
# 5. Return & Volatility Function (Session-based Grouping)
# ---------------------------------------------------------

def add_return_and_volatility(df, assets, interval_min, vol_window=20):
    """
    Adds Log Returns and Annualized Volatility.
    Uses 'Session ID' based on time gaps.
    """
    df = df.copy()
    if df.empty: return df

    # 1. Dynamic Annualization Factor
    daily_counts = df.groupby(df.index.date).size()
    bars_per_day_mode = daily_counts.mode()[0] if not daily_counts.empty else (24*60/interval_min)
    annual_factor = np.sqrt(bars_per_day_mode * 252)

    print(f"  [Stats] Freq: {interval_min}m | Bars/Day: {bars_per_day_mode} | AnnFactor: {annual_factor:.2f}")

    # 2. Create Session ID based on Time Gaps
    time_diff = df.index.to_series().diff()
    max_gap = pd.Timedelta(minutes=interval_min) + pd.Timedelta(seconds=30)
    is_new_session = (time_diff > max_gap).fillna(True)
    session_id = is_new_session.cumsum()

    # 3. Calculate Returns & Volatility
    for asset in assets:
        if asset not in df.columns:
            continue

        # Log Return (Session-based)
        df[f'{asset}_rtn'] = df.groupby(session_id)[asset].transform(
            lambda x: np.log(x / x.shift(1))
        )

        # Volatility
        df[f'{asset}_vol'] = df.groupby(session_id)[f'{asset}_rtn'].transform(
            lambda x: x.rolling(window=vol_window).std()
        ) * annual_factor

    return df

In [10]:
# ---------------------------------------------------------
# 6. Main Execution Block
# ---------------------------------------------------------

def main():
    print("=== Starting Data Processing pipeline ===")

    # 1. Load Data
    # assume_utc=True: Naive timestamps are treated as UTC, then converted to CET
    raw_data = load_project_data(DATA_DIR, assume_utc=True)

    if not raw_data["group1"] and not raw_data["group2"]:
        print("No data loaded. Exiting.")
        return

    # -----------------------------------------------------
    # Process Group 1 (SP, NQ)
    # -----------------------------------------------------
    print("\n--- Processing Group 1 (SP, NQ) ---")
    df_g1 = combine_quarters(raw_data, "group1", only_in_sample=True)

    if not df_g1.empty:
        # Step A: Apply Rules
        df_g1_clean = apply_common_assumptions(df_g1.copy(), "group1")

        # Step B: Add Metrics (Overwriting df_g1_clean to include metrics)
        df_g1_clean = add_return_and_volatility(
            df_g1_clean,
            assets=['SP', 'NQ'],
            interval_min=1
        )
        print(f"Group 1 processed shape: {df_g1_clean.shape}")

    # -----------------------------------------------------
    # Process Group 2 (FX/Metals)
    # -----------------------------------------------------
    print("\n--- Processing Group 2 (FX, Metals) ---")
    df_g2 = combine_quarters(raw_data, "group2", only_in_sample=True)

    if not df_g2.empty:
        # Step A: Apply Rules
        df_g2_clean = apply_common_assumptions(df_g2.copy(), "group2")

        # Identify assets
        g2_assets = ['AUD', 'CAD', 'XAU', 'XAG']
        existing_assets = [c for c in g2_assets if c in df_g2_clean.columns]

        # Step B: Add Metrics (Overwriting df_g2_clean to include metrics)
        df_g2_clean = add_return_and_volatility(
            df_g2_clean,
            assets=existing_assets,
            interval_min=5
        )
        print(f"Group 2 processed shape: {df_g2_clean.shape}")

    # -----------------------------------------------------
    # Save Output (User Requested Format)
    # -----------------------------------------------------
    print("\n--- Saving Data ---")

    # Save processed dataframes to pickle files for the next step (EDA)
    # This preserves column types and indices perfectly.

    if 'df_g1_clean' in locals() and not df_g1_clean.empty:
        df_g1_clean.to_pickle('df_g1_processed.pkl')
        print("Saved: df_g1_processed.pkl")

    if 'df_g2_clean' in locals() and not df_g2_clean.empty:
        df_g2_clean.to_pickle('df_g2_processed.pkl')
        print("Saved: df_g2_processed.pkl")

    print("Data saved successfully to .pkl files.")

if __name__ == "__main__":
    main()

=== Starting Data Processing pipeline ===
Found 14 files. Loading...

--- Processing Group 1 (SP, NQ) ---
  [Stats] Freq: 1m | Bars/Day: 390 | AnnFactor: 313.50
Group 1 processed shape: (175812, 10)

--- Processing Group 2 (FX, Metals) ---
  [Stats] Freq: 5m | Bars/Day: 272 | AnnFactor: 261.81
Group 2 processed shape: (122630, 16)

--- Saving Data ---
Saved: df_g1_processed.pkl
Saved: df_g2_processed.pkl
Data saved successfully to .pkl files.
