In [1]:
import sys
import pickle
import textwrap
import re
import torch
from torch.utils.data import TensorDataset, DataLoader
import torch.nn as nn
from torch.nn import BatchNorm1d
from torch.optim.lr_scheduler import StepLR
import json
from sklearn.metrics import f1_score
from sklearn.feature_extraction.text import TfidfVectorizer
from tqdm import tqdm
import warnings
import os
import numpy as np
import pandas as pd


torch.manual_seed(42)
np.random.seed(42)

print(f"Device: {torch.device('cuda' if torch.cuda.is_available() else 'cpu')}")

Device: cuda


In [3]:

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")


CHAR_TO_INDEX = {'د': 1, '؟': 2, 'آ': 3, 'إ': 4, 'ؤ': 5, 'ط': 6, 'م': 7, '،': 8, 'ة': 9, 'ت': 10, 
                 'ر': 11, 'ئ': 12, 'ا': 13, 'ض': 14, '!': 15, ' ': 16, 'ك': 17, 'غ': 18, 'س': 19, 'ص': 20, 
                 'أ': 21, 'ل': 22, 'ف': 23, 'ظ': 24, 'ج': 25, '؛': 26, 'ن': 27, 'ع': 28, 'ب': 29, 'ث': 30, 
                 'ه': 31, 'خ': 32, 'ى': 33, 'ء': 34, 'ز': 35, 'ق': 36, 'ي': 37, 'ش': 38, 'ح': 39, ':': 40, 
                 'ذ': 41, 'و': 42, '.': 43}

INDEX_TO_CHAR = {v: k for k, v in CHAR_TO_INDEX.items()}


LABELS = {
    1614: 0,   
    1611: 1,   
    1615: 2,   
    1612: 3,   
    1616: 4,   
    1613: 5,   
    1618: 6,   
    1617: 7,   
    (1617, 1614): 8,   
    (1617, 1611): 9,   
    (1617, 1615): 10,  
    (1617, 1612): 11,  
    (1617, 1616): 12,  
    (1617, 1613): 13,  
    0: 14,    
    15: 15    
}

INDEX_TO_LABEL = {v: k for k, v in LABELS.items()}


MAX_LENGTH = 600
TRAIN_BATCH_SIZE = 32
VAL_BATCH_SIZE = 256
NUM_EPOCHS = 15
LEARNING_RATE = 0.001
TFIDF_FEATURES = 100
HIDDEN_SIZE = 256
NUM_LAYERS = 2
DROPOUT_RATE = 0.2


DATA_PATH = 'data/'
TEST_PATH = 'test/'
OUTPUT_PATH = 'RNN_Output/'


os.makedirs(OUTPUT_PATH, exist_ok=True)
os.makedirs(DATA_PATH, exist_ok=True)
os.makedirs(TEST_PATH, exist_ok=True)


Using device: cuda


In [4]:

sys.path.insert(0, './Arabic-Diacritization')

from cleaner import TextCleaner
from preprocessor import TextPreprocessor
from dataset_builder import DatasetBuilder


cleaner = TextCleaner()
preprocessor = TextPreprocessor(
    cleaner=cleaner,
    input_path=DATA_PATH,
    output_path=OUTPUT_PATH,
    max_length=MAX_LENGTH,
    with_labels=True
)


dataset_builder = DatasetBuilder(
    preprocessor=preprocessor,
    char_to_index=CHAR_TO_INDEX,
    label_map=LABELS,
    max_length=MAX_LENGTH,
    device=DEVICE
)


In [None]:
try:
    train_loader = dataset_builder.create_dataloader(
        data_type='train', batch_size=TRAIN_BATCH_SIZE, with_labels=True
    )
except Exception as e:
    print(f"Error creating training dataloader: {e}")
    train_loader = None

try:
    val_loader = dataset_builder.create_dataloader(
        data_type='val', batch_size=VAL_BATCH_SIZE, with_labels=True
    )
