In [4]:
import pandas as pd
import csv
from pathlib import Path
from tokenizers import ByteLevelBPETokenizer
import os
from tokenizers.implementations import ByteLevelBPETokenizer
from tokenizers.processors import BertProcessing
import torch
from torch.utils.data import Dataset
import torch.nn as nn
import math
from typing import Optional
import torch.nn.functional as F
from dataclasses import dataclass
import spacy
import torchtext
from torchtext.data.functional import to_map_style_dataset
from torch.utils.data import DataLoader
from torchtext.vocab import build_vocab_from_iterator
import torchtext.datasets as datasets
from torch.optim.lr_scheduler import LambdaLR
from torch.nn.functional import pad
from transformers import RobertaTokenizerFast, RobertaConfig

## Creating Tokenizer

### csv to txt

In this part also the dates are being removed to avoid the model learning pattern by dates

In [5]:
df = pd.read_csv('.\\data\\btc-usdt.csv', index_col = False)
df.drop(df.columns[[0]], axis=1, inplace=True)
df.Time = df.Time.agg(lambda x: x.split(' ')[1])
df.to_csv(f'{os.getcwd()}\\data\\btc-usdt-mod.csv', index= False)

  df.Time = df.Time.agg(lambda x: x.split(' ')[1])


In [6]:
# Open the CSV file and the text file
with open('.\\data\\btc-usdt-mod.csv', mode='r') as csv_file, open('.\\data\\btc-usdt-mod.txt', mode='w') as txt_file:
    # Create a CSV reader
    csv_reader = csv.reader(csv_file)
    next(csv_reader)
    # Loop through each row in the CSV
    for row in csv_reader:        
        # Join the row with commas and a space, then write it to the text file with a newline
        txt_file.write(' '.join(row) + '\n')

print('CSV has been successfully converted to a text file', 'btc-usdt-mod.txt')
data_path = '.\\data\\btc-usdt-mod.txt'


CSV has been successfully converted to a text file btc-usdt-mod.txt


### Train tokenizer on data

In [7]:
tokenizer = ByteLevelBPETokenizer()

# Customize training
tokenizer.train(files=data_path, vocab_size=52_000, min_frequency=2, special_tokens=[
    "<s>",
    "<pad>",
    "</s>",
    "<unk>",
    "<mask>",
])
try:
    os.mkdir('tokenizer')
except FileExistsError:
    pass
# Save files to disk
tokenizer.save_model(".\\tokenizer", "btc")

['.\\tokenizer\\btc-vocab.json', '.\\tokenizer\\btc-merges.txt']

### Split Dataset

In [8]:
def create_sequences(data, k):
    sequences = []
    for i in range(len(data) - k):
        sequence = " <s> " + " ".join(data[i:i+k]) + " </s>"
        yield sequence

# Load and preprocess the dataset
with open('.\\data\\btc-test.txt', 'r') as file:
    lines = [line.strip() for line in file.readlines()]

# Example: create sequences with k=3
k = 3
sequences = create_sequences(lines, k)

In [9]:
with open(data_path, 'r') as file:
    lines = file.readlines()

split_index = int(len(lines) * 0.8)

train_lines = lines[:split_index]
test_lines = lines[split_index:]

with open('.\\data\\btc-train.txt', 'w') as file:
    file.writelines(train_lines)
    
with open('.\\data\\btc-test.txt', 'w') as file:
    file.writelines(test_lines)

print(f'Data has been split into training ({len(train_lines)} lines) and testing ({len(test_lines)} lines) sets.')


Data has been split into training (1678209 lines) and testing (419553 lines) sets.


