In [None]:
import pandas as pd
from data_service import DataService

eth_ds = DataService("ETH-USDT", "3min")
eth_data = eth_ds.load_market_data()

btc_ds = DataService("BTC-USDT", "3min")
btc_data = btc_ds.load_market_data()
btc_data.columns = [col + "_btc" if col != 'timestamp' else col for col in btc_data.columns]
df = pd.merge(eth_data, btc_data, on='timestamp', how='inner')
display(df.tail(25))

In [None]:
df = df.drop(['id', 'symbol', 'id_btc', 'symbol_btc', 'timeframe_btc', 'timeframe'], axis=1)
display(df)


In [None]:
import cross_validation

cross_val_count = 3
train_segment_size = 300000
val_segment_size = 50000
test_segment_size = 50000
cross_val_segmentation_map = cross_validation.create_segmentation_map(cross_val_count, train_segment_size, val_segment_size, test_segment_size)
display(cross_val_segmentation_map)

In [None]:
df = df.tail(cross_val_segmentation_map['total_data_needed']).reset_index(drop=True)
display(df)

In [None]:
segments = {}
for n in range(cross_val_count):
    cross_val_cycle_keys = [key for key in cross_val_segmentation_map.keys() if str(n) in key]
    
    segments[n] = {}
    for key in cross_val_cycle_keys:
        start_end_tuple = cross_val_segmentation_map[key]
        segment = cross_validation.get_segment(df, *start_end_tuple)
        segments[n][key] = segment

In [None]:
cross_val_set_n = 2
cross_val_set = segments[cross_val_set_n]
train_data = cross_val_set[f'train_segment_{cross_val_set_n}']
val_data = cross_val_set[f'val_segment_{cross_val_set_n}']
test_data = cross_val_set[f'test_segment_{cross_val_set_n}']

train_data = train_data.drop(['timestamp'], axis=1)
val_data = val_data.drop(['timestamp'], axis=1)
test_data = test_data.drop(['timestamp'], axis=1)

In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import RobustScaler
from labeler import BinaryWinFinder

win_direction, candle_span, distance_threshold = "long", 500, 0.01

steps = np.linspace(5, 250, 50)
steps = [int(n) for n in steps]


def create_labels(df):
    win_finder = BinaryWinFinder(df, win_direction, candle_span, distance_threshold)
    df['wins'] = win_finder.find_wins()
    return df


def get_scaler(scaler_fit_data):
    scaler = RobustScaler()
    scaler = scaler.fit(scaler_fit_data)
    return scaler


def ema(df, span, target_col):
    closes = df[target_col]
    ema = closes.ewm(span=span).mean()
    label = f"{target_col}_ema_{span}"
    return ema.rename(label)


def apply_emas(df):
    df = df.copy()
    for n in steps:
        series = ema(df, n, 'close')
        df[series.name] = series
        series = ema(df, n, 'close_btc')
        df[series.name] = series
    return df


def log_series_if_ascending(df):
    for column in df.columns:
        df[column] = np.log(df[column])
        print(f"Log transformation applied on column '{column}'")
    df.replace([np.inf, -np.inf], np.nan, inplace=True)
    df.dropna(inplace=True)
    return df


def get_X_and_y(df):
    y = df['wins'].reset_index(drop=True)
    X = df.drop(['wins'], axis=1).reset_index(drop=True)
    return X, y


def atr(df, period=14):
    high_column = df['high']
    low_column = df['low']
    prev_close_column = df['close'].shift(-1)
    high_low_arr = high_column - low_column
    high_close_arr = abs(high_column - prev_close_column)
    low_close_arr = abs(low_column - prev_close_column)
    tr = pd.concat([high_low_arr, high_close_arr, low_close_arr], axis=1).max(axis=1)
    atr = tr.ewm(alpha=1/period, adjust=False).mean()
    label = f"atr{period}"
    return atr.rename(label)


