In [None]:
import math
import time
import spacy
import numpy as np
import os


import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchtext
from torchtext.data.functional import to_map_style_dataset
from torch.nn.functional import pad
from torch.utils.data import DataLoader
from torchtext.vocab import build_vocab_from_iterator
import torchtext.datasets as datasets


In [None]:
# @title
import os
import torch
import torch.nn.functional as F
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torch.utils.data import DataLoader, Dataset, random_split

# Function to read your custom dataset
def read_telugu_english_data(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        raw_data = []
        for line in file:
            telugu_sentence, english_sentence = line.strip().split('++++$++++')
            raw_data.append((telugu_sentence, english_sentence))
    return raw_data

# Splitting the dataset
def split_dataset(data, train_split=0.7, val_split=0.15, test_split=0.15):
    total_size = len(data)
    train_size = int(total_size * train_split)
    val_size = int(total_size * val_split)
    test_size = total_size - train_size - val_size
    train_data, remaining_data = random_split(data, [train_size, total_size - train_size])
    val_data, test_data = random_split(remaining_data, [val_size, test_size])
    return list(train_data), list(val_data), list(test_data)

# Custom Dataset class
class CustomDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

    def get_raw_texts(self):
        return [(src, trg) for src, trg in self.data]

# Define tokenizers
tokenizer_te = get_tokenizer('basic_english')  # Replace with a suitable tokenizer for Telugu
tokenizer_en = get_tokenizer('basic_english')  # Suitable tokenizer for English

# Build vocabulary function
def build_vocabulary(tokenizer, dataset, min_freq=2):
    def yield_tokens(data):
        for src, trg in data:
            yield tokenizer(src)
            yield tokenizer(trg)

    vocab = build_vocab_from_iterator(yield_tokens(dataset.get_raw_texts()), specials=["<unk>", "<pad>", "<bos>", "<eos>"], min_freq=min_freq)
    vocab.set_default_index(vocab['<unk>'])  # Set default index for unknown tokens
    return vocab

# Read the dataset
file_path = '/content/english_telugu_data.txt'
raw_data = read_telugu_english_data(file_path)
train_data_raw, val_data_raw, test_data_raw = split_dataset(raw_data)

# Create datasets
train_dataset = CustomDataset(train_data_raw)
valid_dataset = CustomDataset(val_data_raw)
test_dataset = CustomDataset(test_data_raw)

# Load vocabularies
vocab_src = build_vocabulary(tokenizer_te, train_dataset)
vocab_trg = build_vocabulary(tokenizer_en, train_dataset)

# Batch generation function
def generate_batch(data_batch):
    de_batch, en_batch = [], []
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    for (de_item, en_item) in data_batch:
        # Convert list of indices into tensors
        de_indices = torch.tensor([vocab_src[token] for token in tokenizer_te(de_item)], dtype=torch.long)
        en_indices = torch.tensor([vocab_trg[token] for token in tokenizer_en(en_item)], dtype=torch.long)

        # Concatenate BOS, indices, EOS
        de_temp = torch.cat([torch.tensor([vocab_src['<bos>']], dtype=torch.long), de_indices, torch.tensor([vocab_src['<eos>']], dtype=torch.long)], dim=0).to(device)
        en_temp = torch.cat([torch.tensor([vocab_trg['<bos>']], dtype=torch.long), en_indices, torch.tensor([vocab_trg['<eos>']], dtype=torch.long)], dim=0).to(device)

        # Pad sequences to ensure consistent length
        padded_de = F.pad(de_temp, (0, 20 - len(de_temp)), value=vocab_src['<pad>'])
        padded_en = F.pad(en_temp, (0, 20 - len(en_temp)), value=vocab_trg['<pad>'])

        de_batch.append(padded_de)
        en_batch.append(padded_en)

    return torch.stack(de_batch), torch.stack(en_batch)


# DataLoader setup
BATCH_SIZE = 128
train_iter = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True, collate_fn=generate_batch)
valid_iter = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True, collate_fn=generate_batch)
test_iter = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True, collate_fn=generate_batch)
BOS_IDX = vocab_trg['<bos>']
EOS_IDX = vocab_trg['<eos>']
PAD_IDX = vocab_trg['<pad>']
MAX_PADDING = 20
BATCH_SIZE = 128


In [None]:
class Embeddings(nn.Module):
  def __init__(self, vocab_size, embed_size):
    super().__init__()
    self.lut = nn.Embedding(vocab_size, embed_size)
    self.d_model=embed_size

  def forward(self, x):
    return (self.lut(x) * math.sqrt(self.d_model))