In [10]:
class BTCDataset(Dataset):
    def __init__(self, evaluate: bool = False, seq_len=11):
        tokenizer = ByteLevelBPETokenizer(
            ".\\tokenizer\\btc-vocab.json",
            ".\\tokenizer\\btc-merges.txt",
        )
        tokenizer._tokenizer.post_processor = BertProcessing(
            ("</s>", tokenizer.token_to_id("</s>")),
            ("<s>", tokenizer.token_to_id("<s>")),
        )
        tokenizer.enable_truncation(max_length=512)

        self.sequences = []

        src_files = Path(".\\data").glob("btc-test.txt") if evaluate else Path(".\\data").glob("btc-train.txt")
        for src_file in src_files:
            print("processing", src_file)
            lines = src_file.read_text(encoding="utf-8").splitlines()
            self.sequences +=  [x.ids for x in tokenizer.encode_batch(create_sequences(lines, seq_len))]
    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, i):
        return torch.tensor(self.sequences[i])


### Embedding

In [11]:
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size: int, d_model: int):
        super().__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, self.d_model)


    def forward(self, x: torch.Tensor):
        return self.embedding(x) *  math.sqrt(self.d_model)

### Position Wise Encoding

In [12]:
class PosEmbedding(nn.Module):
    def __init__(self, d_model: int, max_len: int = 5000, drop_p: float = 0.1):
        super().__init__()
        self.d_model = d_model
        self.dropout = nn.Dropout(drop_p)

        pe = torch.zeros(max_len, d_model) 
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)
        )
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)
        self.register_buffer("pe", pe)

    def forward(self, x: torch.Tensor):
        x = x + self.pe[:, : x.size(1)].requires_grad_(False)
        return self.dropout(x)

### Create lookahead mask

In [13]:
def lookahead_mask(shape):
    mask = torch.ones(shape, shape).tril()

    return mask

### Position-Wise Feed Forward

In [14]:
class FeedForward(nn.Module):
    def __init__(
        self,
        d_model: int,
        d_ffn: int,
        drop_p: float = 0.1
    ):
        super().__init__()

        self.linear1 = nn.Linear(d_model,d_ffn)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(d_ffn,d_model)
        self.dropout = nn.Dropout(drop_p)

    def forward(self, x):
        return self.linear2(self.dropout(self.relu(self.linear1(x))))

### Add and Normalization Layer

In [15]:
class LayerAddNorm(nn.Module):
    def __init__(self, dim: int, eps: float = 1e-6):
        super().__init__()

        self.g = nn.Parameter(torch.ones(dim))
        self.b = nn.Parameter(torch.zeros(dim))
        self.eps = eps


    def forward(self, x: torch.Tensor, sub_x: torch.Tensor):
        try:
            add = x + sub_x
            mean = add.mean(-1, keepdim=True)
            std = add.std(-1, keepdim=True)
            return self.g*(add - mean)/(std+self.eps) + self.b
        except RuntimeError as e:
            error_message = str(e)
            if "size of tensor x" in error_message and "must match the size of tensor sub_x" in error_message:
                print("Caught a tensor size mismatch error in sublayer addition.")
            else:
                raise

### Multihead attention

In [36]:
def scaled_dot_product_attention(
    q: torch.Tensor, 
    k: torch.Tensor,
    v: torch.Tensor, 
    mask: Optional[torch.Tensor] = None, # 1, 1, attn_dim, attn_dim
    drop_p: float = 0.1
):
    d_k = q.size(-1)
    
    att_score = torch.matmul(q, k.transpose(-1,-2)) / math.sqrt(d_k)
    if mask is not None:
        att_score = att_score.masked_fill(mask == 0, float('-inf'))
    att_prob = F.softmax(att_score, dim = -1)
    
    dropout = nn.Dropout(drop_p)
    attn_p = dropout(att_prob)
    
    scores = torch.matmul(attn_p,v)
    
    return scores
    