except Exception as e:
    print(f"Error creating validation dataloader: {e}")
    val_loader = None

try:
    test_loader = dataset_builder.create_dataloader(
        data_type='test', batch_size=VAL_BATCH_SIZE, with_labels=False
    )
except Exception as e:
    print(f"Error creating test dataloader: {e}")
    test_loader = None

PREPROCESSING DATASET
✓ Training dataloader created with 3355 batches
✓ Validation dataloader created with 21 batches
✗ Error creating test dataloader: '('


#. Feature Extraction with TF-IDF

In [5]:
def get_tfidf_features(sentences_list, max_features=100, ngram_range=(1, 2,3)):
    tfidf_vectorizer = TfidfVectorizer(
        max_features=max_features, 
        ngram_range=ngram_range,
        analyzer='char', 
        lowercase=False,
        dtype=np.float32
    )
    tfidf_matrix = tfidf_vectorizer.fit_transform(sentences_list)
    return tfidf_vectorizer, tfidf_matrix


def convert_sequences_to_tfidf_features(sequences, char_to_index, max_len, tfidf_dim, device):
    batch_size = sequences.shape[0]
    tfidf_features = sequences.float().unsqueeze(-1) / float(len(char_to_index))
    tfidf_features = tfidf_features.expand(batch_size, max_len, tfidf_dim)
    
    return tfidf_features.to(device)

In [6]:
class RNNWithTfidf(nn.Module):
    def __init__(self, tfidf_dim, hidden_size, output_size, num_layers=2, dropout_rate=0.2):
        super(RNNWithTfidf, self).__init__()
        
        self.tfidf_dim = tfidf_dim
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.num_layers = num_layers
        
        self.rnn = nn.RNN(
            input_size=tfidf_dim,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True,
            dropout=dropout_rate if num_layers > 1 else 0
        )
        
        
        self.batchnorm = nn.BatchNorm1d(hidden_size * 2)
        
        
        self.dropout = nn.Dropout(dropout_rate)
        
        
        self.output_layer = nn.Linear(hidden_size * 2, output_size)
    
    def forward(self, x):
        rnn_out, _ = self.rnn(x)  
        rnn_out = self.dropout(rnn_out)
        rnn_out = rnn_out.transpose(1, 2)  
        rnn_out = self.batchnorm(rnn_out)
        rnn_out = rnn_out.transpose(1, 2)  
        output = self.output_layer(rnn_out)  
        
        return output



model = RNNWithTfidf(
    tfidf_dim=TFIDF_FEATURES,
    hidden_size=HIDDEN_SIZE,
    output_size=len(LABELS),
    num_layers=NUM_LAYERS,
    dropout_rate=DROPOUT_RATE
).to(DEVICE)


