# Transformer Encoder

In [25]:
import pandas as pd
import ccxt 
import numpy as np 
import pandas as pd 
from tqdm.auto import tqdm 
from xgboost import XGBClassifier
import json 
import pandas_ta as ta
from sklearn.metrics import accuracy_score 

import torch 
import torch.nn as nn 
import torch.optim as optim 
from torch.utils.data import Dataset, TensorDataset, DataLoader, RandomSampler, SequentialSampler
from sklearn.preprocessing import MinMaxScaler 
from sklearn.model_selection import train_test_split 
import zoneinfo

In [26]:
chart_df = pd.read_feather('BTC_USDT-1d (1).feather')
chart_df['date'] = pd.to_datetime(chart_df['date'])
# Extract year, month, day, and time information
chart_df['month'] = chart_df['date'].dt.month
chart_df['day'] = chart_df['date'].dt.day
chart_df.drop(columns={"date"}, inplace=True) 

In [27]:
close = chart_df["close"].values 

targets = [] 
for i in range(len(close)-1): 
    diff = (close[i+1] - close[i]) / close[i] 
    if diff < 0:
        targets.append(0) # short 
    else: 
        targets.append(1) # long 
        
targets.append(None) 

In [28]:
chart_df["targets"] = targets

chart_df.dropna(inplace=True) 

In [29]:
scaler = MinMaxScaler() 
scaled_features = scaler.fit_transform(chart_df.drop('targets', axis=1)) 

In [30]:
def create_dataset(X, y, time_step=1): 
    Xs, ys = [], [] 
    for i in range(len(X) - time_steps): 
        v = X.iloc[i:(i + time_steps)].values 
        Xs.append(v) 
        ys.append(y.iloc[i + time_steps]) 
    return np.array(Xs), np.array(ys) 

In [31]:
time_steps = 14
X, y = create_dataset(pd.DataFrame(scaled_features), chart_df['targets'], time_steps)

train_size = int(X.shape[0] * 0.8)

X_train = X[:train_size] 
y_train = y[:train_size] 

X_val = X[train_size:] 
y_val = y[train_size:] 

In [32]:
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=int)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=int)

X_train.shape, y_train.shape, X_val.shape, y_val.shape

(torch.Size([1828, 14, 7]),
 torch.Size([1828]),
 torch.Size([458, 14, 7]),
 torch.Size([458]))

In [33]:
batch_size = 32
train_data = TensorDataset(X_train, y_train)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)

val_data = TensorDataset(X_val, y_val) 
val_sampler = SequentialSampler(val_data) 
val_dataloader = DataLoader(val_data, sampler=val_sampler, batch_size=batch_size) 

In [34]:
# Transformer Encoder Classifier
class TransformerClassifier(nn.Module):
    def __init__(self, input_size, num_layers, num_heads, dim_feedforward, output_size):
        super(TransformerClassifier, self).__init__()
        
        # Embedding layer for positional encoding
        self.embedding = nn.Linear(input_size, input_size)
        
        transformer_layer = nn.TransformerEncoderLayer(
            d_model=input_size,
            nhead=num_heads,
            dim_feedforward=dim_feedforward,
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(transformer_layer, num_layers=num_layers)
        # Linear layer for final output
        self.fc = nn.Linear(input_size, output_size)
    
    def forward(self, x):
        # Positional embedding
        x = self.embedding(x)
        # Passing through the transformer layers
        x = self.transformer_encoder(x)
        # Taking the output of the last time step
        x = x[:, -1, :] 
        # Output layer
        output = self.fc(x)
        return output

In [35]:
def flat_accuracy(preds, labels):
    pred_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return np.sum(pred_flat == labels_flat) / len(labels_flat)

In [36]:
device = torch.device("cuda") 

# Hyperparameters
input_size = 7  # As per your dataset
num_layers = 3  # Number of Transformer layers
num_heads = 1   # Number of heads in Multi-Head Attention
dim_feedforward = 512  # Feedforward dimension
output_size = 2  # Number of output classes

# Model initialization
model = TransformerClassifier(input_size, num_layers, num_heads, dim_feedforward, output_size)
model.to(device)

loss_func = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=1e-4)

# Training loop
best_val_loss = np.inf
epochs = 10

for epoch in tqdm(range(epochs), desc="Epochs"):
    model.train()
    total_loss = 0
    for step, batch in enumerate(train_dataloader):
        b_x, b_y = [t.to(device) for t in batch]
        optimizer.zero_grad()
        output = model(b_x)
        loss = loss_func(output, b_y)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss += loss.item()
    
    # Validation
    model.eval()
    total_val_loss = 0
    val_accuracy = 0 
    with torch.no_grad():
        for step, batch in enumerate(val_dataloader):
            b_x, b_y = [t.to(device) for t in batch]
            output = model(b_x)
            loss = loss_func(output, b_y)
            total_val_loss += loss.item()
            val_accuracy += flat_accuracy(output.detach().cpu().numpy(), b_y.detach().cpu().numpy()) 
    
    avg_train_loss = total_loss / len(train_dataloader)
    avg_val_loss = total_val_loss / len(val_dataloader)
    avg_val_accuracy = val_accuracy / len(val_dataloader) 
    
    # Save best model
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(model.state_dict(), "best_transformer_encoder.pt")
    
    print(f"Epoch {epoch + 1}: Train Loss: {avg_train_loss}, Val Loss: {avg_val_loss}, val accuracy: {avg_val_accuracy}")


