# Step A1: Data Loading & Validation

“Am I training on what I think I’m training on?”

We load the cleaned dataset, validate the columns, handle null/empty values, and inspect the label distribution.

In [1]:
import pandas as pd
import numpy as np

data_path = "../data/cleaned.csv"
try:
    df = pd.read_csv(data_path)
    print("Dataset loaded successfully.")
except FileNotFoundError:
    print(f"Error: File not found at {data_path}")

print(f"Shape: {df.shape}")
df.head()

Dataset loaded successfully.
Shape: (1599999, 5)


Unnamed: 0,target,text,sentiment_label,text_length,cleaned_text
0,0,is upset that he can't update his Facebook by ...,Negative,111,is upset that he can't update his facebook by ...
1,0,@Kenichan I dived many times for the ball. Man...,Negative,89,i dived many times for the ball. managed to s...
2,0,my whole body feels itchy and like its on fire,Negative,47,my whole body feels itchy and like its on fire
3,0,"@nationwideclass no, it's not behaving at all....",Negative,111,"no, it's not behaving at all. i'm mad. why am..."
4,0,@Kwesidei not the whole crew,Negative,29,not the whole crew


## Validate Columns

Explicitly check that `cleaned_text` and `target` (label) columns exist. This avoids hidden assumptions later.

In [2]:
required_cols = ['cleaned_text', 'target']
missing_cols = [col for col in required_cols if col not in df.columns]

if missing_cols:
    raise ValueError(f"CRITICAL ERROR: Missing required columns: {missing_cols}. Please check the previous preprocessing step.")

print("Validation Passed: 'cleaned_text' and 'target' columns exist.")

Validation Passed: 'cleaned_text' and 'target' columns exist.


## Null & Empty Checks

This is important for TF-IDF, which breaks on empty vocabulary. We must ensure streaming inference never receives empty input.

In [3]:
# 1. Drop rows where cleaned_text is null
initial_count = len(df)
df = df.dropna(subset=['cleaned_text', 'target'])

# 2. Drop rows where cleaned_text is empty string (after conversion to string to be safe)
df = df[df['cleaned_text'].astype(str).str.strip() != ""]

cleaned_count = len(df)
dropped_count = initial_count - cleaned_count

print(f"Initial rows: {initial_count}")
print(f"Dropped rows (null/empty): {dropped_count}")
print(f"Remaining rows: {cleaned_count}")

if cleaned_count == 0:
    raise ValueError("CRITICAL: Dataset is empty after cleaning!")

Initial rows: 1599999
Dropped rows (null/empty): 2815
Remaining rows: 1597184


## Label Sanity Check

Understand binary vs multiclass and class imbalance.

In [4]:
# Unique labels
unique_labels = df['target'].unique()
print(f"Unique labels: {unique_labels}")

# Count occurrences
label_counts = df['target'].value_counts()
print("\nLabel Distribution:")
print(label_counts)

# Visual check of imbalance
if len(unique_labels) > 2:
    print("\nObservation: Multi-class problem.")
else:
    print("\nObservation: Binary classification problem.")

Unique labels: [0 4]

Label Distribution:
target
4    798688
0    798496
Name: count, dtype: int64

Observation: Binary classification problem.


## Define Training Variables

Prepare `X_text` and `y` for vectorization and splitting.

In [5]:
X_text = df['cleaned_text']
y = df['target']

print("Training variables defined:")
print(f"X_text (Series): {X_text.shape}")
print(f"y (Series): {y.shape}")

# Peek at the data
print("\nSample X_text:")
print(X_text.head(3))
print("\nSample y:")
print(y.head(3))

Training variables defined:
X_text (Series): (1597184,)
y (Series): (1597184,)

Sample X_text:
0    is upset that he can't update his facebook by ...
1     i dived many times for the ball. managed to s...
2      my whole body feels itchy and like its on fire 
Name: cleaned_text, dtype: object

Sample y:
0    0
1    0
2    0
Name: target, dtype: int64


# Step A2: Feature Extraction with TF-IDF

"How do we turn cleaned text into numbers that a model can learn from?"

We use TF-IDF (Term Frequency-Inverse Document Frequency) to convert text into numerical vectors. This approach gives higher weight to informative words and lower weight to common, less useful words.

### Configuration
- `max_features=5000`: Keeps the model small and inference fast.
- `ngram_range=(1, 2)`: Captures single words and short phrases (e.g., "not good").
- `min_df=5`: Ignores words that appear in fewer than 5 tweets to reduce noise.
- `stop_words='english'`: Removes common English stop words.

In [6]:
from sklearn.feature_extraction.text import TfidfVectorizer
import time

print("Starting TF-IDF vectorization...")
start_time = time.time()

