In [0]:

dbutils.widgets.dropdown("env", "dev", ["dev", "qa", "prod"])
env = dbutils.widgets.get("env")

bronze_schema = f"iedr_{env}_bronze"   
silver_schema = f"iedr_{env}_silver"   
platinum_schema = f"iedr_{env}_platinum"

print(f"Running in environment: {env}")
print(f"Reading from bronze schema: {bronze_schema}")
print(f"Writing to silver schema: {silver_schema}")
print(f"Platinum schema : {platinum_schema}")

Running in environment: prod
Reading from bronze schema: iedr_prod_bronze
Writing to silver schema: iedr_prod_silver
Platinum schema : iedr_prod_platinum


In [0]:
from pyspark.sql.functions import col, max as spark_max, count, lit, when, coalesce, to_timestamp

# Bronze schema
#bronze_schema = "iedr_dev_bronze"

# Create silver schema
#silver_schema = "iedr_dev_silver"
#spark.sql(f"CREATE SCHEMA IF NOT EXISTS {silver_schema}")

In [0]:
# --- Silver Feeders Table ---

u1_circuits = spark.table(f"{bronze_schema}.utility1_circuits")

silver_feeders_u1 = u1_circuits.groupBy("NYHCPV_csv_NFEEDER") \
    .agg(
        spark_max("NYHCPV_csv_NMAXHC").alias("max_hosting_capacity_mw"),
        count("*").alias("segment_count"),
        spark_max("NYHCPV_csv_FHCADATE").alias("refresh_date")  # latest date as proxy
    ) \
    .withColumn("feeder_id", col("NYHCPV_csv_NFEEDER").cast("string")) \
    .withColumn("utility_id", lit("utility1")) \
    .withColumn("source_grain", lit("segment")) \
    .select("feeder_id", "max_hosting_capacity_mw", "segment_count", "utility_id", "source_grain", "refresh_date")

u2_circuits = spark.table(f"{bronze_schema}.utility2_circuits")

silver_feeders_u2 = u2_circuits.select(
    col("Master_CDF").alias("feeder_id"),
    col("feeder_max_hc").alias("max_hosting_capacity_mw"),
    lit(1).alias("segment_count"),
    lit("utility2").alias("utility_id"),
    lit("feeder").alias("source_grain"),
    to_timestamp(
        col("hca_refresh_date"),
        "yyyy/MM/dd HH:mm:ssX"  
    ).alias("refresh_date")
)

silver_feeders = silver_feeders_u1.unionByName(silver_feeders_u2)

silver_feeders.write.mode("overwrite").format("delta").saveAsTable(f"{silver_schema}.feeders")

print(f"Rows: {silver_feeders.count():,}")
silver_feeders.show(5, truncate=False)

Rows: 2,176
+---------+-----------------------+-------------+----------+------------+-------------------+
|feeder_id|max_hosting_capacity_mw|segment_count|utility_id|source_grain|refresh_date       |
+---------+-----------------------+-------------+----------+------------+-------------------+
|1105004  |10.0                   |446          |utility1  |segment     |2022-10-15 00:00:00|
|2300303  |10.0                   |291          |utility1  |segment     |2022-10-15 00:00:00|
|1501802  |1.3                    |262          |utility1  |segment     |2022-10-15 00:00:00|
|1501601  |0.4                    |660          |utility1  |segment     |2022-10-15 00:00:00|
|1501001  |0.3                    |58           |utility1  |segment     |2022-06-30 00:00:00|
+---------+-----------------------+-------------+----------+------------+-------------------+
only showing top 5 rows


In [0]:
# --- Silver DER Records Table (Installed + Planned Unified) ---

from pyspark.sql.functions import col, lit, when, substring_index, coalesce, expr

der_schema = silver_schema

# Utility1 Installed DER
u1_installed = spark.table(f"{bronze_schema}.utility1_install_der")