In [None]:
class PositionalEncoding(nn.Module):
  def __init__(self,d_model,dropout=0.1,max_len=5000):
    super().__init__()
    self.dropout=nn.Dropout(p=dropout)
    pe=tc.zeros(max_len,d_model)
    for k in np.arange(max_len):
      for i in np.arange(d_model//2):
        theta = k / (100** ((2*i)/d_model))


        pe[k, 2*i] = math.sin(theta)


        pe[k, 2*i+1] = math.cos(theta)
        self.register_buffer("pe",pe)
  def forward(self,x):
    x=x+self.pe[:x.size(1)].requires_grad_(False)
    return self.dropout(x)

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_heads, dropout=0.1):
        super().__init__()
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_key = d_model // n_heads
        self.wq = nn.Linear(d_model, d_model)
        self.wk = nn.Linear(d_model, d_model)
        self.wv = nn.Linear(d_model, d_model)
        self.wo = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, query, key, value, mask=None):
        batch_size = query.shape[0]
        q = self.wq(query)
        k = self.wk(key)
        v = self.wv(value)



        q = q.view(batch_size, -1, self.n_heads, self.d_key).permute(0, 2, 1, 3)
        k = k.view(batch_size, -1, self.n_heads, self.d_key).permute(0, 2, 1, 3)
        v = v.view(batch_size, -1, self.n_heads, self.d_key).permute(0, 2, 1, 3)

        scaled_dot_prod = torch.matmul(q, k.permute(0, 1, 3, 2)) / math.sqrt(self.d_key)
        if mask is not None:
            scaled_dot_prod = scaled_dot_prod.masked_fill(mask == 0, -1e10)

        attention_probs = torch.softmax(scaled_dot_prod, dim=-1)
        A = torch.matmul(self.dropout(attention_probs), v)
        A = A.permute(0, 2, 1, 3).contiguous()
        A = A.view(batch_size, -1, self.n_heads * self.d_key)
        output = self.wo(A)
        return output,attention_probs


In [None]:
class PositionwiseFeedForward(nn.Module):
  def __init__(self,d_model,d_ff,dropout=0.1):
    super().__init__()
    self.fc1=nn.Linear(d_model,d_ff)
    self.fc2=nn.Linear(d_ff,d_model)
    self.dropout=nn.Dropout(p=dropout)
  def forward(self,x):
    x=self.fc1(x)
    x=F.relu(x)
    x=self.dropout(x)
    x=self.fc2(x)
    return x

In [None]:
class EncoderLayer(nn.Module):
  def __init__(self,d_model,n_heads,d_ff,dropout):
    super().__init__()
    self.attention=MultiHeadAttention(d_model,n_heads,dropout)
    self.attn_layer_norm=nn.LayerNorm(d_model)
    self.positionwise_ffn=PositionwiseFeedForward(d_model,d_ff,dropout)
    self.ffn_layer_norm=nn.LayerNorm(d_model)
    self.dropout=nn.Dropout(p=dropout)

  def forward(self,src,src_mask):
    # print("encoder")
    _src,attn_probs=self.attention(src,src,src,src_mask)
    src=self.attn_layer_norm(src+self.dropout(_src))
    _src=self.positionwise_ffn(src)
    src=self.ffn_layer_norm(src+self.dropout(_src))
    return src,attn_probs


In [None]:
class Encoder(nn.Module):
  def __init__(self,d_model,n_layers,n_heads,d_ff,dropout):
    super().__init__()
    self.layers=nn.ModuleList([EncoderLayer(d_model,n_heads,d_ff,dropout) for _ in range(n_layers)])
    self.dropout=nn.Dropout(dropout)

  def forward(self,src,src_mask):
    for layer in self.layers:
      src,attn_probs=layer(src,src_mask)
    self.attn_probs=attn_probs
    return src

In [None]:
class DecodeLayer(nn.Module):
  def __init__(self,d_model,n_heads,d_ff,dropout):
    super().__init__()
    self.masked_attention=MultiHeadAttention(d_model,n_heads,dropout)
    self.masked_attn_layer_norm=nn.LayerNorm(d_model)
    self.attention=MultiHeadAttention(d_model,n_heads,dropout)
    self.attn_layer_norm=nn.LayerNorm(d_model)
    self.positionwise_ffn=PositionwiseFeedForward(d_model,d_ff,dropout)
    self.ffn_layer_norm=nn.LayerNorm(d_model)
    self.dropout=nn.Dropout(p=dropout)

  def forward(self,trg,src,trg_mask,src_mask):
    # print("Decoder")
    _trg,attn_probs=self.masked_attention(trg,trg,trg,trg_mask)
    trg=self.masked_attn_layer_norm(trg+self.dropout(_trg))
    _trg,attn_probs=self.attention(trg,src,src,src_mask)
    trg=self.attn_layer_norm(trg+self.dropout(_trg))
    _trg=self.positionwise_ffn(trg)
    trg=self.ffn_layer_norm(trg+self.dropout(_trg))
    return trg,attn_probs


