In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer
from torch.utils.tensorboard import SummaryWriter
from sklearn.model_selection import train_test_split
import pandas as pd

In [None]:
#load dataset
data = pd.read_csv("sentences.csv")
data = data.dropna()
data['english'] = data['eng']
english_sentences = data['english'].tolist()
darija_sentences = data['darija'].tolist()

In [None]:
#split dataset into train, validation, and test sets
train_eng, temp_eng, train_dar, temp_dar = train_test_split(english_sentences, darija_sentences, test_size=0.2)
val_eng, test_eng, val_dar, test_dar = train_test_split(temp_eng, temp_dar, test_size=0.5)

In [None]:
#initialize tokenizer for both English and Darija (using a multilingual model)
tokenizer = AutoTokenizer.from_pretrained("Helsinki-NLP/opus-mt-en-ar")

#tokenize the sentences
train_encodings = tokenizer(train_eng, padding=True, truncation=True, return_tensors="pt", max_length=50)
train_labels = tokenizer(train_dar, padding=True, truncation=True, return_tensors="pt", max_length=50)

#dataset class
class TranslationDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __len__(self):
        return len(self.encodings["input_ids"])

    def __getitem__(self, idx):
        return (
            {key: torch.tensor(val[idx]) for key, val in self.encodings.items()},
            torch.tensor(self.labels["input_ids"][idx])
        )

#create the Dataset and DataLoader
train_dataset = TranslationDataset(train_encodings, train_labels)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

#check the first batch to confirm it's working
for batch_idx, batch in enumerate(train_loader):
    src, trg = batch
    print(f"src shape: {src['input_ids'].shape}")
    print(f"trg shape: {trg.shape}")
    break  # Print just the first batch

src shape: torch.Size([32, 50])
trg shape: torch.Size([32, 50])


  {key: torch.tensor(val[idx]) for key, val in self.encodings.items()},
  torch.tensor(self.labels["input_ids"][idx])


In [None]:
#define the LSTM model
class LSTMModel(nn.Module):
    def __init__(self, input_dim, output_dim, emb_dim, hidden_dim, dropout_rate=0.3):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.lstm = nn.LSTM(emb_dim, hidden_dim, batch_first=True)
        self.dropout = nn.Dropout(dropout_rate)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, src):
        embedded = self.embedding(src)
        outputs, (hidden, cell) = self.lstm(embedded)
        outputs = self.dropout(outputs)
        predictions = self.fc(outputs)
        return predictions

#initialize the model with the correct vocab size
vocab_size = tokenizer.vocab_size
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = LSTMModel(input_dim=vocab_size, output_dim=vocab_size, emb_dim=256, hidden_dim=512).to(device)

#adamW optimizer
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)  # Ignore padding token during loss calculation

#learning rate scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=2, factor=0.5)

#gradient clipping
clip_value = 1.0

#tensorBoard for monitoring
writer = SummaryWriter()

#training loop
model.train()
for epoch in range(10):
    epoch_loss = 0
    for batch_idx, batch in enumerate(train_loader):
        #unpack the batch
        src, trg = batch
        
        #move tensors to the device
        src = src["input_ids"].to(device)
        trg = trg.to(device)

        optimizer.zero_grad()
        
        #forward pass
        output = model(src)
        
        #flatten the output and target for loss calculation
        output = output.view(-1, output.shape[-1])  # Flatten to [batch_size * seq_len, vocab_size]
        trg = trg.view(-1)  # Flatten target to [batch_size * seq_len]

        #mask padding tokens
        mask = trg != tokenizer.pad_token_id  # Mask padding tokens from target
        output = output[mask]  # Apply the mask to output
        trg = trg[mask]  # Apply the mask to target
        
        #compute loss
        loss = criterion(output, trg)
        loss.backward()  # Backpropagation
        
        #gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip_value)
        
        optimizer.step()  # Update model parameters
        epoch_loss += loss.item()

    #adjust the learning rate using scheduler
    scheduler.step(epoch_loss)
    
    #log loss to TensorBoard
    writer.add_scalar('Loss/train', epoch_loss / len(train_loader), epoch)

    print(f"Epoch {epoch+1}, Loss: {epoch_loss / len(train_loader)}")

#close TensorBoard writer
writer.close()

  {key: torch.tensor(val[idx]) for key, val in self.encodings.items()},
  torch.tensor(self.labels["input_ids"][idx])


Epoch 1, Loss: 5.595041040342803
Epoch 2, Loss: 4.91170379883817
Epoch 3, Loss: 4.692343031724793
Epoch 4, Loss: 4.505145769507908
Epoch 5, Loss: 4.320055384620978
Epoch 6, Loss: 4.137181234210263
Epoch 7, Loss: 3.9512719517591233
Epoch 8, Loss: 3.759408745272406
Epoch 9, Loss: 3.5723355309716585
Epoch 10, Loss: 3.393462480796168


In [None]:
import torch
from transformers import AutoTokenizer

#initialize tokenizer and model (assuming they are already defined and trained)
tokenizer = AutoTokenizer.from_pretrained("Helsinki-NLP/opus-mt-en-ar")
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

#function to generate translations
def generate_translation(model, tokenizer, sentence, device):
    #tokenize the input sentence
    inputs = tokenizer(sentence, return_tensors="pt", padding=True, truncation=True, max_length=50).to(device)
    
    #generate translation using the model
    with torch.no_grad():
        output = model(inputs["input_ids"])  # Get model output
        pred_tokens = torch.argmax(output, dim=-1)  # Get the index of the highest probability token for each position
        
    #decode the predicted tokens to get the translated sentence
    pred_sentence = tokenizer.decode(pred_tokens[0], skip_special_tokens=True)
    return pred_sentence

# Start interactive loop
print("Interactive Translation Test (Type 'exit' to quit)")
while True:
    #ask for user input
    input_sentence = input("Enter an English sentence: ")
    
    #exit condition
    if input_sentence.lower() == "exit":
        print("Exiting interactive mode.")
        break
    
    #generate the translation
    translated_sentence = generate_translation(model, tokenizer, input_sentence, device)
    
    #output the result
    print(f"English: {input_sentence}")
    print(f"Translated (Darija): {translated_sentence}")
    print("-" * 50)


Interactive Translation Test (Type 'exit' to quit)
English: Thank you for your help.
Translated (Darija): chokran 3la la
--------------------------------------------------
English: Thank you for your help.
Translated (Darija): chokran 3la lla
--------------------------------------------------
English: Thank you for your help.
Translated (Darija): chokran 3la 3 l
--------------------------------------------------
English: Thank you for your help.
Translated (Darija): chokran 3lik la
--------------------------------------------------
English: They're hiding something, I'm sure!
Translated (Darija): rah kaybby ha ,,,a7
--------------------------------------------------
English: They're hiding something, I'm sure!
Translated (Darija): gha kaybby ha7,,,,a
--------------------------------------------------
