In [1]:
import statsmodels.api as sm
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from scipy.stats.mstats import winsorize
from binance.client import Client as bnb_client
from datetime import datetime
from scipy.stats import norm


def get_binance_px(client, symbol, freq, start_ts = '2020-12-20'):
    data = client.get_historical_klines(symbol, freq, start_ts)
    columns = ['open_time', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume',
               'num_trades', 'taker_base_volume', 'taker_quote_volume', 'ignore']
    data = pd.DataFrame(data, columns = columns)
    
    # Convert from POSIX timestamp (number of millisecond since jan 1, 1970)
    data['open_time'] = data['open_time'].map(lambda x: datetime.utcfromtimestamp(x/1000))
    data['close_time'] = data['close_time'].map(lambda x: datetime.utcfromtimestamp(x/1000))
    return data


def get_price_data(univ, freq, should_fetch_cached_data, cached_data_file):
    if should_fetch_cached_data:
        px = pd.read_csv(cached_data_file)
        date_format = "%Y-%m-%d %H:%M:%S"
        px['open_time'] = px['open_time'].apply(lambda t:  datetime.strptime(t, date_format))
        px.set_index('open_time', inplace=True)
        return px
    else:
        client = bnb_client(tld='US')
        px = {}
        for x in univ:
            print(f"Downloading data for symbol {x}")
            data = get_binance_px(client, x, freq)
            px[x] = data.set_index('open_time')['close']

        px = pd.DataFrame(px).astype(float)
        px.to_csv(cached_data_file)
        return px


# test_start_time_point must be in the index of df
def get_train_test_data(df, test_start_time_point):
    train_data = df.loc[:test_start_time_point].iloc[:-1]
    test_data = df.loc[test_start_time_point:]
    return train_data, test_data


def get_demeaned_normalized_signal(raw_signal):
    signal_mean = raw_signal.mean(axis=1)
    demeaned_signal = raw_signal.subtract(signal_mean, axis=0)
    signal_sums = demeaned_signal.abs().sum(axis=1)
    return demeaned_signal.divide(signal_sums, axis=0)


def get_rank_demeaned_normalized_signal(raw_signal):
    signal_rank = raw_signal.rank(axis=1)
    signal_mean = raw_signal.rank(axis=1).mean(axis=1)
    demeaned_signal = signal_rank.subtract(signal_mean, axis=0)
    return demeaned_signal.divide(demeaned_signal.abs().sum(axis=1), axis=0)


def get_gross_returns_and_net_returns(signal_weights, px):
    asset_returns = px / px.shift() - 1
    weighted_returns = signal_weights.shift() * asset_returns
    gross_returns = weighted_returns.sum(axis=1)
    turnover = (signal_weights.fillna(0) - signal_weights.shift().fillna(0)).abs().sum(axis=1)
    tcost_bps = 20 # (commission + slippage)
    net_returns = gross_returns.subtract(turnover * tcost_bps * 1e-4, fill_value = 0)
    return gross_returns, net_returns


def calculate_covariance_directly(ser_1, ser_2):
    available_1 = ser_1.notna()
    available_2 = ser_2.notna()
    
    common_1 = ser_1[available_1][available_2]
    common_2 = ser_2[available_1][available_2]
    
    if common_1.shape[0] <= 1 or common_2.shape[0] <= 1:
        return np.nan
    
    mean_1 = common_1.mean()
    demeaned_1 = common_1 - mean_1
    
    mean_2 = common_2.mean()
    demeaned_2 = common_2 - mean_2
    
    return (demeaned_1 * demeaned_2).sum() / (demeaned_1.shape[0] - 1)


# Using pd.corrwith() yields a large number of hardware-related warnings so I implemented my 
# own version.
def calculate_correlation_directly(ser_1, ser_2):
    cov = calculate_covariance_directly(ser_1, ser_2)
    
    if cov == np.nan:
        return np.nan

    available_1 = ser_1.notna()
    available_2 = ser_2.notna()
    
    common_1 = ser_1[available_1][available_2]
    common_2 = ser_2[available_1][available_2]
    
    if len(common_1) <= 1 or len(common_2) <= 1:
        return np.nan
    
    return cov / (common_1.std() * common_2.std())


# returns pair in the form of (alpha, beta); nan values in dependent_series or independent_series are ignored;
# neither can contain inf
def get_alpha_beta_to_asset(dependent_series, independent_series):
    cov = calculate_covariance_directly(dependent_series, independent_series)
    
    non_na_dependent_series = dependent_series[dependent_series.notna() & independent_series.notna()]
    non_na_independent_series = independent_series[
        dependent_series.notna() & independent_series.notna()]
    
    beta = cov / non_na_independent_series.var()
    alpha = (non_na_dependent_series - non_na_independent_series * beta).mean()
    return alpha, beta


def get_decorrelated_returns(strat_returns, benchmark_asset_returns):
    non_na_period_strat_returns = strat_returns[strat_returns.notna() & benchmark_asset_returns.notna()]
    non_na_period_benchmark_rets = benchmark_asset_returns[
        strat_returns.notna() & benchmark_asset_returns.notna()]
    _, beta = get_alpha_beta_to_asset(non_na_period_strat_returns, non_na_period_benchmark_rets)
    return strat_returns - beta * benchmark_asset_returns


