In [99]:
import torch
import math

In [100]:
import torch.nn as nn
import torch.optim as optim

In [101]:
import sys, time

def print_(*values, sep=' ', end='\n', file=sys.stdout, flush=True):
    file.write(sep.join(map(str, values)) + end)
    if flush:
        file.flush()
    
def cls_line():
    print_('', end='\r')


#* Its bugging when running all cells for some dumb reason 
print_('Loading model...', end='')
time.sleep(1)
cls_line()
print_('Loading model....')

Loading model...

Loading model....


In [102]:
vocab_size = 100
n_embed = 64
max_tokens = 10


In [103]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000, device = torch.device('cuda')):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2)
                             * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

In [104]:
class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, dropout=0.1, num_heads=8, device=torch.device('cuda')):
        super(Encoder, self).__init__()
        
        self.hidden_size = hidden_size
        self.input_size = input_size
        self.output_size = output_size
        self._dropout = dropout
        self._num_heads = num_heads
        self.device = device

        self.attention = nn.MultiheadAttention(hidden_size, num_heads, dropout=dropout, device=device)
        self.ffwd = nn.Sequential(
            nn.Linear(hidden_size, hidden_size, device=device),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size, device=device),
        )
        self.add_norm = nn.Sequential(
            nn.LayerNorm(hidden_size, device=device),
            nn.ReLU()
        )
        self.add_norm2 = nn.Sequential(
            nn.LayerNorm(hidden_size, device=device),
            nn.ReLU()
        )

    def forward(self, x):
        res, _ = self.attention(x, x, x) 
        x = self.add_norm1(res + x)
        res = self.ffwd(x)
        x = self.add_norm2(res + x)
        return x

In [105]:
class Decoder(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, dropout=0.1, num_heads=8, device=torch.device('cuda')):
        super(Decoder, self).__init__()

        self._hidden_size = hidden_size
        self._input_size = input_size
        self._output_size = output_size
        self._dropout = dropout
        self._num_heads = num_heads
        self._device = device

        self.masked_attention = nn.MultiheadAttention(
            hidden_size, num_heads, dropout=dropout, device=device)
        self.enc_dec_attention = nn.MultiheadAttention(
            hidden_size, num_heads, dropout=dropout, device=device)
        self.ffwd = nn.Sequential(
            nn.Linear(hidden_size, hidden_size, device=device),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size, device=device)
        )
        self.add_norm1 = nn.Sequential(
            nn.LayerNorm(hidden_size, device=device),
            nn.ReLU()
        )
        self.add_norm2 = nn.Sequential(
            nn.LayerNorm(hidden_size, device=device),
            nn.ReLU()
        )
        self.add_norm3 = nn.Sequential(
            nn.LayerNorm(hidden_size, device=device),
            nn.ReLU()
        )

    def forward(self, x, hidden_state):
        res, _ = self.masked_attention.forward(x, x, x, 
                                               attn_mask=torch.tril(
                                                   torch.eye(self._hidden_size, 
                                                             self._hidden_size, 
                                                             dtype=torch.bool,
                                                             device=self._device))) #! test mask
        x = self.add_norm1(res + x)
        res, _ = self.enc_dec_attention.forward(x, hidden_state, hidden_state) #! check if its right
        x = self.add_norm2(res + x)
        res = self.ffwd(x)
        x = self.add_norm3(res + x)
        return x

In [106]:
class EncoderBlock(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, dropout=0.1, num_heads=8, n_encoders=6, device=torch.device('cuda')):
        super(EncoderBlock, self).__init__()
        
        self._hidden_size = hidden_size
        self._input_size = input_size
        self._output_size = output_size
        self._dropout = dropout
        self._num_heads = num_heads
        self._n_encoders = n_encoders
        self._device = device

        self.embedding = nn.Embedding(input_size, hidden_size, device=device)
        self.pos_encoder = PositionalEncoding(hidden_size, device=device)

        self._encoders = [Encoder(input_size, hidden_size, output_size, dropout, num_heads, device) for _ in range(n_encoders)] 

    def forward(self, x):
        x = self.embedding(x)
        x = self.pos_encoder(x)
        
        for encoder in self._encoders:
            x = encoder(x)
            
        return x

