In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    current_timestamp, lit, count, sum, avg, max, min, when, col,
    year, month, dayofmonth, dayofweek, date_format, row_number, quarter, countDistinct
)
from pyspark.sql import Window
from delta.tables import DeltaTable
import pyspark.sql.functions as F

# Assuming Spark session is already created
spark = SparkSession.builder.appName("ENPPI_Gold_Dimensional").getOrCreate()

# Read Silver table
silver_df = spark.table("sicinc.silver.enppi_smart_data_inc")

# Quick schema check
print("Silver Schema:")
silver_df.printSchema()
silver_df.select("timestamp").show(5, truncate=False)

# --------------------------------------
# 1) Add time dimensions for Fact
# --------------------------------------
fact_df = silver_df \
    .withColumn("year", year("timestamp")) \
    .withColumn("month", month("timestamp")) \
    .withColumn("day", dayofmonth("timestamp")) \
    .withColumn("weekday", dayofweek("timestamp")) \
    .withColumn("date_key", date_format("timestamp", "yyyy-MM-dd"))

print("Fact DF with Time Dimensions Sample:")
fact_df.select("timestamp", "date_key", "year", "month", "day", "weekday").show(10)

Silver Schema:
root
 |-- timestamp: timestamp (nullable = true)
 |-- facility_id: long (nullable = true)
 |-- facility_name: string (nullable = true)
 |-- sensor_id: long (nullable = true)
 |-- sensor_type: string (nullable = true)
 |-- gas_type: string (nullable = true)
 |-- gas_concentration_ppm: double (nullable = true)
 |-- emission_rate_kg_h: double (nullable = true)
 |-- methane_leak_detected: boolean (nullable = true)
 |-- h2s_alert_level: long (nullable = true)
 |-- temperature_celsius: double (nullable = true)
 |-- pressure_bar: double (nullable = true)
 |-- unit_status: string (nullable = true)
 |-- maintenance_required: boolean (nullable = true)
 |-- power_consumption_kw: double (nullable = true)
 |-- wind_speed_m_s: double (nullable = true)
 |-- wind_direction_deg: double (nullable = true)
 |-- ambient_temperature_celsius: double (nullable = true)
 |-- ambient_humidity_percent: double (nullable = true)
 |-- safety_threshold_exceeded: boolean (nullable = true)
 |-- date: str

In [0]:
# --------------------------------------
# 2) Dim_Time (Incremental)
# --------------------------------------
from delta.tables import DeltaTable

dim_time_table = "sic.gold.dim_time"

# اختبر لو الجدول موجود
table_exists = spark.catalog.tableExists(dim_time_table)

# ابني dim_time من fact_df (استخدم فقط الـ dates الجديدة أو كلها لو الجدول مش موجود)
dim_time_df = fact_df.select(
    "date_key", "year", "month", "day", "weekday"
).distinct() \
    .withColumn("quarter", quarter(col("date_key"))) \
    .withColumn("month_name", date_format(col("date_key"), "MMMM")) \
    .withColumn("day_name", date_format(col("date_key"), "EEEE"))

time_window = Window.partitionBy(lit(1)).orderBy("date_key")
dim_time_df = dim_time_df.withColumn("time_sk", row_number().over(time_window))

if not table_exists:
    # Initial load
    dim_time_df.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .partitionBy("year") \
        .saveAsTable(dim_time_table)
    print(f"Dim Time initial load done: {dim_time_table}")
else:
    # Incremental merge
    dim_time_delta = DeltaTable.forName(spark, dim_time_table)
    dim_time_delta.alias("target").merge(
        dim_time_df.alias("source"),
        "target.date_key = source.date_key"
    ).whenNotMatchedInsertAll().execute()
    print(f"Dim Time incremental merge done: {dim_time_table}")

# Optimize Z-order
DeltaTable.forName(spark, dim_time_table).optimize().executeZOrderBy("date_key")

# Preview
dim_time_df.show(10)
dim_time_df.printSchema()

