# **Predict the score and class of Hotel's reviews**

The notebook aims to develop a model to predict both the score and the class of reviews combining a bidirectionl RNN and a Dense Model

In [None]:
!pip install tensorflow
!pip install --upgrade tensorflow keras

In [None]:
from keras.models import Sequential
from keras.layers import Input, Embedding, Bidirectional, LSTM, Dense, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam


from sklearn.model_selection import StratifiedKFold
from tensorflow.keras.callbacks import EarlyStopping

# Import the data


In [None]:
import pandas as pd

# Download CSV using GitHub
url = "https://raw.githubusercontent.com/jcerri23/DLexam/refs/heads/main/input_data.csv"
df = pd.read_csv(url)

df.head()


The `Review_Score` ans the `Review_Type `columns will be our target columns

In [None]:
df.columns

## Pre processing

The two main pre processing steps are performed on the `Review_Type` column and on the `Review` field

In [None]:
df['Review_Type'] = df['Review_Type'].replace({'Good_review': 1, 'Bad_review': 0})
df['Review_Type']


There are quite the same number of reviews that belong to the positive and negative class, with respect of this feature the dataset is BALANCED

In [None]:
# checking balance
y2 = df['Review_Type']

print(f"Number of 0:{(y2==0).sum()}")
print(f"Number of 1:{(y2==1).sum()}")


A minimal cleaming will be perfomed on the review text :

- Normalize all words to lowercase to reduce the vocabulary size
- Remove all punctuation, numbers and special characters

In [None]:
import re
def cleaning(text):
    text = text.lower() # LOWERCASE
    text = re.sub(r'[^a-z\s]', '', text)  # REMOVE punctuation, numbers, special characters
    return text

df['Review'] = df['Review'].astype(str).apply(cleaning)

# rename the review column
df.rename(columns={'Review': 'Clean_Review'}, inplace=True)
df['Clean_Review'].head()

# Splitting


In [None]:
from typing_extensions import dataclass_transform
from sklearn.model_selection import train_test_split

X = df['Clean_Review']
y = df[['Review_Score', 'Review_Type']]

X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.2,
    random_state=42
)

# Tokenization and Padding

In order to prepare the data for the first layer of the model, the `EmbeddingLayer`, we need to idenify the number of unique words in the reviews, build a mapping from each unique word to an index.

To avoid data leakage a `Tokenizer` is used to learn the word-to-index mapping only on train dataset

At last all sequences are pad at the same length using the `pad_sequences` function

In [None]:
all_words = [word for review in df['Clean_Review'] for word in review.split()]
print(f"Total words: {len(all_words)}")

VOCAB_SIZE = len(set(all_words))
print(f"Unique words: {VOCAB_SIZE}")

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# split the review in token
tokenizer = Tokenizer(num_words=VOCAB_SIZE, oov_token="<OOV>")
tokenizer.fit_on_texts(X_train)  # only on train data

X_train_seq = tokenizer.texts_to_sequences(X_train)
X_test_seq = tokenizer.texts_to_sequences(X_test)


Choosing an appropriate `MAX_LENGTH` value is a crucial point to reduce model complexity while maintaining performance.



In [None]:
# Just for the analysis of the MAX_LENGTH
review_lengths = df['Clean_Review'].apply(lambda x: len(x.split()))
print(review_lengths.describe())

In [None]:
import matplotlib.pyplot as plt

df['Clean_Review'].apply(lambda x: len(x.split())).hist(bins=50)
plt.xlabel('Review Length (words)')
plt.ylabel('Number of Reviews')
plt.title('Distribution of Review Lengths')
plt.show()


Although the maximum number of words per review is stated as 400, analysis shows that 75% of the reviews are under 30 words.

Setting `MAX_LENGTH` to 256 can be a safe and efficient choice, given the distribution, even 128 could be a valid option

In [None]:
MAX_LEN = 256

