In [13]:
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import ParameterGrid, TimeSeriesSplit
from sklearn.metrics import make_scorer

from pycoingecko import CoinGeckoAPI

In [14]:
# Configuration
TOP_N = 10
LOOKBACK_DAYS = 180
RETURN_HORIZON = 5
RANDOM_STATE = 42

In [15]:
def fetch_top_coins_by_market_cap(n = TOP_N):
    """
    Fetches top N crytocurrencies by market cap from CoinGecko API
    """

    cg = CoinGeckoAPI()
    coins = cg.get_coins_markets(vs_currency = 'usd', order = 'market_cap_desc',
                                 per_page = n, page = 1, sparkline = False)
    return [coin['id'] for coin in coins]


In [16]:
def fetch_historical_data(coin_id, days = LOOKBACK_DAYS):
    """
    Fetch historical OHLCV data for a single coin
    """

    cg = CoinGeckoAPI()
    data = cg.get_coin_market_chart_by_id(id = coin_id, vs_currency = 'usd', days = days)

    df = pd.DataFrame({
        'timestamp': [x[0] for x in data['prices']],
        'price': [x[1] for x in data['prices']],
        'market_cap': [x[1] for x in data['market_caps']],
        'volume': [x[0] for x in data['total_volumes']],
    })

    df['Date'] = pd.to_datetime(df['timestamp'], unit = 'ms')
    df = df.drop('timestamp', axis = 1)
    df = df.rename(columns = {'price': 'Close', 'volume': 'Volume', 'market_cap': 'MarketCap'})
    df['Ticker'] = coin_id

    return df[['Date', 'Ticker', 'Close', 'Volume', 'MarketCap']]

In [17]:
def build_crypto_dataset(top_n = TOP_N, days = LOOKBACK_DAYS):
    """
    Build comprehensive dataset for top N crypto assets
    """
    
    coin_ids = fetch_top_coins_by_market_cap(top_n)
    print(f"Coins selected: {coin_ids}")

    all_data = []
    for i, coin_id in enumerate(coin_ids):
        try:
            df = fetch_historical_data(coin_id, days)
            all_data.append(df)
        except Exception as e:
            print(f"Error fetching {coin_id}: {e}")
            continue

    data = pd.concat(all_data, ignore_index = True)
    print(f"Total records fetched: {len(data)}")
    return data

In [None]:
def compute_signals(df):
    """
    Engineer cross-sectional signals from price/volume data
    """
    return_window = 21 # can change this
    group = df.groupby('Ticker', group_keys = False)
    signals = pd.DataFrame(index=df.index)

    # momentum
    signals['momentum'] = group['Close'].apply(lambda x: x.pct_change(return_window))

    # log market cap
    signals['size'] = np.log1p(df['MarketCap'])
    
    # volatility (rolling std of returns)
    signals['volatility'] = group['Close'].apply(lambda x: x.pct_change().rolling(return_window).std())

    # mean reversion (3 prev day)
    signals['meanrev'] = -group['Close'].apply(lambda x: x.pct_change(3))

    # inverse of price (supply proxy)
    signals['supply_proxy'] = 1 / df['Close']

    signals.index = df.index
    return signals


def calc_cross_sec_returns(df, horizon = RETURN_HORIZON):
    """
    Calculate forward returns for cross-sectional analysis
    """
    
    returns = df.groupby('Ticker')['Close'].shift(-horizon) / df['Close'] - 1
    return returns

In [22]:
def rank_signals(signals):
    """
    Rank signals cross-sectionally (within each date)
    """
    
    date_index = signals.index
    ranked = signals.groupby(date_index).rank(axis = 0
                                              , method = 'average', na_option = 'keep')

    # normalize to [0,1]
    ranked = ranked / signals.groupby(date_index).transform('count')
    return ranked

In [23]:
def information_ratio(returns):
    """
    Calculate Information Ratio (mean/std of returns)
    """

    mean = np.nanmean(returns)
    std = np.nanstd(returns)
    return mean/std if std != 0 else np.nan

