In [32]:
!pip install findspark

import os
import sys
from datetime import datetime
import re
import findspark

# Initialize PySpark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

print("📚 All libraries imported successfully!")

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Election Gold Layer Processing") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.warehouse.dir", "/content/spark-warehouse") \
    .getOrCreate()

print("🏆 Spark Session initialized for Gold Layer Processing!")
spark.sparkContext.setLogLevel("WARN")

📚 All libraries imported successfully!
🏆 Spark Session initialized for Gold Layer Processing!


load_silver_data




In [33]:
def load_silver_data(silver_path):
    """
    Load Silver layer data from CSV files.

    Args:
        silver_path (str): Path to silver layer CSV files.

    Returns:
        pyspark.sql.DataFrame: Silver layer DataFrame.
    """
    try:
        silver_df = spark.read.csv(silver_path, header=True, inferSchema=True)
        print(f"✅ Successfully loaded Silver data from {silver_path}")
        print(f"📊 Total records: {silver_df.count()}")
        print(f"📋 Columns: {len(silver_df.columns)}")
        return silver_df
    except Exception as e:
        print(f"❌ Error loading Silver data: {e}")
        return None

**KPIS**:




In [34]:
def create_constituency_summary(silver_df):
    """
    Create constituency-level summary table.
    """
    print("🏛️ Creating Constituency Summary table...")
    winners = silver_df.filter(col("Is_Winner") == True) \
        .select("State", "Constituency", "Candidate", "Party", "Party_Type",
                "Total_Votes", "Vote_Percentage")

    runners_up = silver_df.filter(col("Rank_in_Constituency") == 2) \
        .select(
            col("State").alias("State_RU"),
            col("Constituency").alias("Constituency_RU"),
            col("Candidate").alias("Runner_Up_Candidate"),
            col("Party").alias("Runner_Up_Party"),
            col("Total_Votes").alias("Runner_Up_Votes"),
            col("Vote_Percentage").alias("Runner_Up_Vote_Percentage")
        )

    constituency_stats = silver_df.groupBy("State", "Constituency") \
        .agg(
            sum("Total_Votes").alias("Total_Votes_Polled"),
            count("*").alias("Total_Candidates"),
            countDistinct("Party").alias("Total_Parties"),
            sum(when(col("Party_Type") == "Independent", 1).otherwise(0)).alias("Independent_Candidates"),
            sum(when(col("Vote_Share_Category") == "Minimal (<5%)", 1).otherwise(0)).alias("Candidates_Below_5_Percent"),
            sum(when(col("Party") == "None of the Above", col("Total_Votes")).otherwise(0)).alias("NOTA_Votes")
        )

    constituency_summary = winners \
        .join(runners_up,
              (winners.State == runners_up.State_RU) &
              (winners.Constituency == runners_up.Constituency_RU), "left") \
        .join(constituency_stats, ["State", "Constituency"], "inner") \
        .select(
            col("State"),
            col("Constituency"),
            col("Candidate").alias("Winner_Candidate"),
            col("Party").alias("Winner_Party"),
            col("Party_Type").alias("Winner_Party_Type"),
            col("Total_Votes").alias("Winner_Votes"),
            col("Vote_Percentage").alias("Winner_Vote_Percentage"),
            col("Runner_Up_Candidate"),
            col("Runner_Up_Party"),
            col("Runner_Up_Votes"),
            col("Runner_Up_Vote_Percentage"),
            (col("Total_Votes") - coalesce(col("Runner_Up_Votes"), lit(0))).alias("Victory_Margin"),
            (col("Vote_Percentage") - coalesce(col("Runner_Up_Vote_Percentage"), lit(0))).alias("Victory_Margin_Percentage"),
            col("Total_Votes_Polled"),
            col("Total_Candidates"),
            col("Total_Parties"),
            col("Independent_Candidates"),
            col("Candidates_Below_5_Percent"),
            col("NOTA_Votes"),
            (col("NOTA_Votes") / col("Total_Votes_Polled") * 100).alias("NOTA_Percentage")
        ) \
        .withColumn("Margin_Category",
                    when(col("Victory_Margin_Percentage") >= 20, "Comfortable (20%+)")
                    .when(col("Victory_Margin_Percentage") >= 10, "Safe (10-19%)")
                    .when(col("Victory_Margin_Percentage") >= 5, "Moderate (5-9%)")
                    .otherwise("Close (<5%)"))

    print("✅ Constituency Summary table created")
    return constituency_summary

