In [12]:
import numpy as np
import pandas as pd
import torch
from torch import nn, Tensor, inf
import matplotlib.pyplot as plt
from datasets import load_dataset

In [2]:
ds = load_dataset("wmt/wmt14", "fr-en")

Resolving data files:   0%|          | 0/30 [00:00<?, ?it/s]

Loading dataset shards:   0%|          | 0/30 [00:00<?, ?it/s]

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

class DataEncoding:
    def __init__(self, corpus):
        self.corpus = corpus
    
    def __tfidf_matrix_generator(self):
        vectorizer = TfidfVectorizer()
        return vectorizer.fit_transform(self.corpus)
    
    def __positional_encoding(self):
        tfidf = self.__tfidf_matrix_generator()
        seq_length, no_terms = tfidf.shape
        pos_vals = torch.arange(seq_length).unsqueeze(0)
        
        positional_encoder = nn.Embedding(seq_length, 512)
        return positional_encoder(pos_vals)
    
    def input_data(self):
        tfidf_dense = torch.tensor(self.__tfidf_matrix_generator().toarray(), dtype=torch.float32)
        return tfidf_dense + self.__positional_encoding()

In [14]:
from typing import Tuple

def generate_self_weights_QKV(inpt_mat: Tensor, h: int = 8) -> Tuple[Tensor]:
    _, d_model = inpt_mat.shape
    Q_weight = nn.Parameter(torch.randn(d_model, d_model))
    K_weight = nn.Parameter(torch.randn(d_model, d_model))
    V_weight = nn.Parameter(torch.randn(d_model, d_model))
    O_weight = nn.Parameter(torch.randn(h*d_model, d_model))
    return Q_weight, K_weight, V_weight, O_weight

def generate_cross_weights_QKV(inpt_mat: Tensor, otpt_mat: Tensor, h: int = 8) -> Tuple[Tensor]:
    _, d_ipt = inpt_mat.shape
    _, d_otpt = otpt_mat.shape
    Q_weight = nn.Parameter(torch.randn(d_ipt, d_ipt))
    K_weight = nn.Parameter(torch.randn(d_ipt, d_ipt))
    V_weight = nn.Parameter(torch.randn(d_otpt, d_otpt))
    O_weight = nn.Parameter(torch.randn(h*d_otpt, d_otpt))
    return Q_weight, K_weight, V_weight, O_weight

def layer_norm(ipt: Tensor) -> Tensor:
    layer_norm = nn.LayerNorm(ipt.size()[1])
    return layer_norm(ipt)

In [15]:
train_set = ds["train"]
translation = train_set["translation"][:10]
english = []
french = []
for sentences in translation:
    english.append(sentences["en"])
    french.append(sentences["fr"])
eng_word2Vec = DataEncoding(english)
eng_embedded = eng_word2Vec.input_data()
fr_word2Vec = DataEncoding(french)
fr_embedded = fr_word2Vec.input_data()


KeyboardInterrupt: 

In [None]:
class FeedForwardNetwork(nn.Module):
    def __init__(self, ipt: Tensor):
        super(FeedForwardNetwork, self).__init__()
        self.ipt = ipt
    
    def forward(self) -> Tensor:
        return layer_norm(nn.ReLU(self.ipt)+self.ipt)

In [None]:
import math
class ScaledDotProductAttention(nn.Module):
    def __init__(self, Q: Tensor, K: Tensor, V: Tensor):
        super(ScaledDotProductAttention, self).__init__()
        self.Q = Q
        self.K = K
        self.V = V
    
    def simple_attention(self, isMask: bool = False) -> Tensor:
        _, d_k = self.K.shape
        QK = torch.matmul(self.Q, self.K)
        scale = 1/math.sqrt(d_k)
        if isMask:
            mask = torch.triu(torch.ones_like(QK), diagonal=1).bool()
            QK = QK.masked_fill(mask, -inf)
        softmax = nn.Softmax.forward(scale * QK)
        attention = torch.matmul(softmax, self.V)
        return attention    

In [None]:

class Linear(nn.Module):
    def __init__(self, ipt: Tensor):
        super(Linear, self).__init__()
        self.ipt = ipt
    
    def forward(self, weight: Tensor) -> Tensor:
        return torch.matmul(weight, self.ipt)

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, ipt: Tensor, h: int):
        super(MultiHeadAttention, self).__init__()
        self.ipt = ipt
        self.h = h
        self.W_q, self.W_k, self.W_v, self.W_o = generate_self_weights_QKV(self.ipt)
        self.Q = Tensor()
        self.K = Tensor()
        self.V = Tensor()
    
    def __init__(self, ipt: Tensor, opt: Tensor, h: int):
        super(MultiHeadAttention, self).__init__()
        self.ipt = ipt
        self.opt = opt
        self.h = h
        self.W_q, self.W_k, self.W_v, self.W_o = generate_cross_weights_QKV(self.ipt, self.opt)
        self.Q = Tensor()
        self.K = Tensor()
        self.V = Tensor()
        
    def set_QKV(self):
        self.Q = Linear(self.ipt).forward(self.W_q)
        self.K = Linear(self.ipt).forward(self.W_k)
        self.V = Linear(self.ipt).forward(self.W_v)
        
    def forward(self, isMask: bool = False) -> Tensor:
        self.set_QKV()
        _,d_q = self.Q.shape
        _,d_k = self.K.shape
        _,d_v = self.V.shape
        Q_head = self.Q.view(self.ipt.size(0), self.ipt.size(1), self.h, d_q)
        K_head = self.K.view(self.ipt.size(0), self.ipt.size(1), self.h, d_k)
        V_head = self.V.view(self.ipt.size(0), self.ipt.size(1), self.h, d_v)
        
        scale_attention_opt = ScaledDotProductAttention(Q_head, K_head, V_head).simple_attention(isMask)
        concat_opt = scale_attention_opt.transpose(1,2).contiguous().view(self.ipt.size(0), self.ipt.size(1), self.h * d_v)
        ma_opt = Linear(concat_opt).forward(self.W_o) + self.ipt
        return layer_norm(ma_opt)