In [None]:
import os
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random
import json

# PySpark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.sql.functions import expr

from pyspark.sql.functions import col

# BigQuery imports
# Install required libraries
!pip install google-cloud-bigquery pandas db-dtypes
!pip install -q pandas-gbq google-cloud-bigquery

from google.cloud import bigquery
import pandas as pd

PROJECT_ID = "sentiment-analysis-a"
DATASET = "outputs"

client = bigquery.Client(project=PROJECT_ID)

def pandas_to_bq(
    pdf: pd.DataFrame,
    table_name: str,
    if_exists: str = "replace"  # "append" or "replace"
):
    table_id = f"{PROJECT_ID}.{DATASET}.{table_name}"

    job_config = bigquery.LoadJobConfig(
        write_disposition={
            "replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
            "append": bigquery.WriteDisposition.WRITE_APPEND
        }[if_exists],
        autodetect=True
    )

    job = client.load_table_from_dataframe(
        pdf,
        table_id,
        job_config=job_config
    )
    job.result()

    print(f"✓ Loaded {len(pdf):,} rows → {table_id}")


def spark_df_to_bq(
    spark_df,
    table_name,
    write_mode="replace",   # "replace" or "append"
    max_rows=2_000_000
):


    count = spark_df.count()
    print(f"Uploading {count:,} rows → {table_name}")

    if count > max_rows:
        raise ValueError(
            f"Too many rows ({count:,}). "
            f"Sample or aggregate before upload."
        )

    pdf = spark_df.toPandas()

    pdf.to_gbq(
        destination_table=f"{DATASET}.{table_name}",
        project_id=PROJECT_ID,
        if_exists=write_mode
    )

    print(f"✓ Uploaded to BigQuery: {DATASET}.{table_name}")



# Import libraries
from google.cloud import bigquery
from google.colab import auth
import pandas as pd

print("=== Sentiment Analysis at Scale: Customer Feedback Pipeline ===")
print("Initializing Spark Session with optimized configuration...")

# Initialize Spark Session with performance optimizations
spark = SparkSession.builder \
    .appName("SentimentAnalysisAtScale") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.default.parallelism", "100") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print(f"✓ Spark {spark.version} initialized successfully")
print(f"✓ Available cores: {spark.sparkContext.defaultParallelism}")

# ============================================================================
# PHASE 1: DATA INGESTION & EXPANSION (SPARK-NATIVE)
# ============================================================================

print("\n=== Loading original reviews.csv into Spark ===")

spark_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("reviews_37k_eng.csv")

spark_df = spark_df.filter(
    expr("try_cast(review_ts as date) IS NOT NULL OR review_ts IS NULL")
)