In [37]:
class MultiHeadAttention(nn.Module):
    def __init__(
        self,
        d_model: int,
        num_h: int,
        drop_p: float = 0.1
    ):
        super().__init__()

        self.attn = None

        self.d_k = d_model // num_h
        self.num_h = num_h
        self.proj_q = nn.Linear(self.d_k, self.d_k)
        self.proj_k = nn.Linear(self.d_k, self.d_k)
        self.proj_v = nn.Linear(self.d_k, self.d_k)
        self.out_proj = nn.Linear(d_model, d_model)
        self.drop_p = drop_p

    def forward(
        self,
        query: torch.Tensor,
        key: torch.Tensor,
        value: torch.Tensor,
        mask: torch.Tensor = None,
    ):
        if mask is not None:
            mask = mask.unsqueeze(1)
         
        bs, seq_len, _ = query.size()

        query = query.view(bs, -1, self.num_h, self.d_k).transpose(1,2)
        key = key.view(bs, -1, self.num_h, self.d_k).transpose(1,2)
        value = value.view(bs, -1, self.num_h, self.d_k).transpose(1,2)
        
        query = self.proj_q(query)
        key = self.proj_k(key)
        value = self.proj_v(value)
        
        scores = scaled_dot_product_attention(query, key ,value, mask = mask, drop_p = self.drop_p)
        scores = scores.transpose(-2,-3).reshape(bs, seq_len, self.d_k * self.num_h)
        out = self.out_proj(scores)
        
        return out
        

### Encoder

In [41]:
class TransformerBlock(nn.Module):
    def __init__(self, embed_dim, expansion_factor=4, n_heads=8):
        super(TransformerBlock, self).__init__()
        
        """
        Args:
           embed_dim: dimension of the embedding
           expansion_factor: fator ehich determines output dimension of linear layer
           n_heads: number of attention heads
        
        """
        self.attention = MultiHeadAttention(embed_dim, n_heads)
        self.addnorm1 = LayerAddNorm(embed_dim) 
        self.addnorm2 = LayerAddNorm(embed_dim) 
        self.feedfwd = FeedForward(embed_dim, expansion_factor*embed_dim)

    def forward(self,key,query,value):
        
        """
        Args:
           key: key vector
           query: query vector
           value: value vector
           norm2_out: output of transformer block
        
        """
        att_out = self.attention(key,query,value) 
        addnorm1_out = self.addnorm1(att_out,value)

        feed_fwd_out = self.feedfwd(addnorm1_out)
        addnorm2_out = self.addnorm2(feed_fwd_out, addnorm1_out)

        return addnorm2_out



class TransformerEncoder(nn.Module):
    """
    Args:
        seq_len : length of input sequence
        embed_dim: dimension of embedding
        num_layers: number of encoder layers
        expansion_factor: factor which determines number of linear layers in feed forward layer
        n_heads: number of heads in multihead attention
        
    Returns:
        out: output of the encoder
    """
    def __init__(self, seq_len, vocab_size, embed_dim, num_layers=2, expansion_factor=4, n_heads=8):
        super(TransformerEncoder, self).__init__()
        
        self.embedding_layer = TokenEmbedding(vocab_size, embed_dim)
        self.positional_encoder = PosEmbedding(embed_dim, max_len=seq_len)

        self.layers = nn.ModuleList([TransformerBlock(embed_dim, expansion_factor, n_heads) for i in range(num_layers)])
    
    def forward(self, x):
        embed_out = self.embedding_layer(x)
        out = self.positional_encoder(embed_out)
        for layer in self.layers:
            out = layer(out,out,out)

        return out

In [59]:
enc = TransformerEncoder(10,512,6,4,8)
x = torch.randint(1,10,(1000,32))
out = enc(x)

RuntimeError: The size of tensor a (1000) must match the size of tensor b (10) at non-singleton dimension 1

In [35]:
class DecoderBlock(nn.Module):
    def __init__(self, embed_dim, expansion_factor=4, n_heads=8):
        super(DecoderBlock, self).__init__()

        """
        Args:
           embed_dim: dimension of the embedding
           expansion_factor: fator ehich determines output dimension of linear layer
           n_heads: number of attention heads
        
        """
        self.attention = MultiHeadAttention(embed_dim, n_heads)
        self.addnorm = nn.LayerAddNorm(embed_dim)
        self.transformer_block = TransformerBlock(embed_dim, expansion_factor, n_heads)
        
    
    def forward(self, key, query, x,mask):
        
        """
        Args:
           key: key vector
           query: query vector
           value: value vector
           mask: mask to be given for multi head attention 
        Returns:
           out: output of transformer block
    
        """
        
        #we need to pass mask mask only to fst attention
        att_out = self.attention(x,x,x,mask=mask) #32x10x512
        addnorm_out = self.addnorm(att_out,x)
        
        out = self.transformer_block(key, query, addnorm_out)

        
        return out


