In [7]:
import matplotlib.pyplot as plt
import numpy as np
import polars as pl
from pathlib import Path
import ta
import statsmodels.api as sm
import numba
import pandas as pd
from statsmodels.regression.rolling import RollingOLS
from concurrent.futures import ThreadPoolExecutor
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import wandb

In [None]:
%load_ext memory_profiler

In [6]:
def fix_nan(data, fix):
    idx = np.isnan(data)
    if isinstance(fix, (pd.core.series.Series, np.ndarray)):
        data[idx] = fix[idx]
        return
    data[idx] = fix


def first_valid_obs(data):
    x = np.square(data)
    fix_nan(x, 0)
    return np.where(np.cumsum(x) > 0)[0][0]


def ewma_moment(data, moment, decay_in_days, ignore_zeros=True):
    res =  np.nan * np.ones(data.shape)
    alpha = 1 - 1 / (1 + decay_in_days)
    fst_idx = first_valid_obs(data)
    seeded_value = np.nanmean(data[fst_idx : fst_idx + decay_in_days - 1] ** moment)
    res[fst_idx : fst_idx + decay_in_days - 1] = seeded_value
    for k in np.arange(fst_idx + decay_in_days - 1, len(data)):
        if np.isnan(data[k]) or (ignore_zeros and data[k] == 0):
            res[k] = res[k-1]
        else:
            res[k] = alpha * res[k-1] + (1 - alpha) * (data[k] ** moment)
    return res


def winsorize(x, window=1, low_quantile=0, high_quantile=1):
    if low_quantile==0 and high_quantile==1:
        return x
    if low_quantile==0 and high_quantile<1:
        high = x.rolling(window=window).quantile(high_quantile, interpolation='midpoint')
        fix_nan(high, x)
        return pd.Series(data=np.min([x.values, high.values], axis=0), index=x.index)
    if high_quantile==1 and low_quantile>0:
        low = x.rolling(window=window).quantile(low_quantile, interpolation='midpoint')
        fix_nan(low, x)
        return pd.Series(data=np.max([x.values, low.values], axis=0), index=x.index) 
    low = x.rolling(window=window).quantile(low_quantile, interpolation='midpoint')
    fix_nan(low, x)
    high = x.rolling(window=window).quantile(high_quantile, interpolation='midpoint')
    fix_nan(high, x)
    return pd.Series(data=np.min([np.max([x.values, low.values], axis=0), high.values], axis=0), index=x.index) 


def draw_down(data): 
    r = np.concatenate((np.arange(0,1), np.cumsum(data))) 
    return r - np.maximum.accumulate(r), r


def max_draw_down(data):
    dd, r = draw_down(data)
    mdd = np.min(dd)
    end = np.argmin(dd)
    start = np.argmax(r[:end])
    return mdd, start, end


def nice_plot(x, label):
    ax = x.plot()
    ax.grid(True)
    ax.set_ylabel(label, fontsize=12)
    ax.xaxis.label.set_visible(False)


def describe(x, label):
    x.hist(bins=50)
    plt.suptitle(label)
    print(x.describe())

def pnl_stat(x):
    res = dict()
    res['mean'] = x.mean()
    res['std'] = x.std()
    res['sharpe'] = np.sqrt(252)*res['mean']/res['std']
    mdd, start, end = max_draw_down(x.values)
    res['mdd'] = mdd 
    res['mdd_start_date']  = x.index[start]
    res['mdd_end_date']  = x.index[end]
    res['mdd_depth']  = x.index[end] - x.index[start]
    res['autocorr'] = x.autocorr(1)
    res['% win'] = (x>0).mean()
    return res
    

def score_card(x):
    grp = x.groupby([(data.index.year),(data.index.month)])
    return np.sqrt(252)*grp.mean().unstack(1)/grp.std().unstack(1), grp.sum().unstack(1), grp.std().unstack(1)

In [3]:
def kaufman_efficiency_ratio(price_col: str, period: int = 10) -> pl.Expr:
    change = pl.col(price_col).diff().abs().fill_nan(0)
    volatility = change.rolling_sum(window_size=period)
    change_total = pl.col(price_col).diff(n=period-1).abs().fill_nan(0)
    efficiency_ratio = (change_total / volatility).fill_nan(0).alias('efficiency_ratio')
    return efficiency_ratio

def rolling_volatility(price_col: str, window_size: int = 20) -> pl.Expr:
    log_returns = pl.col(price_col).log().diff().fill_nan(0)
    volatility = log_returns.rolling_std(window_size=window_size, ddof = 0).alias('volatility')
    return volatility