Epochs:   0%|          | 0/10 [00:00<?, ?it/s]

Epoch 1: Train Loss: 0.7022328561749952, Val Loss: 0.6972587466239929, val accuracy: 0.48333333333333334
Epoch 2: Train Loss: 0.6965796238389509, Val Loss: 0.694024900595347, val accuracy: 0.49083333333333334
Epoch 3: Train Loss: 0.6980716997179491, Val Loss: 0.6938722411791484, val accuracy: 0.4845833333333333
Epoch 4: Train Loss: 0.69346975560846, Val Loss: 0.6937342643737793, val accuracy: 0.49083333333333334
Epoch 5: Train Loss: 0.6965873775811031, Val Loss: 0.6936309655507406, val accuracy: 0.47625
Epoch 6: Train Loss: 0.6961075057243479, Val Loss: 0.693880005677541, val accuracy: 0.5304166666666666
Epoch 7: Train Loss: 0.6928976527575789, Val Loss: 0.6934242010116577, val accuracy: 0.495
Epoch 8: Train Loss: 0.6933132944435909, Val Loss: 0.6937866568565368, val accuracy: 0.4791666666666667
Epoch 9: Train Loss: 0.6975081429399294, Val Loss: 0.6936341603597005, val accuracy: 0.46375
Epoch 10: Train Loss: 0.6961903674849148, Val Loss: 0.6933500369389852, val accuracy: 0.528333333333

In [41]:
# save scaler 
import joblib 
joblib.dump(scaler, 'Transformer_minmax_scaler.pkl')

['Transformer_minmax_scaler.pkl']

# Inference

In [38]:
# run inference on CPU 
input_size = 7  # As per your dataset
num_layers = 3  # Number of Transformer layers
num_heads = 1   # Number of heads in Multi-Head Attention
dim_feedforward = 512  # Feedforward dimension
output_size = 2  # Number of output classes

Transformer_test = TransformerClassifier(input_size, num_layers, num_heads, dim_feedforward, output_size)  
checkpoint = torch.load("best_transformer_encoder.pt") 
Transformer_test.load_state_dict(checkpoint)

<All keys matched successfully>

In [42]:
import joblib
test_scaler = joblib.load("Transformer_minmax_scaler.pkl")

In [43]:
def preprocess(df):
    # Convert timestamps to datetime objects in Korean timezone
    korean_timezone = zoneinfo.ZoneInfo("Asia/Seoul")
    df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True)
    df['datetime'] = df['datetime'].dt.tz_convert(korean_timezone)
    return df

In [44]:
bitget = ccxt.bitget()
ohlcv = bitget.fetch_ohlcv("BTC/USDT:USDT", "1d")
chart_df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
chart_df = preprocess(chart_df)
days, months = [], []
for dt in chart_df["datetime"]:
    dtobj = pd.to_datetime(dt)
    day = dtobj.day
    month = dtobj.month
    days.append(day)
    months.append(month)
chart_df["month"] = months
chart_df["day"] = days
chart_df.drop(columns={"timestamp", "datetime"}, inplace=True)

In [45]:
test_scaled_features = test_scaler.transform(chart_df)  

In [48]:
test_input = test_scaled_features[-15:-1]  
test_input = test_input.reshape((-1, 14, 7)) 
test_input = torch.tensor(test_input, dtype=torch.float32)

Transformer_test.eval() 
with torch.no_grad():
    output = Transformer_test(test_input) 
    probs = nn.Softmax(dim=-1)(output)[0] 
    print(f"long prob: {probs[1]}%")  
    print(f"short prob: {probs[0]}%") 

long prob: 0.47848591208457947%
short prob: 0.5215141177177429%


In [49]:
# inference function 
def infer_Transformer(): 
    input_size = 7  # As per your dataset
    num_layers = 3  # Number of Transformer layers
    num_heads = 1   # Number of heads in Multi-Head Attention
    dim_feedforward = 512  # Feedforward dimension
    output_size = 2  # Number of output classes

    Transformer_test = TransformerClassifier(input_size, num_layers, num_heads, dim_feedforward, output_size)  
    checkpoint = torch.load("best_transformer_encoder.pt") 
    Transformer_test.load_state_dict(checkpoint)
    
    test_scaler = joblib.load("Transformer_minmax_scaler.pkl")
    
    bitget = ccxt.bitget()
    ohlcv = bitget.fetch_ohlcv("BTC/USDT:USDT", "1d")
    chart_df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
    chart_df = preprocess(chart_df)
    days, months = [], []
    for dt in chart_df["datetime"]:
        dtobj = pd.to_datetime(dt)
        day = dtobj.day
        month = dtobj.month
        days.append(day)
        months.append(month)
    chart_df["month"] = months
    chart_df["day"] = days
    chart_df.drop(columns={"timestamp", "datetime"}, inplace=True)
    
    test_input = test_scaled_features[-15:-1]  
    test_input = test_input.reshape((-1, 14, 7)) 
    test_input = torch.tensor(test_input, dtype=torch.float32)

    Transformer_test.eval() 
    with torch.no_grad():
        output = Transformer_test(test_input) 
        probs = nn.Softmax(dim=-1)(output)[0] 
        probs = probs.detach().cpu().numpy() 

    Transformer_long_prob = round(probs[1] * 100, 2)
    Transformer_short_prob = round(probs[0] * 100, 2)
    return Transformer_long_prob, Transformer_short_prob
