
# BTCUSDT Market Prediction Model

This notebook demonstrates how to build a predictive model for the
Bitcoin/USDT pair (BTCUSDT) using a rich set of market data hosted
within a Google Drive folder.  The available datasets include trade
tapes, order book snapshots at various depth percentages, multiple
variants of kline (candlestick) series, and metrics describing open
interest and funding rates.  Our goal is to produce a model that
analyses the last 100 15‑minute candles and predicts the direction and
magnitude of the next 100 15‑minute candles.

The notebook performs the following steps:

1. **Data loading** – reads Parquet files from the drive folder
   specified by `base_dir` and concatenates multiple months of data.
2. **Resampling & preprocessing** – converts 1‑minute klines to
   15‑minute bars, and computes order book top‑level imbalances,
   depth ratios at several percentage bands, and an open interest RSI.
3. **Feature engineering** – builds a feature matrix using the most
   recent 100 bars of returns, rolling statistics, volume proxies,
   order book features and premium/basis values.
4. **Label construction** – defines binary (up/down) and continuous
   (log‑return) targets for the next 100 bars.
5. **Model training** – fits gradient boosted models using a
   time‑series split for cross‑validation and prints performance
   metrics.
6. **Live signal** – provides a function to compute a buy/sell/hold
   recommendation based on the latest data.

You can run each cell sequentially after ensuring that the data
directory exists on your runtime environment.  Adjust `base_dir` if
your data live in a different location.


In [None]:

import os
import glob
import math
import numpy as np
import pandas as pd
from datetime import datetime, timezone
from sklearn.model_selection import TimeSeriesSplit
from sklearn.ensemble import HistGradientBoostingClassifier, HistGradientBoostingRegressor
from sklearn.metrics import roc_auc_score, accuracy_score, f1_score, mean_squared_error
import matplotlib.pyplot as plt

# Display plots inline
%matplotlib inline


In [None]:

def read_parquet_files(base_path, subdir, pattern, parse_dt_cols=None, index_col=None):
    """Load multiple Parquet files from a subdirectory into a single DataFrame.

    Parameters
    ----------
    base_path : str
        Root directory containing all data categories (e.g. '/content/drive/MyDrive/binance_data').
    subdir : str
        Name of the subdirectory within `base_path` (e.g. 'klines').
    pattern : str
        Glob pattern to match files (e.g. 'BTCUSDT_klines_BTCUSDT-klines-*.parquet').
    parse_dt_cols : list of str, optional
        List of columns to parse as datetime with UTC timezone.
    index_col : str, optional
        Column to set as the index after concatenation.

    Returns
    -------
    pandas.DataFrame
        Concatenated DataFrame containing data from all matching files.
    """
    full_pattern = os.path.join(base_path, subdir, pattern)
    paths = sorted(glob.glob(full_pattern))
    if not paths:
        raise FileNotFoundError(f"No files matched pattern {full_pattern}")
    dfs = []
    for p in paths:
        df = pd.read_parquet(p)
        if parse_dt_cols:
            for col in parse_dt_cols:
                if col in df.columns:
                    df[col] = pd.to_datetime(df[col], utc=True)
        dfs.append(df)
    df_all = pd.concat(dfs, ignore_index=True)
    if index_col and index_col in df_all.columns:
        df_all = df_all.set_index(index_col).sort_index()
    return df_all


def resample_klines_to_15m(k):
    """Resample a DataFrame of 1-minute klines to 15-minute bars.

    The resulting DataFrame will have OHLC fields aggregated over each 15-minute
    interval, along with summed volumes and counts.
    """
    # Ensure a DateTimeIndex
    k = k.sort_index()
    k15 = k.resample('15T', label='right', closed='right').agg({
        'open': 'first',
        'high': 'max',
        'low': 'min',
        'close': 'last',
        'volume': 'sum',
        'quote_volume': 'sum',
        'count': 'sum',
        'taker_buy_volume': 'sum',
        'taker_buy_quote_volume': 'sum',
    }).dropna(subset=['open', 'high', 'low', 'close'])
    return k15