def create_state_summary(silver_df):
    """
    Create state-level summary table.
    """
    print("🗺️ Creating State Summary table...")
    state_summary = silver_df.filter(col("Is_Winner") == True) \
        .groupBy("State", "Party") \
        .agg(count("*").alias("Seats_Won")) \
        .withColumn("Rank", row_number().over(Window.partitionBy("State").orderBy(desc("Seats_Won")))) \
        .filter(col("Rank") == 1) \
        .withColumnRenamed("Party", "Leading_Party_in_State") \
        .withColumnRenamed("Seats_Won", "Leading_Party_Seats_Won") \
        .drop("Rank")

    state_aggregates = silver_df.groupBy("State") \
        .agg(
            countDistinct("Constituency").alias("Total_Constituencies"),
            count("*").alias("Total_Candidates_in_State"),
            countDistinct("Party").alias("Total_Parties_in_State"),
            sum("Total_Votes").alias("Total_Votes_Polled_in_State"),
            sum(when(col("Is_Winner"), 1).otherwise(0)).alias("Total_Winners"),
            sum(when(col("Party_Type") == "Independent", 1).otherwise(0)).alias("Independent_Candidates"),
            sum(when(col("Is_Winner") & (col("Party_Type") == "Independent"), 1).otherwise(0)).alias("Independent_Winners")
        )

    state_summary = state_summary \
        .join(state_aggregates, "State", "inner") \
        .withColumn("Total_Candidates_Per_Constituency", col("Total_Candidates_in_State") / col("Total_Constituencies")) \
        .orderBy(desc("Total_Constituencies"))

    print("✅ State Summary table created")
    return state_summary

def create_party_performance(silver_df):
    """
    Create party performance analysis table.
    """
    print("🎯 Creating Party Performance table...")
    party_performance = silver_df.groupBy("Party", "Party_Type") \
        .agg(
            count("*").alias("Total_Candidates_Contested"),
            sum(when(col("Is_Winner"), 1).otherwise(0)).alias("Seats_Won"),
            sum("Total_Votes").alias("Total_Votes_Polled"),
            mean("Total_Votes").alias("Average_Votes_Per_Candidate"),
            mean("Vote_Percentage").alias("Average_Vote_Percentage")
        ) \
        .withColumn("Win_Rate_Percentage", (col("Seats_Won") / col("Total_Candidates_Contested")) * 100) \
        .withColumn("Winner_Candidate", when(col("Seats_Won") > 0, lit("Yes")).otherwise(lit("No"))) \
        .orderBy(desc("Seats_Won"))

    print("✅ Party Performance table created")
    return party_performance

def create_vote_analysis(silver_df):
    """
    Create vote analysis table with vote share categories.
    """
    print("🗳️ Creating Vote Analysis table...")
    vote_analysis = silver_df.groupBy("State", "Party_Type", "Vote_Share_Category") \
        .agg(
            count("*").alias("Total_Candidates"),
            sum("Total_Votes").alias("Total_Votes_Polled")
        ) \
        .withColumn("Percentage_of_Total_Candidates",
                    (col("Total_Candidates") / sum("Total_Candidates").over(Window.partitionBy("State"))) * 100) \
        .orderBy("State", "Party_Type", "Vote_Share_Category")

    print("✅ Vote Analysis table created")
    return vote_analysis

