In [0]:
from pyspark.sql.functions import col, sum, count, avg, max

# --- USE CASE 1: UNPREDICTABLE TIMINGS ---

# 1. Read cleansed data from Silver Layer
df_vld = spark.read.table("silver_vld")

# 2. Define "Severe Delay" Threshold (e.g., 15 minutes late)
# We want to find where these severe delays happen most often.
severe_delays = df_vld.filter(col("Delay_Minutes") >= 15)

# 3. Aggregate by Route and Hour (The "Hot Spot" Analysis)
# We calculate:
# - Total Delay Minutes: The cumulative time lost (Financial impact).
# - Count of Delays: How frequent is this? (Reliability impact).
# - Max Delay: The worst-case scenario.
gold_delay_analysis = severe_delays.groupBy("Route_ID", "Hour").agg(
    sum("Delay_Minutes").alias("Total_Delay_Minutes"),
    count("Trip_ID").alias("Severe_Delay_Count"),
    max("Delay_Minutes").alias("Max_Delay_Minutes")
)

# 4. Rank the results to find the Top Offenders
# Ordering by Total_Delay_Minutes highlights the biggest operational drains.
gold_delay_analysis = gold_delay_analysis.orderBy(col("Total_Delay_Minutes").desc())

# 5. Write to GOLD Layer
# This table is now ready for a Dashboard (PowerBI/Tableau)
gold_delay_analysis.write.format("delta").mode("overwrite").saveAsTable("gold_unpredictable_timings")

# --- Validation Output ---
print("✅ Use Case 1 (Gold Table) Created: 'gold_unpredictable_timings'")
print("\nTop 5 Worst Delayed Route Segments:")
spark.sql("SELECT * FROM gold_unpredictable_timings LIMIT 5").show()

✅ Use Case 1 (Gold Table) Created: 'gold_unpredictable_timings'

Top 5 Worst Delayed Route Segments:
+--------+----+-------------------+------------------+-----------------+
|Route_ID|Hour|Total_Delay_Minutes|Severe_Delay_Count|Max_Delay_Minutes|
+--------+----+-------------------+------------------+-----------------+
|     R5A|  21|                158|                 8|               25|
|    R12B|   3|                128|                 6|               24|
|     R7C|   8|                115|                 6|               24|
|     R5A|  13|                108|                 5|               25|
|     R5A|   1|                108|                 6|               21|
+--------+----+-------------------+------------------+-----------------+



In [0]:
from pyspark.sql.functions import col, max, avg, count, when, sum

# --- USE CASE 2: INEFFICIENT ROUTES ---

# 1. Read cleansed data from Silver
df_etm = spark.read.table("silver_etm")

# --- STAGE A: TRIP-LEVEL METRICS ---
# We aggregate transactions to understand the load profile of each unique trip.
trip_level_stats = df_etm.groupBy("Trip_ID", "Route_ID", "Hour").agg(
    max("Current_Load").alias("Max_Trip_Load"),
    avg("Current_Load").alias("Avg_Trip_Load")
)

# Define Efficiency Flags for each trip
# - Overcrowded: If the bus ever exceeded capacity (60).
# - Underutilized: If the average load was consistently low (< 30).
trip_level_stats = trip_level_stats.withColumn(
    "Is_Trip_Overcrowded", when(col("Max_Trip_Load") > 60, 1).otherwise(0)
).withColumn(
    "Is_Trip_Underutilized", when(col("Avg_Trip_Load") < 30, 1).otherwise(0)
)

# --- STAGE B: ROUTE-HOUR AGGREGATION (GOLD) ---
# Now we summarize to find the "Problem Zones" (Route + Hour).
gold_efficiency_analysis = trip_level_stats.groupBy("Route_ID", "Hour").agg(
    count("Trip_ID").alias("Total_Trips_Run"),
    sum("Is_Trip_Overcrowded").alias("Overcrowded_Trips_Count"),
    sum("Is_Trip_Underutilized").alias("Underutilized_Trips_Count"),
    avg("Avg_Trip_Load").alias("Segment_Avg_Load")
)