X_train_pad = pad_sequences(X_train_seq, maxlen=MAX_LEN, padding='post', truncating='post')
X_test_pad = pad_sequences(X_test_seq, maxlen=MAX_LEN, padding='post', truncating='post')


In [None]:
X_train_pad[367]

# Input

The given dataset has been trasformed into an input of size `(N^SAMPLE, MAX_LENGTH) `

In [None]:
print("X_train_pad shape:", X_train_pad.shape)
print("X_test_pad shape:", X_test_pad.shape)

# Initial Model


In [None]:
EMBEDDING_DIM = 128
n_units = 64
n_hidden = 64
input_seq = Input(shape=(MAX_LEN,), name='input')

# Embedding layer
x = Embedding(input_dim=VOCAB_SIZE, output_dim=EMBEDDING_DIM, input_length=MAX_LEN)(input_seq)

# Bidirectional LSTM layer
x = Bidirectional(LSTM(n_units, return_sequences=False))(x)

# dense + dropout layers
x = Dense(n_hidden, activation='relu')(x)
x = Dropout(0.5)(x)


#  Regression head
output_score = Dense(1, activation='linear', name='score_output')(x)

# Classification head
output_class = Dense(1, activation='sigmoid', name='class_output')(x)


model = Model(inputs=input_seq,
              outputs=[output_score, output_class]
              )

optimizer = Adam(learning_rate=0.001, clipvalue=1.0)


# Composite loss
model.compile(
    optimizer=optimizer,
    loss={
        'score_output': 'mse',          # regression loss
        'class_output': 'binary_crossentropy'  # classification loss
    },
    metrics={
        'score_output': ['mse'],
        'class_output': ['accuracy']
    }
)

model.summary()


In [None]:
from tensorflow.keras.utils import plot_model
plot_model(model)

Just verify if the model works

In [None]:
y_train_score = y_train['Review_Score'].values.reshape(-1, 1)
y_train_class = y_train['Review_Type'].values.reshape(-1, 1)


batch_size = 128
epochs = 5


model.fit(
    X_train_pad,
    y={'score_output':y_train_score, 'class_output': y_train_class},
    epochs=epochs,
    batch_size=batch_size,
    validation_split=0.2
)

Without any special parameters the models seems to has already  good performances, with an accuracy of 90% into predicting the classo of the review and small errors on the score prediction


 - val_class_output_accuracy: 0.8975
 - val_class_output_loss: 0.3148
 - val_loss: 2.2076
 - val_score_output_loss: 1.9724
 - val_score_output_mse: 1.9011





# Hyperparameters tuning


The key parameters worth tuning in the context of the Bidirectional LSTM are the `number of units` and the `number of epochs` .

Additionally, when encoding the text, finding the optimal `embedding size` can be very beneficial.

Finally, tuning regularization parameters such as the `dropout rate` and the optimizer’s `clip value` may further enhance model performance.

* Number of units
* Number of epochs
* Embedding size
* Dropout rate
* Clip value


We can introduce an EarlyStopping callback to avoid performing unnecessary epoch if validation loss stops to increase

Then a function can be defined to build the model and test the values of the hyperparameters chosen

In [None]:
import numpy as np
import random
import tensorflow as tf

# Set seeds for reproducibility
np.random.seed(42)
random.seed(42)
tf.random.set_seed(42)

# EarlyStopping callback
early_stop = EarlyStopping(
    monitor='val_loss',
    patience=3,
    restore_best_weights=True,
    verbose=0
)


The initial model achieved a total loss of 2.017, which serves as our baseline for comparison (look at Initial Model section)

Inspecting the loss components, it became evident that the regression loss is significantly higher than the classification loss.

* `score_output_loss ≈ 1.9`
* `class_output_loss ≈ 0.3`

This discrepancy causes the total loss to be dominated by the regression task, since Keras by default computes the total loss as a simple sum of the individual task losses.


To address this imbalance and ensure that both tasks contribute more equally to model training, we manually assigned custom weights to each loss

In [None]:
# builder function with weigthed loss

