## Import the Dependecies

In [None]:
import pandas as pd
import nltk
nltk.download('averaged_perceptron_tagger_eng')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('punkt_tab')
from __future__ import annotations
import re
import os
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import string

## Putting all the csv files into one file

In [None]:
train_data = 'data/train.csv'
test_data = 'data/test.csv'
valid_data = 'data/test.csv'

file = "multiclass_dataset.csv"

def save_load_df(file:str):
    if os.path.exists(file):
        df = pd.read_csv(file, index_col= 0)
    else:
        df = pd.concat(map(pd.read_csv, [train_data, test_data, valid_data]), axis= 0, ignore_index=True)
        df.to_csv(file, columns= ['id', 'text', 'label', 'sentiment'])
        df = pd.read_csv(file, index_col= 0)
    return df

df = save_load_df(file=file)
# df = df.sample(frac=1, random_state= 1337).reset_index(drop=True)
df = df.drop(columns='id')
df

## 1e etape: pre-precessing the text

In [None]:
def cleaning_text(text):
    text_pattern = re.compile(
        r'(<.+?>)'         # Balises HTML
        r'|([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})'  # Emails
        r'|(https?\W+[^\s]+)'  # URLs commençant par http ou https
        r'|(https?://[^\s\n\r]+)' # URLs commençant par http ou https
        r'|(www\.[^\s]+)'      # URLs commençant par www
        r'|([\U00010000-\U0010ffff])'  # Émojis et autres caractères au-delà de l'ASCII étendu
        r'|([^\x00-\xFF])'     # Tout ce qui n'est pas en ASCII étendu (0-255)
    )
    text = text_pattern.sub('', str(text))
    text = text.lower()
    punctuation = set(string.punctuation)
    stop_words = set(stopwords.words('english'))
    tokens = []
    sentences = nltk.sent_tokenize(text)
    for sentence in sentences:
      words = nltk.word_tokenize(sentence)
      for word in words:
        if word not in stop_words:
          word = ''.join([c for c in word if c not in punctuation])
          if word == '':
              continue
          tokens.append(word)
    
    # get the part of speech
    pos_tags = nltk.pos_tag(tokens)
    lemmatizer = WordNetLemmatizer()
    data = []
    for token, pos in pos_tags:
        if pos.startswith('J'):
          lemma = lemmatizer.lemmatize(token, pos = 'a')
        elif pos.startswith('V'):
          lemma = lemmatizer.lemmatize(token, pos = 'v')
        elif pos.startswith('RB'):
          lemma = lemmatizer.lemmatize(token, pos = 'r')
        elif pos.startswith('N'):
          lemma = lemmatizer.lemmatize(token, pos = 'n')
        else:
          lemma = lemmatizer.lemmatize(token)
        data.append([token, lemma, pos])
    data.append(['', '', ''])
    return data

def get_infos(texts):
    infos = []
    for text in texts:
        data = cleaning_text(text=text)
        infos.extend(data)
    return infos

texts = df['text']
df_tokens = pd.DataFrame(get_infos(texts), columns = ['token', 'lemma', 'pos'])
df_tokens

## 2e etape: TF-IDF

In [None]:

def get_document(element: list[str]):
  docs = []
  for i in range(0, len(element)):
    for j in range(i, len(element)):
      if element[j] == '':
        docs.append(' '.join(element[i:j]))
        i = j + 1
    break
  return docs

documents = get_document(df_tokens['lemma'].tolist())

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
from scipy.sparse import csr_matrix

docs = np.array(documents)
tfidf = TfidfVectorizer(use_idf = True, norm = 'l2', smooth_idf=True)
tfidf_matrix = tfidf.fit_transform(docs).toarray()
csr = csr_matrix(tfidf_matrix ,dtype = float)


In [None]:
# save the tfidf_matrix
if os.path.exists('tfidf_matrix.npz'):
  csr = np.load('tfidf_matrix.npz', allow_pickle=True)
else:
  np.savez('tfidf_matrix', csr)
  csr = np.load('tfidf_matrix.npz', allow_pickle= True)

## GLOVE

In [None]:

file_path = r'glove.840B.300d.txt'
glove = {}

with open(file_path, encoding="utf-8") as f:
    for line in f:
        values = line.split()
        word = values[0]
        try:
            vector = np.asarray(values[1:], dtype="float32")
            glove[word] = vector
        except ValueError:
            print(f"Skipping malformed line: {line}")
        

In [None]:
vocab_glove = set(glove.keys())
glove_embeddings = df_tokens['lemma'].apply(lambda word: glove[word] if word in vocab_glove else None)

In [None]:
df_glove = pd.DataFrame(data=[(lemma, embedding) 
                              for lemma, embedding in zip(df_tokens['lemma'], glove_embeddings) 
                              if embedding is not None], 
                        columns=['lemma', 'glove'])

In [None]:
df_glove