# Calculate Efficiency Ratios for easier ranking
gold_efficiency_analysis = gold_efficiency_analysis.withColumn(
    "Overcrowding_Ratio", col("Overcrowded_Trips_Count") / col("Total_Trips_Run")
).withColumn(
    "Underutilization_Ratio", col("Underutilized_Trips_Count") / col("Total_Trips_Run")
)

# Write to GOLD Layer
gold_efficiency_analysis.write.format("delta").mode("overwrite").saveAsTable("gold_route_efficiency")


In [0]:
# --- Validation Output ---
print("✅ Use Case 2 (Gold Table) Created: 'gold_route_efficiency'")

print("\n🚨 TOP 5 OVERCROWDED SEGMENTS (Capacity Blind Spot):")
gold_efficiency_analysis.orderBy(col("Overcrowded_Trips_Count").desc()).display(5)

print("\n📉 TOP 5 UNDERUTILIZED SEGMENTS (Wasted Fuel/Labor):")
gold_efficiency_analysis.orderBy(col("Underutilized_Trips_Count").desc()).display(5)

✅ Use Case 2 (Gold Table) Created: 'gold_route_efficiency'

🚨 TOP 5 OVERCROWDED SEGMENTS (Capacity Blind Spot):


Route_ID,Hour,Total_Trips_Run,Overcrowded_Trips_Count,Underutilized_Trips_Count,Segment_Avg_Load,Overcrowding_Ratio,Underutilization_Ratio
R5A,7,3,3,0,64.65811965811966,1.0,0.0
R5A,18,3,3,0,63.583333333333336,1.0,0.0
R5A,8,3,3,0,66.82857142857144,1.0,0.0
R5A,9,3,2,0,57.41666666666666,0.6666666666666666,0.0
R5A,17,2,2,0,63.65,1.0,0.0
R12B,11,3,0,3,19.75,0.0,1.0
R5A,20,2,0,2,22.97222222222222,0.0,1.0
R12B,21,1,0,0,33.333333333333336,0.0,0.0
R5A,21,3,0,2,25.666666666666668,0.0,0.6666666666666666
R7C,23,4,0,0,38.65803571428572,0.0,0.0



📉 TOP 5 UNDERUTILIZED SEGMENTS (Wasted Fuel/Labor):


Route_ID,Hour,Total_Trips_Run,Overcrowded_Trips_Count,Underutilized_Trips_Count,Segment_Avg_Load,Overcrowding_Ratio,Underutilization_Ratio
R7C,9,5,0,5,20.395238095238096,0.0,1.0
R12B,11,3,0,3,19.75,0.0,1.0
R12B,18,3,0,3,24.222222222222225,0.0,1.0
R5A,5,3,0,3,29.5962962962963,0.0,1.0
R5A,1,3,0,3,25.166666666666668,0.0,1.0
R5A,20,2,0,2,22.97222222222222,0.0,1.0
R12B,12,2,0,2,26.15,0.0,1.0
R5A,0,3,0,2,28.36111111111111,0.0,0.6666666666666666
R12B,3,2,0,2,26.670454545454547,0.0,1.0
R5A,14,3,0,2,26.409523809523808,0.0,0.6666666666666666


In [0]:
from pyspark.sql.functions import col, explode, split, count, lit, sum, when, lower

# --- USE CASE 3: LOW AWARENESS (DIGITAL FAILURE) ---

# 1. Read cleansed data from Silver
df_feedback = spark.read.table("silver_feedback")

# 2. Identify 'Info Lack' Complaints
# We filter for keywords indicating an information failure.
# In a production system, this would be a Machine Learning classification model.
info_lack_filter = (
    col("Text_Clean").like("%app%") | 
    col("Text_Clean").like("%tracker%") | 
    col("Text_Clean").like("%eta%") | 
    col("Text_Clean").like("%realtime%") | 
    col("Text_Clean").like("%update%") | 
    col("Text_Clean").like("%info%")
)

df_info_lack = df_feedback.filter(info_lack_filter)

