In [None]:
# Importing necessary libraries
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
import joblib
# from untrade.client import Client
import copy
from pathlib import Path
import warnings

import lightning.pytorch as pl
from lightning.pytorch.callbacks import EarlyStopping, LearningRateMonitor
from lightning.pytorch.loggers import TensorBoardLogger
import numpy as np
import pandas as pd
import torch

from pytorch_forecasting import Baseline, TemporalFusionTransformer, TimeSeriesDataSet
from pytorch_forecasting.data import GroupNormalizer
from pytorch_forecasting.metrics import MAE, SMAPE, PoissonLoss, QuantileLoss
from pytorch_forecasting.models.temporal_fusion_transformer.tuning import optimize_hyperparameters


In [None]:
# Constants
DATA_PATH = "BTC_2019_2023_1h.csv"  # Path to your dataset
MODEL_PATH = "TFT_FINAL_MODEL_PATH_.pth"  # Path to save your trained model
max_prediction_length = 6
max_encoder_length = 24
batch_size = 128 

In [None]:

# Function to load data
def load_data(path):
    return pd.read_csv(path)


In [None]:

# Function to preprocess data
def preprocess_data(data):
    # Basic preprocessing steps (example)
    # add time index
    data["time_idx"] = data.index
    data['datetime']=pd.to_datetime(data['datetime'])
    # data.rename(columns={"Unnamed: 0": "time_idx"}, inplace=True)
    # add additional features
    data['date']=pd.to_datetime(data['datetime'])
    # data['month'] = data.date.dt.month.astype(str).astype("category")  # categories have be strings
    data["month"] = data.date.dt.month.astype(str).astype("category")  # categories have be strings
    data["log_volume"] = np.log(data.volume + 1e-8)
    data['Target'] = data['close'].shift(-1) - data['close']
    data['TargetClass'] = data['Target'].apply(lambda x: 1 if x > 0 else 0).astype(float)
    data['group_id']='BTC'
    data['predictions']=-1
    data['predictions_y']=-1
    return data


In [None]:

# Function to train the model
def train_model(trainer,tft,train_dataloader,val_dataloader):
    # Train your model here
        
        trainer.fit(
                   tft,
                   train_dataloaders=train_dataloader,
                   val_dataloaders=val_dataloader,
                    )
        
        
        return tft


In [None]:

# Function to save the model
def save_model(model, path):
    # Save model and scaler
    torch.save(model.state_dict(), path)

    return


In [None]:

def generate_signals(data,n,pred):
    """
    Generates trading signals based on percentage change.

    Args:
        data (np.array): Array of numerical values.

    Returns:
        np.array: Array of trading signals (2, 1, 0, -1, -2).
    """

    signals = np.zeros_like(data, dtype=int)
    current_position = 0  # 0: neutral, 1: long, -1: short

    for i in range(1, len(data)):
        percentage_change = ((pred[i] - data[i - 1]) / pred[i])*100

        if percentage_change > n:
            if current_position == 0:  # Neutral
                signals[i] = 1
                current_position = 1 # Update position to long
            elif current_position == -1:  # Currently short
                signals[i] = 2
                current_position = 1  # Update position to neutral
            elif current_position == 1:  # Currently short
                signals[i] =  0

        elif percentage_change < -1*n:
            if current_position == 0:  # Neutral
                signals[i] = -1
                current_position = -1
            elif current_position == 1:  # Currently long
                signals[i] = -2
                current_position = -1  # Update position to short
            elif current_position == -1:  # Currently short
                signals[i] =  0

        elif (percentage_change < n and percentage_change>0):
            if current_position == 0:  # Neutral
                signals[i] = 0
                current_position = 0
            elif current_position == 1:  # Currently long
                signals[i] = -1
                current_position = 0
            elif current_position == -1:  # Currently long
                signals[i] = 1
                current_position = 0

        elif (percentage_change > -1*n and percentage_change<0):
            if current_position == 0:
                signals[i] = 0
                # current_position = -1
            elif current_position == 1:
                signals[i] = -1
                current_position = 0
            elif current_position == -1:
                signals[i] = 1
                current_position = 0

    return signals


In [None]:

# Function to generate backtest result
def backtest(signals):
    """
    Perform backtesting using the untrade SDK.

    Parameters:
    - csv_file_path (str): Path to the CSV file containing historical price data and signals.

    Returns:
    - result (generator): Generator object that yields backtest results.
    """
    # Create an instance of the untrade client
    client = Client()

    # Perform backtest using the provided CSV file path
    result = client.backtest(
        jupyter_id="jenish",  # the one you use to login to jupyter.untrade.io
        file_path = signals,
        leverage=1,  # Adjust leverage as needed
        result_type="Q", # Optional
    )

    return result


In [None]:
if __name__ == "__main__":
    # Load data
    data = load_data(DATA_PATH)

    # Preprocess data
    data = preprocess_data(data)

    training_cutoff = data["time_idx"].max() - max_prediction_length
    training = TimeSeriesDataSet(
    data[lambda x: x.time_idx <= training_cutoff],
    time_idx="time_idx",
    target="close",
    group_ids=[ "group_id"],
    min_encoder_length=max_encoder_length // 2,  # keep encoder length long (as it is in the validation set)
    max_encoder_length=max_encoder_length,
    min_prediction_length=1,
    max_prediction_length=max_prediction_length,
    static_categoricals=["group_id"],
    static_reals=[],
    # time_varying_known_categoricals=["time_idx", "month"],
    time_varying_known_reals=["time_idx", "month"],
    time_varying_unknown_categoricals=[],
    time_varying_unknown_reals=[
      'volume', 

    ],
    target_normalizer=GroupNormalizer(
        groups=["group_id"], transformation="softplus"
    ),  # use softplus and normalize by group
    add_relative_time_idx=True,
    add_target_scales=True,
    add_encoder_length=True,
  )
    
    validation = TimeSeriesDataSet.from_dataset(training, data, predict=True, stop_randomization=True)

    train_dataloader = training.to_dataloader(train=True, batch_size=batch_size, num_workers=0)
    val_dataloader = validation.to_dataloader(train=False, batch_size=batch_size * 10, num_workers=0)
    # early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, verbose=False, mode="min")
    lr_logger = LearningRateMonitor()  # log the learning rate
    logger = TensorBoardLogger("lightning_logs")  # logging results to a tensorboard

    trainer = pl.Trainer(
        max_epochs=25,
        enable_model_summary=True,
        gradient_clip_val=0.1,
        limit_train_batches=50, 
        callbacks=[lr_logger],
        logger=logger,
    )


In [None]:
tft = TemporalFusionTransformer.from_dataset(
        training,
        learning_rate=0.03,
        hidden_size=16,
        attention_head_size=2,
        dropout=0.1,
        hidden_continuous_size=8,
        loss=QuantileLoss(),
        log_interval=10,  # uncomment for learning rate finder and otherwise, e.g. to 10 for logging every 10 batches
        reduce_on_plateau_patience=4,
    )

In [None]:
# Train the model
tft = train_model(trainer,tft,train_dataloader,val_dataloader)




In [None]:
# Save the model
save_model(tft, MODEL_PATH)

In [None]:
# Load the model (just to confirm it works)
tft.load_state_dict(torch.load(MODEL_PATH))


In [None]:
   # Lists to store predictions and actual values
all_preds = []
all_ys = []

# Set starting point to only use the last 1000 data points
start_point = len(data) - 1000 - 24  # -24 to account for the 24-step lookahead

# Iterate over the specified range
for i in range(start_point, len(data) - 24):
    # Create the dataset for the next 24 points of data
    test = TimeSeriesDataSet.from_dataset(training, data[i:i+24], predict=True, stop_randomization=True)

    # Make predictions
    predictions = tft.predict(test, return_y=True, trainer_kwargs=dict(accelerator="cpu"))

    # Store predictions and actual values
    all_preds.append(predictions.output[0][0].item())
    all_ys.append(predictions.y[0][0][0].item())

# Add predictions to the DataFrame for the last 1000 data points
data.loc[start_point+24:, 'predictions'] = all_preds
data.loc[start_point+24:, 'predictions_y'] = all_ys
data['p_class']=-1
test_data=data.tail(1000)
test_data['predictions_y']=test_data['predictions_y'].shift(-6)
test_data['predictions']=test_data['predictions'].shift(-6)
test_data=test_data.iloc[:-6]
backtesting_signals = generate_signals(data=test_data['close'].values,n=0.12,pred=test_data['predictions'].values)
backtesting_signals=np.array(backtesting_signals)
test_data['signals']=backtesting_signals
test_data.to_csv('test_data.csv')

In [None]:
# Evaluate the model
results= backtest('test_data.csv')