silver_u1_installed = u1_installed.select(
    coalesce(
        col("ProjectCircuitID"),
        substring_index(col("ProjectCircuitID"), ",", 1)
    ).alias("feeder_id"),
    
    when(col("ProjectType").isin("RESPHOTO", "NRESPHOTO"), "Solar")
     .when(col("ProjectType") == "ESS", "EnergyStorage")
     .when(col("ProjectType") == "ComSolar", "Solar")
     .when(col("ProjectType") == "ComWind", "Wind")
     .when(col("ProjectType") == "HYDRO", "Hydro")
     .otherwise("Other").alias("der_type"),
    
    # Safe cast using SQL try_cast
    expr("try_cast(NamePlateRating AS DOUBLE)").alias("nameplate_rating_raw"),
    expr("try_cast(NamePlateRating AS DOUBLE) / 1000").alias("nameplate_rating_mw"),
    
    lit("installed").alias("status"),
    col("ProjectID").alias("project_id"),
    lit("utility1").alias("utility_id"),
    col("ingest_timestamp")
).filter(
    col("feeder_id").isNotNull()
)

# Utility1 Planned DER
u1_planned = spark.table(f"{bronze_schema}.utility1_planned_der")

silver_u1_planned = u1_planned.select(
    col("ProjectCircuitID").alias("feeder_id"),
    
    when(col("ProjectType") == "NRESPHOTO", "Solar")
     .when(col("ProjectType") == "ComSolar", "Solar")
     .when(col("ProjectType") == "ESS", "EnergyStorage")
     .when(col("ProjectType") == "ComWind", "Wind")
     .when(col("ProjectType") == "HYDRO", "Hydro")
     .otherwise("Other").alias("der_type"),
    
    expr("try_cast(NamePlateRating AS DOUBLE)").alias("nameplate_rating_raw"),
    expr("try_cast(NamePlateRating AS DOUBLE) / 1000").alias("nameplate_rating_mw"),
    
    lit("planned").alias("status"),
    col("ProjectID").alias("project_id"),
    lit("utility1").alias("utility_id"),
    col("ingest_timestamp")
).filter(
    col("feeder_id").isNotNull()
)

# Utility2 Installed DER
u2_installed = spark.table(f"{bronze_schema}.utility2_install_der")

silver_u2_installed = u2_installed.select(
    col("DER_INTERCONNECTION_LOCATION").alias("feeder_id"),
    col("DER_TYPE").alias("der_type"),
    
    expr("try_cast(DER_NAMEPLATE_RATING AS DOUBLE)").alias("nameplate_rating_mw"),
    
    lit("installed").alias("status"),
    col("DER_ID").alias("project_id"),
    lit("utility2").alias("utility_id"),
    col("ingest_timestamp")
).filter(col("feeder_id").isNotNull())

# Utility2 Planned DER
u2_planned = spark.table(f"{bronze_schema}.utility2_planned_der")

silver_u2_planned = u2_planned.select(
    col("DER_INTERCONNECTION_LOCATION").alias("feeder_id"),
    col("DER_TYPE").alias("der_type"),
    
    expr("try_cast(DER_NAMEPLATE_RATING AS DOUBLE)").alias("nameplate_rating_mw"),
    
    lit("planned").alias("status"),
    col("INTERCONNECTION_QUEUE_REQUEST_ID").alias("project_id"),
    lit("utility2").alias("utility_id"),
    col("ingest_timestamp")
).filter(col("feeder_id").isNotNull())

# Union all
silver_der_records = (
    silver_u1_installed
    .unionByName(silver_u1_planned, allowMissingColumns=True)
    .unionByName(silver_u2_installed, allowMissingColumns=True)
    .unionByName(silver_u2_planned, allowMissingColumns=True)
)

# Save
silver_der_records.write.mode("overwrite").format("delta").saveAsTable(f"{silver_schema}.der_records")

# Summary
print("Silver DER records table created")
total_rows = silver_der_records.count()
print(f"Total DER rows: {total_rows:,}")

print("\nBreakdown by utility and status:")
silver_der_records.groupBy("utility_id", "status").count().show()

print("\nSample 5 rows:")
silver_der_records.show(5, truncate=False)

print("\nSilver DER transformation complete!")

Silver DER records table created
Total DER rows: 71,909