# --- STAGE A: THE NER BLIND SPOT ANALYSIS ---
# Calculate how many of these digital complaints could NOT be linked to a bus.
# If this is high, you are losing critical operational feedback.
blind_spot_stats = df_info_lack.select(
    count("*").alias("Total_Info_Complaints"),
    sum(when(col("Link_Status") == "Unlinked", 1).otherwise(0)).alias("Unlinked_Complaints")
).withColumn(
    "Blind_Spot_Percentage", (col("Unlinked_Complaints") / col("Total_Info_Complaints")) * 100
)

# --- STAGE B: KEYWORD FREQUENCY ANALYSIS (GOLD) ---
# 1. Tokenize: Split sentences into words
words_df = df_info_lack.select(explode(split(col("Text_Clean"), " ")).alias("word"))

# 2. Define Stopwords (Noise to remove)
# In a real project, use pyspark.ml.feature.StopWordsRemover
stopwords = ["the", "is", "to", "and", "a", "of", "in", "for", "on", "was", "it", "this", "my", "bus", "route", "trip"]

# 3. Filter & Count
gold_keywords = words_df.filter(
    ~col("word").isin(stopwords) & (col("word") != "")
).groupBy("word").agg(
    count("word").alias("Frequency")
).orderBy(col("Frequency").desc()).limit(20)

# 4. Write to GOLD Layer
gold_keywords.write.format("delta").mode("overwrite").saveAsTable("gold_awareness_failure")

# --- Validation Output ---
print("✅ Use Case 3 (Gold Table) Created: 'gold_awareness_failure'")

print("\n🚨 NER BLIND SPOT METRICS:")
blind_spot_stats.show()

print("\n🔍 TOP 10 PASSENGER PAIN POINTS (Keywords):")
spark.sql("SELECT * FROM gold_awareness_failure LIMIT 10").show()

✅ Use Case 3 (Gold Table) Created: 'gold_awareness_failure'

🚨 NER BLIND SPOT METRICS:
+---------------------+-------------------+---------------------+
|Total_Info_Complaints|Unlinked_Complaints|Blind_Spot_Percentage|
+---------------------+-------------------+---------------------+
|                   85|                 23|   27.058823529411764|
+---------------------+-------------------+---------------------+


🔍 TOP 10 PASSENGER PAIN POINTS (Keywords):
+----------+---------+
|      word|Frequency|
+----------+---------+
|       app|      106|
|     whole|       85|
|     times|       85|
|    system|       85|
|     still|       85|
|unreliable|       85|
|         3|       85|
|   checked|       85|
|     makes|       85|
|        no|       53|
+----------+---------+



In [0]:
from pyspark.sql.functions import col, udf, avg, count, when, lit, hour
from pyspark.sql.types import StringType

# --- USE CASE 4: ROOT CAUSE & TRIAGE ---

# 1. Read cleansed data from Silver
df_feedback = spark.read.table("silver_feedback")
df_vld = spark.read.table("silver_vld")

# --- STAGE A: CLASSIFICATION & TRIAGE (SIMULATED NLP) ---
# In production, use Spark NLP or a pre-trained Transformer here.
def classify_topic(text):
    t = str(text).lower()
    if 'rude' in t or 'driver' in t or 'staff' in t: return 'Staff'
    if 'late' in t or 'delay' in t or 'wait' in t: return 'Delay'
    if 'crowd' in t or 'full' in t or 'space' in t: return 'Crowding'
    if 'app' in t or 'tracker' in t: return 'Info Lack'
    return 'Other'

def classify_urgency(text):
    t = str(text).lower()
    if 'danger' in t or 'accident' in t or 'harass' in t or 'emergency' in t: return 'High'
    if 'rude' in t or 'skip' in t: return 'Medium'
    return 'Low'

# Register UDFs
topic_udf = udf(classify_topic, StringType())
urgency_udf = udf(classify_urgency, StringType())

# Apply Classification
enriched_feedback = df_feedback.withColumn(
    "Issue_Topic", topic_udf(col("Text_Content"))
).withColumn(
    "Urgency_Flag", urgency_udf(col("Text_Content"))
)

