1. Data Ingestion & Cleaning Monitoring

In [0]:
!pip show py4j


Name: py4j
Version: 0.10.9.9
Summary: Enables Python programs to dynamically access arbitrary Java objects
Home-page: https://www.py4j.org/
Author: Barthelemy Dagenais
Author-email: barthelemy@infobart.com
License: BSD License
Location: /local_disk0/.ephemeral_nfs/envs/pythonEnv-aaa948bb-df6c-4543-b569-665a46d7eef7/lib/python3.11/site-packages
Requires: 
Required-by: databricks-connect, pyspark


In [0]:
# %pip install emoji langdetect langcodes iso-639 requests psutil pyspark

import requests
import pandas as pd
import emoji
import psutil
import time
from datetime import datetime, timedelta
from langdetect import detect, DetectorFactory
from iso639 import languages
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()
DetectorFactory.seed = 42  # consistent lang detect

# Evaluation log storage
eval_log = []

def log_evaluation(step, start_time, notes=""):
    cpu = psutil.cpu_percent(interval=1)
    mem = psutil.virtual_memory().used / (1024 ** 3)
    duration = round(time.time() - start_time, 2)
    eval_log.append({
        "Step": step,
        "Time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "CPU_Usage_Percent": cpu,           
        "Memory_Used_GB": round(mem, 2),   
        "Duration_Sec": duration,
        "Notes": notes
    })

# Constants and URLs
container_url = "https://wqd7007.blob.core.windows.net/bronze-webscrape"
sas_token = ("sv=2024-11-04&ss=bfqt&srt=sco&sp=rwdlacupiytfx&se=2026-06-30T20:37:08Z"
             "&st=2025-05-18T12:37:08Z&spr=https&sig=ztkYcXLKHQ9nC5CE3PThs1OY%2FTDHHzSZ8JD4J6JUc1s%3D")

# Date range for loading daily files
start_date = datetime(2024, 5, 1)
end_date = datetime(2025, 5, 18)
date_range = [(start_date + timedelta(days=i)).strftime('%Y%m%d') for i in range((end_date - start_date).days + 1)]

all_data = []
loaded_files = 0
failed_files = 0

# Unified load start time for all files
overall_start_time = time.time()

# Load combined file first
combined_filename = "agoda_reviews_20240501_to_20250518.json"
combined_url = f"{container_url}/{combined_filename}?{sas_token}"

response = requests.get(combined_url)
if response.status_code == 200:
    try:
        data = response.json()
        if isinstance(data, list) and data:
            all_data.extend(data)
        loaded_files += 1
    except Exception as e:
        failed_files += 1
else:
    failed_files += 1

# Load daily files one by one 
for date_str in date_range:
    # Uncomment below if want to skip combined file date range to avoid duplicates
    # if "20240501" <= date_str <= "20250518":
    #     continue

    file_url = f"{container_url}/agoda_reviews_{date_str}.json?{sas_token}"
    try:
        response = requests.get(file_url)
        if response.status_code == 200:
            data = response.json()
            if isinstance(data, list) and data:
                all_data.extend(data)
                loaded_files += 1
            else:
                failed_files += 1
        else:
            failed_files += 1
    except Exception:
        failed_files += 1

total_load_duration = time.time() - overall_start_time
cpu = psutil.cpu_percent(interval=1)
mem = psutil.virtual_memory().used / (1024 ** 3)
log_evaluation("File Loading Summary", overall_start_time,
               f"Files loaded: {loaded_files}, failed: {failed_files}, total records: {len(all_data)}, total load time: {total_load_duration:.2f}s")

# DataFrame creation and cleaning
start_time = time.time()
df = pd.DataFrame(all_data)
if df.empty or "content" not in df.columns:
    raise ValueError("No valid review data found.")

df.drop(columns=['userImage', 'reviewCreatedVersion'], errors='ignore', inplace=True)

def remove_emojis(text):
    return emoji.replace_emoji(str(text), replace='').strip()