Breakdown by utility and status:
+----------+---------+-----+
|utility_id|   status|count|
+----------+---------+-----+
|  utility1|installed|13727|
|  utility1|  planned| 1688|
|  utility2|installed|25537|
|  utility2|  planned|30957|
+----------+---------+-----+


Sample 5 rows:
+---------+--------+--------------------+-------------------+---------+----------+----------+--------------------------+
|feeder_id|der_type|nameplate_rating_raw|nameplate_rating_mw|status   |project_id|utility_id|ingest_timestamp          |
+---------+--------+--------------------+-------------------+---------+----------+----------+--------------------------+
|NULL     |Other   |0.0                 |0.0                |installed|3875      |utility1  |2026-01-20 03:34:32.112254|
|NULL     |Wind    |78400.0             |78.4               |installed|7389      |utility1  |2026-01-20 03:34:32.112254|
|NULL     |Other   |NULL                |NULL           

In [0]:
# Platinum Layer (API-ready tables)

from pyspark.sql.functions import sum as spark_sum, countDistinct

#platinum_schema = "iedr_dev_platinum"
#spark.sql(f"CREATE SCHEMA IF NOT EXISTS {platinum_schema}")

# 1. Feeder Capacity Table (with DER counts)
silver_feeders = spark.table(f"{silver_schema}.feeders")
silver_ders = spark.table(f"{silver_schema}.der_records")

# Aggregate DER counts per feeder
der_counts = silver_ders.groupBy("feeder_id").agg(
    spark_sum(when(col("status") == "installed", 1).otherwise(0)).alias("installed_der_count"),
    spark_sum(when(col("status") == "planned", 1).otherwise(0)).alias("planned_der_count"),
    countDistinct("der_type").alias("unique_der_types")
)

platinum_feeders = silver_feeders.join(
    der_counts,
    "feeder_id",
    "left_outer"
).fillna({"installed_der_count": 0, "planned_der_count": 0, "unique_der_types": 0})

platinum_feeders.write.mode("overwrite").format("delta").saveAsTable(f"{platinum_schema}.feeder_capacity")

print("Platinum feeder_capacity created")
print(f"Rows: {platinum_feeders.count():,}")
platinum_feeders.orderBy(col("max_hosting_capacity_mw").desc()).show(5, truncate=False)

# 2. DER Details Table (simple copy with some cleanup)
platinum_der_details = silver_ders.select(
    "feeder_id",
    "der_type",
    "nameplate_rating_mw",
    "status",
    "project_id",
    "utility_id",
    "ingest_timestamp"
)

platinum_der_details.write.mode("overwrite").format("delta").saveAsTable(f"{platinum_schema}.der_details")

print("\nPlatinum der_details created")
print(f"Rows: {platinum_der_details.count():,}")
platinum_der_details.show(5, truncate=False)

# 3. Data Quality Summary (as per task: refresh dates, volume, missing data)
quality_summary = spark.createDataFrame([
    {"table": "feeders", "row_count": silver_feeders.count(), "last_refresh": silver_feeders.agg(spark_max("refresh_date")).collect()[0][0]},
    {"table": "der_records", "row_count": silver_ders.count(), "last_refresh": silver_ders.agg(spark_max("ingest_timestamp")).collect()[0][0]},
    {"table": "feeder_capacity", "row_count": platinum_feeders.count(), "note": "Aggregated from silver"},
    {"table": "der_details", "row_count": platinum_der_details.count(), "note": "Detailed DER view"}
])

quality_summary.write.mode("overwrite").format("delta").saveAsTable(f"{platinum_schema}.data_quality_summary")

print("\nData Quality Summary:")
quality_summary.show(truncate=False)

print("\nPlatinum layer complete!")

Platinum feeder_capacity created
Rows: 2,176
+-----------+-----------------------+-------------+----------+------------+-------------------+-------------------+-----------------+----------------+
|feeder_id  |max_hosting_capacity_mw|segment_count|utility_id|source_grain|refresh_date       |installed_der_count|planned_der_count|unique_der_types|
+-----------+-----------------------+-------------+----------+------------+-------------------+-------------------+-----------------+----------------+
|36_14_25453|10.0                   |1            |utility2  |feeder      |2022-10-01 00:00:00|9                  |10               |1               |
|36_17_64656|10.0                   |1            |utility2  |feeder      |2022-10-01 00:00:00|18                 |29               |1               |
|36_14_29951|10.0                   |1            |utility2  |feeder      |2022-10-01 00:00:00|4                  |6                |1               |
|36_14_25458|10.0                   |1           