Dim Time incremental merge done: sic.gold.dim_time
+----------+----+-----+---+-------+-------+----------+---------+-------+
|  date_key|year|month|day|weekday|quarter|month_name| day_name|time_sk|
+----------+----+-----+---+-------+-------+----------+---------+-------+
|2025-01-01|2025|    1|  1|      4|      1|   January|Wednesday|      1|
|2025-01-02|2025|    1|  2|      5|      1|   January| Thursday|      2|
|2025-01-03|2025|    1|  3|      6|      1|   January|   Friday|      3|
|2025-01-04|2025|    1|  4|      7|      1|   January| Saturday|      4|
|2025-01-05|2025|    1|  5|      1|      1|   January|   Sunday|      5|
|2025-01-06|2025|    1|  6|      2|      1|   January|   Monday|      6|
|2025-01-07|2025|    1|  7|      3|      1|   January|  Tuesday|      7|
|2025-01-08|2025|    1|  8|      4|      1|   January|Wednesday|      8|
|2025-01-09|2025|    1|  9|      5|      1|   January| Thursday|      9|
|2025-01-10|2025|    1| 10|      6|      1|   January|   Friday|     10|


In [0]:
dim_facility_table = "sic.gold.dim_facility"
table_exists = spark.catalog.tableExists(dim_facility_table)

facility_window = Window.partitionBy(lit(1)).orderBy("facility_id")
dim_facility_df = silver_df.select(
    "facility_id", "facility_name"
).distinct() \
    .withColumn("facility_sk", row_number().over(facility_window))

if not table_exists:
    dim_facility_df.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(dim_facility_table)
    print(f"Dim Facility initial load done: {dim_facility_table}")
else:
    dim_facility_delta = DeltaTable.forName(spark, dim_facility_table)
    dim_facility_delta.alias("target").merge(
        dim_facility_df.alias("source"),
        "target.facility_id = source.facility_id"
    ).whenNotMatchedInsertAll().execute()
    print(f"Dim Facility incremental merge done: {dim_facility_table}")

dim_facility_df.show(10)

Dim Facility incremental merge done: sic.gold.dim_facility
+-----------+---------------+-----------+
|facility_id|  facility_name|facility_sk|
+-----------+---------------+-----------+
|       6997| Zohr Gas Plant|          1|
|       8020|West Nile Delta|          2|
|       9782|  Ras Gas Plant|          3|
+-----------+---------------+-----------+



In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable

# --------------------------------------
# 0) Silver incremental table
# --------------------------------------
silver_inc_df = spark.table("sicinc.silver.enppi_smart_data_inc")

# --------------------------------------
# 1) Prepare new sensors DataFrame
# --------------------------------------
new_sensors_df = silver_inc_df.select(
    "sensor_id",
    "sensor_type",
    "gas_type"
).distinct().withColumn("load_date", F.current_date())

# --------------------------------------
# 2) Dim Sensor table
# --------------------------------------
dim_sensor_table = "sic.gold.dim_sensor_inc"

if not spark.catalog.tableExists(dim_sensor_table):

    dim_sensor_df = new_sensors_df \
        .withColumn("sensor_sk", F.monotonically_increasing_id().cast("long")) \
        .withColumn("start_date", F.col("load_date")) \
        .withColumn("end_date", F.lit(None).cast("date")) \
        .withColumn("current_flag", F.lit(True)) \
        .drop("load_date")

    dim_sensor_df.write.format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(dim_sensor_table)

    print("Initial Dim Sensor (SCD Type 2) created successfully!")

else:

    dim_sensor_delta = DeltaTable.forName(spark, dim_sensor_table)
    dim_sensor_existing_df = dim_sensor_delta.toDF()
    current_active_df = dim_sensor_existing_df.filter("current_flag = true")

    # -------------------------
    # New + Changed sensors
    # -------------------------
    new_only_df = new_sensors_df.join(
        current_active_df.select("sensor_id"), "sensor_id", "left_anti"
    )

    changed_df = new_sensors_df.alias("new").join(
        current_active_df.alias("old"), "sensor_id"
    ).filter(
        (F.col("new.sensor_type") != F.col("old.sensor_type")) |
        (F.col("new.gas_type") != F.col("old.gas_type"))
    ).select(
        "new.sensor_id", "new.sensor_type", "new.gas_type", "new.load_date"
    )

    to_insert_df = new_only_df.union(changed_df)

    if to_insert_df.count() > 0:

        # generate surrogate key
        max_sk = current_active_df.agg(F.max("sensor_sk")).collect()[0][0] or 0
        window_sk = Window.orderBy("sensor_id")

        to_insert_df = to_insert_df \
            .withColumn("sensor_sk", (F.row_number().over(window_sk) + max_sk).cast("long")) \
            .withColumn("start_date", F.col("load_date")) \
            .withColumn("end_date", F.lit(None).cast("date")) \
            .withColumn("current_flag", F.lit(True)) \
            .drop("load_date")

        # ----------------------------------------
        # Reorder columns to match target schema
        # ----------------------------------------
        final_cols = [
            "sensor_sk", "sensor_id", "sensor_type", "gas_type",
            "start_date", "end_date", "current_flag"
        ]

        to_insert_df = to_insert_df.select(final_cols)

        # ----------------------------------------
        # Append with mergeSchema = true
        # ----------------------------------------
        to_insert_df.write.format("delta") \
            .mode("append") \
            .option("mergeSchema", "true") \
            .saveAsTable(dim_sensor_table)

        # expire old rows
        updated_keys = [r.sensor_id for r in to_insert_df.select("sensor_id").distinct().collect()]

        dim_sensor_delta.update(
            condition=F.col("sensor_id").isin(updated_keys) & (F.col("current_flag") == True),
            set={
                "current_flag": F.lit(False),
                "end_date": F.date_sub(F.current_date(), 1)
            }
        )

    print("Incremental SCD Type 2 update for Dim Sensor completed!")