# --- STAGE B: ROOT CAUSE LINKAGE (GOLD) ---
# Link Complaints to GPS Data to see the *actual* operational conditions
# Key Keys: Simulated_NER_Bus_ID (Feedback) == Bus_ID (VLD), and Hour match.
linked_complaints = enriched_feedback.filter(col("Link_Status") == "Linked").join(
    df_vld,
    (enriched_feedback.Simulated_NER_Bus_ID == df_vld.Bus_ID) & 
    (hour(enriched_feedback.Complaint_Time_Approx) == df_vld.Hour),
    how="inner"
)

# Aggregate to find the "Truth" behind the complaints
gold_root_cause = linked_complaints.groupBy("Issue_Topic").agg(
    count("Feedback_ID").alias("Complaint_Volume"),
    avg("Delay_Minutes").alias("Avg_Actual_Delay_Minutes")
).orderBy(col("Complaint_Volume").desc())

# Aggregate Triage Summary (for Customer Support dashboard)
gold_triage_queue = enriched_feedback.groupBy("Urgency_Flag").agg(
    count("Feedback_ID").alias("Queue_Size")
).orderBy(col("Queue_Size").asc()) # High urgency at top (if coded 1/2/3) or alphanumeric

# Write to GOLD Layer
gold_root_cause.write.format("delta").mode("overwrite").saveAsTable("gold_complaint_root_cause")
gold_triage_queue.write.format("delta").mode("overwrite").saveAsTable("gold_triage_summary")

# --- Validation Output ---
print("✅ Use Case 4 (Gold Tables) Created.")

print("\n🔍 ROOT CAUSE TRUTH (Complaint vs. Reality):")
spark.sql("SELECT * FROM gold_complaint_root_cause").show()

print("\n🚨 TRIAGE QUEUE SUMMARY:")
spark.sql("SELECT * FROM gold_triage_summary").show()

✅ Use Case 4 (Gold Tables) Created.

🔍 ROOT CAUSE TRUTH (Complaint vs. Reality):
+-----------+----------------+------------------------+
|Issue_Topic|Complaint_Volume|Avg_Actual_Delay_Minutes|
+-----------+----------------+------------------------+
|      Other|             997|       6.945837512537613|
|      Staff|             416|       7.197115384615385|
|  Info Lack|             396|        7.22979797979798|
|      Delay|             335|       7.176119402985075|
|   Crowding|             300|       7.316666666666666|
+-----------+----------------+------------------------+


🚨 TRIAGE QUEUE SUMMARY:
+------------+----------+
|Urgency_Flag|Queue_Size|
+------------+----------+
|      Medium|        34|
|         Low|       466|
+------------+----------+



In [0]:
from pyspark.sql.functions import col, sum, count, avg, lit, desc

# --- USE CASE 5: FINANCIAL FEASIBILITY (OPPORTUNITY COST) ---

# 1. Read cleansed data from Silver
df_vld = spark.read.table("silver_vld")
df_etm = spark.read.table("silver_etm")

# --- METRIC A: COST OF LOST TIME (Severe Delays) ---
# Filter for severe delays (e.g., > 15 mins) where costs spike (OT, penalties).
severe_delays = df_vld.filter(col("Delay_Minutes") >= 15)

delay_cost_gold = severe_delays.groupBy("Route_ID").agg(
    sum("Delay_Minutes").alias("Total_Lost_Minutes"),
    count("Trip_ID").alias("Severe_Delay_Events")
).withColumn(
    # Estimated Cost: Assuming $1.50 per minute of delay (Driver + Fuel + Penalty)
    "Est_Delay_Cost_Impact", col("Total_Lost_Minutes") * 1.50
)

# --- METRIC B: COST OF WASTE (Underutilization) ---
# First, get the average load per trip
trip_loads = df_etm.groupBy("Trip_ID", "Route_ID").agg(
    avg("Current_Load").alias("Avg_Load")
)

# Filter for trips running < 50% capacity (e.g., < 30 passengers)
wasted_trips = trip_loads.filter(col("Avg_Load") < 30)

