In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from transformers import AutoTokenizer
from torch.utils.data import Dataset, DataLoader
import numpy as np

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class FeedForward(nn.Module):
    def __init__(self,d_model,d_ff,droupout=0.1):
        super(FeedForward,self).__init__()
        self.linear1=nn.Linear(d_model,d_ff)
        self.linear2=nn.Linear(d_ff,d_model)
        self.dropout=nn.Dropout(droupout)
    def forward(self,x):
        x=self.linear1(x)
        x=F.relu(x)
        x=self.dropout(x)
        x=self.linear2(x)
        return x

In [3]:
class PositionalEmbedding(nn.Module):
    def __init__(self, d_model, max_len=512):
        super(PositionalEmbedding,self).__init__()
        pe=torch.zeros(max_len,d_model)
        position=torch.arange(0,max_len).unsqueeze(1).float()
        division=torch.exp(torch.arange(0,d_model,2).float()*-(torch.log(torch.tensor(10000.0))/d_model))
        pe[:,0::2]=torch.sin(position*division)
        pe[:,1::2]=torch.cos(position*division)
        pe=pe.unsqueeze(0)
        self.register_buffer('pe',pe)
    def forward(self,x):
        return x+self.pe[:,:x.size(1),:].detach()

In [4]:
class ScaledDotProductAttention(nn.Module):
    def __init__(self, d_model):
        super(ScaledDotProductAttention, self).__init__()
        self.scale = torch.sqrt(torch.FloatTensor([d_model])).item()
        
    def forward(self, query, key, value, mask=None):
        temp = torch.matmul(query, key.transpose(-2, -1)) / self.scale
        
        if mask is not None:
            if mask.dim() != 4:
                mask = mask.unsqueeze(1).unsqueeze(2)
            temp = temp.masked_fill(mask == 0, -1e10)
            
        attention = torch.softmax(temp, dim=-1)
        output = torch.matmul(attention, value)
    
        return output

In [5]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention,self).__init__()
        self.d_model=d_model
        self.num_heads=num_heads
        self.d_k=d_model//num_heads
        self.d_v=d_model//num_heads
        self.query=nn.Linear(d_model,d_model)
        self.key=nn.Linear(d_model,d_model)
        self.value=nn.Linear(d_model,d_model)
        self.attention=ScaledDotProductAttention(d_model)
        self.out=nn.Linear(d_model,d_model)
    def forward(self,query,key,value,mask=None):
        batch_size=query.size(0)
        query=self.query(query).view(batch_size,-1,self.num_heads,self.d_k).transpose(1,2)
        key=self.key(key).view(batch_size,-1,self.num_heads,self.d_k).transpose(1,2)
        value=self.value(value).view(batch_size,-1,self.num_heads,self.d_v).transpose(1,2)
        attention=self.attention(query,key,value,mask).transpose(1,2).contiguous().view(batch_size,-1,self.d_model)
        return self.out(attention)

In [6]:
class EncoderLayer(nn.Module):
    def __init__(self,d_model,num_heads,d_ff,droupout=0.1):
        super(EncoderLayer,self).__init__()
        self.attention=MultiHeadAttention(d_model,num_heads)
        self.feedforward=FeedForward(d_model,d_ff,droupout)
        self.norm1=nn.LayerNorm(d_model)
        self.norm2=nn.LayerNorm(d_model)
        self.dropout=nn.Dropout(droupout)
    def forward(self,x,mask=None):
        attention_x=self.attention(x,x,x,mask)
        x=self.norm1(x+self.dropout(attention_x))
        ff_output=self.feedforward(x)
        x=self.norm2(x+self.dropout(ff_output))
        return x

In [7]:
class DecoderLayer(nn.Module):
    def __init__(self,d_model,num_heads,d_ff,droupout=0.1):
        super(DecoderLayer,self).__init__()
        self.attention=MultiHeadAttention(d_model,num_heads)
        self.cross_attention=MultiHeadAttention(d_model,num_heads)
        self.feedforward=FeedForward(d_model,d_ff,droupout)
        self.norm1=nn.LayerNorm(d_model)
        self.norm2=nn.LayerNorm(d_model)
        self.norm3=nn.LayerNorm(d_model)
        self.dropout=nn.Dropout(droupout)
    def forward(self,x,memory,maskSelf=None,maskCross=None):
        self_attention_x=self.attention(x,x,x,maskSelf)
        x=self.norm1(x+self.dropout(self_attention_x))
        cross_attention_x=self.cross_attention(x,memory,memory,maskCross)
        x=self.norm2(x+self.dropout(cross_attention_x))
        ff_output=self.feedforward(x)
        x=self.norm3(x+self.dropout(ff_output))
        return x

