In [1]:
import sys

sys.path.append("../src")
from tokenizer import get_tokenizer, get_data
from torch.utils.data import Dataset, DataLoader
from CustomDataLoader import CustomDataset


dataset_path = (
    "/Users/shusanketbasyal/.cache/kagglehub/datasets/jigarpanjiyar/english-to-manipuri-dataset/versions/1"
    + "//english-nepali.xlsx"
)

engtokenizer, neptokenizer = get_tokenizer(dataset_path)

df_train, df_test = get_data(dataset_path, split=True)

In [2]:
engvocabsize = engtokenizer.get_vocab_size()
nepvocabsize = neptokenizer.get_vocab_size()
engvocabsize, nepvocabsize

(30000, 30000)

In [3]:
# from tokenizer import get_tokenizer, get_data
# from torch.utils.data import Dataset, DataLoader
# from CustomDataLoader import CustomDataset


# dataset_path =  "/Users/shusanketbasyal/.cache/kagglehub/datasets/jigarpanjiyar/english-to-manipuri-dataset/versions/1"+"//english-nepali.xlsx"

# engtokenizer, neptokenizer  = get_tokenizer(dataset_path)

# df_train, df_test = get_data(dataset_path, split=True)

# df_train_dataset = CustomDataset(df_train, engtokenizer, neptokenizer, "eng", "nep", 256)
# df_test_dataset = CustomDataset(df_test, engtokenizer, neptokenizer, "eng", "nep", 256)


# df_train_dataloader = DataLoader(df_train_dataset, batch_size=2, shuffle=True)

In [4]:
df_train_dataset = CustomDataset(
    df_train, engtokenizer, neptokenizer, "eng", "nep", 256
)
df_test_dataset = CustomDataset(df_test, engtokenizer, neptokenizer, "eng", "nep", 256)

In [5]:
df_train_dataloader = DataLoader(df_train_dataset, batch_size=1, shuffle=True)

In [6]:
for x in df_train_dataloader:
    enc_input = x["encoder_input"]
    dec_input = x["decoder_input"]
    enc_mask = x["encoder_mask"]
    dec_mask = x["decoder_mask"]

    print(enc_input.shape)
    print(dec_input.shape)
    print(x["label"].shape)
    print(enc_mask.shape)
    print(dec_mask.shape)

    break

torch.Size([1, 256])
torch.Size([1, 256])
torch.Size([1, 256])
torch.Size([1, 256])
torch.Size([1, 256, 256])


In [7]:
print("Done with data loader working on Model Implementation")

Done with data loader working on Model Implementation


In [8]:
# kinda global variable
seq_len = 256
embdim = 512
vocab_size = engvocabsize

In [9]:
import torch
import torch.nn as nn

In [10]:
class InputEmbeddings(nn.Module):
    def __init__(self, vocab_size, embdim):

        super().__init__()
        self.embeddings = nn.Embedding(vocab_size, embdim)

    def forward(self, x):
        return self.embeddings(x)

In [12]:
model = InputEmbeddings(vocab_size, embdim)
out = model(enc_input)
out.shape

torch.Size([1, 256, 512])

In [None]:
class PositionalEmbeddings(nn.Module):
    def __init__(self, seq_len, embdim):
        super().__init__()

        # self.pe is the lookup matrix where each row represents a position
        # shape  = (SEQ_LEN, EMBDIM)
        self.pe = torch.zeros(seq_len, embdim, dtype=torch.float32)
        # positions is just the sequence of the position from 0,seq_len, shape => (SEQ_LEN, 1)
        positions = torch.arange(0, seq_len, dtype=torch.float32).unsqueeze(1)
        # SHAPE(256)
        emb_skip_dim = torch.arange(0, embdim, step=2, dtype=torch.float32)
        # (seqlen, 1) / (256) => (seqlen, 256)
        z = positions / (10000 ** (emb_skip_dim / embdim))
        # even 256
        self.pe[:, 0::2] = torch.sin(z)
        # odd 256
        self.pe[:, 1::2] = torch.cos(z)

        # shape of self.pe => (SEQ_LEN, EMBDIM)
        # NEED TO ADD BATCH DIM
        self.pe = self.pe.unsqueeze(0)
        self.pe = nn.Parameter(self.pe, requires_grad=False)

    def forward(self, x):
        B, T, C = x.shape
        # for training all will have T but when generating not all will have seq of T
        x = x + self.pe[:, :T, :]
        return x

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, embdim, num_heads):
        super().__init__()
        self.query = nn.Linear(embdim, embdim)
        self.key = nn.Linear(embdim, embdim)
        self.value = nn.Linear(embdim, embdim)
        # after computation, this is used for projection
        self.proj = nn.Linear(embdim, embdim)
        self.num_heads = num_heads
        assert embdim % self.num_heads == 0, "{embdim} is not divisible by {num_heads}"
        self.head_dim = embdim // num_heads

    @staticmethod
    def attention(q, k, v, mask):
        head_dim = q.shape[-1]

        attention_scores = (q @ k.transpose(-2, -1)) / (head_dim) ** (1 / 2)
        if mask is not None:
            attention_scores.masked_fill(mask == 0, float("-inf"))

        attention_scores = attention_scores.softmax(dim=-1)

        return (attention_scores @ v), attention_scores

    def forward(self, q, k, v, mask):
        B, T, C = x.shape
        # (B,T,C) => (B,T,C)
        q = self.query(q)
        k = self.key(k)
        v = self.key(v)

        # reshape
        # (B,T,C) => (B,NH, T, H)
        q = q.view(B, T, self.num_heads, self.head_dim).transpose(1, 2)
        k = k.view(B, T, self.num_heads, self.head_dim).transpose(1, 2)
        v = v.view(B, T, self.num_heads, self.head_dim).transpose(1, 2)

        output, attentionscores = MultiHeadAttention.attention(q, k, v, mask)
        # shape of output => (B,NH, SEQ, HDIM)
        # TO => (B, SEQ, EMBDIM)
        output = output.transpose(1, 2).contiguous().view(B, T, C)
        return self.proj(output)