class TransformerDecoder(nn.Module):
    def __init__(self, target_vocab_size, embed_dim, seq_len, num_layers=2, expansion_factor=4, n_heads=8):
        super(TransformerDecoder, self).__init__()
        """  
        Args:
           target_vocab_size: vocabulary size of taget
           embed_dim: dimension of embedding
           seq_len : length of input sequence
           num_layers: number of encoder layers
           expansion_factor: factor which determines number of linear layers in feed forward layer
           n_heads: number of heads in multihead attention
        
        """
        self.word_embedding = TokenEmbedding(target_vocab_size, embed_dim)
        self.position_embedding = PosEmbedding(seq_len, embed_dim)

        self.layers = nn.ModuleList(
            [
                DecoderBlock(embed_dim, expansion_factor=4, n_heads=8) 
                for _ in range(num_layers)
            ]

        )
        self.fc_out = nn.Linear(embed_dim, target_vocab_size)
        self.dropout = nn.Dropout(0.2)

    def forward(self, x, enc_out, mask):
        
        """
        Args:
            x: input vector from target
            enc_out : output from encoder layer
            trg_mask: mask for decoder self attention
        Returns:
            out: output vector
        """
            
        
        x = self.word_embedding(x)  #32x10x512
        x = self.position_embedding(x) #32x10x512
        x = self.dropout(x)
     
        for layer in self.layers:
            x = layer(enc_out, x, enc_out, mask) 

        out = F.softmax(self.fc_out(x))

        return out

In [None]:
class Transformer(nn.Module):
    def __init__(self, embed_dim, src_vocab_size, target_vocab_size, seq_length,num_layers=2, expansion_factor=4, n_heads=8):
        super(Transformer, self).__init__()
        
        """  
        Args:
           embed_dim:  dimension of embedding 
           src_vocab_size: vocabulary size of source
           target_vocab_size: vocabulary size of target
           seq_length : length of input sequence
           num_layers: number of encoder layers
           expansion_factor: factor which determines number of linear layers in feed forward layer
           n_heads: number of heads in multihead attention
        
        """
        
        self.target_vocab_size = target_vocab_size

        self.encoder = TransformerEncoder(seq_length, src_vocab_size, embed_dim, num_layers=num_layers, expansion_factor=expansion_factor, n_heads=n_heads)
        self.decoder = TransformerDecoder(target_vocab_size, embed_dim, seq_length, num_layers=num_layers, expansion_factor=expansion_factor, n_heads=n_heads)
        
    
    def make_trg_mask(self, trg):
        """
        Args:
            trg: target sequence
        Returns:
            trg_mask: target mask
        """
        batch_size, trg_len = trg.shape
        # returns the lower triangular part of matrix filled with ones
        trg_mask = torch.tril(torch.ones((trg_len, trg_len))).expand(
            batch_size, 1, trg_len, trg_len
        )
        return trg_mask    

    def decode(self,src,trg):
        """
        for inference
        Args:
            src: input to encoder 
            trg: input to decoder
        out:
            out_labels : returns final prediction of sequence
        """
        trg_mask = self.make_trg_mask(trg)
        enc_out = self.encoder(src)
        out_labels = []
        batch_size,seq_len = src.shape[0],src.shape[1]
        out = trg
        for i in range(seq_len): #10
            out = self.decoder(out,enc_out,trg_mask) #bs x seq_len x vocab_dim
            # taking the last token
            out = out[:,-1,:]
     
            out = out.argmax(-1)
            out_labels.append(out.item())
            out = torch.unsqueeze(out,axis=0)
          
        
        return out_labels
    
    def forward(self, src, trg):
        """
        Args:
            src: input to encoder 
            trg: input to decoder
        out:
            out: final vector which returns probabilities of each target word
        """
        trg_mask = self.make_trg_mask(trg)
        enc_out = self.encoder(src)
   
        outputs = self.decoder(trg, enc_out, trg_mask)
        return outputs