In [8]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, num_heads=8, d_ff=2048, num_layers=6, dropout=0.1):
        super(Transformer, self).__init__()
        self.src_embedding = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_embedding = PositionalEmbedding(d_model)
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.out = nn.Linear(d_model, tgt_vocab_size)
        self.scale = torch.sqrt(torch.FloatTensor([d_model])).item()
        
    def forward(self, src, tgt):
        # Create masks
        src_pad_mask = (src != 0).unsqueeze(1).unsqueeze(2)  # [batch_size, 1, 1, src_len]
        tgt_pad_mask = (tgt != 0).unsqueeze(1).unsqueeze(2)  # [batch_size, 1, 1, tgt_len]
        
        # Embeddings
        src = self.src_embedding(src)
        tgt = self.tgt_embedding(tgt)
        src = self.positional_embedding(src)
        tgt = self.positional_embedding(tgt)
        
        # Encoder
        for layer in self.encoder_layers:
            src = layer(src, src_pad_mask)
        
        # Decoder
        for layer in self.decoder_layers:
            tgt = layer(tgt, src, tgt_pad_mask, src_pad_mask)
            
        return self.out(tgt)

In [10]:
def load_texts(eng_file, hin_file):
    with open(eng_file, "r", encoding="utf-8") as f:
        eng_sentences = f.readlines()
    with open(hin_file, "r", encoding="utf-8") as f:
        hin_sentences = f.readlines()
    return [s.strip() for s in eng_sentences], [s.strip() for s in hin_sentences]

In [11]:
english_sentences, hindi_sentences = load_texts("/Users/vishalsankarram/Desktop/github/document-level-mt-project/PM India en hi/pmindia.en-hi.en", "/Users/vishalsankarram/Desktop/github/document-level-mt-project/PM India en hi/pmindia.en-hi.hi")

In [12]:
np.random.seed(42)
sample_indices = np.random.choice(len(english_sentences), 1000, replace=False)
english_sample = [english_sentences[i].strip() for i in sample_indices]
hindi_sample = [hindi_sentences[i].strip() for i in sample_indices]

In [13]:
src_tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
tgt_tokenizer = AutoTokenizer.from_pretrained("ai4bharat/indic-bert")

In [14]:
class TranslationDataset(Dataset):
    def __init__(self, src_texts, tgt_texts, src_tokenizer, tgt_tokenizer, max_length=128):
        self.src_texts = src_texts
        self.tgt_texts = tgt_texts
        self.src_tokenizer = src_tokenizer
        self.tgt_tokenizer = tgt_tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.src_texts)

    def __getitem__(self, idx):
        src = self.src_tokenizer.encode(self.src_texts[idx], max_length=self.max_length, padding="max_length", truncation=True, return_tensors="pt")
        tgt = self.tgt_tokenizer.encode(self.tgt_texts[idx], max_length=self.max_length, padding="max_length", truncation=True, return_tensors="pt")
        return src.squeeze(0), tgt.squeeze(0)

In [15]:
train_dataset = TranslationDataset(english_sentences, hindi_sentences, src_tokenizer, tgt_tokenizer)
train_dataloader = DataLoader(train_dataset, batch_size=4, shuffle=True)

In [16]:
device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
src_vocab_size, tgt_vocab_size = len(src_tokenizer), len(tgt_tokenizer)
model = Transformer(src_vocab_size, tgt_vocab_size).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.AdamW(model.parameters(), lr=3e-4)

In [17]:
def train_model(num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for src, tgt in train_dataloader:
            src, tgt = src.to(device), tgt.to(device)
            optimizer.zero_grad()
            output = model(src, tgt[:, :-1])
            loss = criterion(output.view(-1, tgt_vocab_size), tgt[:, 1:].reshape(-1))
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss / len(train_dataloader)}")

train_model()

def translate(sentence, model, src_tokenizer, tgt_tokenizer, max_length=128):
    model.eval()
    src_tokens = src_tokenizer.encode(sentence, max_length=max_length, padding="max_length", truncation=True, return_tensors="pt").to(device)
    tgt_tokens = torch.tensor([[tgt_tokenizer.cls_token_id]]).to(device)

    for _ in range(max_length):
        output = model(src_tokens, tgt_tokens)
        next_token = output[:, -1, :].argmax(-1).unsqueeze(0)
        tgt_tokens = torch.cat([tgt_tokens, next_token], dim=1)
        if next_token.item() == tgt_tokenizer.sep_token_id:
            break
    
 
    return tgt_tokenizer.decode(tgt_tokens[0].tolist(), skip_special_tokens=True)

print(translate("Hello, how are you?", model, src_tokenizer, tgt_tokenizer))


KeyboardInterrupt: 

In [16]:
torch.save(model.state_dict(),"transformer.pt")