In [None]:
class Decoder(nn.Module):
  def __init__(self,vocab_size,d_model,n_layers,n_heads,d_ffn,dropout):
    super().__init__()
    self.layers=nn.ModuleList([DecodeLayer(d_model,n_heads,d_ffn,dropout) for _ in range(n_layers)])
    self.dropout=nn.Dropout(dropout)
    self.wo=nn.Linear(d_model,vocab_size)

  def forward(self,trg,src,trg_mask,src_mask):
    for layer in self.layers:
      trg,attn_probs=layer(trg,src,trg_mask,src_mask)
    self.attn_probs=attn_probs
    return self.wo(trg)


In [None]:
class Transformer(nn.Module):
  def __init__(self,encoder,decoder,src_embed,trg_embed,src_pad_idx,trg_pad_idx,device):
    super().__init__()
    self.encoder=encoder
    self.decoder=decoder
    self.src_embed=src_embed
    self.trg_embed=trg_embed
    self.src_pad_idx=src_pad_idx
    self.trg_pad_idx=trg_pad_idx
    self.device=device

  def make_src_mask(self,src):
    src_mask=(src!=self.src_pad_idx).unsqueeze(1).unsqueeze(2)
    return src_mask.to(self.device)

  def make_trg_mask(self,trg):
    seq_len=trg.shape[1]
    trg_mask = (trg != self.trg_pad_idx).unsqueeze(1).unsqueeze(2)
    trg_sub_mask=tc.tril(tc.ones((seq_len,seq_len))).to(self.device).bool()
    trg_mask=trg_mask &trg_sub_mask
    return trg_mask
  def forward(self,src,trg):
    src_mask=self.make_src_mask(src)
    trg_mask=self.make_trg_mask(trg)
    src=self.src_embed(src)
    trg=self.trg_embed(trg)
    src=self.encoder(src,src_mask)
    output=self.decoder(trg,src,trg_mask,src_mask)
    # print(f"type decoder output {type(output)}")
    return output




In [None]:
def make_model(device, src_vocab, trg_vocab, n_layers: int = 3, d_model: int = 256,
               d_ffn: int = 2048, n_heads: int = 8, dropout: float = 0.1,
               max_length: int = 5000):
  encoder = Encoder(d_model, n_layers, n_heads, d_ffn, dropout)
  decoder = Decoder(len(trg_vocab), d_model, n_layers, n_heads, d_ffn, dropout)
  src_embed = Embeddings(len(src_vocab), d_model)
  trg_embed = Embeddings(len(trg_vocab), d_model)
  pos_enc = PositionalEncoding(d_model, dropout, max_length)
  model = Transformer(encoder, decoder, nn.Sequential(src_embed, pos_enc),
                      nn.Sequential(trg_embed, pos_enc),
                      src_pad_idx=src_vocab.get_stoi()["<pad>"],
                      trg_pad_idx=trg_vocab.get_stoi()["<pad>"],
                      device=device)
  for p in model.parameters():
    if p.dim() > 1:
      nn.init.xavier_uniform_(p)

  return model




In [None]:
import torch as tc
device = tc.device('cuda' if tc.cuda.is_available() else 'cpu')

model = make_model(device, vocab_src, vocab_trg,
                   n_layers=3, n_heads=8, d_model=256,
                   d_ffn=512, max_length=50)
model.cuda()