base_count = spark_df.count()
target_rows = 10_000_000
multiplier = (target_rows // base_count) + 1

print(f"✓ Loaded {base_count:,} original reviews")
print(f"✓ Expanding to {target_rows:,} records using Spark")

# Generate multiplier DataFrame
replication_df = spark.range(multiplier)

expanded_df = (
    spark_df
    .crossJoin(replication_df)
    .withColumn(
        "review_id",
        concat_ws(
            "-",
            lit("rev"),
            col("id"),
            monotonically_increasing_id()
        )
    )
    .withColumn(
        "review_ts_clean",
        to_date(col("review_ts"))
    )
    .withColumn(
        "review_ts",
        date_add(
            coalesce(col("review_ts_clean"), current_date()),
            - (rand() * 730).cast("int")
        )
    )
    .withColumn(
        "stars",
        when(
            rand() < 0.2,
            greatest(
                lit(1),
                least(
                    lit(5),
                    col("stars") + when(rand() < 0.5, -1).otherwise(1)
                )
            )
        ).otherwise(col("stars"))
    )
    .drop("review_ts_clean", "id")
    .limit(target_rows)
)

print("✓ Dataset expanded")


# NULL SANITIZATION (FIX BLANKS BEFORE TEXT GENERATION)

expanded_df = expanded_df \
    .withColumn(
        "stars",
        when(col("stars").isNull(), lit(3)).otherwise(col("stars"))
    ) \
    .withColumn(
        "review_type",
        when(col("review_type").isNull(), lit("product")).otherwise(col("review_type"))
    )

# INTELLIGENT TEXT FILLING
expanded_df = expanded_df \
    .withColumn(
        "review_text_eng",
        when(col("review_text_eng").isNull() | (col("review_text_eng") == ""),
            when(col("stars") >= 4, lit("Excellent product, very satisfied."))
            .when(col("stars") == 3, lit("Average product, acceptable quality."))
            .otherwise(lit("Disappointed with product quality."))
        ).otherwise(col("review_text_eng"))
    ) \
    .withColumn(
        "review_title_eng",
        when(col("review_title_eng").isNull() | (col("review_title_eng") == ""),
            when(col("stars") >= 4, lit("Great purchase"))
            .when(col("stars") == 3, lit("Okay"))
            .otherwise(lit("Not recommended"))
        ).otherwise(col("review_title_eng"))
    )

# INTELLIGENT TEXT FILLING (SPARK)

expanded_df = expanded_df \
    .withColumn(
        "stars",
        when(col("stars").isNull(), lit(3)).otherwise(col("stars"))
    ) \
    .withColumn(
        "review_type",
        when(col("review_type").isNull(), lit("product"))
        .otherwise(col("review_type"))
    ) \
    .withColumn(
        "review_text_eng",
        when(col("review_text_eng").isNull() | (col("review_text_eng") == ""),
            when(col("stars") >= 4, lit("Excellent product, very satisfied."))
            .when(col("stars") == 3, lit("Average product, acceptable quality."))
            .otherwise(lit("Disappointed with product quality."))
        ).otherwise(col("review_text_eng"))
    ) \
    .withColumn(
        "review_title_eng",
        when(col("review_title_eng").isNull() | (col("review_title_eng") == ""),
            when(col("stars") >= 4, lit("Great purchase"))
            .when(col("stars") == 3, lit("Okay"))
            .otherwise(lit("Not recommended"))
        ).otherwise(col("review_title_eng"))
    )


# Repartition + cache
expanded_df = expanded_df.repartition(200).cache()
expanded_df.count()

print(f"✓ Final record count: {expanded_df.count():,}")
print(f"✓ Partitions: {expanded_df.rdd.getNumPartitions()}")
print("\n✓ Phase 1 Complete: Data Ingestion & Expansion")
print("=" * 80)


# Cache for performance
spark_df.cache()
print(f"✓ Data loaded and cached in Spark")
print(f"  Total records: {spark_df.count():,}")
print(f"  Partitions: {spark_df.rdd.getNumPartitions()}")

# Show sample
print("\n=== Sample Data ===")
spark_df.show(5, truncate=50)

# Data profiling
print("\n=== Data Profiling ===")
spark_df.printSchema()
print("\nSummary Statistics (raw):")
spark_df.select('stars').summary().show()

print("\nNull value counts (raw):")
null_counts = spark_df.select([count(when(col(c).isNull(), c)).alias(c) for c in spark_df.columns])
null_counts.show()

# Data profiling
print("\n=== Data Profiling ===")
expanded_df.printSchema()
print("\nSummary Statistics (extanded data):")
expanded_df.select('stars').summary().show()

print("\nNull value counts (extanded data):")
null_counts = expanded_df.select([count(when(col(c).isNull(), c)).alias(c) for c in expanded_df.columns])
null_counts.show()


(
    expanded_df
    .repartition(20)
    .write
    .mode("overwrite")
    .option("header", "true")
    .csv("reviews_expanded_10M_20parts")
)



print("\n✓ Phase 1 Complete: Data Ingestion & Expansion")
print("=" * 80)


#================ SAVING TO BIGQUERY =========================

# Authenticate with Google Cloud
auth.authenticate_user()

# Initialize BigQuery client
project_id = 'sentiment-analysis-a'  # Replace with your GCP project ID
client = bigquery.Client(project=project_id)

# Define BigQuery dataset and table
dataset_id = 'outputs'  # Replace with your dataset name
table_id = 'spark_df'      # Replace with your table name
full_table_id = f'{project_id}.{dataset_id}.{table_id}'

# Configure the load job
job_config = bigquery.LoadJobConfig(
    write_disposition='WRITE_TRUNCATE',  # Options: WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY
    autodetect=True,  # Auto-detect schema
)

# Convert PySpark DataFrame to Pandas DataFrame
pandas_df = spark_df.toPandas()

# Load DataFrame to BigQuery
job = client.load_table_from_dataframe(pandas_df, full_table_id, job_config=job_config)
job.result()  # Wait for the job to complete

print(f'Successfully loaded {job.output_rows} rows to {full_table_id}')

=== Sentiment Analysis at Scale: Customer Feedback Pipeline ===
Initializing Spark Session with optimized configuration...
✓ Spark 4.0.1 initialized successfully
✓ Available cores: 100

=== Loading original reviews.csv into Spark ===
✓ Loaded 39,160 original reviews
✓ Expanding to 10,000,000 records using Spark
✓ Dataset expanded
✓ Final record count: 10,000,000
✓ Partitions: 200

✓ Phase 1 Complete: Data Ingestion & Expansion
✓ Data loaded and cached in Spark
  Total records: 39,160
  Partitions: 2

=== Sample Data ===
+--------+-----------+----------------------------------------+----------+-----+--------------------------------------------------+-----------------------------------------+
|   brand|review_type|                               review_id| review_ts|stars|                                   review_text_eng|                         review_title_eng|
+--------+-----------+----------------------------------------+----------+-----+----------------------------------------------

In [None]:
spark.sparkContext.getConf().get("spark.jars.packages")

In [16]:
# =============================================================================
# PHASE 2: REAL-TIME STREAMING INGESTION - FULL 1M ROWS TO BIGQUERY
# =============================================================================

import os
import time
import threading
import shutil
import builtins
from datetime import datetime

import pandas as pd

from pyspark.sql.types import *
from pyspark.sql.functions import col, when, length

print("\n" + "=" * 80)
print("=== PHASE 2: STREAMING 1M REVIEWS TO BIGQUERY ===")
print("=" * 80)


# =============================================================================
# STREAMING SCHEMA
# =============================================================================
streaming_schema = StructType([
    StructField("brand", StringType(), True),
    StructField("review_type", StringType(), True),
    StructField("review_id", StringType(), True),
    StructField("review_ts", StringType(), True),
    StructField("stars", IntegerType(), True),
    StructField("review_text_eng", StringType(), True),
    StructField("review_title_eng", StringType(), True),
    StructField("processing_time", TimestampType(), True)
])


# =============================================================================
# DIRECTORIES
# =============================================================================
streaming_dir = "/content/streaming_reviews"
checkpoint_dir = "/content/streaming_checkpoint"

for d in [streaming_dir, checkpoint_dir]:
    os.makedirs(d, exist_ok=True)

# Always reset checkpoint for notebook runs
shutil.rmtree(checkpoint_dir, ignore_errors=True)
os.makedirs(checkpoint_dir, exist_ok=True)

print("✓ Created streaming directories")


# =============================================================================
# PREPARE 1M ROWS
# =============================================================================
print("\n=== Preparing 1M rows for streaming ===")

expanded_pd = expanded_df.select(
    "brand",
    "review_type",
    "review_id",
    "review_ts",
    "stars",
    "review_text_eng",
    "review_title_eng"
).limit(1_000_000).toPandas()

print(f"✓ Converted {len(expanded_pd):,} rows to pandas")


# =============================================================================
# STREAMING DATA GENERATOR
# =============================================================================
def generate_streaming_reviews_full(batch_size=1000, interval=2):
    print("\n=== Starting Full Dataset Streaming ===")

    total_batches = (len(expanded_pd) + batch_size - 1) // batch_size
    start_time = time.time()

    for batch_num in range(total_batches):
        start_idx = batch_num * batch_size
        end_idx = builtins.min(start_idx + batch_size, len(expanded_pd))

        batch = expanded_pd.iloc[start_idx:end_idx].copy()
        now = datetime.now()

        batch["review_ts"] = now.strftime("%Y-%m-%d %H:%M:%S")
        batch["processing_time"] = now

        batch.to_json(
            f"{streaming_dir}/batch_{batch_num:05d}.json",
            orient="records",
            lines=True
        )

        if (batch_num + 1) % 10 == 0:
            elapsed = time.time() - start_time
            print(
                f"  {end_idx:,}/{len(expanded_pd):,} rows | "
                f"{end_idx / elapsed:.0f} rows/sec"
            )

        time.sleep(interval)

    print("✓ Streaming generator finished")


# =============================================================================
# START STREAMING GENERATOR THREAD
# =============================================================================
streaming_thread = threading.Thread(
    target=generate_streaming_reviews_full,
    daemon=True
)
streaming_thread.start()

time.sleep(5)


# =============================================================================
# SPARK STREAMING SETUP
# =============================================================================
print("\n=== Setting up Spark Structured Streaming ===")

streaming_df = spark.readStream \
    .schema(streaming_schema) \
    .option("maxFilesPerTrigger", 2) \
    .json(streaming_dir)

print("✓ Streaming DataFrame created")


# =============================================================================
# REAL-TIME TRANSFORMATIONS
# =============================================================================
streaming_processed = (
    streaming_df
    .withColumn("text_length", length(col("review_text_eng")))
    .withColumn(
        "sentiment_label",
        when(col("stars") >= 4, "Positive")
        .when(col("stars") <= 2, "Negative")
        .otherwise("Neutral")
    )
)


# =============================================================================
# FOREACH-BATCH BIGQUERY WRITER (CORRECT WAY)
# =============================================================================
def write_batch_to_bigquery(batch_df, batch_id):
    """
    Executed exactly once per micro-batch by Spark
    """
    if batch_df.count() == 0:
        return

    pdf = batch_df.toPandas()

    pandas_to_bq(
        pdf,
        table_name="phase2_streaming_reviews_full",
        if_exists="append"
    )

    print(f"✓ BQ batch {batch_id}: wrote {len(pdf):,} rows")


# =============================================================================
# START STREAMING QUERY
# =============================================================================
query = (
    streaming_processed.writeStream
    .foreachBatch(write_batch_to_bigquery)
    .outputMode("append")
    .option("checkpointLocation", checkpoint_dir)
    .start()
)

print("✓ Streaming query started")


# =============================================================================
# MONITOR PROGRESS
# =============================================================================
print("\n=== Monitoring Stream Progress ===")

while streaming_thread.is_alive():
    time.sleep(15)
    print("  Streaming still running...")

streaming_thread.join()
query.awaitTermination(timeout=60)


# =============================================================================
# COMBINE BATCH + STREAMING DATA
# =============================================================================
print("\n=== Combining Batch and Streaming Data ===")

combined_df = spark_df.unionByName(
    final_streaming_df.select(spark_df.columns)
)

combined_df.cache()



print(f"✓ Combined dataset: {combined_df.count():,} total reviews")
print(f"  Batch data: {spark_df.count():,}")
print(f"  Streaming data: {streaming_count:,}")

print("\n✓ Phase 2 Complete: Streaming Integration")
print("=" * 80)


print("\n Full Dataset Streaming to BigQuery")
print("=" * 80)



=== PHASE 2: STREAMING 1M REVIEWS TO BIGQUERY ===
✓ Created streaming directories

=== Preparing 1M rows for streaming ===
✓ Loaded 1,000 rows → sentiment-analysis-a.outputs.phase2_streaming_reviews_full
✓ BigQuery batch 147: 1,000 rows written
✓ Converted 1,000,000 rows to pandas

=== Starting Full Dataset Streaming ===

=== Setting up Spark Structured Streaming ===
✓ Streaming DataFrame created
✓ Streaming query started

=== Monitoring Stream Progress ===
✓ Loaded 2,000 rows → sentiment-analysis-a.outputs.phase2_streaming_reviews_full
✓ BQ batch 0: wrote 2,000 rows
✓ Loaded 2,000 rows → sentiment-analysis-a.outputs.phase2_streaming_reviews_full
✓ BQ batch 1: wrote 2,000 rows
✓ Loaded 2,000 rows → sentiment-analysis-a.outputs.phase2_streaming_reviews_full
✓ BQ batch 2: wrote 2,000 rows
  10,000/1,000,000 rows | 551 rows/sec
✓ Loaded 2,000 rows → sentiment-analysis-a.outputs.phase2_streaming_reviews_full
✓ BQ batch 3: wrote 2,000 rows
  Streaming still running...
✓ Loaded 2,000 rows →

In [None]:
# Data Processing, Cleaning & Feature Engineering Pipeline

print("\n" + "="*80)
print("=== PHASE 3: DATA PROCESSING & FEATURE ENGINEERING ===")
print("="*80)

# ============================================================================
# DATA QUALITY CHECKS
# ============================================================================

print("\n=== Running Data Quality Checks ===")

# Check 1: Missing critical fields
print("\n1. Missing Critical Fields:")
critical_fields = ['brand', 'stars', 'review_text_eng']
for field in critical_fields:
    null_count = combined_df.filter(col(field).isNull()).count()
    null_pct = (null_count / combined_df.count()) * 100
    print(f"  {field}: {null_count:,} nulls ({null_pct:.2f}%)")

# Check 2: Invalid star ratings
print("\n2. Invalid Star Ratings:")
invalid_stars = combined_df.filter((col('stars') < 1) | (col('stars') > 5)).count()
print(f"  Invalid ratings: {invalid_stars:,}")

# Check 3: Empty text reviews
print("\n3. Empty Review Text:")
empty_text = combined_df.filter((col('review_text_eng').isNull()) | (col('review_text_eng') == '')).count()
print(f"  Empty reviews: {empty_text:,}")

# Check 4: Duplicate review IDs
print("\n4. Duplicate Review IDs:")
total_reviews = combined_df.count()
unique_ids = combined_df.select('review_id').distinct().count()
duplicates = total_reviews - unique_ids
print(f"  Duplicates: {duplicates:,}")

# ============================================================================
# DATA CLEANING
# ============================================================================

print("\n=== Data Cleaning Pipeline ===")

# Remove records with missing critical fields
cleaned_df = combined_df.filter(
    col('review_text_eng').isNotNull() &
    (col('review_text_eng') != '') &
    col('stars').isNotNull() &
    col('brand').isNotNull()
)

# Filter valid star ratings
cleaned_df = cleaned_df.filter((col('stars') >= 1) & (col('stars') <= 5))

# Remove duplicates based on review_id
cleaned_df = cleaned_df.dropDuplicates(['review_id'])

print(f"✓ Cleaned data: {cleaned_df.count():,} records")
print(f"  Removed: {combined_df.count() - cleaned_df.count():,} invalid records")

# ============================================================================
# FEATURE ENGINEERING
# ============================================================================

print("\n=== Feature Engineering ===")

# 1. Text-based features
print("\n1. Creating text-based features...")
cleaned_df = cleaned_df.withColumn('text_length', length(col('review_text_eng')))
cleaned_df = cleaned_df.withColumn('word_count', size(split(col('review_text_eng'), ' ')))
cleaned_df = cleaned_df.withColumn('has_title', when(col('review_title_eng').isNotNull(), 1).otherwise(0))

# 2. Sentiment labels (target variable)
print("2. Creating sentiment labels...")
cleaned_df = cleaned_df.withColumn('sentiment_label',
    when(col('stars') >= 4, 'Positive')
    .when(col('stars') <= 2, 'Negative')
    .otherwise('Neutral')
)

# Binary sentiment for some models
cleaned_df = cleaned_df.withColumn('sentiment_binary',
    when(col('stars') >= 4, 1).otherwise(0)
)

# 3. Temporal features
print("3. Creating temporal features...")
cleaned_df = cleaned_df.withColumn('review_date', to_date(col('review_ts')))
cleaned_df = cleaned_df.withColumn('review_year', year(col('review_date')))
cleaned_df = cleaned_df.withColumn('review_month', month(col('review_date')))
cleaned_df = cleaned_df.withColumn('review_quarter', quarter(col('review_date')))
cleaned_df = cleaned_df.withColumn('review_day_of_week', dayofweek(col('review_date')))

# 4. Brand encoding
print("4. Encoding categorical features...")
brand_indexer = StringIndexer(inputCol='brand', outputCol='brand_index')
cleaned_df = brand_indexer.fit(cleaned_df).transform(cleaned_df)

# 5. Review type encoding
review_type_indexer = StringIndexer(inputCol='review_type', outputCol='review_type_index')
cleaned_df = review_type_indexer.fit(cleaned_df).transform(cleaned_df)

# Cache the processed data
cleaned_df.cache()
print(f"\n✓ Feature engineering complete")
print(f"  Total features: {len(cleaned_df.columns)}")

# Show sample with new features
print("\n=== Sample Processed Data ===")
cleaned_df.select(
    'brand', 'stars', 'sentiment_label', 'text_length',
    'word_count', 'review_month', 'brand_index'
).show(10)

# =============================================================================
# EXPLORATORY DATA ANALYSIS (WITH CSV OUTPUTS)
# =============================================================================

print("\n=== Exploratory Data Analysis ===")

# ---------------------------------------------------------------------------
# 1. Sentiment distribution
# ---------------------------------------------------------------------------
sentiment_dist = cleaned_df.groupBy('sentiment_label') \
    .agg(count('*').alias('count')) \
    .withColumn('percentage', col('count') / cleaned_df.count() * 100) \
    .orderBy('sentiment_label')

sentiment_dist.coalesce(1).write.mode("overwrite").csv(
    "phase3_sentiment_distribution.csv",
    header=True
)

# ---------------------------------------------------------------------------
# 2. Top brands
# ---------------------------------------------------------------------------
brand_dist = cleaned_df.groupBy('brand') \
    .agg(
        count('*').alias('review_count'),
        avg('stars').alias('avg_rating')
    ) \
    .orderBy(desc('review_count')) \
    .limit(10)

brand_dist.coalesce(1).write.mode("overwrite").csv(
    "phase3_brand_distribution_top10.csv",
    header=True
)

# ---------------------------------------------------------------------------
# 3. Monthly trends
# ---------------------------------------------------------------------------
monthly_dist = cleaned_df.groupBy('review_year', 'review_month') \
    .agg(
        count('*').alias('count'),
        avg('stars').alias('avg_stars')
    ) \
    .orderBy('review_year', 'review_month')

monthly_dist.coalesce(1).write.mode("overwrite").csv(
    "phase3_monthly_trends.csv",
    header=True
)

# ---------------------------------------------------------------------------
# 4. Text statistics
# ---------------------------------------------------------------------------
text_stats = cleaned_df.select(
    'text_length', 'word_count'
).summary()

text_stats.coalesce(1).write.mode("overwrite").csv(
    "phase3_text_statistics.csv",
    header=True
)

# ---------------------------------------------------------------------------
# 5. Star distribution
# ---------------------------------------------------------------------------
stars_dist = cleaned_df.groupBy('stars') \
    .agg(count('*').alias('count')) \
    .withColumn('percentage', col('count') / cleaned_df.count() * 100) \
    .orderBy('stars')

stars_dist.coalesce(1).write.mode("overwrite").csv(
    "phase3_star_distribution.csv",
    header=True
)

# =============================================================================
# PARTITIONING STRATEGY
# =============================================================================

print("\n=== Implementing Partitioning Strategy ===")

partitioned_df = cleaned_df.repartition(50, 'sentiment_label')
partitioned_df.cache()
partitioned_df.count()

partitioned_df.coalesce(1).write.mode("overwrite").csv(
    "phase3_partitioned_data.csv",
    header=True
)

print("\n✓ Phase 3 Complete: Data Processing & Feature Engineering")
print("=" * 80)

# Final reference
processed_df = partitioned_df


spark_df_to_bq(
    processed_df,
    "phase3_reviews_processed",
    write_mode="replace"
)

spark_df_to_bq(
    sentiment_dist,
    "phase3_sentiment_distribution",
    write_mode="replace"
)

spark_df_to_bq(
    brand_dist,
    "phase3_brand_distribution",
    write_mode="replace"
)

spark_df_to_bq(
    monthly_dist,
    "phase3_monthly_trends",
    write_mode="replace"
)

spark_df_to_bq(
    text_stats,
    "phase3_text_statistics",
    write_mode="replace"
)

spark_df_to_bq(
    stars_dist,
    "phase3_star_distribution",
    write_mode="replace"
)


=== PHASE 3: DATA PROCESSING & FEATURE ENGINEERING ===

=== Running Data Quality Checks ===

1. Missing Critical Fields:
  brand: 0 nulls (0.00%)
  stars: 1,860 nulls (4.52%)
  review_text_eng: 3,868 nulls (9.40%)

2. Invalid Star Ratings:
  Invalid ratings: 0

3. Empty Review Text:
  Empty reviews: 3,868

4. Duplicate Review IDs:
  Duplicates: 15,676

=== Data Cleaning Pipeline ===
✓ Cleaned data: 23,292 records
  Removed: 17,868 invalid records

=== Feature Engineering ===

1. Creating text-based features...
2. Creating sentiment labels...
3. Creating temporal features...
4. Encoding categorical features...

✓ Feature engineering complete
  Total features: 19

=== Sample Processed Data ===
+--------+-----+---------------+-----------+----------+------------+-----------+
|   brand|stars|sentiment_label|text_length|word_count|review_month|brand_index|
+--------+-----+---------------+-----------+----------+------------+-----------+
|Brand NN|    5|       Positive|         16|         4|

  pdf.to_gbq(
100%|██████████| 1/1 [00:00<00:00, 8719.97it/s]


✓ Uploaded to BigQuery: outputs.phase3_reviews_processed
Uploading 3 rows → phase3_sentiment_distribution


  pdf.to_gbq(
100%|██████████| 1/1 [00:00<00:00, 10512.04it/s]


✓ Uploaded to BigQuery: outputs.phase3_sentiment_distribution
Uploading 10 rows → phase3_brand_distribution


  pdf.to_gbq(
100%|██████████| 1/1 [00:00<00:00, 2399.49it/s]


✓ Uploaded to BigQuery: outputs.phase3_brand_distribution
Uploading 82 rows → phase3_monthly_trends


  pdf.to_gbq(
100%|██████████| 1/1 [00:00<00:00, 11397.57it/s]


✓ Uploaded to BigQuery: outputs.phase3_monthly_trends
Uploading 8 rows → phase3_text_statistics


  pdf.to_gbq(
100%|██████████| 1/1 [00:00<00:00, 11814.94it/s]


✓ Uploaded to BigQuery: outputs.phase3_text_statistics
Uploading 5 rows → phase3_star_distribution


  pdf.to_gbq(
100%|██████████| 1/1 [00:00<00:00, 8490.49it/s]

✓ Uploaded to BigQuery: outputs.phase3_star_distribution





In [None]:
# Machine Learning Pipeline - Multi-Model Sentiment Classification

print("\n" + "="*80)
print("=== PHASE 4: MACHINE LEARNING PIPELINE ===")
print("="*80)

from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, Word2Vec
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, NaiveBayes
from pyspark.ml.clustering import KMeans, BisectingKMeans
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml import Pipeline
import time
import builtins # Import builtins to access original Python functions

# ============================================================================
# DATA PREPARATION FOR ML
# ============================================================================

print("\n=== Preparing Data for ML ===")

# Sample for faster training (you can use full dataset if resources allow)
ml_sample_size = 1_000_000  # 1M records for training
current_df_count = processed_df.count()
# Calculate the fraction to sample, ensuring it doesn't exceed 1.0
fraction_to_sample = builtins.min(1.0, ml_sample_size / current_df_count)
ml_df = processed_df.sample(fraction=fraction_to_sample, seed=42)
ml_df = ml_df.cache()

print(f"✓ ML dataset size: {ml_df.count():,} records")

# Convert sentiment labels to numeric indices
label_indexer = StringIndexer(inputCol='sentiment_label', outputCol='label')
ml_df = label_indexer.fit(ml_df).transform(ml_df)

# Split data: 70% train, 15% validation, 15% test
train_df, val_df, test_df = ml_df.randomSplit([0.7, 0.15, 0.15], seed=42)

print(f"\nData splits:")
print(f"  Training:   {train_df.count():,} ({train_df.count()/ml_df.count()*100:.1f}%) {val_df.count()/ml_df.count()*100:.1f}%) ")
print(f"  Validation: {val_df.count():,} ({val_df.count()/ml_df.count()*100:.1f}%) ")
print(f"  Test:       {test_df.count():,} ({test_df.count()/ml_df.count()*100:.1f}%)")

# Cache splits
train_df.cache()
val_df.cache()
test_df.cache()

# ============================================================================
# TEXT PROCESSING PIPELINE
# ============================================================================

print("\n=== Building Text Processing Pipeline ===")

# Tokenization
tokenizer = Tokenizer(inputCol='review_text_eng', outputCol='words')

# Remove stop words
remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')

# TF-IDF vectorization
cv = CountVectorizer(inputCol='filtered_words', outputCol='raw_features', vocabSize=10000)
idf = IDF(inputCol='raw_features', outputCol='tfidf_features')

# Assemble all features
assembler = VectorAssembler(
    inputCols=['tfidf_features', 'text_length', 'word_count', 'brand_index', 'review_type_index'],
    outputCol='features',
    handleInvalid='skip'
)

print("✓ Text processing pipeline created")

# ============================================================================
# MODEL 1: LOGISTIC REGRESSION (Baseline)
# ============================================================================

print("\n=== Model 1: Logistic Regression ===")
start_time = time.time()

# Build pipeline
lr = LogisticRegression(
    featuresCol='features',
    labelCol='label',
    maxIter=10,
    regParam=0.01
)

lr_pipeline = Pipeline(stages=[tokenizer, remover, cv, idf, assembler, lr])

# Train model
print("Training Logistic Regression...")
lr_model = lr_pipeline.fit(train_df)
lr_train_time = time.time() - start_time

# Predictions
lr_train_pred = lr_model.transform(train_df)
lr_test_pred = lr_model.transform(test_df)

# Evaluation
evaluator_multi = MulticlassClassificationEvaluator(
    labelCol='label',
    predictionCol='prediction',
    metricName='accuracy'
)

evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol='label',
    predictionCol='prediction',
    metricName='f1'
)

lr_train_acc = evaluator_multi.evaluate(lr_train_pred)
lr_test_acc = evaluator_multi.evaluate(lr_test_pred)
lr_test_f1 = evaluator_f1.evaluate(lr_test_pred)

print(f"\n✓ Logistic Regression Results:")
print(f"  Training time: {lr_train_time:.2f}s")
print(f"  Training accuracy: {lr_train_acc:.4f}")
print(f"  Test accuracy: {lr_test_acc:.4f}")
print(f"  Test F1-score: {lr_test_f1:.4f}")

# ============================================================================
# MODEL 2: RANDOM FOREST (Ensemble Method)
# ============================================================================

print("\n=== Model 2: Random Forest Classifier ===")
start_time = time.time()

rf = RandomForestClassifier(
    featuresCol='features',
    labelCol='label',
    numTrees=20,
    maxDepth=10,
    seed=42,
    maxBins=100 # Increased maxBins to handle higher cardinality categorical features
)

rf_pipeline = Pipeline(stages=[tokenizer, remover, cv, idf, assembler, rf])

print("Training Random Forest...")
rf_model = rf_pipeline.fit(train_df)
rf_train_time = time.time() - start_time

# Predictions
rf_train_pred = rf_model.transform(train_df)
rf_test_pred = rf_model.transform(test_df)

# Evaluation
rf_train_acc = evaluator_multi.evaluate(rf_train_pred)
rf_test_acc = evaluator_multi.evaluate(rf_test_pred)
rf_test_f1 = evaluator_f1.evaluate(rf_test_pred)

print(f"\n✓ Random Forest Results:")
print(f"  Training time: {rf_train_time:.2f}s")
print(f"  Training accuracy: {rf_train_acc:.4f}")
print(f"  Test accuracy: {rf_test_acc:.4f}")
print(f"  Test F1-score: {rf_test_f1:.4f}")

# Feature importance
rf_classifier = rf_model.stages[-1]
feature_importance = rf_classifier.featureImportances
print(f"\n  Feature importances (top 5): {feature_importance.toArray()[:5]}")

# ============================================================================
# MODEL 3: NAIVE BAYES (Probabilistic)
# ============================================================================

print("\n=== Model 3: Naive Bayes Classifier ===")
start_time = time.time()

nb = NaiveBayes(
    featuresCol='features',
    labelCol='label',
    smoothing=1.0
)

nb_pipeline = Pipeline(stages=[tokenizer, remover, cv, idf, assembler, nb])

print("Training Naive Bayes...")
nb_model = nb_pipeline.fit(train_df)
nb_train_time = time.time() - start_time

# Predictions
nb_test_pred = nb_model.transform(test_df)

# Evaluation
nb_test_acc = evaluator_multi.evaluate(nb_test_pred)
nb_test_f1 = evaluator_f1.evaluate(nb_test_pred)

print(f"\n✓ Naive Bayes Results:")
print(f"  Training time: {nb_train_time:.2f}s")
print(f"  Test accuracy: {nb_test_acc:.4f}")
print(f"  Test F1-score: {nb_test_f1:.4f}")

# ============================================================================
# HYPERPARAMETER TUNING (Random Forest)
# ============================================================================

print("\n=== Hyperparameter Tuning (Random Forest) ==")
start_time = time.time()

# Create parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

# Cross-validator
cv_evaluator = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')

crossval = TrainValidationSplit(
    estimator=rf_pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=cv_evaluator,
    trainRatio=0.8,
    seed=42
)

# Use smaller sample for tuning
tuning_sample = train_df.sample(fraction=0.2, seed=42)
print(f"Tuning on {tuning_sample.count():,} samples...")

cv_model = crossval.fit(tuning_sample)
tuning_time = time.time() - start_time

# Best model predictions
best_model_pred = cv_model.transform(test_df)
best_model_acc = evaluator_multi.evaluate(best_model_pred)
best_model_f1 = evaluator_f1.evaluate(best_model_pred)

print(f"\n✓ Tuned Model Results:")
print(f"  Tuning time: {tuning_time:.2f}s")
print(f"  Best test accuracy: {best_model_acc:.4f}")
print(f"  Best test F1-score: {best_model_f1:.4f}")

# ============================================================================
# CLUSTERING ANALYSIS (Unsupervised)
# ============================================================================

print("\n=== Clustering Analysis (K-Means) ===")
start_time = time.time()

# Prepare features for clustering
clustering_pipeline = Pipeline(stages=[tokenizer, remover, cv, idf])
clustering_features = clustering_pipeline.fit(train_df).transform(train_df)

# Assemble features
clustering_assembler = VectorAssembler(
    inputCols=['raw_features'],
    outputCol='features',
    handleInvalid='skip'
)
clustering_data = clustering_assembler.transform(clustering_features)

# Sample for clustering
clustering_sample = clustering_data.sample(fraction=0.1, seed=42).cache()
print(f"Clustering on {clustering_sample.count():,} samples...")

# K-Means clustering
kmeans = KMeans(k=3, seed=42, featuresCol='features')
kmeans_model = kmeans.fit(clustering_sample)
clustering_time = time.time() - start_time

# Predict clusters
clustered = kmeans_model.transform(clustering_sample)

# Analyze clusters
cluster_analysis = clustered.groupBy('prediction', 'sentiment_label') \
    .agg(count('*').alias('count')) \
    .orderBy('prediction', 'sentiment_label')

print(f"\n✓ Clustering Results:")
print(f"  Training time: {clustering_time:.2f}s")
print(f"  Silhouette score: {kmeans_model.summary.trainingCost:.4f}")
print("\nCluster distribution by sentiment:")
cluster_analysis.show()

# ============================================================================
# MODEL COMPARISON SUMMARY
# ============================================================================

print("\n=== MODEL COMPARISON SUMMARY ===")
print("="*80)

model_results = [
    ("Logistic Regression", lr_model, lr_train_time, lr_test_acc, lr_test_f1),
    ("Random Forest", rf_model, rf_train_time, rf_test_acc, rf_test_f1),
    ("Naive Bayes", nb_model, nb_train_time, nb_test_acc, nb_test_f1),
    ("Tuned Random Forest", cv_model, tuning_time, best_model_acc, best_model_f1)
]

print(f"\n{'Model':<25} {'Train Time':<15} {'Test Acc':<12} {'F1-Score':<12}")
print("-" * 64)

best_model = None
best_f1 = -1

for name, model, train_time, acc, f1 in model_results:
    print(f"{name:<25} {train_time:>10.2f}s     {acc:>8.4f}     {f1:>8.4f}")
    if f1 > best_f1:
        best_f1 = f1
        best_model = model
        best_model_name = name

print(f"\n✓ Best model selected: {best_model_name} (F1 = {best_f1:.4f})")


# ============================================================================
# PATH CONFIGURATION
# ============================================================================

PREDICTION_OUTPUT_PATH = "outputs/full_dataset_predictions"

# Create output directory if needed (local / compatible with Spark submit)
os.makedirs(PREDICTION_OUTPUT_PATH, exist_ok=True)

# ============================================================================
# RUN INFERENCE
# ============================================================================

print("\nRunning inference on full dataset...")

full_predictions = best_model.transform(processed_df)

prediction_df = full_predictions.select(
    "review_id",
    "brand",
    "stars",
    "sentiment_label",
    col("prediction").alias("predicted_label"),
    col("probability").cast(StringType()).alias("probability") # Convert vector to string
)

record_count = prediction_df.count()
print(f"✓ Inference complete on {record_count:,} records")


spark_df_to_bq(
    spark_df,
    "spark_df",
    write_mode="replace"
)


comparison_df = spark.createDataFrame(
    comparison_data,
    ["model_name", "train_time_sec", "test_accuracy", "test_f1"]
)

spark_df_to_bq(
    comparison_df,
    "phase4_model_comparison",
    write_mode="replace"
)

spark_df_to_bq(
    cluster_analysis,
    "phase4_cluster_analysis",
    write_mode="replace"
)


spark_df_to_bq(
    prediction_df,
    "phase4_model_predictions",
    write_mode="replace"
)




print(f"✓ Predictions saved at: {PREDICTION_OUTPUT_PATH}")
print("="*80)



=== PHASE 4: MACHINE LEARNING PIPELINE ===

=== Preparing Data for ML ===
✓ ML dataset size: 23,292 records

Data splits:
  Training:   16,325 (70.1%) 14.7%) 
  Validation: 3,430 (14.7%) 
  Test:       3,537 (15.2%)

=== Building Text Processing Pipeline ===
✓ Text processing pipeline created

=== Model 1: Logistic Regression ===
Training Logistic Regression...

✓ Logistic Regression Results:
  Training time: 25.52s
  Training accuracy: 0.9846
  Test accuracy: 0.9189
  Test F1-score: 0.9121

=== Model 2: Random Forest Classifier ===
Training Random Forest...

✓ Random Forest Results:
  Training time: 65.94s
  Training accuracy: 0.9151
  Test accuracy: 0.9138
  Test F1-score: 0.8729

  Feature importances (top 5): [0.00281224 0.00258494 0.0004981  0.00075189 0.00101027]

=== Model 3: Naive Bayes Classifier ===
Training Naive Bayes...

✓ Naive Bayes Results:
  Training time: 6.45s
  Test accuracy: 0.9005
  Test F1-score: 0.9090

=== Hyperparameter Tuning (Random Forest) ==
Tuning on 3,2

In [None]:
# Performance Analysis & Optimization

print("\n" + "="*80)
print("=== PHASE 6: PERFORMANCE OPTIMIZATION & BENCHMARKING ===")
print("="*80)

import time
import pandas as pd
from pyspark.sql.functions import *
import builtins # Import builtins to access original Python functions

# ============================================================================
# BENCHMARK 1: PARTITIONING IMPACT
# ============================================================================

print("\n=== Benchmark 1: Impact of Partitioning ===")

# Test different partition strategies
partition_configs = [
    ("No Repartition", None),
    ("10 Partitions", 10),
    ("50 Partitions", 50),
    ("100 Partitions", 100),
    ("200 Partitions", 200)
]

benchmark_results = []

# Sample data for testing
test_df = processed_df.sample(fraction=0.1, seed=42)
test_count = test_df.count()
print(f"Testing on {test_count:,} records")

for config_name, num_partitions in partition_configs:
    print(f"\nTesting: {config_name}")

    # Apply partitioning
    if num_partitions:
        test_data = test_df.repartition(num_partitions)
    else:
        test_data = test_df

    # Benchmark: Aggregation operation
    start_time = time.time()
    result = test_data.groupBy('sentiment_label', 'brand') \
        .agg(
            count('*').alias('count'),
            avg('stars').alias('avg_stars'),
            avg('text_length').alias('avg_length')
        ).collect()
    execution_time = time.time() - start_time

    actual_partitions = test_data.rdd.getNumPartitions()

    benchmark_results.append({
        'Configuration': config_name,
        'Partitions': actual_partitions,
        'Execution_Time': execution_time,
        'Records_Processed': test_count
    })

    print(f"  Partitions: {actual_partitions}")
    print(f"  Execution time: {execution_time:.3f}s")
    print(f"  Throughput: {test_count/execution_time:,.0f} records/sec")

# Display results
print("\n" + "="*70)
print("Partitioning Benchmark Results:")
print("="*70)
benchmark_df = pd.DataFrame(benchmark_results)
print(benchmark_df.to_string(index=False))

# ============================================================================
# BENCHMARK 2: CACHING STRATEGY
# ============================================================================

print("\n\n=== Benchmark 2: Caching Strategy Impact ===")

cache_results = []

# Test without cache
print("\nTest 1: Without caching")
test_data = processed_df.sample(fraction=0.05, seed=42)

# Multiple operations without cache
start_time = time.time()
count1 = test_data.count()
agg1 = test_data.groupBy('sentiment_label').count().collect()
filter1 = test_data.filter(col('stars') >= 4).count()
no_cache_time = time.time() - start_time

print(f"  Total time (3 operations): {no_cache_time:.3f}s")

# Test with cache
print("\nTest 2: With caching")
test_data_cached = test_data.cache()
test_data_cached.count()  # Materialize cache

start_time = time.time()
count2 = test_data_cached.count()
agg2 = test_data_cached.groupBy('sentiment_label').count().collect()
filter2 = test_data_cached.filter(col('stars') >= 4).count()
cache_time = time.time() - start_time

print(f"  Total time (3 operations): {cache_time:.3f}s")
print(f"  Speedup: {no_cache_time/cache_time:.2f}x")

cache_results.append({
    'Strategy': 'Without Cache',
    'Time': no_cache_time,
    'Speedup': 1.0
})
cache_results.append({
    'Strategy': 'With Cache',
    'Time': cache_time,
    'Speedup': no_cache_time/cache_time
})

test_data_cached.unpersist()

# ============================================================================
# BENCHMARK 3: DIFFERENT PROCESSING STRATEGIES
# ============================================================================

print("\n\n=== Benchmark 3: Processing Strategy Comparison ===")

strategy_results = []
sample_data = processed_df.sample(fraction=0.05, seed=42)

# Strategy 1: RDD-based processing
print("\nStrategy 1: RDD-based approach")
start_time = time.time()
rdd_result = sample_data.rdd \
    .map(lambda row: (row['sentiment_label'], (row['stars'], 1))) \
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
    .map(lambda x: (x[0], x[1][0] / x[1][1])) \
    .collect()
rdd_time = time.time() - start_time
print(f"  Execution time: {rdd_time:.3f}s")

# Strategy 2: DataFrame API
print("\nStrategy 2: DataFrame API approach")
start_time = time.time()
df_result = sample_data.groupBy('sentiment_label') \
    .agg(avg('stars').alias('avg_stars')) \
    .collect()
df_time = time.time() - start_time
print(f"  Execution time: {df_time:.3f}s")

# Strategy 3: Spark SQL
print("\nStrategy 3: Spark SQL approach")
sample_data.createOrReplaceTempView("reviews_temp")
start_time = time.time()
sql_result = spark.sql("""
    SELECT sentiment_label, AVG(stars) as avg_stars
    FROM reviews_temp
    GROUP BY sentiment_label
""").collect()
sql_time = time.time() - start_time
print(f"  Execution time: {sql_time:.3f}s")

strategy_results = pd.DataFrame({
    'Strategy': ['RDD API', 'DataFrame API', 'Spark SQL'],
    'Execution_Time': [rdd_time, df_time, sql_time],
    'Relative_Performance': [
        rdd_time / builtins.min(rdd_time, df_time, sql_time),
        df_time / builtins.min(rdd_time, df_time, sql_time),
        sql_time / builtins.min(rdd_time, df_time, sql_time)
    ]
})

print("\n" + "="*60)
print("Processing Strategy Comparison:")
print("="*60)
print(strategy_results.to_string(index=False))

# ============================================================================
# BENCHMARK 4: SCALABILITY TEST
# ============================================================================

print("\n\n=== Benchmark 4: Scalability Analysis ===")

scalability_results = []
data_sizes = [10000, 50000, 100000, 500000, 1000000]

for size in data_sizes:
    print(f"\nProcessing {size:,} records...")

    # Sample data
    scale_data = processed_df.sample(fraction=builtins.min(1.0, size/processed_df.count()), seed=42)
    actual_size = scale_data.count()

    # Benchmark operation
    start_time = time.time()
    result = scale_data.groupBy('sentiment_label', 'brand') \
        .agg(
            count('*').alias('count'),
            avg('stars').alias('avg_stars')
        ).count()
    execution_time = time.time() - start_time

    throughput = actual_size / execution_time

    scalability_results.append({
        'Records': actual_size,
        'Time': execution_time,
        'Throughput': throughput
    })

    print(f"  Actual records: {actual_size:,}")
    print(f"  Execution time: {execution_time:.3f}s")
    print(f"  Throughput: {throughput:,.0f} records/sec")

scalability_df = pd.DataFrame(scalability_results)

print("\n" + "="*60)
print("Scalability Test Results:")
print("="*60)
print(scalability_df.to_string(index=False))

# ============================================================================
# BENCHMARK 5: RESOURCE UTILIZATION
# ============================================================================

print("\n\n=== Benchmark 5: Resource Utilization Analysis ===")

# Get Spark configuration
spark_conf = spark.sparkContext.getConf().getAll()
print("\nCurrent Spark Configuration:")
important_configs = ['spark.driver.memory', 'spark.executor.memory',
                     'spark.sql.shuffle.partitions', 'spark.default.parallelism']
for key, value in spark_conf:
    if any(config in key for config in important_configs):
        print(f"  {key}: {value}")

# Analyze task distribution
print("\nTask Distribution Analysis:")
test_data = processed_df.sample(fraction=0.1, seed=42).repartition(20)

# Count records per partition
partition_counts = test_data.rdd.mapPartitions(lambda it: [builtins.sum(1 for _ in it)]).collect()
print(f"  Total partitions: {len(partition_counts)}")
print(f"  Min records per partition: {builtins.min(partition_counts):,}")
print(f"  Max records per partition: {builtins.max(partition_counts):,}")
print(f"  Avg records per partition: {builtins.sum(partition_counts)/len(partition_counts):,.0f}")
print(f"  Partition imbalance ratio: {builtins.max(partition_counts)/builtins.min(partition_counts):.2f}x")

# ============================================================================
# PERFORMANCE SUMMARY
# ============================================================================

print("\n\n" + "="*80)
print("=== PERFORMANCE OPTIMIZATION SUMMARY ===")
print("="*80)

print("\n1. PARTITIONING RECOMMENDATIONS:")
optimal_partitions = benchmark_df.loc[benchmark_df['Execution_Time'].idxmin()]
print(f"   ✓ Optimal partition count: {optimal_partitions['Partitions']}")
print(f"   ✓ Best execution time: {optimal_partitions['Execution_Time']:.3f}s")
print(f"   ✓ Throughput: {optimal_partitions['Records_Processed']/optimal_partitions['Execution_Time']:,.0f} records/sec")

print("\n2. CACHING IMPACT:")
speedup = cache_results[1]['Speedup']
print(f"   ✓ Performance improvement: {speedup:.2f}x faster with caching")
print(f"   ✓ Recommendation: Cache frequently accessed DataFrames")

print("\n3. PROCESSING STRATEGY:")
best_strategy = strategy_results.loc[strategy_results['Execution_Time'].idxmin(), 'Strategy']
print(f"   ✓ Fastest approach: {best_strategy}")
print(f"   ✓ Recommendation: Use DataFrame API for best Catalyst optimization")

print("\n4. SCALABILITY:")
avg_throughput = scalability_df['Throughput'].mean()
print(f"   ✓ Average throughput: {avg_throughput:,.0f} records/second")
print(f"   ✓ Linear scalability: {'Yes' if scalability_df['Throughput'].std() / avg_throughput < 0.3 else 'Needs optimization'}")

print("\n5. RESOURCE OPTIMIZATION:")
print(f"   ✓ Partition balance: {builtins.max(partition_counts)/builtins.min(partition_counts):.2f}x imbalance")
print(f"   ✓ Recommendation: {'Good balance' if builtins.max(partition_counts)/builtins.min(partition_counts) < 2 else 'Consider repartitioning'}")

# Save performance results
print("\n=== Saving Performance Results ===")

benchmark_df.to_csv('performance_partitioning.csv', index=False)
pd.DataFrame(cache_results).to_csv('performance_caching.csv', index=False)
strategy_results.to_csv('performance_strategies.csv', index=False)
scalability_df.to_csv('performance_scalability.csv', index=False)

print("✓ Saved performance_partitioning.csv")
print("✓ Saved performance_caching.csv")
print("✓ Saved performance_strategies.csv")
print("✓ Saved performance_scalability.csv")



pandas_to_bq(
    benchmark_df,
    "phase5_partition_benchmark",
    if_exists="replace"
)


pandas_to_bq(
    pd.DataFrame(cache_results),
    "phase5_caching_benchmark",
    if_exists="replace"
)


pandas_to_bq(
    strategy_results,
    "phase5_processing_strategies",
    if_exists="replace"
)


pandas_to_bq(
    scalability_df,
    "phase5_scalability",
    if_exists="replace"
)


print("\n✓ Phase 6 Complete: Performance Optimization & Benchmarking")
print("="*80)










=== PHASE 6: PERFORMANCE OPTIMIZATION & BENCHMARKING ===

=== Benchmark 1: Impact of Partitioning ===
Testing on 2,342 records

Testing: No Repartition
  Partitions: 50
  Execution time: 0.951s
  Throughput: 2,464 records/sec

Testing: 10 Partitions
  Partitions: 10
  Execution time: 1.005s
  Throughput: 2,330 records/sec

Testing: 50 Partitions
  Partitions: 50
  Execution time: 1.559s
  Throughput: 1,502 records/sec

Testing: 100 Partitions
  Partitions: 100
  Execution time: 2.060s
  Throughput: 1,137 records/sec

Testing: 200 Partitions
  Partitions: 200
  Execution time: 1.941s
  Throughput: 1,207 records/sec

Partitioning Benchmark Results:
 Configuration  Partitions  Execution_Time  Records_Processed
No Repartition          50        0.950585               2342
 10 Partitions          10        1.005250               2342
 50 Partitions          50        1.558914               2342
100 Partitions         100        2.060377               2342
200 Partitions         200        