In [7]:
def train_model(model, train_loader, val_loader, num_epochs=NUM_EPOCHS, 
                learning_rate=LEARNING_RATE, device=DEVICE, output_path=OUTPUT_PATH):

    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss(ignore_index=15)  
    scheduler = StepLR(optimizer, step_size=5, gamma=0.1)
    
    
    history = {
        'train_loss': [], 'train_acc': [], 'train_f1': [],
        'val_loss': [], 'val_acc': [], 'val_f1': []
    }
    
    best_f1 = -1
    best_model_state = None
        
    for epoch in range(num_epochs):
        
        model.train()
        train_loss = 0
        train_correct = 0
        train_total = 0
        train_preds, train_trues = [], []
        
        for batch_sequences, batch_labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Train]", leave=False):
            optimizer.zero_grad()
            tfidf_features = convert_sequences_to_tfidf_features(
                batch_sequences, CHAR_TO_INDEX, MAX_LENGTH, TFIDF_FEATURES, device
            )
            outputs = model(tfidf_features)
            flat_outputs = outputs.view(-1, outputs.shape[-1])
            flat_labels = batch_labels.view(-1)
            mask = (flat_labels != 15)
            loss = criterion(flat_outputs[mask], flat_labels[mask])
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            train_loss += loss.item()
            pred = flat_outputs.argmax(dim=1)
            train_correct += (pred[mask] == flat_labels[mask]).sum().item()
            train_total += mask.sum().item()
            train_preds.extend(pred[mask].cpu().tolist())
            train_trues.extend(flat_labels[mask].cpu().tolist())
        train_loss /= len(train_loader)
        train_accuracy = train_correct / train_total if train_total > 0 else 0
        train_f1 = f1_score(train_trues, train_preds, average='macro', zero_division=0)
        model.eval()
        val_loss = 0
        val_correct = 0
        val_total = 0
        val_preds, val_trues = [], []
        with torch.inference_mode():
            for val_seq, val_label in tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Val]", leave=False):
                tfidf_features = convert_sequences_to_tfidf_features(
                    val_seq, CHAR_TO_INDEX, MAX_LENGTH, TFIDF_FEATURES, device
                )
                outputs = model(tfidf_features)
                flat_outputs = outputs.view(-1, outputs.shape[-1])
                flat_labels = val_label.view(-1)
                loss = criterion(flat_outputs, flat_labels)
                val_loss += loss.item()
                pred = flat_outputs.argmax(dim=1)
                mask = (flat_labels != 15)
                val_correct += (pred[mask] == flat_labels[mask]).sum().item()
                val_total += mask.sum().item()
                val_preds.extend(pred[mask].cpu().tolist())
                val_trues.extend(flat_labels[mask].cpu().tolist())
        val_loss /= len(val_loader)
        val_accuracy = val_correct / val_total if val_total > 0 else 0
        val_f1 = f1_score(val_trues, val_preds, average='macro', zero_division=0)
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_accuracy)
        history['train_f1'].append(train_f1)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_accuracy)
        history['val_f1'].append(val_f1)
        scheduler.step()
        print(f"Epoch {epoch+1}/{num_epochs}")
        print(f"  Train: Loss={train_loss:.4f}, Acc={train_accuracy*100:.2f}%, F1={train_f1:.3f}")
        print(f"  Val:   Loss={val_loss:.4f}, Acc={val_accuracy*100:.2f}%, F1={val_f1:.3f}")
        
        
        if val_f1 > best_f1:
            best_f1 = val_f1
            best_model_state = model.state_dict().copy()
            print(f"  ★ New best model saved (F1={best_f1:.3f})")
    
    
    if best_model_state is not None:
        model.load_state_dict(best_model_state)
    
    
    model_path = os.path.join(output_path, 'rnn_tfidf_model.pth')
    meta_path = os.path.join(output_path, 'rnn_tfidf_model_meta.json')
    
    torch.save(model.state_dict(), model_path)
    
    metadata = {
        'model_type': 'RNNWithTfidf',
        'best_val_f1': float(best_f1),
        'tfidf_dim': TFIDF_FEATURES,
        'hidden_size': HIDDEN_SIZE,
        'num_layers': NUM_LAYERS,
        'dropout_rate': DROPOUT_RATE,
        'learning_rate': LEARNING_RATE,
        'num_epochs': num_epochs,
        'max_sequence_length': MAX_LENGTH,
        'train_batch_size': TRAIN_BATCH_SIZE,
        'vocab_size': len(CHAR_TO_INDEX) + 1,
        'output_classes': len(LABELS)
    }
    
    with open(meta_path, 'w', encoding='utf-8') as f:
        json.dump(metadata, f, ensure_ascii=False, indent=4)
    return model, history



if train_loader is not None and val_loader is not None:
    model, training_history = train_model(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        num_epochs=NUM_EPOCHS,
        learning_rate=LEARNING_RATE,
        device=DEVICE,
        output_path=OUTPUT_PATH
    )
else:
    print("Error: Could not create train/val loaders. Skipping training.")

NameError: name 'train_loader' is not defined

#. Model Evaluation