def get_max_drawdown(net_returns):
    cumulative_net_returns = net_returns.cumsum()
    drawdowns = cumulative_net_returns / cumulative_net_returns.expanding(min_periods=1).max() - 1
    return drawdowns[drawdowns != float('-inf')].min()


def get_max_drawdown_duration(net_returns, hours_freq):
    cumulative_net_returns = net_returns.cumsum()
    
    peak = cumulative_net_returns.expanding(min_periods=1).max()
    
    max_drawdown_duration = 0
    current_drawdown_duration = 0
    
    for dt in cumulative_net_returns.index:
        if cumulative_net_returns[dt] >= peak[dt]:
            current_drawdown_duration = 0
        else:
            current_drawdown_duration += 1
            max_drawdown_duration = max(max_drawdown_duration, current_drawdown_duration)
    return max_drawdown_duration * hours_freq / 24


# trade_hours_freq = 4, 8, 12, 24 (for 1 day), ...
def get_strategy_stats(net_returns, trade_hours_freq, input_prices):
    bitcoin_returns_over_period = input_prices['BTCUSDT'] / input_prices['BTCUSDT'].shift() - 1
    
    alpha, beta = get_alpha_beta_to_asset(net_returns.iloc[2:],
                                          bitcoin_returns_over_period.iloc[2:])
    decorrelated_returns = get_decorrelated_returns(net_returns, bitcoin_returns_over_period)
    
    res = {
        "avg returns": net_returns.mean() * 24 / trade_hours_freq * 365,
        "decorrelated avg returns": decorrelated_returns.mean() * 24 / trade_hours_freq * 365,
        "volatility": net_returns.std() * np.sqrt(24 / trade_hours_freq * 365),
        "sharpe ratio": net_returns.mean() / net_returns.std() * np.sqrt(24 / trade_hours_freq * 365),
        "decorrelated sharpe ratio": decorrelated_returns.mean() / decorrelated_returns.std() * np.sqrt(24 / trade_hours_freq * 365),
        "max drawdown": get_max_drawdown(net_returns),
        "max drawdown duration": get_max_drawdown_duration(net_returns, trade_hours_freq),
        "alpha_BTC": alpha,
        "beta_BTC": beta,
    }
    return res


# For example, proportion_lo = 0.1 and proportion_hi = 0.1 makes the top and bottom 10% be the values at the
# 90th and 10th percentiles, respectively.
def get_winsorized_signal(raw_signal, proportion_lo, proportion_hi):
    winsorized_signal = raw_signal.apply(lambda row: winsorize(
        row, limits=[proportion_lo, proportion_hi]), axis=1, result_type='expand')
    winsorized_signal.columns = raw_signal.columns
    return winsorized_signal


# For example, proportion_lo = 0.1 and proportion_hi = 0.1 makes the top and bottom 10% be removed.
def get_truncated_signal(raw_signal, proportion_lo, proportion_hi):
    quantile_lo = raw_signal.quantile(proportion_lo, axis=1)
    mask_lo = raw_signal.lt(quantile_lo, axis=0)

    quantile_hi = raw_signal.quantile(1-proportion_hi, axis=1)
    mask_hi = raw_signal.gt(quantile_hi, axis=0)

    return raw_signal.mask(mask_lo).mask(mask_hi)


# For example, proportion_lo = 0.1 and proportion_hi = 0.1 makes the middle 80% of the data be removed.
def get_rank_thresholded_signal(raw_signal, proportion_lo, proportion_hi):
    quantile_lo = raw_signal.quantile(proportion_lo, axis=1)
    mask_above = raw_signal.gt(quantile_lo, axis=0)

    quantile_hi = raw_signal.quantile(1-proportion_hi, axis=1)
    mask_below = raw_signal.lt(quantile_hi, axis=0)
    
    return raw_signal.mask(mask_above & mask_below)


def get_inverse_cdf_standard_normal_signal(raw_signal):
    ranked_signal = raw_signal.rank(axis=1)
    num_non_na = ranked_signal.notna().astype(int).sum(axis=1)

    return pd.DataFrame(
        norm.ppf(ranked_signal.divide(num_non_na + 1, axis=0)),
        columns=raw_signal.columns,
        index=raw_signal.index)


# px is expected to have data for every 4 hours
def get_horizon_to_px_4h(px_df):
    start_time = px_df.index[0]
    end_time = px_df.index[-1]
    return {
        4: px_df,
        8: px_df[px_df.index.hour % 8 == 0],
        12: px_df[px_df.index.hour % 12 == 0],
        24: px_df[px_df.index.hour == 0],
        48: px_df[px_df.index.hour == 0].loc[pd.date_range(start=start_time, end=end_time, freq='2D')],
    }


def plot_gross_and_net_cumulative_returns(gross_returns, net_returns):
    fig, axes = plt.subplots(1, 2, figsize=(10, 4))

    gross_returns.cumsum().plot(ax=axes[0], title='Cumulative Gross Returns', color='blue')
    axes[0].set_xlabel('Time')
    axes[0].set_ylabel('Cumulative Return')
    axes[0].grid(True)

    # Plot the second Series on the second subplot (axes[1])
    net_returns.cumsum().plot(ax=axes[1], title='Cumulative Net Returns', color='red')
    axes[1].set_xlabel('Time')
    axes[1].set_ylabel('Cumulative Return')
    axes[1].grid(True)

    # Adjust layout to prevent overlapping titles/labels
    plt.tight_layout()

    # Display the plots
    plt.show()