In [24]:
def random_forest_optimizer(X, y, n_splits = 3):
    """
    Optimize Random Forest hyperparameters for Information Ratio
    """
    
    def ir_score(y_true, y_pred):
        return information_ratio(y_pred)
    
    scorer = make_scorer(ir_score, greater_is_beter = True)

    params = {
        'n_estimators': [25, 50, 75],
        'max_depth': [3, 5, 8],
        'min_samples_leaf': [1, 4]
    }
    grid = ParameterGrid(params)

    best_score = -np.inf
    best_model = None
    best_params = None

    # time series cross-validation
    tscv = TimeSeriesSplit(n_splits = n_splits)

    for p in grid:
        model = RandomForestRegressor(random_state = RANDOM_STATE, **p)
        fold_preds = []

        for train_idx, test_idx in tscv.split(X):
            model.fit(X.iloc[train_idx], y.iloc[train_idx])
            y_pred = model.predict(X.iloc[test_idx])
            fold_preds.append(y_pred)

        cv_preds = np.concatenate(fold_preds)
        score = information_ratio(cv_preds)

        if score > best_score:
            best_score = score
            best_model = model
            best_params = p

    print(f"Best parameters: {best_params}")
    print(f"Best CV Information Ratio: {best_score:.4f}")
    
    return best_model

In [25]:
def plot_corr_heatmap(X, y, fname = 'feature_return_heatmap.png'):
    """
    Generate correlation heatmap for features and returns
    """

    allcorr = X.assign(Return = y).corr()

    plt.figure(figsize = (8, 6))
    sns.heatmap(allcorr, annot = True, fmt = '.2f', cmap = 'bwr', center = 0,
                vmin = -1, vmax = 1, square = True, linewidths = 0.5)
    plt.title('Feature-Return Correlation Heatmap', fontsize = 14, fontweight = 'bold')
    plt.tight_layout()
    plt.show()
    # plt.savefig(fname, dpi=300)
    # plt.close()


def plot_feature_importance(model, feature_names, fname = 'feature_importance.png'):
    """
    Plot Random Forest feature importance
    """

    importances = model.feature_importances_
    indices = np.argsort(importances)[::-1]

    plt.figure(figsize = (8,5))
    plt.title('Feature Importances (Random Forest)', fontsize = 14, fontweight = 'bold')
    plt.bar(range(len(importances)), importances[indices], align = 'center')
    
    plt.xticks(range(len(importances)), [feature_names[i] for i in indices], rotation = 45)
    plt.ylabel('Importance Score')
    plt.tight_layout()

    plt.show()
    # plt.savefig(fname, dpi = 300)
    # plt.close()

In [32]:
# Analysis starts here

# Get data
print(f"Gathering top {TOP_N} crypto data from the last {LOOKBACK_DAYS} days")
data = build_crypto_dataset(top_n = TOP_N, days = LOOKBACK_DAYS)

display(data.head())

Gathering top 10 crypto data from the last 180 days
Coins selected: ['bitcoin', 'ethereum', 'tether', 'binancecoin', 'ripple', 'solana', 'usd-coin', 'staked-ether', 'dogecoin', 'tron']
Total records fetched: 1810


Unnamed: 0,Date,Ticker,Close,Volume,MarketCap
0,2025-04-28,bitcoin,93809.33782,1745798400000,1862772000000.0
1,2025-04-29,bitcoin,95030.606455,1745884800000,1887067000000.0
2,2025-04-30,bitcoin,94256.359463,1745971200000,1870818000000.0
3,2025-05-01,bitcoin,94235.75331,1746057600000,1871350000000.0
4,2025-05-02,bitcoin,96426.945223,1746144000000,1914884000000.0


In [33]:
# Create signals
print("Computing signals from asset data")
# data.set_index('Date', inplace = True)
signals = compute_signals(data)
signals['Ticker'] = data['Ticker']
signals['Return'] = calc_cross_sec_returns(data.reset_index(), RETURN_HORIZON)

Computing signals from asset data


KeyError: 'meanrev'

In [None]:
# Clean data
print("Cleaning dataset")
signals.dropna(inplace = True)
X = signals[['momentum', 'size', 'volatility', 'meanrev', 'supply_proxy']]
y = signals['Return']

In [None]:
# Rank signals (cross-sectionally)
print("Ranking signals cross-sectionally")
Xr = rank_signals(X)
y = y.loc[Xr.index]

print(f"Final dataset size: {len(Xr)} observations")

In [None]:
# Random Forest Optimization
print("Training and optimizing Random Forest model")
model = random_forest_optimizer(Xr, y, n_splits = 3)

In [None]:
# Predictions
y_pred = model.predict(Xr)
final_ir = information_ratio(y_pred)

print(f"RESULTS:")

print(f"Information Ratio: {final_ir:.4f}")
print(f"Mean Predicted Return: {np.mean(y_pred):.4f}")
print(f"Std Predicted Return: {np.std(y_pred):.4f}")

In [None]:
# Visuals
print(f"Generating visuals")
plot_corr_heatmap(Xr, y)
plot_feature_importance(model, X.columns.tolist())