# LSTM

In [None]:
import numpy as np
import pandas as pd
import re
import string
import os
import nltk

nltk.download('stopwords')
from nltk.corpus import stopwords

import matplotlib.pyplot as plt
import seaborn as sns
color = sns.color_palette()
from sklearn.metrics import confusion_matrix

import warnings
warnings.filterwarnings('ignore')

In [None]:
from pathlib import Path
from sklearn.model_selection import train_test_split

# Load the dataset
DATA_PATH = Path("../data/raw/french_to_english_product.csv")
if not DATA_PATH.exists():
    raise FileNotFoundError(
        f"Missing dataset: {DATA_PATH}. Run `notebooks/knn_naive_bayes.ipynb` first (it downloads and saves the CSV), or place the file there manually."
    )
df = pd.read_csv(DATA_PATH)

# Split the dataset
whole_df_fr_review = df[['review', 'rating']]
whole_df_en_translation = df[['translation', 'rating']]

# Split the dataset into 20% train and 80% test
train_df_en_translation, test_df_en_translation = train_test_split(whole_df_en_translation, test_size=0.20, random_state=42)

train_df_fr_review, test_df_fr_review = train_test_split(whole_df_fr_review, test_size=0.20, random_state=42)

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Dense , Input , LSTM , Embedding, Dropout , Activation, Flatten
from tensorflow.keras.layers import Bidirectional, GlobalMaxPool1D, SpatialDropout1D
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras import initializers, regularizers, constraints, optimizers, layers

### French LSTM

#### Words cleaning

In [None]:
def fr_data_cleaning(raw_data):
    raw_data = raw_data.translate(str.maketrans('', '', string.punctuation + string.digits))
    words = raw_data.lower().split()
    stops = set(stopwords.words("french"))
    useful_words = [w for w in words if w not in stops]
    return " ".join(useful_words)
train_df_fr_review['review']=train_df_fr_review['review'].apply(fr_data_cleaning)
test_df_fr_review['review']=test_df_fr_review['review'].apply(fr_data_cleaning)

In [None]:

y = train_df_fr_review["rating"].values
y_test = test_df_fr_review["rating"].values
fr_train = train_df_fr_review["review"]
fr_test = test_df_fr_review["review"]

y = np.where(y <= 2, 0, np.where(y == 3, 1, 2))

y_test = np.where(y_test <= 2, 0, np.where(y_test == 3, 1, 2))

y = np.array(y)

max_features = 6000
tokenizer = Tokenizer(num_words=max_features)
tokenizer.fit_on_texts(list(fr_train))
list_fr_tokenized_train = tokenizer.texts_to_sequences(fr_train)
list_fr_tokenized_test = tokenizer.texts_to_sequences(fr_test)

In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Define the maximum sequence length (adjust based on your dataset)
max_sequence_length = 300

# Pad the sequences
X_fr_train = pad_sequences(list_fr_tokenized_train, maxlen=max_sequence_length, padding='post', truncating='post')
X_fr_test = pad_sequences(list_fr_tokenized_test, maxlen=max_sequence_length, padding='post', truncating='post')

print("Shape of X_train:", X_fr_train.shape)
print("Shape of X_test:", X_fr_test.shape)

In [None]:
class myCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epochs, logs={}):
        if logs.get('accuracy') > 0.95:
            print('\n Stopped Training!\n')
            self.model.stop_training = True

def train_model(model, model_name, n_epochs, batch_size, X_data, 
                y_data, validation_split):    
    checkpoint_path = model_name+"_cp-{epoch:04d}.keras"
    checkpoint_dir = os.path.dirname(checkpoint_path)
    cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                    verbose=1)
    callbacks = myCallback()
    history = model.fit(
        X_data,
        y_data,
        steps_per_epoch=batch_size,
        epochs=n_epochs,
        validation_split=validation_split,
        verbose=1,
        callbacks=[cp_callback]
    )
    return history

In [None]:
def generate_graph(history):
    plt.plot(history.history['accuracy'], 'b')
    plt.plot(history.history['val_accuracy'], 'r')
    plt.title('Model Accuracy'),
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend(['Train', 'Validation'], loc='upper left')
    plt.show()

In [None]:
%pip install optuna