In [0]:


print("Test 1: Feeders with max_hosting_capacity_mw > 5 (platinum.feeder_capacity)")
api1_query = """
SELECT *
FROM iedr_dev_platinum.feeder_capacity
WHERE max_hosting_capacity_mw > 5
ORDER BY max_hosting_capacity_mw DESC
LIMIT 10
"""
spark.sql(api1_query).show(truncate=False)

print("\nTest 2: All installed + planned DER for a specific feeder (platinum.der_details) ")

feeder_example = "36_14_25453"  

api2_query = f"""
SELECT 
    feeder_id,
    der_type,
    nameplate_rating_mw,
    status,
    project_id,
    utility_id
FROM iedr_dev_platinum.der_details
WHERE feeder_id = '{feeder_example}'
ORDER BY nameplate_rating_mw DESC
"""
spark.sql(api2_query).show(truncate=False)

print("\n=== Data Quality Summary ===")
spark.table("iedr_dev_platinum.data_quality_summary").show(truncate=False)

Test 1: Feeders with max_hosting_capacity_mw > 5 (platinum.feeder_capacity)
+---------+-----------------------+-------------+----------+------------+-------------------+-------------------+-----------------+----------------+
|feeder_id|max_hosting_capacity_mw|segment_count|utility_id|source_grain|refresh_date       |installed_der_count|planned_der_count|unique_der_types|
+---------+-----------------------+-------------+----------+------------+-------------------+-------------------+-----------------+----------------+
|2307210  |10.0                   |2400         |utility1  |segment     |2022-10-15 00:00:00|84                 |4                |2               |
|1107241  |10.0                   |1785         |utility1  |segment     |2022-10-15 00:00:00|108                |12               |2               |
|1109706  |10.0                   |1415         |utility1  |segment     |2022-10-15 00:00:00|140                |12               |2               |
|1105249  |10.0               

In [0]:

from pyspark.sql.functions import max as spark_max, count

def test_feeder_aggregation():
  
    mock_data = [
        ("1001", 4.8, 10.5, "2022-10-01"),   
        ("1001", 4.8, 12.0, "2022-10-01"),   
        ("1002", 4.8, 8.0, "2022-10-02"),    
        ("1002", 4.8, 9.5, "2022-10-02"),    
        ("1001", 4.8, 7.0, "2022-10-01")     
    ]
    
    columns = ["NYHCPV_csv_NFEEDER", "NYHCPV_csv_NVOLTAGE", "NYHCPV_csv_NMAXHC", "NYHCPV_csv_FHCADATE"]
    df = spark.createDataFrame(mock_data, columns)
    
    # Your aggregation logic (copy-paste from your notebook)
    aggregated = df.groupBy("NYHCPV_csv_NFEEDER").agg(
        spark_max("NYHCPV_csv_NMAXHC").alias("max_hosting_capacity_mw"),
        count("*").alias("segment_count")
    )
    
    # Collect results
    result = aggregated.orderBy("NYHCPV_csv_NFEEDER").collect()
    
    # Assertions
    assert len(result) == 2, "Should have 2 unique feeders"
    
    feeder1 = result[0]
    assert feeder1["NYHCPV_csv_NFEEDER"] == "1001"
    assert feeder1["max_hosting_capacity_mw"] == 12.0, "Max HC for feeder1 wrong"
    assert feeder1["segment_count"] == 3, "Segment count for feeder1 wrong"
    
    feeder2 = result[1]
    assert feeder2["NYHCPV_csv_NFEEDER"] == "1002"
    assert feeder2["max_hosting_capacity_mw"] == 9.5
    assert feeder2["segment_count"] == 2
    
    print("Test 2: Feeder aggregation logic → PASSED ✅")

# Run the test
test_feeder_aggregation()

Test 2: Feeder aggregation logic → PASSED ✅
