# Case 1: Task 1 - Sentiment Analysis Modeling

In [1]:
# Import required libraries
import os
import re
import nltk
import pandas as pd
from typing import Literal
from joblib import dump, load
from sklearn import preprocessing
from sklearn.model_selection import train_test_split, GridSearchCV, RandomizedSearchCV
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, accuracy_score, precision_score, recall_score

# Models
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier
from sklearn.svm import LinearSVC
from sklearn.linear_model import LogisticRegression, RidgeClassifier
from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier
from sklearn.naive_bayes import MultinomialNB, ComplementNB

# Module
from tweet import Tweet

In [2]:
# Paths
ABSOLUTE_PATH = os.path.abspath(os.getcwd())
data_dir = os.path.join(ABSOLUTE_PATH, "dataset")
models_dir = os.path.join(ABSOLUTE_PATH, "models")

data_sentiment_path = os.path.join(data_dir, "dataset_tweet_sentiment_cellular_service_provider.csv")
label_encoder_path = os.path.join(models_dir, "label_encoder.joblib")
vectorizer_path = os.path.join(models_dir, "vectorizer.joblib")
final_model_path = os.path.join(models_dir, "final_model.joblib")

In [3]:
# Load dataset
df = pd.read_csv(data_sentiment_path)

# Drop duplicate tweets
tweets_df = df[['Sentiment', 'Text Tweet']]
tweets_df = tweets_df.drop_duplicates()
tweets_df.head(5)

Unnamed: 0,Sentiment,Text Tweet
0,positive,<USER_MENTION> #BOIKOT_<PROVIDER_NAME> Gunakan...
1,positive,"Saktinya balik lagi, alhamdulillah :v <PROVIDE..."
2,negative,Selamat pagi <PROVIDER_NAME> bisa bantu kenap...
3,negative,Dear <PROVIDER_NAME> akhir2 ini jaringan data ...
4,negative,Selamat malam PENDUSTA <PROVIDER_NAME>


# Data Preprocessing

In [4]:
# Perform text preprocessing on tweet data using the Tweet module
tweets = []
for _, row in tweets_df.iterrows():
    tweets.append(
        Tweet(original_tweet=row['Text Tweet'], sentiment=row['Sentiment'])
    )

tweets_df['Clean_Tweet'] = [tweet.clean_tweet for tweet in tweets]
tweets_df.head(5)

Unnamed: 0,Sentiment,Text Tweet,Clean_Tweet
0,positive,<USER_MENTION> #BOIKOT_<PROVIDER_NAME> Gunakan...,#boikot produk bangsa
1,positive,"Saktinya balik lagi, alhamdulillah :v <PROVIDE...",sakti alhamdulillah cengang
2,negative,Selamat pagi <PROVIDER_NAME> bisa bantu kenap...,selamat pagi bisa bantu kamar sinyal 4g hilang...
3,negative,Dear <PROVIDER_NAME> akhir2 ini jaringan data ...,dear jaring data lambat h+
4,negative,Selamat malam PENDUSTA <PROVIDER_NAME>,selamat malam dusta


In [5]:
# Label encoding
labels = tweets_df['Sentiment']
label_encoder = preprocessing.LabelEncoder()
label_encoder = label_encoder.fit(labels)

tweets_df['Sentiment'] = label_encoder.transform(labels)

# Save label encoder for later use
dump(label_encoder, label_encoder_path)

tweets_df.head(5)

Unnamed: 0,Sentiment,Text Tweet,Clean_Tweet
0,1,<USER_MENTION> #BOIKOT_<PROVIDER_NAME> Gunakan...,#boikot produk bangsa
1,1,"Saktinya balik lagi, alhamdulillah :v <PROVIDE...",sakti alhamdulillah cengang
2,0,Selamat pagi <PROVIDER_NAME> bisa bantu kenap...,selamat pagi bisa bantu kamar sinyal 4g hilang...
3,0,Dear <PROVIDER_NAME> akhir2 ini jaringan data ...,dear jaring data lambat h+
4,0,Selamat malam PENDUSTA <PROVIDER_NAME>,selamat malam dusta