def calculate_rsi(close_col: str, window: int) -> pl.Expr:
    delta = pl.col(close_col).diff().fill_nan(0)
    gain = pl.when(delta > 0).then(delta).otherwise(0)
    loss = pl.when(delta < 0).then(-delta).otherwise(0)
    avg_gain = gain.ewm_mean(alpha=1/window, adjust=False, ignore_nulls=True)
    avg_loss = loss.ewm_mean(alpha=1/window, adjust=False, ignore_nulls=True)
    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))
    return rsi.alias("rsi")

def calculate_bollinger_bands_indicators(close_col: str, window: int, num_std_dev: float) -> (pl.Expr, pl.Expr):
    sma = pl.col(close_col).rolling_mean(window_size=window)
    std_dev = pl.col(close_col).rolling_std(window_size=window)
    upper_band = sma + (std_dev * num_std_dev)
    lower_band = sma - (std_dev * num_std_dev)
    lband_indicator = pl.when(pl.col(close_col) < lower_band).then(1).otherwise(0).alias('lband_indicator')
    hband_indicator = pl.when(pl.col(close_col) > upper_band).then(1).otherwise(0).alias('hband_indicator')
    return lband_indicator, hband_indicator

def stochastic_oscillator_expressions(high_col: str, low_col: str, close_col: str, window_stoch: int, window_smooth: int) -> (pl.Expr, pl.Expr):
    rolling_low_expr = pl.col(low_col).rolling_min(window_size=window_stoch).alias('rolling_low')
    rolling_high_expr = pl.col(high_col).rolling_max(window_size=window_stoch).alias('rolling_high')
    percent_k_expr = ((pl.col(close_col) - rolling_low_expr) / (rolling_high_expr - rolling_low_expr) * 100).alias('%K')
    percent_d_expr = percent_k_expr.rolling_mean(window_size=window_smooth).alias('%D')
    return percent_k_expr, percent_d_expr

def bullish_trend(price_direction_col: str, window: int = 4, min_periods_to_confirm: int = 3) -> pl.Expr:
    rolling_sum_expr = pl.col(price_direction_col).rolling_sum(window_size=window, min_periods=1).shift(-window + 1)
    bullish_trend_expr = (rolling_sum_expr >= min_periods_to_confirm).cast(pl.Int8).fill_null(0).alias('bullish_trend')
    return bullish_trend_expr

In [8]:
def pipeline(data,
             low_col = 'low',
             high_col = 'high',
             open_col = 'open',
             close_col = 'close',
             volume_col = 'volume',
             decay_window = 20,
             window_sma = 20,
             window_ema = 20,
             window_volume = 20,
             window_trend = 4,
             min_window_trend = 3, 
             window_rsi = 20,
             window_bb = 20,
             bb_num_std = 2,
#              window_adx = 20,
             window_vol = 20,
             window_stoch = 30, 
             window_smooth = 3):

    percentage_change = ((pl.col(close_col) - pl.col(open_col)) / pl.col(open_col)).alias('percentage_change')
    percent_k_expr, percent_d_expr = stochastic_oscillator_expressions(high_col, low_col, close_col, window_stoch, window_smooth)
    lband_indicator, hband_indicator = calculate_bollinger_bands_indicators(close_col, window_bb, bb_num_std)
    
    df = data.with_columns(
    ((pl.col(close_col) > pl.col('open')).cast(pl.Int32)).alias('price_direction'), 
    pl.col(close_col).pct_change().alias('returns'),
    pl.col(volume_col).pct_change().alias('volume_change'),
    pl.col(volume_col).rolling_mean(window_size=window_volume).alias('avg_volume'),
    pl.col(close_col).rolling_mean(window_size=window_sma).alias('sma'),
    pl.col(close_col).ewm_mean(alpha = 1/window_ema, min_periods = window_ema, ignore_nulls=True).alias('ema'),
    pl.when(percentage_change.abs() < 0.01).then(0)
    .when(percentage_change.abs() < 0.05).then(1)
    .otherwise(2).alias('signal_strength'),
    pl.arange(1, pl.len() + 1).alias('time'),
    percent_k_expr,
    percent_d_expr,
    lband_indicator,
    hband_indicator,
    rolling_volatility(close_col, window_vol),
    calculate_rsi(close_col, window_rsi),
    kaufman_efficiency_ratio(close_col, window_rsi))
    
    
    bullish_trend_expr = bullish_trend('price_direction', window_trend, min_window_trend)
    
    conditions = [
        ((pl.col(close_col) > pl.col('sma')) & pl.col('hband_indicator').is_not_null()) |
        ((pl.col(close_col) < pl.col('sma')) & pl.col('lband_indicator').is_not_null()) &
        (pl.col('volume_change') > 0.05)]
    
    choices = ['Trend', 'Mean reversion']
    df = df.with_columns(bullish_trend_expr, 
        pl.when(conditions[0]).then(pl.lit(choices[0]))
           .when(conditions[1]).then(pl.lit(choices[1]))
           .otherwise(pl.lit('Uncertain'))
           .alias('Regime')
    )
    
    return df

