In [None]:
import re
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from nltk.tokenize import word_tokenize
from nltk import bigrams
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import nltk
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import (
    Embedding,
    SimpleRNN,
    LSTM,
    Bidirectional,
    Dense,
    Dropout,
)
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import EarlyStopping
from tabulate import tabulate
from sklearn.utils.class_weight import compute_class_weight

In [None]:
nltk.download("punkt")
nltk.download("punkt_tab")
nltk.download("stopwords")
nltk.download("wordnet")

In [None]:
def load_data():
    df = pd.read_csv("/kaggle/input/flipkart-product/flipkart_product.csv", encoding="ISO-8859-1")
    df = df[["Summary", "Rate"]].dropna()
    df.columns = ["review", "rating"]
    df["rating"] = df["rating"].astype(str).str.extract("(\d+)")
    df["rating"] = pd.to_numeric(df["rating"], errors="coerce")
    df = df.dropna(subset=["rating"])
    df["rating"] = df["rating"].astype(int)

    def rate_to_sentiment(rate):
        if rate >= 4:
            return "positive"
        elif rate == 3:
            return "neutral"
        else:
            return "negative"

    df["sentiment"] = df["rating"].apply(rate_to_sentiment)
    return df


def preprocess_text(text):
    text = re.sub(r"[^a-zA-Z\s]", "", str(text).lower())
    lemmatizer = WordNetLemmatizer()
    stop_words = set(stopwords.words("english"))
    tokens = word_tokenize(str(text).lower())
    tokens = [lemmatizer.lemmatize(token) for token in tokens if token.isalpha()]
    tokens = [token for token in tokens if token not in stop_words]
    return " ".join(tokens)


# Making every sentiment have same number of entries
def balance_dataset(df):
    min_count = df["sentiment"].value_counts().min()
    balanced_df = pd.concat(
        [
            df[df["sentiment"] == "positive"].sample(min_count, random_state=42),
            df[df["sentiment"] == "neutral"].sample(min_count, random_state=42),
            df[df["sentiment"] == "negative"].sample(min_count, random_state=42),
        ]
    )
    return balanced_df

In [None]:
df = load_data()
print(df.shape)

In [None]:
df["cleaned_review"] = df["review"].apply(preprocess_text)

In [None]:
df.shape

In [None]:
label_map = {"positive": 2, "neutral": 1, "negative": 0}
df["label"] = df["sentiment"].map(label_map)
label_map_reverse = {v: k for k, v in label_map.items()}

In [None]:
# Tokenization and padding
max_words = 10000
max_len = 100
tokenizer = Tokenizer(num_words=max_words)
tokenizer.fit_on_texts(df["cleaned_review"])
sequences = tokenizer.texts_to_sequences(df["cleaned_review"])
padded_sequences = pad_sequences(sequences, maxlen=max_len)

In [None]:
X = padded_sequences
y = to_categorical(df["label"], num_classes=3)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

In [None]:
def build_rnn_model():
    model = Sequential(
        [
            Embedding(max_words, 400, input_length=max_len),
            SimpleRNN(256, return_sequences=False, kernel_regularizer=l2(0.01)),
            Dense(
                64, activation="relu", kernel_regularizer=l2(0.01)
            ),  # Increased units
            Dropout(0.6),  # Increased dropout
            Dense(3, activation="softmax"),
        ]
    )
    model.compile(
        optimizer=Adam(learning_rate=0.0001),
        loss="categorical_crossentropy",
        metrics=["accuracy"],
    )
    return model


def build_lstm_model():
    model = Sequential(
        [
            Embedding(max_words, 400, input_length=max_len),
            LSTM(256, return_sequences=False, kernel_regularizer=l2(0.01)),
            Dense(64, activation="relu", kernel_regularizer=l2(0.01)),
            Dropout(0.6),
            Dense(3, activation="softmax"),
        ]
    )
    model.compile(
        optimizer=Adam(learning_rate=0.0001),
        loss="categorical_crossentropy",
        metrics=["accuracy"],
    )
    return model


def build_bilstm_model():
    model = Sequential(
        [
            Embedding(max_words, 400, input_length=max_len),
            Bidirectional(
                LSTM(256, return_sequences=False, kernel_regularizer=l2(0.01))
            ),
            Dense(64, activation="relu", kernel_regularizer=l2(0.01)),
            Dropout(0.6),
            Dense(3, activation="softmax"),
        ]
    )
    model.compile(
        optimizer=Adam(learning_rate=0.0001),
        loss="categorical_crossentropy",
        metrics=["accuracy"],
    )
    return model

In [None]:
models = {
    "RNN": build_rnn_model(),
    "LSTM": build_lstm_model(),
    "BiLSTM": build_bilstm_model(),
}
histories = {}
results = {}
early_stopping = EarlyStopping(
    monitor="val_loss", patience=3, restore_best_weights=True
)
# Compute class weights based on the labels
class_weights = compute_class_weight(
    "balanced", classes=np.unique(df["label"]), y=df["label"]
)
class_weights_dict = dict(enumerate(class_weights))
print("Class Weights:", class_weights_dict)