def create_margin_analysis(silver_df):
    """
    Create victory margin analysis for close contests.
    """
    print("📏 Creating Margin Analysis table...")
    winners = silver_df.filter(col("Is_Winner") == True) \
        .select("State", "Constituency", "Candidate", "Party", "Total_Votes", "Vote_Percentage")

    runners_up = silver_df.filter(col("Rank_in_Constituency") == 2) \
        .select(
            col("State").alias("State_RU"),
            col("Constituency").alias("Constituency_RU"),
            col("Candidate").alias("Runner_Up_Candidate"),
            col("Party").alias("Runner_Up_Party"),
            col("Total_Votes").alias("Runner_Up_Votes"),
            col("Vote_Percentage").alias("Runner_Up_Percentage")
        )

    margin_analysis = winners \
        .join(runners_up,
              (winners.State == runners_up.State_RU) &
              (winners.Constituency == runners_up.Constituency_RU), "inner") \
        .select(
            col("State"),
            col("Constituency"),
            col("Candidate").alias("Winner"),
            col("Party").alias("Winner_Party"),
            col("Total_Votes").alias("Winner_Votes"),
            col("Vote_Percentage").alias("Winner_Percentage"),
            col("Runner_Up_Candidate"),
            col("Runner_Up_Party"),
            col("Runner_Up_Votes"),
            col("Runner_Up_Percentage"),
            (col("Total_Votes") - col("Runner_Up_Votes")).alias("Victory_Margin_Votes"),
            (col("Vote_Percentage") - col("Runner_Up_Percentage")).alias("Victory_Margin_Percentage")
        ) \
        .withColumn("Margin_Category",
                    when(col("Victory_Margin_Percentage") < 1, "Very Close (<1%)")
                    .when(col("Victory_Margin_Percentage") < 3, "Close (1-3%)")
                    .when(col("Victory_Margin_Percentage") < 5, "Moderate (3-5%)")
                    .when(col("Victory_Margin_Percentage") < 10, "Safe (5-10%)")
                    .otherwise("Comfortable (10%+)")
        )

    print("✅ Margin Analysis table created")
    return margin_analysis

def create_kpi_dashboard(silver_df):
    """
    Create executive KPI dashboard with key metrics.
    """
    print("📈 Creating KPI Dashboard...")
    overall_kpis = silver_df.agg(
        countDistinct("State").alias("Total_States"),
        countDistinct("Constituency").alias("Total_Constituencies"),
        count("*").alias("Total_Candidates"),
        countDistinct("Party").alias("Total_Parties"),
        sum("Total_Votes").alias("Total_Votes_Polled"),
        sum(when(col("Party_Type") == "Independent", 1).otherwise(0)).alias("Independent_Candidates"),
        sum(when(col("Is_Winner") & (col("Party_Type") == "Independent"), 1).otherwise(0)).alias("Independent_Winners"),
        sum(when(col("Party") == "None of the Above", col("Total_Votes")).otherwise(0)).alias("Total_NOTA_Votes")
    ).withColumn("Election_Year", lit(2024)) \
     .withColumn("Report_Generated_At", current_timestamp())

    top_parties = silver_df.filter(col("Is_Winner") == True) \
        .groupBy("Party", "Party_Type") \
        .agg(count("*").alias("Seats_Won")) \
        .orderBy(desc("Seats_Won")) \
        .limit(10) \
        .withColumn("Rank", row_number().over(Window.orderBy(desc("Seats_Won"))))

    competitive_states = silver_df.groupBy("State") \
        .agg(
            countDistinct("Constituency").alias("Constituencies"),
            (count("*") / countDistinct("Constituency")).alias("Avg_Candidates_Per_Constituency"),
            countDistinct("Party").alias("Parties_Participated")
        ) \
        .withColumn("Competition_Score",
                    col("Avg_Candidates_Per_Constituency") * col("Parties_Participated")) \
        .orderBy(desc("Competition_Score")) \
        .limit(10)

    close_contests = silver_df.filter(col("Is_Winner") == True) \
        .select("State", "Constituency", "Vote_Percentage") \
        .withColumn("Contest_Type",
                    when(col("Vote_Percentage") < 35, "Very Close")
                    .when(col("Vote_Percentage") < 45, "Close")
                    .otherwise("Comfortable")) \
        .groupBy("Contest_Type") \
        .agg(count("*").alias("Count"))

    party_type_performance = silver_df.groupBy("Party_Type") \
        .agg(
            count("*").alias("Total_Candidates"),
            sum(when(col("Is_Winner"), 1).otherwise(0)).alias("Seats_Won"),
            sum("Total_Votes").alias("Total_Votes")
        ) \
        .withColumn("Win_Rate",
                    when(col("Total_Candidates") > 0,
                         (col("Seats_Won") / col("Total_Candidates") * 100))
                    .otherwise(0)) \
        .orderBy(desc("Seats_Won"))

    print("✅ KPI Dashboard created")

    return {
        "overall_kpis": overall_kpis,
        "top_parties": top_parties,
        "competitive_states": competitive_states,
        "close_contests": close_contests,
        "party_type_performance": party_type_performance
    }