def create_model(VOCAB_SIZE, MAX_LEN, EMBEDDING_DIM, n_units, dropout_rate, clipvalue):
    input_ = Input(shape=(MAX_LEN,))
    x = Embedding(input_dim=VOCAB_SIZE, output_dim=EMBEDDING_DIM, input_length=MAX_LEN)(input_)
    x = Bidirectional(LSTM(n_units, return_sequences=False))(x)
    x = Dense(64, activation='relu')(x)
    x = Dropout(dropout_rate)(x)

    score_output = Dense(1, name='score_output')(x)  # Regression
    class_output = Dense(1, activation='sigmoid', name='class_output')(x)  # Classification

    model = Model(inputs=input_, outputs=[score_output, class_output])

    optimizer = Adam(learning_rate=0.001, clipvalue=clipvalue)

    model.compile(
    loss={
        'score_output': 'mse',
        'class_output': 'binary_crossentropy'
    },
    loss_weights={
        'score_output': 0.5,   # downweight the regression task
        'class_output': 1.0    # emphasize classification
    },
    metrics={
        'score_output': ['mse'],
        'class_output': ['accuracy']
    }
    )
    return model


Defining for each parameter the acceptable range of values

In [None]:
param_distributions = {
    'n_units': [8,16,32,64],
    'EMBEDDING_DIM': [32, 64, 128],
    'dropout_rate': [0.2, 0.3, 0.5],
    'clipvalue': [0.5, 1.0, 2.0],
    'epochs': [4,5,6]
}

**INITIAL PROPOSAL**

My initial approach was to perform Stratified K-Fold Cross-Validation based on the classification labels.

This is a robust and reliable method for model evaluation, but in practice, it proved to be quite time-consuming, taking approximately 15 minutes per tuning session, likely due to memory and computational limitations.

This is the result obtained



```
Best hyperparameters found:
{'n_units': 64, 'EMBEDDING_DIM': 64, 'dropout_rate': 0.2, 'clipvalue': 0.5, 'epochs': 5}
Best average validation loss: 1.0737
```



In [None]:
# SKIP THIS PART AND PERFORM THE FASTER TUNING ABOVE

In [None]:
n_trials = 7


skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)

best_avg_val_loss = float('inf')
best_params = None

In [None]:
for trial in range(n_trials):
    print(f"\n Trial {trial+1}/{n_trials}")

    # Sample hyperparameters
    params = {k: random.choice(v) for k, v in param_distributions.items()}
    print("Testing parameters:", params)

    fold_val_losses = []

    for fold, (train_idx, val_idx) in enumerate(skf.split(X_train_pad, y_train_class)):
        print(f"  Fold {fold+1}")

        X_tr, X_val = X_train_pad[train_idx], X_train_pad[val_idx]
        y_score_tr, y_score_val = y_train_score[train_idx], y_train_score[val_idx]
        y_class_tr, y_class_val = y_train_class[train_idx], y_train_class[val_idx]

        model = create_model(
            VOCAB_SIZE=VOCAB_SIZE,
            MAX_LEN=MAX_LEN,
            EMBEDDING_DIM=params['EMBEDDING_DIM'],
            n_units=params['n_units'],
            dropout_rate=params['dropout_rate'],
            clipvalue=params['clipvalue']
        )

        model.fit(
            X_tr,
            {'score_output': y_score_tr, 'class_output': y_class_tr},
            epochs=params['epochs'],
            batch_size=32,
            verbose=0,
            validation_data=(X_val, {'score_output': y_score_val, 'class_output': y_class_val}),
            callbacks=[early_stop]
        )

        # evaluate the trial model on the validation
        val_loss = model.evaluate(
            X_val,
            {'score_output': y_score_val, 'class_output': y_class_val},
            verbose=0
        )[0]  # total loss

        print(f"    Validation loss: {val_loss:.4f}")
        fold_val_losses.append(val_loss)

    avg_val_loss = np.mean(fold_val_losses)
    print(f"  ➤ Avg validation loss: {avg_val_loss:.4f}")

    if avg_val_loss < best_avg_val_loss:
        best_avg_val_loss = avg_val_loss
        best_params = params