In [None]:
# Define a function to create the model with hyperparameters
def Model_FR(embed_size, dense_units, dropout_rate, activation, optimizer):
    model = Sequential()
    model.add(Embedding(max_features, embed_size))
    model.add(LSTM(50, return_sequences=True))
    model.add(GlobalMaxPool1D())
    model.add(Dense(dense_units, activation=activation))
    model.add(Dropout(dropout_rate))
    model.add(Dense(len(set(y)), activation="softmax"))
    model.compile(loss='sparse_categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    return model

# Manual hyperparameter optimization
param_grid = {
    'embed_size': [32, 64, 128],
    'dense_units': [16, 32, 64],
    'dropout_rate': [0.2, 0.5, 0.8],
    'activation': ['relu', 'tanh'],
    'optimizer': ['adam', 'sgd'],
    'batch_size': [16, 32],
    'epochs': [10, 20]
}

best_val_accuracy = 0
best_params = {}

for embed_size in param_grid['embed_size']:
    for dense_units in param_grid['dense_units']:
        for dropout_rate in param_grid['dropout_rate']:
            for activation in param_grid['activation']:
                for optimizer in param_grid['optimizer']:
                    for batch_size in param_grid['batch_size']:
                        for epochs in param_grid['epochs']:
                            model = Model_FR(embed_size, dense_units, dropout_rate, activation, optimizer)
                            history = model.fit(
                                X_fr_train, y,
                                batch_size=batch_size,
                                epochs=epochs,
                                validation_split=0.2,
                                verbose=0
                            )
                            val_accuracy = max(history.history['val_accuracy'])
                            if val_accuracy > best_val_accuracy:
                                best_val_accuracy = val_accuracy
                                best_params = {
                                    'embed_size': embed_size,
                                    'dense_units': dense_units,
                                    'dropout_rate': dropout_rate,
                                    'activation': activation,
                                    'optimizer': optimizer,
                                    'batch_size': batch_size,
                                    'epochs': epochs
                                }

print("Best French Parameters:", best_params)

# Train the final model with the best parameters
model_fr = Model_FR(
    embed_size=best_params['embed_size'],
    dense_units=best_params['dense_units'],
    dropout_rate=best_params['dropout_rate'],
    activation=best_params['activation'],
    optimizer=best_params['optimizer']
)
history_fr = train_model(model_fr, "model_fr", best_params['epochs'], best_params['batch_size'], X_fr_train, y, 0.2)

In [None]:
from sklearn.metrics import accuracy_score, recall_score, f1_score, precision_score, confusion_matrix

def evaluate_model_fr(model):
  prediction = model.predict(X_fr_test)
  y_pred = prediction.argmax(axis=1)  # Get the class with the highest probability
  accuracy = accuracy_score(y_test, y_pred)
  precision = precision_score(y_test, y_pred, average='macro')  # Use 'macro' for multi-class
  recall = recall_score(y_test, y_pred, average='macro')  # Use 'macro' for multi-class
  f1 = f1_score(y_test, y_pred, average='macro')  # Use 'macro' for multi-class
  cf_matrix = confusion_matrix(y_test, y_pred)

  print(f"Accuracy: {accuracy:.3f}")
  print(f"Precision: {precision:.3f}")
  print(f"Recall: {recall:.3f}")
  print(f"F1-score: {f1:.3f}")
  print("Confusion Matrix:\n", cf_matrix)

  return accuracy, precision, recall, f1

In [None]:
# After training your model_d, evaluate it:
accuracy, precision, recall, f1 = evaluate_model_fr(model_fr)

### English


#### Words cleaning

In [None]:
def en_data_cleaning(raw_data):
    raw_data = raw_data.translate(str.maketrans('', '', string.punctuation + string.digits))
    words = raw_data.lower().split()
    stops = set(stopwords.words("english"))
    useful_words = [w for w in words if not w in stops]
    return( " ".join(useful_words))
train_df_en_translation['translation']=train_df_en_translation['translation'].apply(en_data_cleaning)
test_df_en_translation['translation']=test_df_en_translation['translation'].apply(en_data_cleaning)

In [None]:
y = train_df_en_translation["rating"].values
y_test = test_df_en_translation["rating"].values
en_train = train_df_en_translation["translation"]
en_test = test_df_en_translation["translation"]

y = np.where(y <= 2, 0, np.where(y == 3, 1, 2))

y_test = np.where(y_test <= 2, 0, np.where(y_test == 3, 1, 2))

y = np.array(y)

max_features = 6000
tokenizer = Tokenizer(num_words=max_features)
tokenizer.fit_on_texts(list(en_train))
list_en_tokenized_train = tokenizer.texts_to_sequences(en_train)
list_en_tokenized_test = tokenizer.texts_to_sequences(en_test)

In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Define the maximum sequence length (adjust based on your dataset)
max_sequence_length = 300

# Pad the sequences
X_en_train = pad_sequences(list_en_tokenized_train, maxlen=max_sequence_length, padding='post', truncating='post')
X_en_test = pad_sequences(list_en_tokenized_test, maxlen=max_sequence_length, padding='post', truncating='post')

print("Shape of X_train:", X_en_train.shape)
print("Shape of X_test:", X_en_test.shape)

In [None]:
# Define a function to create the model with hyperparameters
def Model_EN(embed_size, dense_units, dropout_rate, activation, optimizer):
    model = Sequential()
    model.add(Embedding(max_features, embed_size))
    model.add(LSTM(50, return_sequences=True))
    model.add(GlobalMaxPool1D())
    model.add(Dense(dense_units, activation=activation))
    model.add(Dropout(dropout_rate))
    model.add(Dense(len(set(y)), activation="softmax"))
    model.compile(loss='sparse_categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    return model

# Manual hyperparameter optimization
param_grid_en = {
    'embed_size': [32, 64, 128],
    'dense_units': [16, 32, 64],
    'dropout_rate': [0.2, 0.5, 0.8],
    'activation': ['relu', 'tanh'],
    'optimizer': ['adam', 'sgd'],
    'batch_size': [16, 32],
    'epochs': [10, 20]
}

best_val_accuracy_en = 0
best_params_en = {}

for embed_size in param_grid_en['embed_size']:
    for dense_units in param_grid_en['dense_units']:
        for dropout_rate in param_grid_en['dropout_rate']:
            for activation in param_grid_en['activation']:
                for optimizer in param_grid_en['optimizer']:
                    for batch_size in param_grid_en['batch_size']:
                        for epochs in param_grid_en['epochs']:
                            model = Model_FR(embed_size, dense_units, dropout_rate, activation, optimizer)
                            history = model.fit(
                                X_en_train, y,
                                batch_size=batch_size,
                                epochs=epochs,
                                validation_split=0.2,
                                verbose=0
                            )
                            val_accuracy_en = max(history.history['val_accuracy'])
                            if val_accuracy_en > best_val_accuracy_en:
                                best_val_accuracy_en = val_accuracy_en
                                best_params_en = {
                                    'embed_size': embed_size,
                                    'dense_units': dense_units,
                                    'dropout_rate': dropout_rate,
                                    'activation': activation,
                                    'optimizer': optimizer,
                                    'batch_size': batch_size,
                                    'epochs': epochs
                                }

print("Best English Parameters:", best_params_en)

# Train the final model with the best parameters
model_en = Model_EN(
    embed_size=best_params_en['embed_size'],
    dense_units=best_params_en['dense_units'],
    dropout_rate=best_params_en['dropout_rate'],
    activation=best_params_en['activation'],
    optimizer=best_params_en['optimizer']
)
history_en = train_model(model_en, "model_en", best_params_en['epochs'], best_params_en['batch_size'], X_en_train, y, 0.2)

In [None]:
from sklearn.metrics import accuracy_score, recall_score, f1_score, precision_score, confusion_matrix

def evaluate_model_en(model):
  prediction = model.predict(X_en_test)
  y_pred = prediction.argmax(axis=1)  # Get the class with the highest probability
  accuracy = accuracy_score(y_test, y_pred)
  precision = precision_score(y_test, y_pred, average='macro')  # Use 'macro' for multi-class
  recall = recall_score(y_test, y_pred, average='macro')  # Use 'macro' for multi-class
  f1 = f1_score(y_test, y_pred, average='macro')  # Use 'macro' for multi-class
  cf_matrix = confusion_matrix(y_test, y_pred)

  print(f"Accuracy: {accuracy:.3f}")
  print(f"Precision: {precision:.3f}")
  print(f"Recall: {recall:.3f}")
  print(f"F1-score: {f1:.3f}")
  print("Confusion Matrix:\n", cf_matrix)

  return accuracy, precision, recall, f1


In [None]:
# After training your model_d, evaluate it:
accuracy, precision, recall, f1 = evaluate_model_en(model_en)