In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F
import numpy as np
from datetime import datetime, timedelta
import pickle
import re
from datetime import date
import csv
import matplotlib.pyplot as plt
import mplfinance as mpf
import matplotlib.dates as mdates
from torch.utils.data import DataLoader, TensorDataset
import ast
from tqdm import tqdm

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        self.depth = d_model // num_heads

        self.W_Q = nn.Linear(d_model, d_model)
        self.W_K = nn.Linear(d_model, d_model)
        self.W_V = nn.Linear(d_model, d_model)
        self.W_O = nn.Linear(d_model, d_model)
 
    def forward(self, Q, K, V):
        Q = self.W_Q(Q)
        K = self.W_K(K)
        V = self.W_V(V)

        Q = self._split_heads(Q)
        K = self._split_heads(K)
        V = self._split_heads(V)

        attention_weights = torch.matmul(Q, K.transpose(-1, -2)) / torch.sqrt(torch.tensor(self.depth, dtype=torch.float32))
        attention_weights = torch.softmax(attention_weights, dim=-1)

        output = torch.matmul(attention_weights, V)
        output = self._combine_heads(output)

        output = self.W_O(output)
        return output
 
    def _split_heads(self, tensor):
        tensor = tensor.view(tensor.size(0), -1, self.num_heads, self.depth)
        return tensor.transpose(1, 2)
 
    def _combine_heads(self, tensor):
        tensor = tensor.transpose(1, 2).contiguous()
        return tensor.view(tensor.size(0), -1, self.num_heads * self.depth)


class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model, num_heads)
        self.feedforward = nn.Sequential(
            nn.Linear(d_model, 4 * d_model),
            nn.ReLU(),
            nn.Linear(4 * d_model, d_model)
        )
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
 
    def forward(self, x):
        attention_output = self.attention(x, x, x)
        attention_output = self.norm1(x + attention_output)

        feedforward_output = self.feedforward(attention_output)
        output = self.norm2(attention_output + feedforward_output)
        return output


class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model, num_heads)
        self.encoder_attention = MultiHeadAttention(d_model, num_heads)
        self.feedforward = nn.Sequential(
            nn.Linear(d_model, 4 * d_model),
            nn.ReLU(),
            nn.Linear(4 * d_model, d_model)
        )
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
 
    def forward(self, x, encoder_output):
        self_attention_output = self.self_attention(x, x, x)
        self_attention_output = self.norm1(x + self_attention_output)

        encoder_attention_output = self.encoder_attention(self_attention_output, encoder_output, encoder_output)
        encoder_attention_output = self.norm2(self_attention_output + encoder_attention_output)

        feedforward_output = self.feedforward(encoder_attention_output)
        output = self.norm3(encoder_attention_output + feedforward_output)
        return output

class Transformer(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_heads, num_layers, output_dim):
        super(Transformer, self).__init__()
        self.input_layer = nn.Linear(input_dim, hidden_dim)
        self.encoder_layers = nn.ModuleList([EncoderLayer(hidden_dim, num_heads) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(hidden_dim, num_heads) for _ in range(num_layers)])
        self.output_layer = nn.Linear(hidden_dim, output_dim)
 
    def forward(self, x):
        x = self.input_layer(x)

        encoder_output = x.transpose(0, 1)
        for layer in self.encoder_layers:
            encoder_output = layer(encoder_output)

        decoder_output = encoder_output
        for layer in self.decoder_layers:
            decoder_output = layer(decoder_output, encoder_output)

        decoder_output = decoder_output[-1, :, :]

        output = self.output_layer(decoder_output)
        return output

model = Transformer(input_dim=2, hidden_dim=64, num_heads=8, num_layers=6, output_dim = 3)

In [5]:
def read_data(input_file, output_file, num_samples):
    with open(input_file, 'r') as f_input, open(output_file, 'r') as f_output:
        for _ in range(num_samples):
            input_line = f_input.readline().strip()
            output_line = f_output.readline().strip()

            if not input_line or not output_line:
                continue

            try:
                input_data = np.array(ast.literal_eval(input_line), dtype=np.float32)
                output_data = np.array(ast.literal_eval(output_line), dtype=np.float32)
            except SyntaxError:
                continue  

            yield input_data, output_data

Import Data

In [None]:
num_samples = #number of samples in your import data
data_generator = read_data('training_input.txt', 'training_output.txt', num_samples)

inputs = []
outputs = []
for _ in range(num_samples):
    input_data, output_data = next(data_generator, (None, None))
    if input_data is not None and output_data is not None:
        inputs.append(input_data)
        outputs.append(output_data)

inputs = torch.tensor(inputs, dtype=torch.float32)
outputs = torch.tensor(outputs, dtype=torch.float32)

print(inputs.shape)
print(outputs_monthy_3.shape)

In [None]:
dataset = TensorDataset(inputs, outputs)

batch_size = #set as you want

data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

print(f'Number of batches: {len(data_loader)}')

Training Implementation

In [None]:
model = Transformer(input_dim=2, hidden_dim=64, num_heads=8, num_layers=6, output_dim = 3)

criterion = nn.MSELoss()

optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

num_epochs = #set as you want

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
criterion = criterion.to(device)

for epoch in range(num_epochs):
    running_loss = 0.0
    for i, data in enumerate(data_loader, 0):
        inputs, labels = data
        inputs = inputs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    print('Epoch [%d/%d], Loss: %.4f' % (epoch+1, num_epochs, running_loss / len(data_loader)))



torch.save(model.state_dict(), 'your_model_pth_name')

Usage

In [None]:
model_params_path = 'your_model_pth_name'
output_dim = #your setting output dim
model = Transformer(input_dim=2, hidden_dim=64, num_heads=8, num_layers=6, output_dim = output_dim)  
model.load_state_dict(torch.load(model_params_path, map_location=torch.device('cpu')))
model.eval()  

new_input = bakctest_input
        

new_input_tensor = torch.tensor(new_input, dtype=torch.float32).to(device)
new_input_tensor = new_input_tensor.unsqueeze(0)

       
with torch.no_grad():
    output = model(new_input_tensor)


output_values = output[0].tolist()
formatted_output = [format(x, '.10f') for x in output_values]

print(formatted_output)