# Preview
spark.table(dim_sensor_table).orderBy("sensor_id", F.desc("start_date")).show(10)




Incremental SCD Type 2 update for Dim Sensor completed!
+---------+------------+--------+---------+----------+----------+------------+
|sensor_id| sensor_type|gas_type|sensor_sk|start_date|  end_date|current_flag|
+---------+------------+--------+---------+----------+----------+------------+
|      100|Gas Detector|     H2S|     1101|2025-11-26|2025-11-25|       false|
|      100|Gas Detector|    VOCs|        8|2025-11-26|2025-11-25|       false|
|      100|Gas Detector|     CO2|      769|2025-11-26|2025-11-25|       false|
|      100|Gas Detector|     H2S|      377|2025-11-26|2025-11-25|       false|
|      100|Gas Detector|    VOCs|     1100|2025-11-26|2025-11-25|       false|
|      100|Gas Detector|    VOCs|     1097|2025-11-26|2025-11-25|       false|
|      100|Gas Detector|     CO2|     1099|2025-11-26|2025-11-25|       false|
|      100|Gas Detector|     CH4|     1102|2025-11-26|2025-11-25|       false|
|      100|Gas Detector|     H2S|     1098|2025-11-26|2025-11-25|       fal



In [0]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession, Window
from delta.tables import DeltaTable

# --------------------------------------
# 0) Setup & Read Tables (Fixing NameError)
# --------------------------------------
spark = SparkSession.builder.appName("ENPPI_Gold_Fact_Daily").getOrCreate()

# Read the incremental Silver table
silver_df_inc = spark.table("sicinc.silver.enppi_smart_data_inc")

# Read Dimensions
dim_time_df = spark.table("sic.gold.dim_time")
dim_facility_df = spark.table("sic.gold.dim_facility")
dim_sensor_df = spark.table("sic.gold.dim_sensor_inc")

# --------------------------------------
# 1) Prepare Silver for Aggregation
# --------------------------------------
# We need to ensure 'year', 'month', and 'date_key' exist before grouping
silver_prepared_df = silver_df_inc \
    .withColumn("date_key", F.col("date")) \
    .withColumn("year", F.year(F.col("timestamp"))) \
    .withColumn("month", F.month(F.col("timestamp")))

# --------------------------------------
# 2) Perform Daily Aggregation
# --------------------------------------
fact_daily_df = silver_prepared_df.groupBy(
    "date_key", "facility_id", "sensor_id", "year", "month"
).agg(
    F.sum("emission_rate_kg_h").alias("total_daily_emissions_kg"),
    F.avg("gas_concentration_ppm").alias("avg_daily_concentration_ppm"),
    F.avg("emission_rate_kg_h").alias("avg_daily_emission_rate_kg_h"),
    F.sum(F.when(F.col("methane_leak_detected") == True, 1).otherwise(0)).alias("methane_leak_count"),
    F.max("h2s_alert_level").alias("max_h2s_alert_level"),
    F.sum(F.when(F.col("safety_threshold_exceeded") == True, 1).otherwise(0)).alias("safety_violation_count"),
    F.sum(F.when(F.col("unit_status") == "Running", 1).otherwise(0)).alias("running_units_count"),
    F.avg("temperature_celsius").alias("avg_temperature_c"),
    F.avg("pressure_bar").alias("avg_pressure_bar"),
    F.avg("wind_speed_m_s").alias("avg_wind_speed_m_s"),
    F.avg("ambient_temperature_celsius").alias("avg_ambient_temp_c"),
    F.avg("ambient_humidity_percent").alias("avg_ambient_humidity_pct"),
    F.count("*").alias("record_count")
)