In [35]:
def save_gold_tables(tables_dict, output_path, format="csv"):
    """
    Save Gold layer tables to specified format.
    """
    print(f"💾 Saving Gold layer tables to {output_path} in {format} format...")
    os.makedirs(output_path, exist_ok=True)

    saved_tables = []

    for table_name, df in tables_dict.items():
        try:
            table_path = os.path.join(output_path, table_name)

            if format.lower() == "parquet":
                df.write.mode("overwrite").parquet(table_path)
            elif format.lower() == "csv":
                df.coalesce(1).write.mode("overwrite").option("header", "true").csv(table_path)
            else:
                raise ValueError(f"Unsupported format: {format}")

            print(f"✅ Saved {table_name} to {table_path}")
            saved_tables.append(table_name)
        except Exception as e:
            print(f"❌ Error saving {table_name}: {e}")

    print(f"\n🎉 Successfully saved {len(saved_tables)} tables!")
    return saved_tables

def create_download_zip(gold_path):
    """Create a zip file of all Gold layer tables for download"""
    import zipfile
    from pathlib import Path

    gold_path_obj = Path(gold_path)
    zip_path = "/content/election_gold_tables.zip"

    if gold_path_obj.exists():
        with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            for file_path in gold_path_obj.rglob("*.csv"):
                arcname = file_path.relative_to(gold_path_obj.parent)
                zipf.write(file_path, arcname)

        print(f"✅ Created zip file: {zip_path}")
        print(f"📁 File size: {os.path.getsize(zip_path) / (1024*1024):.2f} MB")
        return zip_path
    else:
        print("❌ Gold tables directory not found. Please run the processing steps first.")
        return None



In [36]:
import os
import sys
from datetime import datetime
import re
import findspark

# Initialize PySpark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

print("📚 All libraries imported successfully!")

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Election Gold Layer Processing") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.warehouse.dir", "/content/spark-warehouse") \
    .getOrCreate()

print("🏆 Spark Session initialized for Gold Layer Processing!")
spark.sparkContext.setLogLevel("WARN")

def load_silver_data(silver_path):
    """
    Load Silver layer data from CSV files.

    Args:
        silver_path (str): Path to silver layer CSV files.

    Returns:
        pyspark.sql.DataFrame: Silver layer DataFrame.
    """
    try:
        silver_df = spark.read.csv(silver_path, header=True, inferSchema=True)
        print(f"✅ Successfully loaded Silver data from {silver_path}")
        print(f"📊 Total records: {silver_df.count()}")
        print(f"📋 Columns: {len(silver_df.columns)}")
        return silver_df
    except Exception as e:
        print(f"❌ Error loading Silver data: {e}")
        return None