In [8]:
def evaluate_model(model, test_loader, device=DEVICE):
    model.eval()
    all_preds = []
    all_trues = []
    correct = 0
    total = 0 
    with torch.inference_mode():
        for test_seq, test_label in tqdm(test_loader, desc="Evaluating"):
            tfidf_features = convert_sequences_to_tfidf_features(
                test_seq, CHAR_TO_INDEX, MAX_LENGTH, TFIDF_FEATURES, device
            )
            outputs = model(tfidf_features)
            predictions = outputs.argmax(dim=2)
            flat_preds = predictions.view(-1)
            flat_trues = test_label.view(-1)
            mask = (flat_trues != 15) & (flat_trues != 16)
            correct += (flat_preds[mask] == flat_trues[mask]).sum().item()
            total += mask.sum().item()
            all_preds.extend(flat_preds[mask].cpu().numpy())
            all_trues.extend(flat_trues[mask].cpu().numpy())
    accuracy = correct / total if total > 0 else 0
    macro_f1 = f1_score(all_trues, all_preds, average='macro', zero_division=0)
    weighted_f1 = f1_score(all_trues, all_preds, average='weighted', zero_division=0)
    return {
        'accuracy': accuracy,
        'macro_f1': macro_f1,
        'weighted_f1': weighted_f1,
        'predictions': all_preds,
        'ground_truth': all_trues
    }



if test_loader is not None:
    test_results = evaluate_model(model, test_loader, device=DEVICE)
else:
    print("Error:Test loader not available. Skipping evaluation.")

NameError: name 'test_loader' is not defined

#. Model Loading and Prediction

In [None]:
def load_trained_model(model_path, device=DEVICE):
    model = RNNWithTfidf(
        input_size=TFIDF_FEATURES,
        hidden_size=HIDDEN_SIZE,
        num_layers=NUM_LAYERS,
        output_size=len(LABELS),
        dropout=DROPOUT_RATE
    )
    
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()
    return model


def predict_on_test_set(model, test_data_path, output_csv_path=None):
    model.eval()
    
    
    with open(test_data_path, 'r', encoding='utf-8') as f:
        test_sentences = [line.strip() for line in f.readlines()]
    
    predictions_list = []

    with torch.inference_mode():
        for sentence in tqdm(test_sentences, desc="Predicting"):
            
            cleaned = text_cleaner.clean_text(sentence)
            sequences = text_preprocessor.preprocess_text(cleaned)
            
            
            padded_seq = sequences + [15] * (MAX_LENGTH - len(sequences))
            padded_seq = padded_seq[:MAX_LENGTH]
            
            
            seq_tensor = torch.tensor([padded_seq], dtype=torch.long, device=DEVICE)
            
            tfidf_features = convert_sequences_to_tfidf_features(
                seq_tensor, CHAR_TO_INDEX, MAX_LENGTH, TFIDF_FEATURES, DEVICE
            )
            
            
            with torch.inference_mode():
                outputs = model(tfidf_features)
                pred_indices = outputs.argmax(dim=2)[0].cpu().numpy()
            
            
            diacritics = [LABELS.get(idx, '') for idx in pred_indices]
            
            
            diacritized = ''
            for i, char in enumerate(cleaned):
                if i < len(diacritics):
                    diacritized += char + diacritics[i]
                else:
                    diacritized += char
            
            predictions_list.append({
                'original': sentence,
                'cleaned': cleaned,
                'diacritized': diacritized
            })
    df = pd.DataFrame(predictions_list)
    if output_csv_path is None:
        output_csv_path = os.path.join(OUTPUT_PATH, 'test_predictions.csv')
    df.to_csv(output_csv_path, index=False, encoding='utf-8')
    return df

TEST_DATA_PATH="data/sample_test_no_diacritics.txt"

if os.path.exists(TEST_DATA_PATH):
    test_predictions = predict_on_test_set(model, TEST_DATA_PATH)
    print("\nFirst few predictions:")
    print(test_predictions.head())
else:
    print(f"Error: Test data not found at {TEST_DATA_PATH}")