vectorizer = TfidfVectorizer(
    max_features=5000,
    ngram_range=(1, 2),
    min_df=5,
    stop_words='english'
)

X_features = vectorizer.fit_transform(X_text)

end_time = time.time()
print(f"Vectorization complete in {end_time - start_time:.2f} seconds.")
print(f"Shape: {X_features.shape}")
print(f"Vocabulary size: {len(vectorizer.get_feature_names_out())}")

Starting TF-IDF vectorization...
Vectorization complete in 48.37 seconds.
Shape: (1597184, 5000)
Vocabulary size: 5000
Vectorization complete in 48.37 seconds.
Shape: (1597184, 5000)
Vocabulary size: 5000


# Step A3: Train & Evaluate Model (Logistic Regression)

"Can the model learn useful patterns from TF-IDF features?"

We choose **Logistic Regression** because it is:
- **Fast** to train and predict (crucial for real-time streaming).
- **Interpretable** (we can see which words drive sentiment).
- **Strong Baseline** for text classification tasks.

We will split the data (80/20), train the model, and evaluate it using standard classification metrics.

In [7]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score

print("Splitting data (80/20)...")
X_train, X_test, y_train, y_test = train_test_split(
    X_features, y, test_size=0.2, random_state=42, stratify=y
)

print(f"Train size: {X_train.shape[0]}, Test size: {X_test.shape[0]}")

print("Training Logistic Regression...")
log_reg = LogisticRegression(max_iter=1000, random_state=42, n_jobs=-1)
log_reg.fit(X_train, y_train)

print("Training complete.")

Splitting data (80/20)...
Train size: 1277747, Test size: 319437
Training Logistic Regression...
Train size: 1277747, Test size: 319437
Training Logistic Regression...
Training complete.
Training complete.


### Model Evaluation

We evaluate the model on the held-out test set using Accuracy, Precision, Recall, and F1-Score.

In [8]:
y_pred = log_reg.predict(X_test)

accuracy = accuracy_score(y_test, y_pred)
report = classification_report(y_test, y_pred)

print(f"Model Accuracy: {accuracy:.4f}\n")
print("Classification Report:")
print(report)

with open("../ml/metrics.txt", "w") as f:
    f.write(f"Accuracy: {accuracy:.4f}\n\n")
    f.write("Classification Report:\n")
    f.write(report)
print("Metrics saved to ../ml/metrics.txt")

feature_names = vectorizer.get_feature_names_out()
coefficients = log_reg.coef_[0]

if len(log_reg.classes_) == 2:
    top_positive_indices = np.argsort(coefficients)[-10:]
    top_negative_indices = np.argsort(coefficients)[:10]

    print("\nTop 10 Words Driving POSITIVE Sentiment:")
    print([feature_names[i] for i in top_positive_indices])

    print("\nTop 10 Words Driving NEGATIVE Sentiment:")
    print([feature_names[i] for i in top_negative_indices])

Model Accuracy: 0.7692

Classification Report:
              precision    recall  f1-score   support

           0       0.79      0.74      0.76    159699
           4       0.75      0.80      0.78    159738

    accuracy                           0.77    319437
   macro avg       0.77      0.77      0.77    319437
weighted avg       0.77      0.77      0.77    319437

Metrics saved to ../ml/metrics.txt

Top 10 Words Driving POSITIVE Sentiment:
['proud', 'pleasure', 'blessed', 'smile', 'thanks', 'thank', 'welcome', 'congratulations', 'smiling', 'wish luck']

Top 10 Words Driving NEGATIVE Sentiment:
['sad', 'bummed', 'sadly', 'gutted', 'poor', 'sick', 'unfortunately', 'missing', 'sucks', 'disappointed']


# Step A4: Persist the Trained ML Pipeline

"How can Spark later load and use the exact same model that was trained offline?"

We must save the **entire inference pipeline**, not just the classifier. This includes:
1.  **TF-IDF Vectorizer**: To transform raw text into the exact same numerical features (same vocabulary, same index).
2.  **Logistic Regression Classifier**: To predict sentiment from those features.

We will bundle these into a single dictionary and serialize it to `sentiment_model.pkl`.

In [9]:
import joblib
import os

model_pipeline = {
    "vectorizer": vectorizer,
    "classifier": log_reg,
    "target_names": ["Negative", "Neutral", "Positive"]
}

model_dir = "../ml"
os.makedirs(model_dir, exist_ok=True)
model_path = os.path.join(model_dir, "sentiment_model.pkl")

print(f"Saving pipeline to {model_path}...")
joblib.dump(model_pipeline, model_path)

print("Model saved successfully.")

Saving pipeline to ../ml/sentiment_model.pkl...
Model saved successfully.
