## Data Preprocessing
#### Hold-on split -- 80% + 20%

In [18]:
# erdos_src/data_processing.py

import os
import pandas as pd

class CFG:
    # --- Data & Feature Parameters ---
    COIN_ID_COLUMN        = 'coin_id'
    TIMESTAMP_COLUMN      = 'timestamp'
    TARGET_COLUMN         = 'target_direction'
    #PREDICTION_HORIZON_MINS = 10   # predict 10 minutes into the future

    # --- Splitting & CV Parameters ---
    TRAIN_RATIO           = 0.8    # 80% train, 20% test
    SPLIT_ROUND_FREQUENCY = 'month'  # 'month', 'day', or '' for no rounding
    CV_SPLITS             = 5


def load_data(path: str) -> pd.DataFrame:
    """
    Load ALL coins from a partitioned Parquet dataset and
    ensure proper sorting.
    - path: root folder of the parquet dataset (with coin_id=... subfolders)
    Returns a DataFrame with columns including 'coin_id' and 'timestamp'.
    """
    if not os.path.exists(path):
        raise FileNotFoundError(f"Data file not found: {path}")

    # let pandas/pyarrow auto-discover partitions (coin_id, year, etc.)
    df = pd.read_parquet(path, engine='pyarrow')

    # ensure timestamp is datetime
    if not pd.api.types.is_datetime64_any_dtype(df[CFG.TIMESTAMP_COLUMN]):
        df[CFG.TIMESTAMP_COLUMN] = pd.to_datetime(df[CFG.TIMESTAMP_COLUMN])

    # sort by coin_id then time
    df.sort_values([CFG.COIN_ID_COLUMN, CFG.TIMESTAMP_COLUMN],
                   inplace=True, ignore_index=True)

    return df