def depth_ratio_frame(minute_df):
    """Compute order book imbalance features for a single minute snapshot.

    Parameters
    ----------
    minute_df : pandas.DataFrame
        Rows representing different percentage buckets at the same timestamp.
        Must include 'percentage' and 'depth' columns.

    Returns
    -------
    dict
        Dictionary with top-of-book totals and depth ratios for various bands.
    """
    bids = minute_df[minute_df['percentage'] < 0].sort_values('depth', ascending=False)
    asks = minute_df[minute_df['percentage'] > 0].sort_values('depth', ascending=False)

    top3_bids = bids['depth'].head(3).sum()
    top3_asks = asks['depth'].head(3).sum()
    tob_imbalance = (top3_bids - top3_asks) / (top3_bids + top3_asks) if (top3_bids + top3_asks) > 0 else 0.0

    def band_ratio(lower, upper):
        bids_sum = minute_df[(minute_df['percentage'] <= -lower) & (minute_df['percentage'] >= -upper)]['depth'].sum()
        asks_sum = minute_df[(minute_df['percentage'] >= lower) & (minute_df['percentage'] <= upper)]['depth'].sum()
        return (bids_sum - asks_sum) / (bids_sum + asks_sum) if (bids_sum + asks_sum) > 0 else 0.0

    ratios = {
        'depthr_0_2p5': band_ratio(0.0, 2.5),
        'depthr_1_2p5': band_ratio(1.0, 2.5),
        'depthr_2p5_5': band_ratio(2.5, 5.0),
        'depthr_5_10': band_ratio(5.0, 10.0),
    }
    return {
        'tob_top3_bids': top3_bids,
        'tob_top3_asks': top3_asks,
        'tob_imb': tob_imbalance,
        **ratios,
    }


def make_book_features_15m(book_df):
    """Aggregate order book snapshots into 15-minute features.

    Parameters
    ----------
    book_df : pandas.DataFrame
        DataFrame with a 'timestamp' column (posix ms) and columns
        'percentage', 'depth', and 'notional'.  Each unique timestamp
        represents a snapshot containing multiple percentage buckets.

    Returns
    -------
    pandas.DataFrame
        A DataFrame indexed by 15-minute bars, where each row contains
        top-of-book volumes, imbalance, and depth ratios.
    """
    # Convert timestamp to datetime index in UTC
    if not isinstance(book_df.index, pd.DatetimeIndex):
        book_df = book_df.copy()
        book_df['timestamp'] = pd.to_datetime(book_df['timestamp'], unit='ms', utc=True)
        book_df = book_df.set_index('timestamp')

    # Compute features for each minute snapshot
    per_minute = []
    for ts, grp in book_df.groupby(book_df.index.floor('T')):
        feats = depth_ratio_frame(grp)
        feats['ts'] = ts
        per_minute.append(feats)
    minute_features = pd.DataFrame(per_minute).set_index('ts').sort_index()

    # Forward-fill to handle missing minutes, then resample to 15 minutes
    minute_features = minute_features.resample('T').last().ffill()
    f15 = minute_features.resample('15T', label='right', closed='right').last()
    return f15


def rsi(series, n=14):
    """Compute the Relative Strength Index (RSI) for a price or open interest series."""
    delta = series.diff()
    gain = delta.clip(lower=0.0)
    loss = (-delta).clip(lower=0.0)
    avg_gain = gain.ewm(alpha=1 / n, adjust=False).mean()
    avg_loss = loss.ewm(alpha=1 / n, adjust=False).mean()
    rs = avg_gain / avg_loss.replace(0, np.nan)
    return 100 - (100 / (1 + rs))


def make_oi_rsi_15m(metrics_df, window=14):
    """Compute a 14-period RSI of open interest on 15-minute intervals."""
    if not isinstance(metrics_df.index, pd.DatetimeIndex):
        metrics_df = metrics_df.copy()
        metrics_df['create_time'] = pd.to_datetime(metrics_df['create_time'], utc=True)
        metrics_df = metrics_df.set_index('create_time').sort_index()
    oi = metrics_df['sum_open_interest'].astype(float)
    oi_rsi = rsi(oi, window)
    oi_rsi_df = pd.DataFrame({'oi_rsi': oi_rsi})
    return oi_rsi_df.resample('15T', label='right', closed='right').last()


def make_basis_15m(mark_df, index_df):
    """Compute the basis (mark price minus index price) and its normalized value."""
    basis = mark_df['close'] - index_df['close']
    nbasis = basis / index_df['close']
    return pd.DataFrame({'basis': basis, 'nbasis': nbasis})


def add_price_features(features_df, k15, num_lags=100):
    """Augment a feature DataFrame with price-based features.

    This function computes log returns, rolling mean and standard deviation of returns,
    realized volatility over various windows, a price RSI, and past return lags.

    Parameters
    ----------
    features_df : pandas.DataFrame
        The feature matrix to augment.  Its index should align with k15.
    k15 : pandas.DataFrame
        15-minute bar DataFrame with at least a 'close' column.
    num_lags : int
        Number of past return lags to include as separate features.
    """
    k = k15.copy()
    k['logp'] = np.log(k['close'])
    k['ret1'] = k['logp'].diff()
    # Rolling statistics on returns
    for w in (4, 12, 24, 48, 96):  # 1h..24h on 15m bars
        features_df[f'ret_mean_{w}'] = k['ret1'].rolling(w).mean()
        features_df[f'ret_std_{w}'] = k['ret1'].rolling(w).std()
        features_df[f'rv_{w}'] = (k['ret1'] ** 2).rolling(w).sum()
    # Price RSI
    features_df['rsi_14'] = rsi(k['close'], 14)
    # Lags of returns
    for lag in range(1, num_lags + 1):
        features_df[f'ret_lag_{lag}'] = k['ret1'].shift(lag)
    # Volume proxies
    for w in (4, 12, 48):
        features_df[f'vol_sum_{w}'] = k['volume'].rolling(w).sum()
    return features_df