df['content_no_emojis'] = df['content'].apply(remove_emojis)

def detect_language(text):
    try:
        return detect(str(text))
    except:
        return "Unknown"

df['language_code'] = df['content_no_emojis'].astype(str).apply(detect_language)

def detect_language_name(code):
    try:
        return languages.get(part1=code).name
    except:
        return "Unknown"

df['language'] = df['language_code'].apply(detect_language_name)

# Heuristic English correction
english_terms = ['amazing','awesome','great','good','love','nice','perfect','helpful','worst','bad','excellent','recommended']
not_en = df["language_code"] != "en"
for term in english_terms:
    df.loc[not_en & df['content_no_emojis'].str.lower().str.contains(term, na=False), 'language_code'] = 'en'

df['language'] = df['language_code'].apply(detect_language_name)

df_cleaned = df[df['language'] != "Unknown"].reset_index(drop=True)
log_evaluation("Clean & Detect Language", start_time, f"Cleaned records: {len(df_cleaned)}")

# Data quality metrics
raw_count = len(all_data)
cleaned_count = len(df_cleaned)
dropped_count = raw_count - cleaned_count
duplicate_count = df_cleaned.duplicated().sum()
missing_content_count = df_cleaned['content'].isnull().sum()
lang_dist = df_cleaned['language'].value_counts()

print(f"Data Quality Report:")
print(f"  Total raw records: {raw_count}")
print(f"  Records after cleaning: {cleaned_count}")
print(f"  Dropped records: {dropped_count}")
print(f"  Duplicate records: {duplicate_count}")
print(f"  Missing 'content' records: {missing_content_count}")
print("\nLanguage distribution:")
print(lang_dist)

log_evaluation("Data Quality Monitoring", time.time(),
               f"Raw: {raw_count}, Cleaned: {cleaned_count}, Dropped: {dropped_count}, "
               f"Duplicates: {duplicate_count}, Missing content: {missing_content_count}")

# Save cleaned data to Delta table
start_time = time.time()
spark_df = spark.createDataFrame(df_cleaned)
spark.sql("CREATE SCHEMA IF NOT EXISTS evaluation")
spark_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("evaluation.silver_reviews_copy")
log_evaluation("Save to Evaluation Table", start_time, "Saved to evaluation.silver_reviews_copy")

# Show evaluation summary table
eval_summary_df = pd.DataFrame(eval_log)

# Clean column names for Delta
eval_summary_df.columns = eval_summary_df.columns.str.replace(r'[ ,;{}()\n\t=%]', '_', regex=True)

display(spark.createDataFrame(eval_summary_df))

# Save evaluation logs to CSV and Delta table
eval_df = eval_summary_df.copy()
eval_df.to_csv("evaluation_log.csv", index=False)
spark.createDataFrame(eval_df).write.format("delta").mode("overwrite").saveAsTable("evaluation.pipeline_logs")

print("Evaluation logs saved to CSV and Delta table.")


Data Quality Report:
  Total raw records: 11496
  Records after cleaning: 11387
  Dropped records: 109
  Duplicate records: 0
  Missing 'content' records: 0