def create_constituency_summary(silver_df):
    """
    Create constituency-level summary table.
    """
    print("🏛️ Creating Constituency Summary table...")
    winners = silver_df.filter(col("Is_Winner") == True) \
        .select("State", "Constituency", "Candidate", "Party", "Party_Type",
                "Total_Votes", "Vote_Percentage")

    runners_up = silver_df.filter(col("Rank_in_Constituency") == 2) \
        .select(
            col("State").alias("State_RU"),
            col("Constituency").alias("Constituency_RU"),
            col("Candidate").alias("Runner_Up_Candidate"),
            col("Party").alias("Runner_Up_Party"),
            col("Total_Votes").alias("Runner_Up_Votes"),
            col("Vote_Percentage").alias("Runner_Up_Vote_Percentage")
        )

    constituency_stats = silver_df.groupBy("State", "Constituency") \
        .agg(
            sum("Total_Votes").alias("Total_Votes_Polled"),
            count("*").alias("Total_Candidates"),
            countDistinct("Party").alias("Total_Parties"),
            sum(when(col("Party_Type") == "Independent", 1).otherwise(0)).alias("Independent_Candidates"),
            sum(when(col("Vote_Share_Category") == "Minimal (<5%)", 1).otherwise(0)).alias("Candidates_Below_5_Percent"),
            sum(when(col("Party") == "None of the Above", col("Total_Votes")).otherwise(0)).alias("NOTA_Votes")
        )

    constituency_summary = winners \
        .join(runners_up,
              (winners.State == runners_up.State_RU) &
              (winners.Constituency == runners_up.Constituency_RU), "left") \
        .join(constituency_stats, ["State", "Constituency"], "inner") \
        .select(
            col("State"),
            col("Constituency"),
            col("Candidate").alias("Winner_Candidate"),
            col("Party").alias("Winner_Party"),
            col("Party_Type").alias("Winner_Party_Type"),
            col("Total_Votes").alias("Winner_Votes"),
            col("Vote_Percentage").alias("Winner_Vote_Percentage"),
            col("Runner_Up_Candidate"),
            col("Runner_Up_Party"),
            col("Runner_Up_Votes"),
            col("Runner_Up_Vote_Percentage"),
            (col("Total_Votes") - coalesce(col("Runner_Up_Votes"), lit(0))).alias("Victory_Margin"),
            (col("Vote_Percentage") - coalesce(col("Runner_Up_Vote_Percentage"), lit(0))).alias("Victory_Margin_Percentage"),
            col("Total_Votes_Polled"),
            col("Total_Candidates"),
            col("Total_Parties"),
            col("Independent_Candidates"),
            col("Candidates_Below_5_Percent"),
            col("NOTA_Votes"),
            (col("NOTA_Votes") / col("Total_Votes_Polled") * 100).alias("NOTA_Percentage")
        ) \
        .withColumn("Margin_Category",
                    when(col("Victory_Margin_Percentage") >= 20, "Comfortable (20%+)")
                    .when(col("Victory_Margin_Percentage") >= 10, "Safe (10-19%)")
                    .when(col("Victory_Margin_Percentage") >= 5, "Moderate (5-9%)")
                    .otherwise("Close (<5%)"))

    print("✅ Constituency Summary table created")
    return constituency_summary

def create_state_summary(silver_df):
    """
    Create state-level summary table.
    """
    print("🗺️ Creating State Summary table...")
    state_summary = silver_df.filter(col("Is_Winner") == True) \
        .groupBy("State", "Party") \
        .agg(count("*").alias("Seats_Won")) \
        .withColumn("Rank", row_number().over(Window.partitionBy("State").orderBy(desc("Seats_Won")))) \
        .filter(col("Rank") == 1) \
        .withColumnRenamed("Party", "Leading_Party_in_State") \
        .withColumnRenamed("Seats_Won", "Leading_Party_Seats_Won") \
        .drop("Rank")

    state_aggregates = silver_df.groupBy("State") \
        .agg(
            countDistinct("Constituency").alias("Total_Constituencies"),
            count("*").alias("Total_Candidates_in_State"),
            countDistinct("Party").alias("Total_Parties_in_State"),
            sum("Total_Votes").alias("Total_Votes_Polled_in_State"),
            sum(when(col("Is_Winner"), 1).otherwise(0)).alias("Total_Winners"),
            sum(when(col("Party_Type") == "Independent", 1).otherwise(0)).alias("Independent_Candidates"),
            sum(when(col("Is_Winner") & (col("Party_Type") == "Independent"), 1).otherwise(0)).alias("Independent_Winners")
        )

    state_summary = state_summary \
        .join(state_aggregates, "State", "inner") \
        .withColumn("Total_Candidates_Per_Constituency", col("Total_Candidates_in_State") / col("Total_Constituencies")) \
        .orderBy(desc("Total_Constituencies"))

    print("✅ State Summary table created")
    return state_summary

def create_party_performance(silver_df):
    """
    Create party performance analysis table.
    """
    print("🎯 Creating Party Performance table...")
    party_performance = silver_df.groupBy("Party", "Party_Type") \
        .agg(
            count("*").alias("Total_Candidates_Contested"),
            sum(when(col("Is_Winner"), 1).otherwise(0)).alias("Seats_Won"),
            sum("Total_Votes").alias("Total_Votes_Polled"),
            mean("Total_Votes").alias("Average_Votes_Per_Candidate"),
            mean("Vote_Percentage").alias("Average_Vote_Percentage")
        ) \
        .withColumn("Win_Rate_Percentage", (col("Seats_Won") / col("Total_Candidates_Contested")) * 100) \
        .withColumn("Winner_Candidate", when(col("Seats_Won") > 0, lit("Yes")).otherwise(lit("No"))) \
        .orderBy(desc("Seats_Won"))

    print("✅ Party Performance table created")
    return party_performance