In [None]:

# ---- Configuration ----
base_dir = os.getenv('BINANCE_DATA_DIR', '/content/drive/MyDrive/binance_data')
symbol = 'BTCUSDT'
horizon = 100  # prediction horizon: 100 future 15‑minute bars
num_lags = 100  # number of lagged return features (context length)

# Ensure the data directory exists
if not os.path.isdir(base_dir):
    if base_dir.startswith('/content/'):
        try:
            from google.colab import drive
            drive.mount('/content/drive')
        except Exception as e:
            print(f'Could not mount Google Drive automatically: {e}')
    if not os.path.isdir(base_dir):
        raise FileNotFoundError(
            f"Data directory '{base_dir}' not found. Set BINANCE_DATA_DIR environment variable to your data location."
        )

# ---- Load raw data ----
# Klines (1m) to be resampled; pattern matches all monthly files
klines = read_parquet_files(base_dir, 'klines', f"{symbol}-klines-*.parquet", parse_dt_cols=['open_time', 'close_time'], index_col='open_time')
# Book depth snapshots
bookdepth = read_parquet_files(base_dir, 'bookDepth', f"{symbol}-bookDepth-*.parquet")
aggtrades = read_parquet_files(base_dir, 'aggTrades', f"{symbol}-aggTrades-*.parquet")
trades = read_parquet_files(base_dir, 'trades', f"{symbol}-trades-*.parquet")
# Metrics (open interest etc.)
metrics = read_parquet_files(base_dir, 'metrics', f"{symbol}-metrics-*.parquet", parse_dt_cols=['create_time'])
# Mark price klines
markprice = read_parquet_files(base_dir, 'markPriceKlines', f"{symbol}-markPriceKlines-*.parquet", parse_dt_cols=['open_time', 'close_time'], index_col='open_time')
# Index price klines
indexprice = read_parquet_files(base_dir, 'indexPriceKlines', f"{symbol}-indexPriceKlines-*.parquet", parse_dt_cols=['open_time', 'close_time'], index_col='open_time')
# Premium index klines
premium = read_parquet_files(base_dir, 'premiumIndexKlines', f"{symbol}-premiumIndexKlines-*.parquet", parse_dt_cols=['open_time', 'close_time'], index_col='open_time')

# ---- Resample to 15‑minute bars ----
k15 = resample_klines_to_15m(klines)
mark15 = markprice.resample('15T', label='right', closed='right').last()[['close']].rename(columns={'close': 'close'})
index15 = indexprice.resample('15T', label='right', closed='right').last()[['close']].rename(columns={'close': 'close'})
premium15 = premium.resample('15T', label='right', closed='right').last()[['close']].rename(columns={'close': 'premium'})

# ---- Feature construction ----
# Start with an empty DataFrame indexed on 15‑minute intervals
X = pd.DataFrame(index=k15.index)
# Price‑based features
X = add_price_features(X, k15, num_lags=num_lags)
# Order book features (depth ratios and top‑of‑book imbalance)
book15 = make_book_features_15m(bookdepth)
X = X.join(book15, how='left')
# Open interest RSI
oi15 = make_oi_rsi_15m(metrics)
X = X.join(oi15, how='left')
# Basis and normalized basis
basis15 = make_basis_15m(mark15, index15)
X = X.join(basis15, how='left')
# Premium
X = X.join(premium15, how='left')
# Handle infinities and fill missing values
X = X.replace([np.inf, -np.inf], np.nan).ffill(limit=5).bfill(limit=5)

# ---- Create targets ----
close_prices = k15['close']
future_log_return = np.log(close_prices.shift(-horizon)) - np.log(close_prices)
y_direction = (future_log_return > 0).astype(int)  # 1 if price goes up, else 0
y_regression = future_log_return

# Drop rows where labels or lags are unavailable
valid = ~y_regression.isna()
# Ensure all lags are present
for lag in range(1, num_lags + 1):
    valid &= ~X[f'ret_lag_{lag}'].isna()
X = X[valid]
y_direction = y_direction[valid]
y_regression = y_regression[valid]

print(f"Feature matrix shape: {X.shape}")
print(f"Number of classification samples: {y_direction.shape[0]}")


