In [11]:
!pip install pyarabic nltk torchtext==0.6.0

Collecting torchtext==0.6.0
  Downloading torchtext-0.6.0-py3-none-any.whl (64 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m64.2/64.2 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: torchtext
  Attempting uninstall: torchtext
    Found existing installation: torchtext 0.17.1
    Uninstalling torchtext-0.17.1:
      Successfully uninstalled torchtext-0.17.1
Successfully installed torchtext-0.6.0


In [2]:
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


## 2. Establish a preprocessing NLP pipeline (tokenization stemming lemmatization, Stop words, Discretization, etc) of the collected Dataset.

In [18]:
import pandas as pd
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer
from nltk.stem import SnowballStemmer
from pyarabic.araby import strip_tashkeel
from sklearn.feature_extraction.text import CountVectorizer

# Download NLTK resources
nltk.download('stopwords')

# Load your dataset
df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/data/arabic_text_score_data.csv')

# Ensure all values in 'Text' column are strings
df['Text'] = df['Text'].astype(str)

# Tokenization using RegexpTokenizer
tokenizer = RegexpTokenizer(r'\w+')
df['Text'] = df['Text'].apply(lambda x: tokenizer.tokenize(x))

# Remove stop words
stop_words = set(stopwords.words('arabic'))
df['Text'] = df['Text'].apply(lambda x: [word for word in x if word.lower() not in stop_words])

# Stemming
stemmer = SnowballStemmer("arabic")
df['Text'] = df['Text'].apply(lambda x: [stemmer.stem(word) for word in x])

# Lemmatization (example: removing tashkeel)
df['Text'] = df['Text'].apply(lambda x: [strip_tashkeel(word) for word in x])

# Discretization (using CountVectorizer)
vectorizer = CountVectorizer()
X = vectorizer.fit_transform(df['Text'].apply(lambda x: ' '.join(x)))

# Convert the sparse matrix to DataFrame
df_transformed = pd.DataFrame(X.toarray(), columns=vectorizer.get_feature_names_out())

# Optional: Other preprocessing steps such as lowercase conversion, removing punctuation, etc.

# Display the preprocessed DataFrame
print(df_transformed.head())

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


   00  000  0000000  00000000000  0000000000000  000000000000000  000000طول  \
0   0    0        0            0              0                0          0   
1   0    0        0            0              0                0          0   
2   0    0        0            0              0                0          0   
3   0    0        0            0              0                0          0   
4   0    0        0            0              0                0          0   

   000000كسب  0000ط  0000مواط  ...  يويد  يويل  ييايد  ييبقى  ييج  ييجو  ييجى  \
0          0      0         0  ...     0     0      0      0    0     0     0   
1          0      0         0  ...     0     0      0      0    0     0     0   
2          0      0         0  ...     0     0      0      0    0     0     0   
3          0      0         0  ...     0     0      0      0    0     0     0   
4          0      0         0  ...     0     0      0      0    0     0     0   

   ييشتم  ييياس  يييش  
0      0      

In [21]:
print(df_transformed.values)

[[0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 ...
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]]


## 3. Train your models by using RNN, Bidirectional RNN GRU and LSTM Architectures and tuning hyper-parameters to get the best performance.

In [None]:
import pandas as pd
import numpy as np
import torch
import torchtext.data as data
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from pathlib import Path

# Load the dataset
df = pd.read_csv(str(Path('/content/drive/MyDrive/Colab Notebooks/data/arabic_text_data.csv')))

# Split into train and test sets
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

# Define fields for preprocessing
TEXT = data.Field(tokenize='spacy', lower=True, include_lengths=True)
LABEL = data.LabelField(dtype=torch.float)

# Define TabularDataset
fields = [('Text', TEXT), ('Score', LABEL)]
train_data, test_data = data.TabularDataset.splits(
    path='', train=train_df, test=test_df, format='csv', fields=fields
)

# Build vocabulary
TEXT.build_vocab(train_data, min_freq=2)

# Define iterators
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, test_data), batch_size=64, sort_within_batch=True, device=device
)

# Define RNN model
class RNN(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, output_dim, rnn_type='LSTM'):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, embedding_dim)
        self.rnn_type = rnn_type
        if rnn_type == 'GRU':
            self.rnn = nn.GRU(embedding_dim, hidden_dim, bidirectional=True)
        elif rnn_type == 'LSTM':
            self.rnn = nn.LSTM(embedding_dim, hidden_dim, bidirectional=True)
        else:
            raise ValueError(f"Invalid RNN type: {rnn_type}")
        self.fc = nn.Linear(hidden_dim * 2, output_dim)  # Double hidden_dim for bidirectional

    def forward(self, text, text_lengths):
        embedded = self.embedding(text)
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths.cpu(), enforce_sorted=False)
        packed_output, (hidden, cell) = self.rnn(packed_embedded)  # Output format adjusted for GRU/LSTM
        output, output_lengths = nn.utils.rnn.pad_packed_sequence(packed_output)
        hidden = hidden.view(text.size(0), -1)  # Flatten for fully connected layer
        return self.fc(hidden)

INPUT_DIM = len(TEXT.vocab)
EMBEDDING_DIM = 100
HIDDEN_DIM = 256
OUTPUT_DIM = 4 # Assuming 4 classes for the scores

model = RNN(INPUT_DIM, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM).to(device)

# Define optimizer and loss function
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()

# Train the model
def train(model, iterator, optimizer, criterion):
    model.train()
    epoch_loss = 0
    for batch in iterator:
        text, text_lengths = batch.Text
        optimizer.zero_grad()
        predictions = model(text, text_lengths).squeeze(1)
        loss = criterion(predictions, batch.Score)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(iterator)

# Evaluate the model
def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    all_predictions = []
    all_labels = []
    with torch.no_grad():
        for batch in iterator:
            text, text_lengths = batch.Text
            predictions = model(text, text_lengths).squeeze(1)
            loss = criterion(predictions, batch.Score)
            epoch_loss += loss.item()
            all_predictions.extend(torch.argmax(predictions, dim=1).cpu().numpy())
            all_labels.extend(batch.Score.cpu().numpy())
    return epoch_loss / len(iterator), accuracy_score(all_labels, all_predictions)

N_EPOCHS = 10
best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
    train_loss = train(model, train_iterator, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, test_iterator, criterion)

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'rnn_model.pt')

    print(f'Epoch: {epoch+1:02} | Train Loss: {train_loss:.3f} | Test Loss: {valid_loss:.3f} | Test Acc: {valid_acc:.2f}')