def create_vote_analysis(silver_df):
    """
    Create vote analysis table with vote share categories.
    """
    print("🗳️ Creating Vote Analysis table...")
    vote_analysis = silver_df.groupBy("State", "Party_Type", "Vote_Share_Category") \
        .agg(
            count("*").alias("Total_Candidates"),
            sum("Total_Votes").alias("Total_Votes_Polled")
        ) \
        .withColumn("Percentage_of_Total_Candidates",
                    (col("Total_Candidates") / sum("Total_Candidates").over(Window.partitionBy("State"))) * 100) \
        .orderBy("State", "Party_Type", "Vote_Share_Category")

    print("✅ Vote Analysis table created")
    return vote_analysis

def create_margin_analysis(silver_df):
    """
    Create victory margin analysis for close contests.
    """
    print("📏 Creating Margin Analysis table...")
    winners = silver_df.filter(col("Is_Winner") == True) \
        .select("State", "Constituency", "Candidate", "Party", "Total_Votes", "Vote_Percentage")

    runners_up = silver_df.filter(col("Rank_in_Constituency") == 2) \
        .select(
            col("State").alias("State_RU"),
            col("Constituency").alias("Constituency_RU"),
            col("Candidate").alias("Runner_Up_Candidate"),
            col("Party").alias("Runner_Up_Party"),
            col("Total_Votes").alias("Runner_Up_Votes"),
            col("Vote_Percentage").alias("Runner_Up_Percentage")
        )

    margin_analysis = winners \
        .join(runners_up,
              (winners.State == runners_up.State_RU) &
              (winners.Constituency == runners_up.Constituency_RU), "inner") \
        .select(
            col("State"),
            col("Constituency"),
            col("Candidate").alias("Winner"),
            col("Party").alias("Winner_Party"),
            col("Total_Votes").alias("Winner_Votes"),
            col("Vote_Percentage").alias("Winner_Percentage"),
            col("Runner_Up_Candidate"),
            col("Runner_Up_Party"),
            col("Runner_Up_Votes"),
            col("Runner_Up_Percentage"),
            (col("Total_Votes") - col("Runner_Up_Votes")).alias("Victory_Margin_Votes"),
            (col("Vote_Percentage") - col("Runner_Up_Percentage")).alias("Victory_Margin_Percentage")
        ) \
        .withColumn("Margin_Category",
                    when(col("Victory_Margin_Percentage") < 1, "Very Close (<1%)")
                    .when(col("Victory_Margin_Percentage") < 3, "Close (1-3%)")
                    .when(col("Victory_Margin_Percentage") < 5, "Moderate (3-5%)")
                    .when(col("Victory_Margin_Percentage") < 10, "Safe (5-10%)")
                    .otherwise("Comfortable (10%+)")
        )

    print("✅ Margin Analysis table created")
    return margin_analysis

def create_kpi_dashboard(silver_df):
    """
    Create executive KPI dashboard with key metrics.
    """
    print("📈 Creating KPI Dashboard...")
    overall_kpis = silver_df.agg(
        countDistinct("State").alias("Total_States"),
        countDistinct("Constituency").alias("Total_Constituencies"),
        count("*").alias("Total_Candidates"),
        countDistinct("Party").alias("Total_Parties"),
        sum("Total_Votes").alias("Total_Votes_Polled"),
        sum(when(col("Party_Type") == "Independent", 1).otherwise(0)).alias("Independent_Candidates"),
        sum(when(col("Is_Winner") & (col("Party_Type") == "Independent"), 1).otherwise(0)).alias("Independent_Winners"),
        sum(when(col("Party") == "None of the Above", col("Total_Votes")).otherwise(0)).alias("Total_NOTA_Votes")
    ).withColumn("Election_Year", lit(2024)) \
     .withColumn("Report_Generated_At", current_timestamp())

    top_parties = silver_df.filter(col("Is_Winner") == True) \
        .groupBy("Party", "Party_Type") \
        .agg(count("*").alias("Seats_Won")) \
        .orderBy(desc("Seats_Won")) \
        .limit(10) \
        .withColumn("Rank", row_number().over(Window.orderBy(desc("Seats_Won"))))

    competitive_states = silver_df.groupBy("State") \
        .agg(
            countDistinct("Constituency").alias("Constituencies"),
            (count("*") / countDistinct("Constituency")).alias("Avg_Candidates_Per_Constituency"),
            countDistinct("Party").alias("Parties_Participated")
        ) \
        .withColumn("Competition_Score",
                    col("Avg_Candidates_Per_Constituency") * col("Parties_Participated")) \
        .orderBy(desc("Competition_Score")) \
        .limit(10)

    close_contests = silver_df.filter(col("Is_Winner") == True) \
        .select("State", "Constituency", "Vote_Percentage") \
        .withColumn("Contest_Type",
                    when(col("Vote_Percentage") < 35, "Very Close")
                    .when(col("Vote_Percentage") < 45, "Close")
                    .otherwise("Comfortable")) \
        .groupBy("Contest_Type") \
        .agg(count("*").alias("Count"))

    party_type_performance = silver_df.groupBy("Party_Type") \
        .agg(
            count("*").alias("Total_Candidates"),
            sum(when(col("Is_Winner"), 1).otherwise(0)).alias("Seats_Won"),
            sum("Total_Votes").alias("Total_Votes")
        ) \
        .withColumn("Win_Rate",
                    when(col("Total_Candidates") > 0,
                         (col("Seats_Won") / col("Total_Candidates") * 100))
                    .otherwise(0)) \
        .orderBy(desc("Seats_Won"))

    print("✅ KPI Dashboard created")

    return {
        "overall_kpis": overall_kpis,
        "top_parties": top_parties,
        "competitive_states": competitive_states,
        "close_contests": close_contests,
        "party_type_performance": party_type_performance
    }