# Update the training loop to include class weights
for name, model in models.items():
    print(f"Training {name}...")
    history = model.fit(
        X_train,
        y_train,
        epochs=20,
        batch_size=32,
        validation_split=0.2,
        callbacks=[early_stopping],
        class_weight=class_weights_dict,
        verbose=1,
    )
    histories[name] = history.history
    loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
    results[name] = {"loss": loss, "accuracy": accuracy}
    print(f"{name} Test Accuracy: {accuracy:.4f}")

    y_pred = model.predict(X_test, verbose=0)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_test_classes = np.argmax(y_test, axis=1)

    cm = confusion_matrix(y_test_classes, y_pred_classes)
    plt.figure(figsize=(4, 3))
    sns.heatmap(
        cm,
        annot=True,
        fmt="d",
        cmap="Blues",
        xticklabels=["Negative", "Neutral", "Positive"],
        yticklabels=["Negative", "Neutral", "Positive"],
    )
    plt.title(f"Confusion Matrix - {name}")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.show()

In [None]:
import os
import pickle

# Directory where Kaggle allows writing
save_dir = "/kaggle/working/saved_models"
os.makedirs(save_dir, exist_ok=True)

# Save models
for name, model in models.items():
    model.save(os.path.join(save_dir, f"{name}_model.h5"))

# Save tokenizer
with open(os.path.join(save_dir, "tokenizer.pkl"), "wb") as f:
    pickle.dump(tokenizer, f)

# Save label maps
with open(os.path.join(save_dir, "label_maps.pkl"), "wb") as f:
    pickle.dump({"label_map": label_map, "label_map_reverse": label_map_reverse}, f)

print("Models and tokenizer saved to /kaggle/working/saved_models/")


In [None]:
# Plot validation accuracy and loss
plt.figure(figsize=(10, 6))
for name, history in histories.items():
    plt.plot(history["val_accuracy"], label=f"{name} Val Accuracy")
plt.title("Model Validation Accuracy Comparison")
plt.xlabel("Epoch")
plt.ylabel("Validation Accuracy")
plt.legend()
plt.show()

plt.figure(figsize=(10, 6))
for name, history in histories.items():
    plt.plot(history["val_loss"], label=f"{name} Val Loss")
plt.title("Model Validation Loss Comparison")
plt.xlabel("Epoch")
plt.ylabel("Validation Loss")
plt.legend()
plt.show()

In [None]:
for name, model in models.items():
    y_pred = model.predict(X_test, verbose=0)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_test_classes = np.argmax(y_test, axis=1)
    print(f"\nClassification Report for {name}:")
    print(
        classification_report(
            y_test_classes,
            y_pred_classes,
            target_names=["Negative", "Neutral", "Positive"],
        )
    )

In [None]:
# Predict unseen data
def predict_unseen_data(models, tokenizer, reviews, max_len=150):
    cleaned_reviews = [preprocess_text(review) for review in reviews]
    sequences = tokenizer.texts_to_sequences(cleaned_reviews)
    padded = pad_sequences(sequences, maxlen=max_len)
    predictions = {}
    for name, model in models.items():
        probs = model.predict(padded, verbose=0)
        predicted_classes = np.argmax(probs, axis=1)
        predictions[name] = list(zip(predicted_classes, probs))
    return predictions


unseen_reviews = [
    "This product is fantastic and works perfectly!",
    "Absolutely terrible, broke after one use.",
    "It's okay, nothing special but gets the job done.",
]
print("\nPredicting sentiments for unseen reviews:")
predictions = predict_unseen_data(models, tokenizer, unseen_reviews, max_len)

for i, review in enumerate(unseen_reviews):
    print(f"\nReview: {review}")
    for model_name, preds in predictions.items():
        pred_class, pred_probs = preds[i]
        sentiment = label_map_reverse[pred_class]
        probs_str = ", ".join(
            [f"{label_map_reverse[j]}: {prob:.4f}" for j, prob in enumerate(pred_probs)]
        )
        print(f"{model_name} Prediction: {sentiment} ({probs_str})")

In [None]:
# Performance table
performance_table = []
for i, (name, model) in enumerate(models.items(), 1):
    loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
    y_pred = model.predict(X_test, verbose=0)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_test_classes = np.argmax(y_test, axis=1)
    report = classification_report(y_test_classes, y_pred_classes, output_dict=True)
    f1_score = report["macro avg"]["f1-score"]
    performance_table.append([i, name, f"{accuracy:.2f}", f"{f1_score:.2f}"])

print("\nPerformance Table:")
headers = ["S.No", "Type of Model", "Accuracy", "F1-Score"]
print(tabulate(performance_table, headers=headers, tablefmt="grid")