Language distribution:
English                    9775
Indonesian                  423
Slovak                      131
Afrikaans                   125
Tagalog                     105
French                      101
Italian                      86
Romanian                     66
Danish                       63
Norwegian                    59
German                       58
Spanish                      43
Dutch                        38
Somali                       36
Welsh                        30
Estonian                     30
Arabic                       26
Catalan                      24
Polish                       23
Portuguese                   21
Vietnamese                   21
Thai                         16
Finnish                      12
Swedish                      11
Croatian                     11
Swahili (macrolangu

Step,Time,CPU_Usage_Percent,Memory_Used_GB,Duration_Sec,Notes
File Loading Summary,2025-06-06 22:08:34,31.1,11.0,390.46,"Files loaded: 1, failed: 383, total records: 11496, total load time: 388.46s"
Clean & Detect Language,2025-06-06 22:09:20,21.9,11.0,45.68,Cleaned records: 11387
Data Quality Monitoring,2025-06-06 22:09:21,28.5,11.01,1.0,"Raw: 11496, Cleaned: 11387, Dropped: 109, Duplicates: 0, Missing content: 0"
Save to Evaluation Table,2025-06-06 22:09:25,30.1,11.05,3.67,Saved to evaluation.silver_reviews_copy


Evaluation logs saved to CSV and Delta table.


2. Sentiment Analysis Evaluation

In [0]:
# Install required package 
%pip install nltk psutil

import nltk
import time
import psutil
from datetime import datetime
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from pyspark.sql.functions import udf, when
from pyspark.sql.types import *

# Download VADER lexicon 
nltk.download('vader_lexicon')

# Initialize VADER sentiment analyzer
analyser = SentimentIntensityAnalyzer()

def sentiment_analyzer_scores(sentence):
    return analyser.polarity_scores(sentence)

# Define Spark UDF for sentiment scoring
sentiment_udf = udf(lambda x: sentiment_analyzer_scores(str(x)), MapType(StringType(), FloatType()))

start_time = time.time()

# Load cleaned review data from your independent evaluation table
df_sentiment = spark.sql("SELECT * FROM evaluation.silver_reviews_copy")

# Apply VADER sentiment analysis UDF
df_sentiment = df_sentiment.withColumn('vader_scores', sentiment_udf(df_sentiment['content_no_emojis']))
df_sentiment = df_sentiment.withColumn('compound', df_sentiment['vader_scores']['compound'])

# Classify sentiment based on compound score thresholds
df_sentiment = df_sentiment.withColumn(
    'sentiment',
    when(df_sentiment['compound'] >= 0.05, 'Positive')
    .when(df_sentiment['compound'] <= -0.05, 'Negative')
    .otherwise('Neutral')
)

# Drop intermediate score columns for clean output
df_sentiment = df_sentiment.drop('vader_scores', 'compound')

# Performance monitoring: record duration, CPU, and memory usage
duration = round(time.time() - start_time, 2)
cpu = psutil.cpu_percent(interval=1)
mem = psutil.virtual_memory().used / (1024 ** 3)

print(f"Sentiment Analysis completed in {duration} sec, CPU usage: {cpu}%, Memory used: {mem:.2f} GB")

# Prepare evaluation log
eval_log = [{
    "Step": "Sentiment Analysis",
    "Time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    "CPU_Usage_Percent": cpu,
    "Memory_Used_GB": round(mem, 2),
    "Duration_Sec": duration,
    "Notes": f"Processed {df_sentiment.count()} records"
}]

# Save sentiment results to your independent evaluation sentiment table
df_sentiment.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("evaluation.silver_reviews_sentiment")

# Save evaluation logs to a separate table for monitoring
eval_df = spark.createDataFrame(eval_log)
eval_df.write.format("delta").mode("overwrite").saveAsTable("evaluation.sentiment_analysis_logs")

print("Sentiment analysis results and evaluation logs saved to the evaluation database.")


[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


[nltk_data] Downloading package vader_lexicon to /home/spark-aaa948bb-
[nltk_data]     df6c-4543-b569-66/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


Sentiment Analysis completed in 33.82 sec, CPU usage: 27.5%, Memory used: 10.84 GB
Sentiment analysis results and evaluation logs saved to the evaluation database.


3. Model Training Evaluation

In [0]:
%sql
DESCRIBE TABLE evaluation.silver_reviews_copy;



col_name,data_type,comment
reviewId,string,
userName,string,
content,string,
score,bigint,
thumbsUpCount,bigint,
at,string,
replyContent,string,
repliedAt,string,
appVersion,string,
content_no_emojis,string,


In [0]:
import time
import psutil
from datetime import datetime
import pandas as pd
from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder.getOrCreate()

# Initialize evaluation log
eval_log = []

# Step 1: Data Loading
start_time = time.time()
df = spark.sql("SELECT content_no_emojis, sentiment FROM silver_dataprocessing.default.silver_agoda_reviews_details_2")
pdf = df.toPandas()
load_duration = round(time.time() - start_time, 2)
eval_log.append({
    "Step": "Load Data",
    "Time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    "CPU_Usage_Percent": psutil.cpu_percent(interval=1),
    "Memory_Used_GB": round(psutil.virtual_memory().used / (1024 ** 3), 2),
    "Duration_Sec": load_duration,
    "Notes": f"Loaded {len(pdf)} rows"
})

# Step 2: TF-IDF Vectorization
from sklearn.feature_extraction.text import TfidfVectorizer
vector_start = time.time()
vectorizer = TfidfVectorizer(max_features=1000)
X = vectorizer.fit_transform(pdf["content_no_emojis"].astype(str))
vector_duration = round(time.time() - vector_start, 2)
eval_log.append({
    "Step": "TF-IDF Vectorization",
    "Time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    "CPU_Usage_Percent": psutil.cpu_percent(interval=1),
    "Memory_Used_GB": round(psutil.virtual_memory().used / (1024 ** 3), 2),
    "Duration_Sec": vector_duration,
    "Notes": f"TF-IDF shape: {X.shape}"
})

# Step 3: Label Encoding
label_map = {"Positive": 1, "Neutral": 0, "Negative": 2}
y = pdf["sentiment"].map(label_map)

# Step 4: Split data
from sklearn.model_selection import train_test_split
split_start = time.time()
pdf = pdf.sample(frac=1, random_state=42).reset_index(drop=True)  # shuffle for fairness
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=False)
split_duration = round(time.time() - split_start, 2)
eval_log.append({
    "Step": "Train-Test Split",
    "Time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    "CPU_Usage_Percent": psutil.cpu_percent(interval=1),
    "Memory_Used_GB": round(psutil.virtual_memory().used / (1024 ** 3), 2),
    "Duration_Sec": split_duration,
    "Notes": f"Train={X_train.shape[0]}, Test={X_test.shape[0]}"
})

# Step 5: Model Training & Evaluation
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import MultinomialNB
from sklearn.svm import LinearSVC
from sklearn.metrics import accuracy_score, classification_report

models = {
    "Logistic Regression": LogisticRegression(C=1.0, penalty='l2', solver='liblinear', max_iter=1000),
    "Random Forest": RandomForestClassifier(n_estimators=150, max_depth=15, min_samples_split=5, random_state=42),
    "Naive Bayes": MultinomialNB(alpha=0.5),
    "Linear SVM": LinearSVC(C=0.8, max_iter=2000)
}

best_accuracy = 0
best_model = None
best_model_name = ""
best_report = ""

for name, model in models.items():
    model_start = time.time()
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    report = classification_report(y_test, y_pred, zero_division=0)

    print(f"=== {name} ===")
    print("Accuracy:", acc)
    print(report)
    print("-" * 50)

    eval_log.append({
        "Step": f"Train & Evaluate {name}",
        "Time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "CPU_Usage_Percent": psutil.cpu_percent(interval=1),
        "Memory_Used_GB": round(psutil.virtual_memory().used / (1024 ** 3), 2),
        "Duration_Sec": round(time.time() - model_start, 2),
        "Notes": f"Accuracy: {acc:.4f}"
    })

    if acc > best_accuracy:
        best_accuracy = acc
        best_model_name = name
        best_model = model
        best_report = report

# Step 6: Write evaluation logs to Delta table
try:
    eval_df = spark.createDataFrame(pd.DataFrame(eval_log))
    eval_df.write.format("delta").mode("overwrite").saveAsTable("evaluation.model_training_logs")
    print("Evaluation logs written to Delta table: evaluation.model_training_logs")
except Exception as e:
    print(f"Failed to save evaluation logs: {e}")




=== Logistic Regression ===
Accuracy: 0.8964458095655989
              precision    recall  f1-score   support

           0       0.78      0.84      0.81       336
           1       0.96      0.93      0.94      1497
           2       0.80      0.84      0.82       446

    accuracy                           0.90      2279
   macro avg       0.85      0.87      0.86      2279
weighted avg       0.90      0.90      0.90      2279

--------------------------------------------------
=== Random Forest ===
Accuracy: 0.759104870557262
              precision    recall  f1-score   support

           0       0.63      0.04      0.07       336
           1       0.77      0.93      0.84      1497
           2       0.73      0.74      0.73       446

    accuracy                           0.76      2279
   macro avg       0.71      0.57      0.55      2279
weighted avg       0.74      0.76      0.71      2279

--------------------------------------------------
=== Naive Bayes ===
Accuracy:



=== Linear SVM ===
Accuracy: 0.9021500658183413
              precision    recall  f1-score   support

           0       0.79      0.87      0.83       336
           1       0.95      0.93      0.94      1497
           2       0.83      0.83      0.83       446

    accuracy                           0.90      2279
   macro avg       0.86      0.88      0.87      2279
weighted avg       0.90      0.90      0.90      2279

--------------------------------------------------
✅ Evaluation logs written to Delta table: evaluation.model_training_logs


In [0]:

eval_logs_df = spark.sql("SELECT * FROM evaluation.model_training_logs ORDER BY Time DESC LIMIT 20")
display(eval_logs_df)

Step,Time,CPU_Usage_Percent,Memory_Used_GB,Duration_Sec,Notes
Train & Evaluate Linear SVM,2025-06-06 22:34:29,19.3,10.89,1.07,Accuracy: 0.9022
Train & Evaluate Naive Bayes,2025-06-06 22:34:28,24.4,10.88,1.01,Accuracy: 0.8109
Train & Evaluate Random Forest,2025-06-06 22:34:27,15.9,10.89,2.9,Accuracy: 0.7591
Train & Evaluate Logistic Regression,2025-06-06 22:34:24,18.3,10.89,1.08,Accuracy: 0.8964
Train-Test Split,2025-06-06 22:34:23,17.9,10.89,0.0,"Train=9116, Test=2279"
TF-IDF Vectorization,2025-06-06 22:34:22,20.4,10.89,0.16,"TF-IDF shape: (11395, 1000)"
Load Data,2025-06-06 22:34:21,20.9,10.89,1.03,Loaded 11395 rows


4. Pipeline Monitoring & Evaluation Dashboard

In [0]:
pipeline_logs = spark.sql("SELECT * FROM evaluation.pipeline_logs ORDER BY Time DESC LIMIT 100")
sentiment_logs = spark.sql("SELECT * FROM evaluation.sentiment_analysis_logs ORDER BY Time DESC LIMIT 50")
training_logs = spark.sql("SELECT * FROM evaluation.model_training_logs ORDER BY Time DESC LIMIT 100")


In [0]:
import matplotlib.pyplot as plt
import pandas as pd

data = {
    "Model": ["Logistic Regression", "Random Forest", "Naive Bayes", "Linear SVM"],
    "Accuracy": [0.8964, 0.7591, 0.8109, 0.9022]
}

df = pd.DataFrame(data)

colors = {
    "Logistic Regression": "#1f77b4",  
    "Random Forest": "#ff7f0e",        
    "Naive Bayes": "#2ca02c",          
    "Linear SVM": "#d62728"            
}

df["Color"] = df["Model"].map(colors)

plt.figure(figsize=(10, 6))
bars = plt.bar(df["Model"], df["Accuracy"], color=df["Color"])

for bar in bars:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2, height + 0.01, f'{height:.4f}',
             ha='center', va='bottom', fontsize=10)

plt.title("Model Accuracy Comparison", fontsize=14)
plt.ylabel("Accuracy")
plt.ylim(0, 1)
plt.xticks(rotation=15)
plt.grid(axis='y', linestyle='--', alpha=0.4)
plt.tight_layout()
plt.show()