# 15 minutes


 Trial 1/7
Testing parameters: {'n_units': 8, 'EMBEDDING_DIM': 32, 'dropout_rate': 0.5, 'clipvalue': 1.0, 'epochs': 4}
  Fold 1




    Validation loss: 1.1498
  Fold 2
    Validation loss: 1.1323
  Fold 3
    Validation loss: 1.1127
  ➤ Avg validation loss: 1.1316

 Trial 2/7
Testing parameters: {'n_units': 32, 'EMBEDDING_DIM': 32, 'dropout_rate': 0.2, 'clipvalue': 1.0, 'epochs': 4}
  Fold 1




    Validation loss: 1.0190
  Fold 2
    Validation loss: 1.1119
  Fold 3
    Validation loss: 1.0540
  ➤ Avg validation loss: 1.0616

 Trial 3/7
Testing parameters: {'n_units': 16, 'EMBEDDING_DIM': 64, 'dropout_rate': 0.3, 'clipvalue': 0.5, 'epochs': 6}
  Fold 1




    Validation loss: 0.9942
  Fold 2
    Validation loss: 1.0950
  Fold 3
    Validation loss: 1.0058
  ➤ Avg validation loss: 1.0317

 Trial 4/7
Testing parameters: {'n_units': 64, 'EMBEDDING_DIM': 64, 'dropout_rate': 0.5, 'clipvalue': 1.0, 'epochs': 4}
  Fold 1




    Validation loss: 1.1077
  Fold 2
    Validation loss: 1.1453
  Fold 3
    Validation loss: 1.0848
  ➤ Avg validation loss: 1.1126

 Trial 5/7
Testing parameters: {'n_units': 8, 'EMBEDDING_DIM': 128, 'dropout_rate': 0.5, 'clipvalue': 1.0, 'epochs': 6}
  Fold 1




    Validation loss: 1.0299
  Fold 2
    Validation loss: 1.0923
  Fold 3
    Validation loss: 1.0888
  ➤ Avg validation loss: 1.0703

 Trial 6/7
Testing parameters: {'n_units': 32, 'EMBEDDING_DIM': 32, 'dropout_rate': 0.2, 'clipvalue': 0.5, 'epochs': 6}
  Fold 1




In [None]:
# Final result
print("\nBest hyperparameters found:")
print(best_params)
print(f"Best average validation loss: {best_avg_val_loss:.4f}")


**CHANGE --> Holdout setting**

To speed up the process, I switched to performing hyperparameter tuning using a holdout validation split.

While slightly less robust than K-Fold, this method significantly reduces computation time and still provides meaningful performance estimates during model selection.

In [None]:
# Holdout validation split (stratified)
X_tr, X_val, y_score_tr, y_score_val, y_class_tr, y_class_val = train_test_split(
    X_train_pad, y_train_score, y_train_class,
    test_size=0.2,
    stratify=y_train_class,
    random_state=42
)

best_val_loss = float('inf')
best_params = None

# Run random search
for trial in range(n_trials):
    print(f"\nTrial {trial+1}/{n_trials}")

    # Sample random parameters
    params = {k: random.choice(v) for k, v in param_distributions.items()}
    print("Testing parameters:", params)


    model = create_model(
        VOCAB_SIZE=VOCAB_SIZE,
        MAX_LEN=MAX_LEN,
        EMBEDDING_DIM=params['EMBEDDING_DIM'],
        n_units=params['n_units'],
        dropout_rate=params['dropout_rate'],
        clipvalue=params['clipvalue']
    )

    # Train with early stopping
    model.fit(
        X_tr,
        {'score_output': y_score_tr, 'class_output': y_class_tr},
        epochs=params['epochs'],
        batch_size=32,
        verbose=0,
        validation_data=(X_val, {'score_output': y_score_val, 'class_output': y_class_val}),
        callbacks=[early_stop]
    )

    # Evaluate total loss
    val_loss = model.evaluate(
        X_val,
        {'score_output': y_score_val, 'class_output': y_class_val},
        verbose=0
    )[0]


    print(f"  ➤ Validation loss: {val_loss:.4f}")

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_params = params