In [107]:
class DecoderBlock(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, dropout=0.1, num_heads=8, n_decoders=6, device=torch.device('cuda')):
        super(DecoderBlock, self).__init__()

        self._hidden_size = hidden_size
        self._input_size = input_size
        self._output_size = output_size
        self._dropout = dropout
        self._num_heads = num_heads
        self._n_decoders = n_decoders
        self._device = device

        self.embedding = nn.Embedding(input_size, hidden_size, device=device)
        self.pos_encoder = PositionalEncoding(hidden_size, device=device)
        self.linear = nn.Linear(hidden_size, output_size, device=device)
        self.softmax = nn.Softmax(dim=1)

        self._decoders = [Decoder(
            input_size, hidden_size, output_size, dropout, num_heads, device=device) for _ in range(n_decoders)]

    def forward(self, x, hidden_state):
        x = self.embedding(x)
        x = self.pos_encoder(x)

        for decoder in self._decoders:
            x = decoder(x, hidden_state)
        
        x = self.linear(x)
        x = self.softmax(x)

        return x

In [108]:
from dataclasses import dataclass
from typing import Callable


@dataclass(frozen=True, slots=True)
class DataProcessor:

    processor: Callable

    def __call__(self, *args, **kwargs):
        return self.processor(*args, **kwargs)
    
@dataclass(frozen=True, slots=True)
class Model:
    preprocessor: DataProcessor
    ai: nn.Module
    postprocessor: DataProcessor

    def __call__(self, *args, **kwargs):
        return self.postprocessor(self.ai(self.preprocessor(*args, **kwargs)))
    

In [109]:
# Training loop using Adam optimizer, CrossEntropyLoss and EarlyStopping
class Trainer:

    _valid_metrics = ['accuracy', 'loss']

    @property
    def metrics(self):
        return self._metrics

    @metrics.setter
    def metrics(self, metrics):
        for element in metrics:
            if element not in self._valid_metrics:
                raise ValueError(f'Invalid metric: {element}')
        self._metrics = metrics

    @property
    def epochs(self):
        return self._epochs

    @epochs.setter
    def epochs(self, epochs):
        if epochs <= 0:
            raise ValueError('Number of epochs must be greater than zero')
        elif not isinstance(epochs, int):
            raise TypeError('Number of epochs must be an integer')

        self._epochs = epochs

    @property
    def batch_size(self):
        return self._batch_size

    @batch_size.setter
    def batch_size(self, batch_size):
        if batch_size <= 0:
            raise ValueError('Batch size must be greater than zero')
        elif not isinstance(batch_size, int):
            raise TypeError('Batch size must be an integer')

        self._batch_size = batch_size

    @property
    def model(self):
        return self._model
    
    @model.setter
    def model(self, model):
        if not isinstance(model, Model):
            raise TypeError('Model must be a Model object')
        self._model = model
    
    @property
    def full_dataset(self):
        return self._full_dataset
    
    @full_dataset.setter
    def full_dataset(self, full_dataset):
        if not isinstance(full_dataset, str | None):
            raise TypeError('The dataset must be passed as a string')
        self._full_dataset = full_dataset

    @property
    def test_split(self):
        return self._test_split
    
    @test_split.setter
    def test_split(self, test_split):
        if not isinstance(test_split, float):
            raise TypeError('Test split must be a float')
        elif test_split <= 0 or test_split >= 1:
            raise ValueError('Test split must be between 0 and 1')
        self._test_split = test_split

    @property
    def valid_split(self):
        return self._valid_split
    
    @valid_split.setter
    def valid_split(self, valid_split):
        if not isinstance(valid_split, float):
            raise TypeError('Validation split must be a float')
        elif valid_split <= 0 or valid_split >= 1:
            raise ValueError('Validation split must be between 0 and 1')
        self._valid_split = valid_split
    

    def __init__(self, model: Model,
                 metrics: list[str], 
                 epochs: int = 100, 
                 batch_size: int = 64, 
                 full_dataset: str|None = None, 
                 test_split: float = 0.2,
                 valid_split: float = 0.2) -> None:
        self.metrics = metrics
        self.epochs = epochs
        self.batch_size = batch_size
        self.model = model
        self.full_dataset = full_dataset
        self.test_split = test_split
        self.valid_split = valid_split

    def _train_one_epoch(self):
        pass

    def _validate(self):
        pass

    def fit(self, optimizer, loss_function, callbacks, **settings):
        pass

    def report(self):
        pass

    def graphics(self):
        pass