def split_data(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Perform a single hold-out split on a time series:
    - uses TRAIN_RATIO to locate cutoff between min and max timestamp
    - rounds cutoff down to start of month/day if configured
    Returns (train_df, test_df), each containing all coins.
    """
    # compute exact cutoff
    min_ts = df[CFG.TIMESTAMP_COLUMN].min()
    max_ts = df[CFG.TIMESTAMP_COLUMN].max()
    total_duration = max_ts - min_ts
    exact_cutoff = min_ts + total_duration * CFG.TRAIN_RATIO

    # round down if needed
    freq = CFG.SPLIT_ROUND_FREQUENCY.lower()
    if freq == 'month':
        cutoff = exact_cutoff.to_period('M').to_timestamp()
    elif freq == 'day':
        cutoff = exact_cutoff.normalize()
    else:
        cutoff = exact_cutoff

    # split
    train_df = df[df[CFG.TIMESTAMP_COLUMN] < cutoff].copy()
    test_df  = df[df[CFG.TIMESTAMP_COLUMN] >= cutoff].copy()

    # report
    print(f"Exact split point: {exact_cutoff}")
    print(f"Rounded split point: {cutoff}")
    print(f"Train  range: {train_df[CFG.TIMESTAMP_COLUMN].min()} → {train_df[CFG.TIMESTAMP_COLUMN].max()}")
    print(f"Test   range: {test_df[CFG.TIMESTAMP_COLUMN].min()} → {test_df[CFG.TIMESTAMP_COLUMN].max()}")

    return train_df, test_df

#### Data loading & hold-on split

In [19]:
import os

# 1. Locate the data file
curr_path = os.getcwd()
root_path = os.path.join(curr_path, "data", "OHLCV_ffill.parquet")

# 2. Load ALL coins (no filter)
df = load_data(root_path)

# 3. Perform the time-based hold-out split
train_df, test_df = split_data(df)

# 4. Inspect
print("Train shape:", train_df.shape)
print("Test  shape:", test_df.shape)

Exact split point: 2024-06-15 16:48:00
Rounded split point: 2024-06-01 00:00:00
Train  range: 2021-01-01 00:00:00 → 2024-05-31 23:59:00
Test   range: 2024-06-01 00:00:00 → 2025-04-27 03:00:00
Train shape: (8978400, 14)
Test  shape: (2376905, 14)


#### Get the order of coin_id

In [21]:
# Assume df contains all coins and is sorted by (coin_id, timestamp)

# Method 1: Get the order in which each coin first appears
order_appear = df['coin_id'].drop_duplicates().tolist()
print("Order of first appearance:", order_appear)

# Method 2: Get the coins in lexicographical (alphabetical) order
unique_ids    = df['coin_id'].unique().tolist()
order_sorted  = sorted(unique_ids)
print("Lexicographical order:", order_sorted)

# Method 3: Cast to Categorical and inspect the category order
df['coin_id'] = df['coin_id'].astype('category')
print("Categories (in defined order):", df['coin_id'].cat.categories.tolist())

Order of first appearance: ['BNBUSDT', 'BTCUSDT', 'ETHUSDT', 'SOLUSDT', 'XRPUSDT']
Lexicographical order: ['BNBUSDT', 'BTCUSDT', 'ETHUSDT', 'SOLUSDT', 'XRPUSDT']
Categories (in defined order): ['BNBUSDT', 'BTCUSDT', 'ETHUSDT', 'SOLUSDT', 'XRPUSDT']


In [15]:
print(train_df.head())

            timestamp     open     high      low    close    volume  \
0 2021-01-01 00:00:00  37.3596  37.3702  37.3381  37.3700   807.624   
1 2021-01-01 00:01:00  37.3700  37.4221  37.3487  37.3910  2734.241   
2 2021-01-01 00:02:00  37.3905  37.4020  37.3298  37.3311   778.868   
3 2021-01-01 00:03:00  37.3311  37.3367  37.2729  37.2800   890.907   
4 2021-01-01 00:04:00  37.2753  37.2997  37.2502  37.2639   483.711   

               close_time  quote_asset_volume  number_of_trades  \
0 2021-01-01 00:00:59.999        30170.884305               146   
1 2021-01-01 00:01:59.999       102217.111475               230   
2 2021-01-01 00:02:59.999        29092.024959               141   
3 2021-01-01 00:03:59.999        33230.401819               156   
4 2021-01-01 00:04:59.999        18028.653726               126   

   taker_buy_base_asset_volume  taker_buy_quote_asset_volume  missing_flag  \
0                      486.275                  18167.580104             0   
1             

In [16]:
print(test_df.head())

                  timestamp   open   high    low  close  volume  \
1795680 2024-06-01 00:00:00  593.8  594.1  593.7  594.1  94.447   
1795681 2024-06-01 00:01:00  594.1  594.1  593.9  594.0  19.360   
1795682 2024-06-01 00:02:00  594.0  594.0  593.9  594.0   5.455   
1795683 2024-06-01 00:03:00  594.0  594.2  593.9  594.2  23.386   
1795684 2024-06-01 00:04:00  594.1  594.2  594.1  594.2  54.754   

                     close_time  quote_asset_volume  number_of_trades  \
1795680 2024-06-01 00:00:59.999          56094.6704                95   
1795681 2024-06-01 00:01:59.999          11499.6144                78   
1795682 2024-06-01 00:02:59.999           3240.0273                31   
1795683 2024-06-01 00:03:59.999          13891.7071                44   
1795684 2024-06-01 00:04:59.999          32531.5678                32   

         taker_buy_base_asset_volume  taker_buy_quote_asset_volume  \
1795680                       88.239                    52407.8786   
1795681           

#### Resampling -- 10-min, 1-hour, 4-hour, 1-day intervals

In [23]:
import os

# 1) Ensure output folder exists
os.makedirs("data/resampled", exist_ok=True)

def resample_ohlcv(df, freq):
    """
    Resample an OHLCV DataFrame (with 'coin_id' column and datetime index on 'timestamp')
    to a new frequency.
    - df: DataFrame must have a 'coin_id' column and a datetime index named 'timestamp'
    - freq: pandas offset alias, e.g. '10min', '1h', '4h', '1d'
    Returns a DataFrame with 'coin_id' and 'timestamp' columns restored.
    """
    # set 'timestamp' as the index
    df = df.set_index("timestamp")
    
    # define OHLCV-style aggregations
    agg_dict = {
        "open":                        "first",
        "high":                        "max",
        "low":                         "min",
        "close":                       "last",
        "volume":                      "sum",
        "quote_asset_volume":          "sum",
        "number_of_trades":            "sum",
        "taker_buy_base_asset_volume": "sum",
        "taker_buy_quote_asset_volume":"sum"
    }
    
    # group by coin_id (only observed categories) and resample
    out = (
        df
        .groupby("coin_id", observed=True)
        .resample(freq)
        .agg(agg_dict)
        .dropna(subset=["open"])  # remove empty intervals
        .reset_index()
    )
    return out


# 3) Define the frequencies and their labels
freq_map = {
    "10min": "10min",
    "1h":    "1h",
    "4h":    "4h",
    "1d":    "1d",
}

# 4) Loop over train/test DataFrames, resample & save
for df_name, df in [("train", train_df), ("test", test_df)]:
    for freq, label in freq_map.items():
        print(f"Resampling {df_name} at {label} intervals…")
        rs = resample_ohlcv(df, freq)
        out_path = os.path.join("data", "resampled", f"{df_name}_{label}.parquet")
        rs.to_parquet(out_path)
        print(f"  → wrote {out_path}, shape {rs.shape}")

Resampling train at 10min intervals…
  → wrote data/resampled/train_10min.parquet, shape (897840, 11)
Resampling train at 1h intervals…
  → wrote data/resampled/train_1h.parquet, shape (149640, 11)
Resampling train at 4h intervals…
  → wrote data/resampled/train_4h.parquet, shape (37410, 11)
Resampling train at 1d intervals…
  → wrote data/resampled/train_1d.parquet, shape (6235, 11)
Resampling test at 10min intervals…
  → wrote data/resampled/test_10min.parquet, shape (237695, 11)
Resampling test at 1h intervals…
  → wrote data/resampled/test_1h.parquet, shape (39620, 11)
Resampling test at 4h intervals…
  → wrote data/resampled/test_4h.parquet, shape (9905, 11)
Resampling test at 1d intervals…
  → wrote data/resampled/test_1d.parquet, shape (1655, 11)