underutilization_cost_gold = wasted_trips.groupBy("Route_ID").agg(
    count("Trip_ID").alias("Wasted_Trip_Count"),
    avg("Avg_Load").alias("Avg_Wasted_Load")
).withColumn(
    # Estimated Cost: Assuming fixed cost of $50 per trip (Fuel + Labor)
    "Est_Wasted_Trip_Cost", col("Wasted_Trip_Count") * 50.00
)

# --- STAGE C: MERGED FINANCIAL VIEW (GOLD) ---
# Join the two cost drivers to see the full picture per route
gold_financial_summary = delay_cost_gold.join(
    underutilization_cost_gold, 
    on="Route_ID", 
    how="outer"
).na.fill(0) # Fill nulls with 0 for clean math

# Calculate Total Opportunity Cost
gold_financial_summary = gold_financial_summary.withColumn(
    "Total_Opportunity_Cost", 
    col("Est_Delay_Cost_Impact") + col("Est_Wasted_Trip_Cost")
).orderBy(col("Total_Opportunity_Cost").desc())

# Write to GOLD Layer
gold_financial_summary.write.format("delta").mode("overwrite").saveAsTable("gold_financial_feasibility")

# --- Validation Output ---
print("✅ Use Case 5 (Gold Table) Created: 'gold_financial_feasibility'")

print("\n💰 FINANCIAL DRAIN REPORT (Where are we losing money?):")
spark.sql("SELECT * FROM gold_financial_feasibility").show()

✅ Use Case 5 (Gold Table) Created: 'gold_financial_feasibility'

💰 FINANCIAL DRAIN REPORT (Where are we losing money?):
+--------+------------------+-------------------+---------------------+-----------------+------------------+--------------------+----------------------+
|Route_ID|Total_Lost_Minutes|Severe_Delay_Events|Est_Delay_Cost_Impact|Wasted_Trip_Count|   Avg_Wasted_Load|Est_Wasted_Trip_Cost|Total_Opportunity_Cost|
+--------+------------------+-------------------+---------------------+-----------------+------------------+--------------------+----------------------+
|     R5A|              1658|                 83|               2487.0|                8| 26.12446348940914|               400.0|                2887.0|
|     R7C|              1018|                 51|               1527.0|               15|25.421460401769995|               750.0|                2277.0|
|    R12B|               709|                 35|               1063.5|               10|25.609860972360973|       

In [0]:
print("\n🔮 Sample ETA Predictions:")
final_results.display(5)

# Optional: Save the trained model for production
# model.save("/Models/bus_eta_rf_v1")