Transformer(
  (encoder): Encoder(
    (layers): ModuleList(
      (0-2): 3 x EncoderLayer(
        (attention): MultiHeadAttention(
          (wq): Linear(in_features=256, out_features=256, bias=True)
          (wk): Linear(in_features=256, out_features=256, bias=True)
          (wv): Linear(in_features=256, out_features=256, bias=True)
          (wo): Linear(in_features=256, out_features=256, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (attn_layer_norm): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
        (positionwise_ffn): PositionwiseFeedForward(
          (fc1): Linear(in_features=256, out_features=512, bias=True)
          (fc2): Linear(in_features=512, out_features=256, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (ffn_layer_norm): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
    )
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

The model has 23,393,215 trainable parameters


In [None]:
LEARNING_RATE = 0.0005

optimizer = torch.optim.Adam(model.parameters(), lr = LEARNING_RATE)
criterion = nn.CrossEntropyLoss(ignore_index = PAD_IDX)

In [None]:
def train(model, iterator, optimizer, criterion, clip):

  # set the model to training mode
  model.train()

  epoch_loss = 0

  # loop through each batch in the iterator
  for i, batch in enumerate(iterator):

    # set the source and target batches
    src,trg = batch

    # zero the gradients
    optimizer.zero_grad()

    # logits for each output
    logits = model(src, trg[:,:-1])

    # expected output
    expected_output = trg[:,1:]

    # calculate the loss
    loss = criterion(logits.contiguous().view(-1, logits.shape[-1]),
                    expected_output.contiguous().view(-1))

    # backpropagation
    loss.backward()

    # clip the weights
    torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

    # update the weights
    optimizer.step()

    # update the loss
    epoch_loss += loss.item()

  # return the average loss for the epoch
  return epoch_loss / len(iterator)

In [None]:

def evaluate(model, iterator, criterion):

  # set the model to evaluation mode
  model.eval()

  epoch_loss = 0

  # evaluate without updating gradients
  with torch.no_grad():

    # loop through each batch in the iterator
    for i, batch in enumerate(iterator):

      # set the source and target batches
      src, trg = batch


      # logits for each output
      logits = model(src, trg[:,:-1])

      # expected output
      expected_output = trg[:,1:]

      # calculate the loss
      loss = criterion(logits.contiguous().view(-1, logits.shape[-1]),
                      expected_output.contiguous().view(-1))

      # update the loss
      epoch_loss += loss.item()

  # return the average loss for the epoch
  return epoch_loss / len(iterator)

In [None]:
def epoch_time(start_time, end_time):
  elapsed_time = end_time - start_time
  elapsed_mins = int(elapsed_time / 60)
  elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
  return elapsed_mins, elapsed_secs

In [None]:
N_EPOCHS =10
CLIP = 1

best_valid_loss = float('inf')

# loop through each epoch
for epoch in range(N_EPOCHS):

  start_time = time.time()

  # calculate the train loss and update the parameters
  train_loss = train(model, train_iter, optimizer, criterion, CLIP)

  # calculate the loss on the validation set
  valid_loss = evaluate(model, valid_iter, criterion)

  end_time = time.time()

  # calculate how long the epoch took
  epoch_mins, epoch_secs = epoch_time(start_time, end_time)

  # save the model when it performs better than the previous run
  if valid_loss < best_valid_loss:
    best_valid_loss = valid_loss
    torch.save(model.state_dict(), 'transformer-model_tel.pt')

  print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
  print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
  print(f'\t Val. Loss: {valid_loss:.3f} |  Val. PPL: {math.exp(valid_loss):7.3f}')

Epoch: 01 | Time: 1m 27s
	Train Loss: 2.932 | Train PPL:  18.770
	 Val. Loss: 1.529 |  Val. PPL:   4.615
Epoch: 02 | Time: 1m 29s
	Train Loss: 1.353 | Train PPL:   3.869
	 Val. Loss: 1.062 |  Val. PPL:   2.891
Epoch: 03 | Time: 1m 28s
	Train Loss: 0.920 | Train PPL:   2.509
	 Val. Loss: 0.926 |  Val. PPL:   2.525
Epoch: 04 | Time: 1m 28s
	Train Loss: 0.716 | Train PPL:   2.046
	 Val. Loss: 0.877 |  Val. PPL:   2.405
Epoch: 05 | Time: 1m 28s
	Train Loss: 0.599 | Train PPL:   1.819
	 Val. Loss: 0.862 |  Val. PPL:   2.367
Epoch: 06 | Time: 1m 28s
	Train Loss: 0.523 | Train PPL:   1.687
	 Val. Loss: 0.864 |  Val. PPL:   2.374
Epoch: 07 | Time: 1m 27s
	Train Loss: 0.469 | Train PPL:   1.598
	 Val. Loss: 0.863 |  Val. PPL:   2.371
Epoch: 08 | Time: 1m 28s
	Train Loss: 0.426 | Train PPL:   1.531
	 Val. Loss: 0.857 |  Val. PPL:   2.357
Epoch: 09 | Time: 1m 27s
	Train Loss: 0.389 | Train PPL:   1.476
	 Val. Loss: 0.867 |  Val. PPL:   2.379
Epoch: 10 | Time: 1m 28s
	Train Loss: 0.362 | Train PPL

In [None]:
# load the weights
model.load_state_dict(torch.load('transformer-model_tel.pt'))

# calculate the loss on the test set
test_loss = evaluate(model, test_iter, criterion)

print(f'Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):7.3f}')

Test Loss: 0.860 | Test PPL:   2.362


In [None]:
import torch

def translate_sentence(sentence, model, device, vocab_src, vocab_trg, tokenizer_te, max_length=50):
    model.eval()

    # Check if the input is a string and tokenize accordingly
    if isinstance(sentence, str):
        # Tokenize the sentence using the Telugu tokenizer
        tokens = tokenizer_te(sentence)
        src = ['<bos>'] + [token.lower() for token in tokens] + ['<eos>']
    else:
        src = ['<bos>'] + sentence + ['<eos>']

    # Map the tokens to their respective indices in the source vocabulary
    src_indexes = [vocab_src[token] if token in vocab_src else vocab_src['<unk>'] for token in src]

    # Convert the list of indices to a tensor and add a batch dimension
    src_tensor = torch.tensor(src_indexes, dtype=torch.long).unsqueeze(0).to(device)

    # Initialize the list of target indices with the index of '<bos>'
    trg_indexes = [vocab_trg['<bos>']]

    # Initialize the loop to generate tokens up to a maximum length
    for i in range(max_length):
        # Convert the current list of target indices to a tensor and add a batch dimension
        trg_tensor = torch.tensor(trg_indexes, dtype=torch.long).unsqueeze(0).to(device)

        with torch.no_grad():
            # Feed the source and target tensors to the model to get the logits
            output = model(src_tensor, trg_tensor)
            pred_token = output.argmax(2)[:, -1].item()

            # Check if the predicted token is '<eos>' or the maximum length is reached
            if pred_token == vocab_trg['<eos>'] or i == (max_length - 1):
                # Convert indices to tokens
                trg_tokens = [vocab_trg.lookup_token(index) for index in trg_indexes[1:]]  # Skip '<bos>'
                return src, trg_tokens

            # Append the predicted token to the list of target indices
            trg_indexes.append(pred_token)

# Example usage
src_text = "how was it?"
model = model  # Replace with your actual model
device = 'cuda' if torch.cuda.is_available() else 'cpu'  # Assuming CUDA is available and appropriate
src, trg_tokens = translate_sentence(src_text, model, device, vocab_src, vocab_trg, tokenizer_te)
print(f'source --> {" ".join(src[1:-1])}')
print(f'target translation --> {" ".join(trg_tokens)}')


source --> how was it ?
target translation --> ఇది ఎలా ఉంది ?


In [None]:
src_text = "The technology is increasing rapidly"
src, trg_tokens = translate_sentence(src_text, model, device, vocab_src, vocab_trg, tokenizer_te)
print(f'source --> {" ".join(src[1:-1])}')
print(f'target translation --> {" ".join(trg_tokens)}')


source --> the technology is increasing rapidly
target translation --> సాంకేతిక పరిజ్ఞానం వేగంగా పెరుగుతోంది .


In [None]:
src_text = "I love learning new languages"
src, trg_tokens = translate_sentence(src_text, model, device, vocab_src, vocab_trg, tokenizer_te)
print(f'source --> {" ".join(src[1:-1])}')
print(f'target translation --> {" ".join(trg_tokens)}')


source --> i love learning new languages
target translation --> కొత్త భాషలు నేర్చుకోవడం నాకు చాలా ఇష్టం .


In [None]:
src_text = "This is a wonderful experience"
src, trg_tokens = translate_sentence(src_text, model, device, vocab_src, vocab_trg, tokenizer_te)
print(f'source --> {" ".join(src[1:-1])}')
print(f'target translation --> {" ".join(trg_tokens)}')



source --> this is a wonderful experience
target translation --> ఇది అద్భుతమైన అనుభవం .


In [None]:
src_text = "Can you help me with this task?"
src, trg_tokens = translate_sentence(src_text, model, device, vocab_src, vocab_trg, tokenizer_te)
print(f'source --> {" ".join(src[1:-1])}')
print(f'target translation --> {" ".join(trg_tokens)}')


source --> can you help me with this task ?
target translation --> ఈ పనికి మీరు నాకు సహాయం చేయగలరా ?


In [None]:
src_text = "Our company is doing great"
src, trg_tokens = translate_sentence(src_text, model, device, vocab_src, vocab_trg, tokenizer_te)
print(f'source --> {" ".join(src[1:-1])}')
print(f'target translation --> {" ".join(trg_tokens)}')


source --> our company is doing great
target translation --> మా కంపెనీ గొప్ప పని చేస్తోంది .