# Sentiment Analysis Modeling

In [6]:
random_seed = 42

## Vectorization

In [7]:
# Convert dataset to vector (Vectorization)
vectorizer = TfidfVectorizer()
vectorizer = vectorizer.fit(tweets_df['Clean_Tweet'])

X = vectorizer.transform(tweets_df['Clean_Tweet'])
y = tweets_df['Sentiment']

# Save vectorizer for later use
dump(vectorizer, vectorizer_path)

print(f"Data: {X.shape[0]} | Columns: {X.shape[1]}")
print(f"Label: {len(y)}")

Data: 298 | Columns: 686
Label: 298


## Train/Test Split

In [8]:
# Split dataset to training and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.3, random_state=random_seed)

print(f"Train data: {X_train.shape[0]} | label {len(y_train)}")
print(f"Test data: {X_test.shape[0]} | label {len(y_test)}")

Train data: 208 | label 208
Test data: 90 | label 90


In [9]:
# Convert sparse matrices to DataFrames with feature names (needed by LGBM)
feature_names = vectorizer.get_feature_names_out()
X_train_df = pd.DataFrame(X_train.toarray(), columns=feature_names)
X_test_df = pd.DataFrame(X_test.toarray(), columns=feature_names)

## Modeling with Various Models

In [10]:
# Define models and its paramaeters
models = {
    'XGB': (
        XGBClassifier(eval_metric='logloss', random_state=random_seed),
        {
            'n_estimators': [100, 200],
            'learning_rate': [0.05, 0.1],
            'max_depth': [4, 6],
            'subsample': [0.7, 1.0]
        }
    ),
    'LGBM': (
        LGBMClassifier(random_state=random_seed),
        {
            'n_estimators': [100, 200],
            'learning_rate': [0.05, 0.1],
            'num_leaves': [20, 31, 50],
            'max_depth': [-1, 5, 10],
            'min_child_samples': [1, 5, 10],
            'min_gain_to_split': [0.0, 0.01],
        }
    ),
    'CatBoost': (
        CatBoostClassifier(verbose=0, random_seed=random_seed),
        {
            'iterations': [100, 200],
            'learning_rate': [0.03, 0.1],
            'depth': [4, 6, 8]
        }
    ),
    'LinearSVC': (
        LinearSVC(random_state=random_seed),
        {
            'C': [0.1, 1, 10],
            'max_iter': [1000, 2000]
        }
    ),
    'LogReg': (
        LogisticRegression(solver='liblinear', random_state=random_seed),
        {
            'C': [0.1, 1, 10],
            'penalty': ['l1', 'l2']
        }
    ),
    'Ridge': (
        RidgeClassifier(random_state=random_seed),
        {
            'alpha': [0.5, 1.0, 2.0],
            'tol': [1e-3, 1e-4]
        }
    ),
    'RandomForest': (
        RandomForestClassifier(random_state=random_seed),
        {
            'n_estimators': [100, 200],
            'max_depth': [None, 10, 20],
            'min_samples_split': [2, 5]
        }
    ),
    'ExtraTrees': (
        ExtraTreesClassifier(random_state=random_seed),
        {
            'n_estimators': [100, 200],
            'max_depth': [None, 10, 20],
            'min_samples_split': [2, 5]
        }
    ),
    'MultinomialNB': (
        MultinomialNB(),
        {
            'alpha': [0.5, 1.0]
        }
    ),
    'ComplementNB': (
        ComplementNB(),
        {
            'alpha': [0.5, 1.0],
            'norm': [True, False]
        }
    ),
}

print(f"Models used: {len(list(models.keys()))}\n\n{list(models.keys())}")

Models used: 10

['XGB', 'LGBM', 'CatBoost', 'LinearSVC', 'LogReg', 'Ridge', 'RandomForest', 'ExtraTrees', 'MultinomialNB', 'ComplementNB']


In [11]:
%%time

best_models = {}