In [None]:
enc_input.shape, enc_mask.shape, dec_input.shape, dec_mask.shape

In [None]:
class FeedForward(nn.Module):
    def __init__(self, embdim):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(embdim, 4 * embdim), nn.ReLU(), nn.Linear(4 * embdim, embdim)
        )

    def forward(self, x):
        return self.model(x)

In [None]:
class LayerNormalization(nn.Module):

    def __init__(self, embdim):
        super().__init__()
        self.embdim = embdim
        self.alpha = torch.ones(embdim)
        self.beta = torch.zeros(embdim)

    def forward(self, x):
        xmean = x.mean(dim=-1, keepdim=True)
        xstd = x.std(dim=-1, keepdim=True)
        return self.alpha * ((x - xmean) / xstd + 1e-6) + self.beta

In [None]:
class EncoderBlock(nn.Module):
    def __init__(self, embdim, num_heads):
        super().__init__()
        self.attn = MultiHeadAttention(embdim, num_heads)
        self.feedfwd = FeedForward(embdim)
        self.layernorm1 = LayerNormalization(embdim)
        self.layernorm2 = LayerNormalization(embdim)

    def forward(self, x, mask):
        x = x + self.attn(x, x, x, mask)
        x = self.layernorm1(x)
        x = x + self.feedfwd(x)
        return self.layernorm2(x)

In [None]:
class DecoderBlock(nn.Module):
    def __init__(self, embdim, num_heads):
        super().__init__()
        self.attn = MultiHeadAttention(embdim, num_heads)
        self.cross_attn = MultiHeadAttention(embdim, num_heads)
        self.feedfwd = FeedForward(embdim)
        self.layernorm1 = LayerNormalization(embdim)
        self.layernorm2 = LayerNormalization(embdim)
        self.layernorm3 = LayerNormalization(embdim)

    def forward(self, encoder_output, x, src_mask, tgt_mask):
        x  = x + self.attn(x,x,x,tgt_mask)
        x  = self.layernorm1(x)
        x = x + self.attn(x,encoder_output, encoder_output, src_mask)
        x = self.layernorm2(x)
        x = x + self.feedfwd(x)
        x = self.layernorm3(x)
        return x 



In [None]:
class Encoder(nn.Module):
    def __init__(self, layers):
        super().__init__()
        self.layers = layers

    def forward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)
        return x

In [None]:
class Decoder(nn.Module):
    def __init__(self, layers):
        super().__init__()
        self.layers = layers

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        for layer in self.layers:
            x = layer(x, encoder_output, src_mask, tgt_mask)
        return x

In [None]:
class FinalProjectionLayer(nn.Module):
    def __init__(self, embdim, vocab_size):
        super().__init__()
        self.linear = nn.Linear(embdim, vocab_size)

    def forward(self, x):
        return self.linear(x)

In [None]:
class Transformer(nn.Module):

    def __init__(
        self, encoder, decoder, src_emb, tgt_emb, src_pos, tgt_pos, finalprojectionlayer
    ):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_emb = src_emb
        self.src_pos = src_pos
        self.tgt_emb = tgt_emb
        self.tgt_pos = tgt_pos
        self.finalprojectionlayer = finalprojectionlayer

    def encoder_func(self, x, src_mask):
        x = self.src_emb(x)
        x = self.src_pos(x)
        return self.encoder(x, src_mask)

    def decoder_func(self, encoder_output, x, src_mask, tgt_mask):
        x = self.tgt_emb(x)
        x = self.tgt_pos(x)
        return self.decoder(encoder_output, x, src_mask, tgt_mask)

    def projection(self, x):
        return self.finalprojectionlayer(x)

In [None]:
def build_transformer(
    src_vocab_size,
    src_seq_len,
    tgt_vocab_size,
    tgt_seq_len,
    embdim,
    encoder_depth,
    decoder_depth,
    num_heads,
):
    src_emb = InputEmbeddings(src_vocab_size, embdim)
    src_pos = PositionalEmbeddings(src_seq_len, embdim)

    tgt_emb = InputEmbeddings(tgt_vocab_size, embdim)
    tgt_pos = PositionalEmbeddings(tgt_seq_len, embdim)

    encoder_blocks = []
    for _ in range(encoder_depth):
        encoder_block = EncoderBlock(embdim, num_heads)
        encoder_blocks.append(encoder_block)

    decoder_blocks = []

    for _ in range(decoder_depth):
        decoder_block = DecoderBlock(embdim, num_heads)
        decoder_blocks.append(decoder_block)

    encoder = Encoder(nn.ModuleList(encoder_blocks))
    decoder = Decoder(nn.ModuleList(decoder_blocks))
    finalprojectionlayer = FinalProjectionLayer(embdim, tgt_vocab_size)

    transformer = Transformer(
        encoder, decoder, src_emb, tgt_emb, src_pos, tgt_pos, finalprojectionlayer
    )

    return transformer

In [None]:
build_transformer(300, 300, 3000, 300, 300, 2, 1, 10)