# --------------------------------------
# 3) Join with Dimensions to get Surrogate Keys
# --------------------------------------
# A. Join Fact with Facility
# Note: Using left join is safer to keep data even if dimension is slightly out of sync
fact_join_facility = fact_daily_df.alias("f").join(
    dim_facility_df.alias("dfac"), 
    F.col("f.facility_id") == F.col("dfac.facility_id"), 
    "left"
)

# B. Join Fact with Sensor (Active sensors logic)
# Note: In a daily snapshot, we ideally check if the sensor was active *on that day*, 
# but for simplicity here we join with current active sensors or use date logic.
# Below uses simple join on active sensors:
active_dim_sensor = dim_sensor_df.filter(F.col("current_flag") == True)

fact_join_sensor = fact_join_facility.join(
    active_dim_sensor.alias("dsen"), 
    F.col("f.sensor_id") == F.col("dsen.sensor_id"), 
    "left"
)

# C. Join Fact with Time
fact_with_dims_df_inc = fact_join_sensor.join(
    dim_time_df.alias("dt"), 
    F.col("f.date_key") == F.col("dt.date_key"), 
    "left"
).select(
    F.col("dt.time_sk").alias("time_key"),
    F.col("dfac.facility_sk").alias("facility_key"),
    F.col("dsen.sensor_sk").alias("sensor_key"),
    F.col("f.date_key"),
    F.col("f.facility_id"),
    F.col("f.sensor_id"),
    F.col("f.total_daily_emissions_kg"),
    F.col("f.avg_daily_concentration_ppm"),
    F.col("f.avg_daily_emission_rate_kg_h"),
    F.col("f.methane_leak_count"),
    F.col("f.max_h2s_alert_level"),
    F.col("f.safety_violation_count"),
    F.col("f.running_units_count"),
    F.col("f.avg_temperature_c"),
    F.col("f.avg_pressure_bar"),
    F.col("f.avg_wind_speed_m_s"),
    F.col("f.avg_ambient_temp_c"),
    F.col("f.avg_ambient_humidity_pct"),
    F.col("f.record_count"),
    F.col("f.year"),
    F.col("f.month")
)

# Handle potential nulls in keys if dimensions were missing
fact_with_dims_df_inc = fact_with_dims_df_inc.fillna(-1, subset=["time_key", "facility_key", "sensor_key"])

# --------------------------------------
# 4) Write/Merge into Gold
# --------------------------------------
fact_table_inc = "sicinc.gold.fact_sensor_daily_readings_inc"

if not spark.catalog.tableExists(fact_table_inc):
    print(f"Creating new table: {fact_table_inc}")
    fact_with_dims_df_inc.write.format("delta").mode("overwrite") \
        .partitionBy("year", "month") \
        .option("overwriteSchema", "true") \
        .saveAsTable(fact_table_inc)