cross_validation = 5
for model_name, (model, param_grid) in models.items():
    print(f"Training {model_name} model ...")
    
    # Training models using GridSearchCV
    grid = GridSearchCV(
        estimator=model,
        param_grid=param_grid,
        scoring="f1_weighted",
        cv=cross_validation,
        n_jobs=-1,
        verbose=0,
    )
    grid.fit(X_train_df, y_train)

    # Get the best model
    best_model = grid.best_estimator_
    best_param = grid.best_params_
    best_score = grid.best_score_
    
    # Evaluate best model
    y_pred = best_model.predict(X_test_df)

    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, average="weighted")
    recall = recall_score(y_test, y_pred, average="weighted")
    report = classification_report(y_test, y_pred)
    
    print(f"{report}\n")

    # Save the best model
    best_models[model_name] = {
        "model": best_model,
        "parameters": best_param,
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1_score": best_score,
        "report": report
    }

Training XGB model ...
              precision    recall  f1-score   support

           0       0.85      0.69      0.76        48
           1       0.71      0.86      0.77        42

    accuracy                           0.77        90
   macro avg       0.78      0.77      0.77        90
weighted avg       0.78      0.77      0.77        90


Training LGBM model ...
[LightGBM] [Info] Number of positive: 96, number of negative: 112
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.000215 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 224
[LightGBM] [Info] Number of data points in the train set: 208, number of used features: 48
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.461538 -> initscore=-0.154151
[LightGBM] [Info] Start training from score -0.154151
              precision    recall  f1-score   support

           0       0

# Get the Best Model

## Using Specific Metric

In [12]:
def get_best_model(models: dict = best_models, metric: Literal["accuracy", "precision", "recall", "f1_score"] = "accuracy"):
    """Get and save final model"""
    # Get the best of the best model (final model) based on specific metric
    try:
        best_model_name = max(models.keys(), key=lambda k: models[k][metric])
    except Exception as e:
        raise Exception("Metric unknown.")

    # Save the final model
    best_model = models[best_model_name]
    print(f"model: {best_model_name}")
    print(f"param: {best_model['parameters']}")
    print(f"{metric}: {best_model[metric]:,.03f}")
    
    return best_model["model"]
    
final_model = get_best_model(metric="f1_score")

model: ComplementNB
param: {'alpha': 1.0, 'norm': False}
f1_score: 0.860


## Using Calculation

In [13]:
# Get the best model based on metrics calculation
def calculate_best_model(models: dict = best_models):
    """Get and save final model based on weights calculation"""
    scores = {}

    # Custom weights
    weights = {
        'accuracy': 0.25,
        'precision': 0.25, 
        'recall': 0.25,
        'f1_score': 0.25
    }

    # Calculate overall score based on custom weights and get the best of the best model (final model)
    for model_name in models:
        score = 0
        for metric in weights:
            score += weights[metric] * models[model_name][metric]
        scores[model_name] = score
    best_model_name = max(scores, key=scores.get)

    # Save the final model
    best_model = models[best_model_name]
    print(f"model: {best_model_name}")
    print(f"param: {best_model['parameters']}")
    print(f"score: {scores[best_model_name]:,.03f}")

    return best_model['model']
    
final_model = calculate_best_model(best_models)

model: MultinomialNB
param: {'alpha': 0.5}
score: 0.845


In [14]:
# Save the final model for later use
dump(final_model, final_model_path)

['D:\\Learn\\LLM\\llm_engineering\\my_projects\\nawatech_test\\case1\\models\\final_model.joblib']

# Test Model (Prediction)

In [15]:
text = "buruan pakai bisa dapat banyak poin lho"

# Text preprocessing
text_clean = Tweet(original_tweet=text, sentiment="")

# Text to vector/matrix (vectorization)
text_vec = vectorizer.transform([text_clean.clean_tweet])

# Sparse matrix to DataFrame (needed by LGBM)
feature_names = vectorizer.get_feature_names_out()
text_df = pd.DataFrame(text_vec.toarray(), columns=feature_names)

# Prediction
prediction = final_model.predict(text_df)

# Inverse label encoding
prediction = label_encoder.inverse_transform(prediction)

print(f"Tweet\t : {text}")
print(f"Sentiment: {prediction[0]}")

Tweet	 : buruan pakai bisa dapat banyak poin lho
Sentiment: positive