In [None]:

        # ---- Train/test split using a time‑series cross‑validation ----
        tscv = TimeSeriesSplit(n_splits=5)
        X_values = X.values
        y_dir_values = y_direction.values
        y_reg_values = y_regression.values

        cls_scores = []
        reg_scores = []
        # Out‑of‑sample predictions to evaluate overall performance
        prob_oos = pd.Series(index=X.index, dtype=float)
        ret_oos = pd.Series(index=X.index, dtype=float)

        fold_num = 1
        for train_idx, test_idx in tscv.split(X_values):
            X_train, X_test = X_values[train_idx], X_values[test_idx]
            y_train_cls, y_test_cls = y_dir_values[train_idx], y_dir_values[test_idx]
            y_train_reg, y_test_reg = y_reg_values[train_idx], y_reg_values[test_idx]

            # Classification model (direction)
            cls_model = HistGradientBoostingClassifier(max_depth=6, learning_rate=0.1, max_iter=300, l2_regularization=0.0)
            cls_model.fit(X_train, y_train_cls)
            # Regression model (return magnitude)
            reg_model = HistGradientBoostingRegressor(max_depth=6, learning_rate=0.05, max_iter=400, l2_regularization=0.0)
            reg_model.fit(X_train, y_train_reg)

            # Predictions
            prob_pred = cls_model.predict_proba(X_test)[:, 1]
            ret_pred = reg_model.predict(X_test)
            prob_oos.iloc[test_idx] = prob_pred
            ret_oos.iloc[test_idx] = ret_pred

            # Metrics
            auc = roc_auc_score(y_test_cls, prob_pred)
            acc = accuracy_score(y_test_cls, (prob_pred > 0.5).astype(int))
            f1 = f1_score(y_test_cls, (prob_pred > 0.5).astype(int))
            rmse = mean_squared_error(y_test_reg, ret_pred, squared=False)
            cls_scores.append((auc, acc, f1))
            reg_scores.append(rmse)
            print(f"Fold {fold_num}: AUC={auc:.3f}, ACC={acc:.3f}, F1={f1:.3f}, RMSE={rmse:.6f}")
            fold_num += 1

        # Overall performance summary
        mean_auc = np.mean([s[0] for s in cls_scores])
        mean_acc = np.mean([s[1] for s in cls_scores])
        mean_f1 = np.mean([s[2] for s in cls_scores])
        mean_rmse = np.mean(reg_scores)
        print(f"
Average classification performance: AUC={mean_auc:.3f}, ACC={mean_acc:.3f}, F1={mean_f1:.3f}")
        print(f"Average regression RMSE: {mean_rmse:.6f}")

        # Fit full models on entire dataset for deployment
        final_cls_model = HistGradientBoostingClassifier(max_depth=6, learning_rate=0.1, max_iter=300, l2_regularization=0.0)
        final_reg_model = HistGradientBoostingRegressor(max_depth=6, learning_rate=0.05, max_iter=400, l2_regularization=0.0)
        final_cls_model.fit(X_values, y_dir_values)
        final_reg_model.fit(X_values, y_reg_values)


In [None]:

        def live_signal(feature_matrix, k15_bars, cls_model, reg_model, horizon=horizon, buy_threshold=0.60, sell_threshold=0.40):
            # Compute a trading signal based on the latest available features.
            # A BUY signal is issued when the predicted probability of an upward
            # move exceeds `buy_threshold`; a SELL signal is issued when the
            # probability falls below `sell_threshold`.  Otherwise a HOLD is
            # returned.  Confidence is reported on a scale from 0 to 1.
            x = feature_matrix.iloc[-1].values.reshape(1, -1)
            p_up = float(cls_model.predict_proba(x)[0, 1])
            exp_ret = float(reg_model.predict(x)[0])
            exp_pct = (math.exp(exp_ret) - 1.0) * 100.0

            if p_up >= buy_threshold:
                signal = 'BUY'
                confidence = p_up
            elif p_up <= sell_threshold:
                signal = 'SELL'
                confidence = 1 - p_up
            else:
                signal = 'HOLD'
                # Confidence near mid-range: distance from 0.5 scaled into [0,1]
                confidence = 0.5 + abs(p_up - 0.5)

            latest_close = float(k15_bars['close'].iloc[-1])
            target_price = latest_close * math.exp(exp_ret)
            return {
                'latest_close': latest_close,
                'p_up': round(p_up, 4),
                'expected_log_return': round(exp_ret, 6),
                'expected_move_percent': round(exp_pct, 4),
                'target_price_t_plus_h': round(target_price, 2),
                'signal': signal,
                'confidence': round(confidence, 4),
            }

        # Example usage on the last bar (after training):
        example_signal = live_signal(X, k15, final_cls_model, final_reg_model)
        print("
Live signal based on the most recent data:")
        print(example_signal)