In [9]:
%%time
base_path = Path()
file_path = base_path / 'data/candles/binance/futures'
parquet_files = list(file_path.glob('*.parquet'))

CPU times: total: 0 ns
Wall time: 210 ms


In [None]:
granularities = ['1m', '30m', '1h', '1d']

In [None]:
#getting file parquets by granularity
parquet_files_1d = [file for file in parquet_files if '1d' in file.name]
parquet_files_1d

In [None]:
#getting the list of tickers
tickers = []
with open(base_path / 'tickers.txt', 'r') as f:
    tickers.extend(line.strip().split('/')[0][1:].replace(",", "") for line in f)

print('No duplicates in tickers') if len(tickers) == len(list(set(tickers))) else 'Duplicates in tickers'

In [None]:
%%time
results = {key:pipeline(lazy_frames[key]) for key in lazy_frames.keys()}
lazy_frames_list = list(results.values())

In [None]:
datas = pl.collect_all(lazy_frames_list)
collected_results['BTC']

## BTC

In [None]:
data = collected_results['BTC'].to_pandas()
data.set_index('date', inplace = True)
data.index = pd.to_datetime(data.index)

In [None]:
vol_window = 252
vol_low_quantile = 0.15
vol_high_quantile = 0.95
vol_decay_in_days = 18
daily_risk = 100

data['Raw_vol'] = np.sqrt(ewma_moment(data['returns'].values, 2, vol_decay_in_days)) 
data['Vol'] = winsorize(data['Raw_vol'], vol_window, vol_low_quantile, vol_high_quantile)
data['Vol_1Lot'] = data['Vol'] * data['close']
data['A_returns'] = data['close'].diff()


data['VN_positions'] =  daily_risk / data['Vol_1Lot']
data['VN_trades'] = data['VN_positions'].diff()
data['VN_PnL'] = data['VN_positions'].shift(1) * data['A_returns']

In [None]:
class MarketEnvironment(gym.Env):
    def __init__(self, data):
        super(MarketEnvironment, self).__init__()
        self.data = data
        self.current_step = 0
        self.position_size = 100
        self.max_steps = len(data) - 1  # Length of the dataframe
        self.transaction_cost = 0
        self.entry_price = data['close'].iloc[0]
        self.action_space = spaces.Discrete(3)  # Two possible actions: Buy (1) or Sell (-1) no action (0)
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(3,), dtype=np.float32)  # Shape = (3,) means 3 features of type float32. If you have more features, adjust accordingly
        self.lookback_window = 2
        
    def reset(self):
        self.current_step = 0
        return self.data.iloc[self.current_step][['entry_exit', 'Price_Direction', 'Signal_Strength']].values # sample features, change depending on configurations/test

    def step(self, action):
        assert self.action_space.contains(action)
        self.current_step += 1
        if self.current_step >= self.max_steps:
            done = True
        else:
            done = False

        reward = self._calculate_reward_v2(action)

        next_state = self.data.iloc[self.current_step][['entry_exit', 'Price_Direction', 'Signal_Strength']].values

        return next_state, reward, done, {}
    

    def _calculate_reward_v2(self, action):
        
        if self.current_step == 0 or self.current_step == self.max_steps:
            return 0  # No reward for the first step or if it's the end of the episode

        current_price = self.data.iloc[self.current_step]['close']
        previous_price = self.data.iloc[self.current_step - 1]['close']
        position_size = self.position_size
        entry_price = self.entry_price

        # Calculate raw PnL
        pnl = (current_price - previous_price) * position_size

        # Subtract transaction costs
        transaction_cost = self.transaction_cost * abs(action)
        pnl -= transaction_cost

        # Risk adjustment (example using the Sharpe ratio)
#         recent_returns = self.data['returns'].iloc[self.current_step - self.lookback_window : self.current_step]
#         risk_adjustment = recent_returns.mean() / (recent_returns.std() + 1e-9)  # Avoid division by zero

        reward = pnl
        return reward