print("\nBest hyperparameters:")
print(best_params)
print(f"Best validation loss: {best_val_loss:.4f}")

# 4 minutes

The best configuration found is



```
Best hyperparameters:
{'n_units': 8, 'EMBEDDING_DIM': 128, 'dropout_rate': 0.3, 'clipvalue': 2.0, 'epochs': 4}
Best validation loss: 1.0067
```



# Re-train the model with the best configuration


After the best configuration has been found a standard approach is to re-train the final model on the training set

In [None]:
best_config = {
    'n_units': 8,
    'EMBEDDING_DIM': 128,
    'dropout_rate': 0.3,
    'clipvalue': 2.0,
    'epochs': 6
}

final_model2= create_model(
    VOCAB_SIZE=VOCAB_SIZE,
    MAX_LEN=MAX_LEN,
    EMBEDDING_DIM= 128,
    n_units=8,
    dropout_rate=0.3,
    clipvalue=2.0
)

final_model2.fit(
    X_train_pad,
    {'score_output': y_train_score, 'class_output': y_train_class},
    epochs=best_config['epochs'],
    batch_size=32,
    verbose=1
)

# Experiment : Autoencoder to learn embedding

BUILD THE AUTOENCODER

In [None]:
def build_autoencoder(VOCAB_SIZE, EMBEDDING_DIM, MAX_LEN):
    input_seq = Input(shape=(MAX_LEN,), name='input_seq')

    # Encoder
    embed = Embedding(input_dim=VOCAB_SIZE, output_dim=EMBEDDING_DIM, name='embedding')(input_seq)
    encoded = LSTM(64)(embed)

    # Decoder
    decoded = Dense(VOCAB_SIZE, activation='softmax', name='decoder')(encoded)

    autoencoder = Model(inputs=input_seq, outputs=decoded)

    autoencoder.compile(optimizer='adam', loss='sparse_categorical_crossentropy')
    return autoencoder


COSTUM TASK : PREDICT THE CENTRAL TOKEN

Instead of reconstructing the entire input sequence, the autoencoder is trained to predict the middle token of the padded input sequence (`X_train_pad[:, MAX_LEN // 2]`).

This custom task encourages the model to learn meaningful semantic representations in the embedding layer, which are then used as input features for the BiLSTM model

