## Testing already trained model saved in "trained_model.pth"

mounting the drive

In [94]:
# from google.colab import drive
# drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


importing the necessary packages

In [115]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

Loading the test data

In [116]:
df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/LLM_Model/data/btc_data.csv')
df = df.dropna()

In [117]:
df

Unnamed: 0,timestamp,open,high,low,close,volume,comment_count,sentiment_score,predict,target
0,2020-01-22 07:00:00,8753.66,8818.00,8706.07,8740.27,2896.788689,1.0,0.0,8678.69,0
1,2020-01-22 08:00:00,8740.49,8746.34,8656.00,8678.69,3374.764471,0.0,0.0,8686.28,1
2,2020-01-22 09:00:00,8678.67,8688.99,8643.31,8686.28,1638.063198,0.0,0.0,8663.36,0
3,2020-01-22 10:00:00,8684.56,8686.68,8651.00,8663.36,1303.479686,0.0,0.0,8659.87,0
4,2020-01-22 11:00:00,8662.90,8670.49,8622.00,8659.87,1708.384236,0.0,0.0,8664.50,1
...,...,...,...,...,...,...,...,...,...,...
34516,2023-12-31 19:00:00,42656.69,42695.44,42586.91,42619.04,703.557910,0.0,0.0,42558.01,0
34517,2023-12-31 20:00:00,42619.04,42680.36,42504.56,42558.01,1015.140350,0.0,0.0,42520.72,0
34518,2023-12-31 21:00:00,42558.02,42670.44,42520.72,42520.72,923.438130,0.0,0.0,42257.88,0
34519,2023-12-31 22:00:00,42520.73,42591.10,42056.00,42257.88,1811.594110,0.0,0.0,42283.58,1


In [118]:
# Calculate moving averages and add them as features
df['ma_5'] = df['close'].rolling(window=5).mean()
df['ma_10'] = df['close'].rolling(window=10).mean()
df['ma_20'] = df['close'].rolling(window=20).mean()

# Drop rows with NaN values created by moving averages
df = df.dropna()

Normalizing the data

In [119]:
scaler = MinMaxScaler()

# Updating the features
features = df[['open', 'high', 'low', 'close', 'volume', 'comment_count', 'sentiment_score', 'ma_5', 'ma_10', 'ma_20']]
features_scaled = scaler.fit_transform(features)

features_scaled = scaler.fit_transform(features)

Feature scaling is necessary get similar range of data when feeding the model

In [120]:
sequence_length = 24  # using 24 hours of data to predict the next hour

# Create sequences
X, y = [], []
for i in range(len(features_scaled) - sequence_length):
    X.append(features_scaled[i:i+sequence_length])
    y.append(features_scaled[i+sequence_length, 3])  # predicting the 'close' price

X, y = np.array(X), np.array(y)

In [121]:
# Split the data
_, X_test, _, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

# Convert to PyTorch tensors
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)


Preparing data for model input (adjusting according to your specific needs)
Converting data to tokenized input suitable for your model,
And defining your training and evaluation setup using pytorch's Trainer and TrainingArguments

In [122]:
# the transformer model
class TransformerModel(nn.Module):
    def __init__(self, input_dim, num_heads, num_layers, hidden_dim):
        super(TransformerModel, self).__init__()
        self.encoder_layer = nn.TransformerEncoderLayer(d_model=input_dim, nhead=num_heads, dim_feedforward=hidden_dim)
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(input_dim * sequence_length, 1)

    def forward(self, src):
        src = self.transformer_encoder(src)
        src = src.view(src.size(0), -1)  # Flatten
        output = self.fc(src)
        return output

In [123]:
# Model parameters
input_dim = 10
num_heads = 1
num_layers = 2
hidden_dim = 128
sequence_length = 24 

# Initializing the model
model = TransformerModel(input_dim, num_heads, num_layers, hidden_dim)



Loading the trained model parameters

In [124]:
model.load_state_dict(torch.load('/content/drive/MyDrive/Colab Notebooks/trained_model.pth'))
model.eval()

TransformerModel(
  (encoder_layer): TransformerEncoderLayer(
    (self_attn): MultiheadAttention(
      (out_proj): NonDynamicallyQuantizableLinear(in_features=10, out_features=10, bias=True)
    )
    (linear1): Linear(in_features=10, out_features=128, bias=True)
    (dropout): Dropout(p=0.1, inplace=False)
    (linear2): Linear(in_features=128, out_features=10, bias=True)
    (norm1): LayerNorm((10,), eps=1e-05, elementwise_affine=True)
    (norm2): LayerNorm((10,), eps=1e-05, elementwise_affine=True)
    (dropout1): Dropout(p=0.1, inplace=False)
    (dropout2): Dropout(p=0.1, inplace=False)
  )
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-1): 2 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=10, out_features=10, bias=True)
        )
        (linear1): Linear(in_features=10, out_features=128, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (lin

Evaluating the model

In [125]:
criterion = nn.MSELoss()

model.eval()
with torch.no_grad():
    test_outputs = model(X_test_tensor)
    test_loss = criterion(test_outputs.squeeze(), y_test_tensor)
    print(f"Test Loss: {test_loss.item()}")

Test Loss: 0.00019210625032428652


Backtest

In [126]:
def backtest(prices, predictions):
    initial_balance = 1000  # Starting with $1000
    balance = initial_balance
    position = 0  # 0 means no position, 1 means holding BTC
    returns = []

    for i in range(len(predictions) - 1):
        if predictions[i] < prices[i+1] and position == 0:  # Buy signal
            balance -= prices[i+1]
            position = 1
        elif predictions[i] > prices[i+1] and position == 1:  # Sell signal
            balance += prices[i+1]
            position = 0

        # Calculate daily return
        daily_return = (balance - initial_balance) / initial_balance
        returns.append(daily_return)
        initial_balance = balance  # Update initial balance for next day's return

    # If holding a position at the end, sell it
    if position == 1:
        balance += prices[-1]

    return balance, returns

Convert test outputs to numpy array and denormalize prices

In [127]:
test_outputs = test_outputs.flatten()
y_test_denormalized = scaler.inverse_transform(np.hstack((np.zeros((y_test_tensor.shape[0], 9)), y_test_tensor.numpy().reshape(-1, 1))))[:, -1]
test_outputs_denormalized = scaler.inverse_transform(np.hstack((np.zeros((test_outputs.shape[0], 9)), test_outputs.reshape(-1, 1))))[:, -1]

In [128]:
final_balance, returns = backtest(y_test_denormalized, test_outputs_denormalized)
total_return = (final_balance - 1000) / 1000

Calculating Sharpe Ratio

In [129]:
risk_free_rate = 0.01 / 365  # Daily risk-free rate assuming 1% annual rate
excess_returns = np.array(returns) - risk_free_rate
sharpe_ratio = np.mean(excess_returns) / np.std(excess_returns) * np.sqrt(365)  # Annualize the Sharpe Ratio

In [130]:
print(f"Final Balance: ${final_balance:.2f}")
print(f"Total Return: {total_return * 100:.2f}%")
print(f"Sharpe Ratio: {sharpe_ratio:.2f}")

Final Balance: $1149.10
Total Return: 14.91%
Sharpe Ratio: -0.91