📚 All libraries imported successfully!
🏆 Spark Session initialized for Gold Layer Processing!


In [37]:
# --- Main Execution Block ---

SILVER_PATH = "/content/sample_data/silver.csv"  # UPDATE THIS PATH

silver_df = load_silver_data(SILVER_PATH)

if silver_df:
    constituency_summary = create_constituency_summary(silver_df)
    state_summary = create_state_summary(silver_df)
    party_performance = create_party_performance(silver_df)
    vote_analysis = create_vote_analysis(silver_df)
    margin_analysis = create_margin_analysis(silver_df)
    kpi_dashboard = create_kpi_dashboard(silver_df)

    all_gold_tables = {
        "constituency_summary": constituency_summary,
        "state_summary": state_summary,
        "party_performance": party_performance,
        "vote_analysis": vote_analysis,
        "margin_analysis": margin_analysis,
        **kpi_dashboard  # Unpack dictionary to merge KPI tables
    }

    saved_tables = save_gold_tables(all_gold_tables, "/content/gold", "csv")

    print("\n📊 Gold Layer Processing Summary:")
    print("=" * 50)
    for table_name, df in all_gold_tables.items():
        try:
            count = df.count()
            columns = len(df.columns)
            print(f"• {table_name}: {count} rows, {columns} columns")
        except Exception as e:
            print(f"• {table_name}: Error getting stats - {e}")

    print("\n🎉 Gold Layer Processing completed successfully!")
    print("📁 All tables saved to /content/gold/ directory")

    create_download_zip("/content/gold")

    spark.stop()
    print("🛑 Spark session stopped successfully")
    print("✅ Gold Layer Processing completed!")

else:
    print("❌ Silver data not available. Please load data first and update the SILVER_PATH.")

✅ Successfully loaded Silver data from /content/sample_data/silver.csv
📊 Total records: 8902
📋 Columns: 19
🏛️ Creating Constituency Summary table...
✅ Constituency Summary table created
🗺️ Creating State Summary table...
✅ State Summary table created
🎯 Creating Party Performance table...
✅ Party Performance table created
🗳️ Creating Vote Analysis table...
✅ Vote Analysis table created
📏 Creating Margin Analysis table...
✅ Margin Analysis table created
📈 Creating KPI Dashboard...
✅ KPI Dashboard created
💾 Saving Gold layer tables to /content/gold in csv format...
✅ Saved constituency_summary to /content/gold/constituency_summary
✅ Saved state_summary to /content/gold/state_summary
✅ Saved party_performance to /content/gold/party_performance
✅ Saved vote_analysis to /content/gold/vote_analysis
✅ Saved margin_analysis to /content/gold/margin_analysis
✅ Saved overall_kpis to /content/gold/overall_kpis
✅ Saved top_parties to /content/gold/top_parties
✅ Saved competitive_states to /content/g