else:
    print(f"Merging into existing table: {fact_table_inc}")
    fact_delta = DeltaTable.forName(spark, fact_table_inc)
    fact_delta.alias("target").merge(
        fact_with_dims_df_inc.alias("source"),
        "target.date_key = source.date_key AND target.sensor_id = source.sensor_id AND target.facility_id = source.facility_id"
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

# --------------------------------------
# 5) Optimize
# --------------------------------------
try:
    DeltaTable.forName(spark, fact_table_inc).optimize().executeZOrderBy("facility_key", "date_key")
except Exception as e:
    print(f"Optimization skipped (table might be too small or empty): {e}")

# --------------------------------------
# 6) Quality Check
# --------------------------------------
print("Incremental Fact Table Quality Check:")
fact_with_dims_df_inc.select(
    F.count("*").alias("total_rows"),
    F.countDistinct("time_key").alias("unique_days"),
    F.countDistinct("facility_key").alias("unique_facilities"),
    F.countDistinct("sensor_key").alias("unique_sensors")
).show()

In [0]:
fact_with_dims_df_inc.printSchema()

root
 |-- time_key: integer (nullable = false)
 |-- facility_key: integer (nullable = false)
 |-- sensor_key: long (nullable = false)
 |-- date_key: string (nullable = true)
 |-- facility_id: long (nullable = true)
 |-- sensor_id: long (nullable = true)
 |-- total_daily_emissions_kg: double (nullable = true)
 |-- avg_daily_concentration_ppm: double (nullable = true)
 |-- avg_daily_emission_rate_kg_h: double (nullable = true)
 |-- methane_leak_count: long (nullable = true)
 |-- max_h2s_alert_level: long (nullable = true)
 |-- safety_violation_count: long (nullable = true)
 |-- running_units_count: long (nullable = true)
 |-- avg_temperature_c: double (nullable = true)
 |-- avg_pressure_bar: double (nullable = true)
 |-- avg_wind_speed_m_s: double (nullable = true)
 |-- avg_ambient_temp_c: double (nullable = true)
 |-- avg_ambient_humidity_pct: double (nullable = true)
 |-- record_count: long (nullable = false)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
from delta.tables import DeltaTable

# --------------------------------------
# 0) Setup Spark Session
# --------------------------------------
spark = SparkSession.builder.appName("ENPPI_Gold_Maintenance").getOrCreate()

# --------------------------------------
# 1) Add metadata to all incremental tables
# --------------------------------------

# CORRECTED: Using 'sicinc.gold' for all tables as requested
inc_tables = [
    "sicinc.gold.dim_time",
    "sicinc.gold.dim_facility",
    "sicinc.gold.dim_sensor_inc",
    "sicinc.gold.fact_sensor_daily_readings_inc"
]

print("Starting metadata update for tables:", inc_tables)

for table in inc_tables:
    try:
        # Check if table exists before proceeding
        if spark.catalog.tableExists(table):
            print(f"Updating metadata for: {table}")
            
            temp_df = spark.table(table).withColumn("gold_ingestion_timestamp", current_timestamp())
            
            temp_df.write.format("delta") \
                .mode("overwrite") \
                .option("overwriteSchema", "true") \
                .saveAsTable(table)
            
            print(f"Successfully updated: {table}")
        else:
            print(f"WARNING: Table not found, skipping: {table}")
            
    except Exception as e:
        print(f"Error processing {table}: {str(e)}")

# Optional: ZOrder optimization (Commented out as per your request)
# try:
#     DeltaTable.forName(spark, "sicinc.gold.dim_sensor_inc").optimize().executeZOrderBy("sensor_sk")
# except Exception as e:
#     print(f"Optimization skipped: {e}")

print("Incremental Gold Dimensional Model metadata updated successfully!")

# --------------------------------------
# 2) Previews
# --------------------------------------
print("\n--- Dim Time Preview ---")
spark.table("sicinc.gold.dim_time").show(5)

print("\n--- Dim Facility Preview ---")
spark.table("sicinc.gold.dim_facility").show(5)

print("\n--- Dim Sensor Incremental Preview ---")
spark.table("sicinc.gold.dim_sensor_inc").show(5)

print("\n--- Fact Table Incremental Preview ---")
spark.table("sicinc.gold.fact_sensor_daily_readings_inc").show(5)

Starting metadata update for tables: ['sicinc.gold.dim_time', 'sicinc.gold.dim_facility', 'sicinc.gold.dim_sensor_inc', 'sicinc.gold.fact_sensor_daily_readings_inc']
Updating metadata for: sicinc.gold.fact_sensor_daily_readings_inc
Successfully updated: sicinc.gold.fact_sensor_daily_readings_inc
Incremental Gold Dimensional Model metadata updated successfully!

--- Dim Time Preview ---


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-7647759673664445>, line 56[0m
[1;32m     52[0m [38;5;66;03m# --------------------------------------[39;00m
[1;32m     53[0m [38;5;66;03m# 2) Previews[39;00m
[1;32m     54[0m [38;5;66;03m# --------------------------------------[39;00m
[1;32m     55[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;130;01m\n[39;00m[38;5;124m--- Dim Time Preview ---[39m[38;5;124m"[39m)
[0;32m---> 56[0m spark[38;5;241m.[39mtable([38;5;124m"[39m[38;5;124msicinc.gold.dim_time[39m[38;5;124m"[39m)[38;5;241m.[39mshow([38;5;241m5[39m)
[1;32m     58[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;130;01m\n[39;00m[38;5;124m--- Dim Facility Preview ---[39m[38;5;124m"[39m)
[1;32m     59[0m spark[38;5;241m.[39mtable([38;5;124m"[39m[38;5;124msicinc.gold.dim_facility[39m[38;5;124m

In [0]:
spark.catalog.listTables("sicinc.gold")


[Table(name='fact_sensor_daily_readings_inc', catalog='sicinc', namespace=['gold'], description=None, tableType='MANAGED', isTemporary=False)]