🔮 Sample ETA Predictions:


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-5593978075013220>, line 2[0m
[1;32m      1[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;130;01m\n[39;00m[38;5;124m🔮 Sample ETA Predictions:[39m[38;5;124m"[39m)
[0;32m----> 2[0m final_results[38;5;241m.[39mdisplay([38;5;241m5[39m)

[0;31mNameError[0m: name 'final_results' is not defined

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression  # Smallest possible model
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, avg, round, hash

# --- 1. DATA PREPARATION ---
df_vld = spark.read.table("silver_vld")
df_etm = spark.read.table("silver_etm")

# Feature Engineering
trip_load_features = df_etm.groupBy("Trip_ID").agg(
    avg("Current_Load").alias("Avg_Load")
)

training_data = df_vld.join(trip_load_features, on="Trip_ID", how="left").na.fill({"Avg_Load": 0})

# --- KEY FIX: Hash Route_ID instead of StringIndexer ---
# StringIndexer creates huge models with many categories
# Hash function maps routes to a fixed range (0-99)
ml_dataset = training_data.select(
    "Route_ID",
    "Hour",
    "Latitude",
    "Longitude",
    "Avg_Load",
    "Delay_Minutes",
    "Scheduled_Arrival"
).withColumn(
    "Route_Hash", (hash(col("Route_ID")) % 100).cast("int")  # Map to 0-99
)

# --- 2. ULTRA-COMPACT ML PIPELINE ---

# Feature Assembly (NO StringIndexer!)
assembler = VectorAssembler(
    inputCols=["Route_Hash", "Hour", "Latitude", "Longitude", "Avg_Load"],
    outputCol="features",
    handleInvalid="skip"
)

# Linear Regression: Smallest model possible (~1-5 MB)
lr = LinearRegression(
    featuresCol="features",
    labelCol="Delay_Minutes",
    maxIter=20,
    regParam=0.01,  # Regularization to prevent overfitting
    elasticNetParam=0.5  # Mix of L1 and L2 regularization
)

# Build Pipeline (only 2 stages)
pipeline = Pipeline(stages=[assembler, lr])

# --- 3. TRAINING & EVALUATION ---

# Sample data aggressively if dataset is huge
train_data, test_data = ml_dataset.randomSplit([0.8, 0.2], seed=42)

# Uncomment if still facing issues:
# train_data = train_data.sample(fraction=0.3, seed=42)

print("🚀 Training Ultra-Compact ETA Model (Linear Regression)...")
model = pipeline.fit(train_data)

print("✅ Model training successful!")

# Predictions
predictions = model.transform(test_data)

# Evaluate
evaluator = RegressionEvaluator(
    labelCol="Delay_Minutes", 
    predictionCol="prediction", 
    metricName="rmse"
)
rmse = evaluator.evaluate(predictions)

# Also calculate MAE for interpretability
mae_evaluator = RegressionEvaluator(
    labelCol="Delay_Minutes",
    predictionCol="prediction",
    metricName="mae"
)
mae = mae_evaluator.evaluate(predictions)

print(f"\n📊 Model Performance:")
print(f"   RMSE: {rmse:.2f} minutes")
print(f"   MAE:  {mae:.2f} minutes (average prediction error)")

# --- 4. FINAL ETA CALCULATION ---
final_results = predictions.withColumn(
    "Predicted_Delay_Minutes", round(col("prediction"), 1)
).select(
    "Route_ID", 
    "Hour", 
    "Scheduled_Arrival", 
    "Delay_Minutes",
    "Predicted_Delay_Minutes"
)

# Display sample predictions
print("\n📋 Sample Predictions:")
final_results.show(10, truncate=False)

# Save predictions
final_results.write.mode("overwrite").saveAsTable("gold_eta_predictions")
print("\n💾 Predictions saved to: gold_eta_predictions")

# Optional: Show feature importance approximation
print("\n🔍 Model Coefficients (feature importance):")
print(f"   Route Hash:  {model.stages[-1].coefficients[0]:.4f}")
print(f"   Hour:        {model.stages[-1].coefficients[1]:.4f}")
print(f"   Latitude:    {model.stages[-1].coefficients[2]:.4f}")
print(f"   Longitude:   {model.stages[-1].coefficients[3]:.4f}")
print(f"   Avg Load:    {model.stages[-1].coefficients[4]:.4f}")

🚀 Training Ultra-Compact ETA Model (Linear Regression)...
✅ Model training successful!

📊 Model Performance:
   RMSE: 7.20 minutes
   MAE:  6.12 minutes (average prediction error)

📋 Sample Predictions:
+--------+----+-------------------+-------------+-----------------------+
|Route_ID|Hour|Scheduled_Arrival  |Delay_Minutes|Predicted_Delay_Minutes|
+--------+----+-------------------+-------------+-----------------------+
|R12B    |3   |2025-11-18 03:45:00|3            |7.8                    |
|R12B    |3   |2025-11-18 03:25:00|1            |8.2                    |
|R12B    |3   |2025-11-18 02:50:00|22           |7.6                    |
|R12B    |3   |2025-11-18 03:10:00|1            |6.2                    |
|R12B    |4   |2025-11-18 04:00:00|11           |7.7                    |
|R12B    |4   |2025-11-18 04:10:00|1            |7.7                    |
|R12B    |4   |2025-11-18 04:20:00|19           |7.7                    |
|R12B    |4   |2025-11-18 04:15:00|0            |7.7     