In [None]:
y_auto = np.expand_dims(X_train_pad[:, MAX_LEN // 2], axis=1)

autoencoder = build_autoencoder(VOCAB_SIZE=VOCAB_SIZE, EMBEDDING_DIM=128, MAX_LEN=MAX_LEN)

autoencoder.fit(
    X_train_pad,
    y_auto,
    batch_size=32,
    epochs=5,
    verbose=1
)


FEED THE MODEL WITH THE LEARNED EMBEDDINGS

In [None]:
# Get the trained embedding layer weights
embedding_weights = autoencoder.get_layer('embedding').get_weights()

def create_model_with_ae_embedding(VOCAB_SIZE, MAX_LEN, EMBEDDING_DIM, n_units, dropout_rate, clipvalue, embedding_weights):
    input_ = Input(shape=(MAX_LEN,))

    # AUTOENCODER EMBEDDING
    x = Embedding(
        input_dim=VOCAB_SIZE,
        output_dim=EMBEDDING_DIM,
        weights=embedding_weights,
        trainable=True, # update embedding during training
        name="pretrained_embedding"
    )(input_)

    x = Bidirectional(LSTM(n_units, return_sequences=False))(x)
    x = Dense(64, activation='relu')(x)
    x = Dropout(dropout_rate)(x)

    score_output = Dense(1, name='score_output')(x)
    class_output = Dense(1, activation='sigmoid', name='class_output')(x)

    model = Model(inputs=input_, outputs=[score_output, class_output])

    optimizer = Adam(learning_rate=0.001, clipvalue=clipvalue)

    model.compile(
        optimizer=optimizer,
        loss={
            'score_output': 'mse',
            'class_output': 'binary_crossentropy'
        },
        loss_weights={
            'score_output': 0.5,
            'class_output': 1.0
        },
        metrics={
            'score_output': ['mse'],
            'class_output': ['accuracy']
        }
    )
    return model


COMPARE PERFORMANCES

In [None]:
final_model1 = create_model_with_ae_embedding(
    VOCAB_SIZE=VOCAB_SIZE,
    MAX_LEN=MAX_LEN,
    EMBEDDING_DIM=128,
    n_units=32,
    dropout_rate=0.3,
    clipvalue=2.0,
    embedding_weights=embedding_weights
)

final_model1.fit(
    X_train_pad,
    {'score_output': y_train_score, 'class_output': y_train_class},
    epochs=4,
    batch_size=32,
    verbose=1
)


Using pretrained embeddings from the autoencoder gives promising results leading to good classification accuracy and moderate score loss.

However, the overall  loss  is very close to the baseline with the standard embedding layer trained from scratch .

- Best loss with EmbeddingLayer --> `1.1304`
- Best loss with AE embedding --> `1.2057`

In this case, pretrained embeddings don’t significantly improve performance, so the straightforward standard approach will be maintained.



# Final Evaluation

Finally, the best model should be evaluated on unseen data to assess its generalization ability and to detect any signs of overfitting.

Two different metrics can be used to quantify performance on the respective tasks:

- **Type classification – Accuracy**

  This metric indicates how often the model correctly predicts the class label.
  Also f1 score metric is an effective metric, but in this context, since the "bad" and "good" classes are balanced, we can confidently rely on accuracy to provide a reliable assessment of the model's performance.

- **Score prediction – Mean Squared Error (MSE)**

  MSE measures the average squared difference between the predicted scores and the actual scores, providing an indication of how far off the predictions are from the true values.

In [None]:
# prompt: EVALUATE MODEL PERFORMANCES

from sklearn.metrics import mean_squared_error, mean_absolute_error, accuracy_score, classification_report, confusion_matrix

y_pred = final_model2.predict(X_test_pad)

y_pred_score = y_pred[0]
y_pred_class = y_pred[1]

#  Score Prediction (Regression)
mse_score = mean_squared_error(y_test['Review_Score'], y_pred_score)
mae_score = mean_absolute_error(y_test['Review_Score'], y_pred_score)

print("--- Score Prediction Performance ---")
print(f"Mean Squared Error (MSE): {mse_score:.4f}")
print(f"Mean Absolute Error (MAE): {mae_score:.4f}")

#  Class Prediction (Classification)
y_pred_class_labels = (y_pred_class > 0.5).astype(int)

accuracy_class = accuracy_score(y_test['Review_Type'], y_pred_class_labels)
report_class = classification_report(y_test['Review_Type'], y_pred_class_labels)


print("\n--- Class Prediction Performance ---")
print(f"Accuracy: {accuracy_class:.4f}")
print("\nClassification Report:")
print(report_class)

# Optional : Confusion Matrix
conf_matrix_class = confusion_matrix(y_test['Review_Type'], y_pred_class_labels)
print("\nConfusion Matrix:")
conf_matrix_class

# Bonus : Comparing MSE and MAE

Computing the MAE alongside the MSE provides valuable insight into the nature of the model’s prediction errors.

- `MSE: 1.75`
- `MAE: 0.9885`

The model demonstrates good overall performance (average MAE <1).

However, the MSE is significantly higher (approximately 67% greater than the MAE), it may suggests that while most predictions are close to the true values, there are a few large errors  that disproportionately affect the MSE due to its sensitivity to larger deviations.