def obv(df):
    copy = df.copy()
    obv = (np.sign(copy["close"].diff()) * copy["volume"]).fillna(0).rolling(250).sum()
    return obv


def process(df, scaler):
    df = create_labels(df)
    df = apply_emas(df)
    df['atr'] = atr(df)
    df['obv'] = obv(df)
    cols_not_win = [col for col in df.columns if col != "wins"]
    df[cols_not_win] = scaler.transform(df[cols_not_win])
    df[cols_not_win] = df[cols_not_win].pct_change(1)
    df.replace([np.inf, -np.inf], np.nan, inplace=True)
    df.dropna(inplace=True)
    df = df.dropna()
    X, y = get_X_and_y(df)
    return X, y

In [None]:
train_data_with_emas = apply_emas(train_data.copy())
train_data_with_emas['atr'] = atr(train_data_with_emas)
train_data_with_emas['obv'] = obv(train_data_with_emas)
# cols_not_win = [col for col in train_data_with_emas.columns if col != "wins"]
# train_data_with_emas[cols_not_win] = train_data_with_emas[cols_not_win].pct_change(1)
# train_data_with_emas.replace([np.inf, -np.inf], np.nan, inplace=True)
# train_data_with_emas.dropna(inplace=True)
scaler = get_scaler(train_data_with_emas)
X_train, y_train = process(train_data, scaler)
X_val, y_val = process(val_data, scaler)
X_test, y_test = process(test_data, scaler)


In [None]:
display(X_train)

In [None]:
import optuna
from xgboost import XGBClassifier
from sklearn.metrics import precision_score, recall_score, make_scorer
from sklearn.model_selection import train_test_split


def objective(trial):
    param = {
        'n_estimators': trial.suggest_int('n_estimators', 50, 300),
        'max_depth': trial.suggest_int('max_depth', 3, 10),
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
        'subsample': trial.suggest_float('subsample', 0.5, 1.0),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0),
        'alpha': trial.suggest_float('alpha', 0.1, 1.0),
        'lambda': trial.suggest_float('lambda', 0.1, 1.0),
        'scale_pos_weight': trial.suggest_float('scale_pos_weight', 1, 10),
        'early_stopping_rounds': 10
    }
    
    model = XGBClassifier(**param, use_label_encoder=False, eval_metric='logloss')
    
    # Train the model on the training set
    model.fit(X_train, y_train, eval_set=[(X_val, y_val)], verbose=False)
    
    # Make predictions on the validation set
    y_proba = model.predict_proba(X_val)[:, 1]
    
    threshold = 0.75 # Adjust this threshold value as needed
    y_pred = (y_proba >= threshold).astype(int)
    
    # Evaluate the model on the validation set for the positive class (class 1)
    precision = precision_score(y_val, y_pred, pos_label=1)
    recall = recall_score(y_val, y_pred, pos_label=1)
    
    # Custom metric that combines precision and recall smoothly
    score = precision * 0.75 + recall * 0.25
    
    return score


study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=2000)

In [None]:
from xgboost import XGBClassifier
from sklearn.metrics import precision_score, confusion_matrix, classification_report

# Initialize the model with the best hyperparameters
best_params = study.best_params
best_params['use_label_encoder'] = False
best_params['eval_metric'] = 'logloss'
best_params['early_stopping_rounds'] = 10

model = XGBClassifier(**best_params)

# Train the model
model.fit(X_train, y_train , eval_set=[(X_val, y_val)], verbose=False)

# Make predictions
y_proba = model.predict_proba(X_test)[:, 1]
    
threshold = 0.75 # Adjust this threshold value as needed
y_pred = (y_proba >= threshold).astype(int)

# Evaluate the model
precision = precision_score(y_test, y_pred, pos_label=1)
print(f"Precision: {precision:.2f}")

conf_matrix = confusion_matrix(y_test, y_pred)
print("Confusion Matrix:")
print(conf_matrix)

class_report = classification_report(y_test, y_pred)
print("Classification Report:")
print(class_report)