In [None]:
def get_mean_vector_glove(phrase, glove, vector_dim=300):
    vectors = [glove[word] for word in phrase if word in glove]  
    if len(vectors) == 0:
        return np.zeros(vector_dim)  
    return np.mean(vectors, axis=0)

In [None]:
phr2vec_glove = []
for phrase in lemmas:  
    if len(phrase) == 0:
        continue
    mean_vect = get_mean_vector_glove(phrase, glove, vector_dim=300)  
    phr2vec_glove.append({
        'phrase': phrase,
        'phrase2vec': mean_vect,
    })

In [None]:
df_glove = pd.DataFrame(phr2vec_glove)
df_glove

In [None]:
import torch

embeddings = torch.cat(embeddings, dim = 0)
embeddings.shape

## vectorisation 

In [None]:
# get a dataframe with lemmes and pos and label :

pos = get_document(df_tokens['pos'].tolist())
pos = [doc.split() for doc in pos]
labels = df['label'].tolist()

In [None]:
corpus = pd.DataFrame(data = [(doc, p, label) for doc, p, label in zip(documents, pos, labels)], columns= ['text', 'pos', 'label'])
corpus = corpus[corpus['text'] != '']
corpus = corpus.reset_index(drop= True)
corpus

In [None]:
from sklearn.preprocessing import OneHotEncoder

all_pos_tags = list(set(pos for tag in corpus['pos'] for pos in tag))
one_hot_encoder = OneHotEncoder(sparse_output=False, categories=[all_pos_tags])

pos_vectors = []
for tags in corpus['pos']:
    pos_vectors.append(np.sum(one_hot_encoder.fit_transform([[tag] for tag in tags]), axis=0))


## Phase d'entrainement 

In [None]:
df3['Y'] = corpus['label']
final_dataset = df3.drop(columns='phrase')


In [None]:
final_dataset

In [None]:
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader, random_split

X = final_dataset['phrase2vec']
Y = final_dataset['Y']
x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.4, train_size=0.6, random_state=42, shuffle=True)

In [None]:
train_dataset = TensorDataset(torch.tensor(np.stack(x_train, axis= 0)), torch.tensor(np.stack(y_train, axis=0)))
test_dataset = TensorDataset(torch.tensor(np.stack(x_test, axis= 0)), torch.tensor(np.stack(y_test, axis=0)))

torch.manual_seed(1337)
valid_dataset, test_dataset = random_split(test_dataset, lengths=[0.5, 0.5])
batch_size = 32
train_dl = DataLoader(train_dataset, batch_size= batch_size, shuffle=True, drop_last=True)
valid_dl = DataLoader(valid_dataset, batch_size = batch_size, shuffle=True, drop_last=True)
test_dl  = DataLoader(test_dataset, batch_size= batch_size, shuffle=True, drop_last= True)


In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class Model(nn.Module):
    def __init__(self, embed_dim = 300, hidden_size = 64, n_layers= 2, dropout = 0.5):
        super(Model, self).__init__()
        self.lstm = nn.LSTM(embed_dim, hidden_size, num_layers = n_layers, 
                            batch_first=True, dropout = dropout, bidirectional=True)
        self.fc1 = nn.Linear(n_layers * hidden_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, 3)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x, y):
        x, (h1, c1) = self.lstm(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        logits = self.sigmoid(x)
        if y is not None:
            # calculate the loss
            loss = F.cross_entropy(logits, y)
        return logits, loss


In [None]:
torch.manual_seed(1337) # for the distribution of the gradient
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = Model()
model.to(device=device)
optimizer = optim.Adam(model.parameters(), lr = 0.001)
print(sum(p.numel() for p in model.parameters()), "M")

for _ in range(10):
    model.train()
    train_loss = 0
    train_acc = 0
    for xb_train, yb_train in train_dl:
        optimizer.zero_grad(set_to_none=False)
        # forward pass
        xb_train = xb_train.to(device)
        yb_train = yb_train.to(device)
        
        logits, loss = model(xb_train, yb_train)
        # backward pass
        loss.backward()
        train_loss += loss.item() * batch_size
        train_acc += (torch.argmax(logits, dim = 1) == yb_train).float().sum().item()
        # update the gradient
        optimizer.step()

    train_loss /= len(train_dl.dataset)
    train_acc /= len(train_dl.dataset)

    model.eval()
    with torch.no_grad():
        valid_loss = 0
        valid_acc = 0
        for xb_valid, yb_valid in valid_dl:
            xb_train = xb_train.to(device)
            yb_train = yb_train.to(device)
            logits, loss = model(xb_valid, yb_valid)
            valid_loss += loss.item() * batch_size
            valid_acc += (torch.argmax(logits, dim = 1) == yb_train).float().sum().item()
    
    valid_loss /= len(valid_dl.dataset)
    valid_acc /= len(valid_dl.dataset)
    print(f'train_loss {train_loss:4f}, valid loss {valid_loss:4f} ')
    print(f'accuracy {train_acc:4f} || , valid acc {valid_acc:4f}')
