In [0]:
# Storage parameters
container_name = "iotdata"
storage_account = "vstoragedatalake"

# ** Extracted SAS token (everything after the '?' in your URL) **
sas_token = "sp=racwdlmeop&st=2025-08-16T07:32:39Z&se=2025-08-23T15:47:39Z&spr=https&sv=2024-11-04&sr=c&sig=Fei1v6hBffMNBO%2BCQIHbjXFd39%2BAbFMQz5Nnp7P2Eh4%3D"

# Configure mount with SAS
configs = {
  f"fs.azure.sas.{container_name}.{storage_account}.blob.core.windows.net": sas_token
}

# Define mount point
mount_point = "/mnt/iotdata"

# Unmount if already exists
if any(m.mountPoint == mount_point for m in dbutils.fs.mounts()):
    dbutils.fs.unmount(mount_point)

# Mount the container
dbutils.fs.mount(
  source = f"wasbs://{container_name}@{storage_account}.blob.core.windows.net",
  mount_point = mount_point,
  extra_configs = configs
)


/mnt/iotdata has been unmounted.


True

In [0]:
# Path after mount
file_path = "/mnt/iotdata/raw/iot/*/*/*.csv"

# Read into Spark DataFrame
df = spark.read.option("header", True).csv(file_path)
df.show(5)


+-------------------+--------+-------+----------+-----+-------------+--------------+------------+----+----------+--------------+--------------------+------+----------+
|           event_ts|plant_id|line_id|machine_id|shift|sensor_temp_c|vibration_mm_s|pressure_bar| rpm|energy_kwh|ambient_temp_c|ambient_humidity_pct|status|fault_code|
+-------------------+--------+-------+----------+-----+-------------+--------------+------------+----+----------+--------------+--------------------+------+----------+
|2025-08-15T08:00:00| Plant_B|     L1|     M0101|    B|        89.33|          1.96|        6.91|1457|     2.022|         28.65|                50.3|    OK|      NULL|
|2025-08-15T08:00:00| Plant_B|     L1|     M0102|    B|         77.0|          2.46|        4.82|1330|      2.59|         24.78|                62.1|    OK|      NULL|
|2025-08-15T08:00:00| Plant_B|     L1|     M0103|    B|        65.32|          2.17|        6.17|1485|     1.798|         27.03|                43.1|    OK|    

In [0]:
# --- Step 1: Widgets for Parameters ---
dbutils.widgets.text("plant_id", "", "Plant ID (leave empty for all plants)")
dbutils.widgets.text("process_date", "2025-08-15", "Process Date")

plant_id = dbutils.widgets.get("plant_id")
process_date = dbutils.widgets.get("process_date")

# --- Step 2: Define plants to process ---
all_plants = ["Plant_A", "Plant_B", "Plant_C"]

if plant_id.strip() == "":
    plants_to_process = all_plants
else:
    plants_to_process = [plant_id]

print(f"Processing plants: {plants_to_process} on date: {process_date}")

# --- Step 3: Define paths ---
mount_point = "/mnt/iotdata"
silver_base_path = f"{mount_point}/silver/iot/"

# --- Step 4: Imports and schema ---
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType, NumericType
from pyspark.sql.functions import col, count, isnan, when, to_date
import json

schema = StructType([
    StructField("event_ts", TimestampType(), True),
    StructField("plant_id", StringType(), True),
    StructField("line_id", StringType(), True),
    StructField("machine_id", StringType(), True),
    StructField("shift", StringType(), True),
    StructField("sensor_temp_c", FloatType(), True),
    StructField("vibration_mm_s", FloatType(), True),
    StructField("pressure_bar", FloatType(), True),
    StructField("rpm", IntegerType(), True),
    StructField("energy_kwh", FloatType(), True),
    StructField("ambient_temp_c", FloatType(), True),
    StructField("ambient_humidity_pct", FloatType(), True),
    StructField("status", StringType(), True),
    StructField("fault_code", StringType(), True)
])

# --- Step 5: Loop through plants ---
results = []

for plant in plants_to_process:
    raw_path = f"{mount_point}/raw/iot/plant={plant}/dt={process_date}/{plant}_{process_date}.csv"
    silver_path = f"{silver_base_path}/plant={plant}/dt={process_date}/"

    print(f"Reading raw data from: {raw_path}")
    
    # Read CSV
    df_raw = spark.read.csv(raw_path, header=True, schema=schema)
    
    # --- Robust Data Quality Checks ---
    numeric_cols = [f.name for f in df_raw.schema.fields if isinstance(f.dataType, NumericType)]
    non_numeric_cols = [f.name for f in df_raw.schema.fields if not isinstance(f.dataType, NumericType)]

    # Numeric: check nulls or NaN
    dq_numeric = df_raw.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in numeric_cols])
    # Non-numeric: check nulls only
    dq_non_numeric = df_raw.select([count(when(col(c).isNull(), c)).alias(c) for c in non_numeric_cols])
    
    # Combine DQ results
    dq_checks = dq_numeric.crossJoin(dq_non_numeric)
    print("Data Quality - Null/NaN counts per column:")
    dq_checks.show()

    # Status validation
    valid_status = ["OK", "WARN", "ALERT"]
    invalid_status_count = df_raw.filter(~col("status").isin(valid_status)).count()
    print(f"Invalid status count: {invalid_status_count}")
    
    # --- Transform / Clean Data ---
    df_clean = df_raw.fillna({
        "energy_kwh": 0.0,
        "ambient_humidity_pct": 50.0,
        "fault_code": "NA"
    }).withColumn("event_day", to_date("event_ts"))
    
    # Write to Silver layer
    df_clean.write.mode("overwrite").parquet(silver_path)
    print(f"Written cleaned data to: {silver_path}")
    
    # Append results for orchestration
    results.append({
        "plant_id": plant,
        "process_date": process_date,
        "raw_count": df_raw.count(),
        "clean_count": df_clean.count(),
        "dq_invalid_status": invalid_status_count
    })

# --- Step 6: Return JSON for orchestration ---
dbutils.notebook.exit(json.dumps(results))


In [0]:
# --- Step 0: Storage Parameters ---
container_name = "iotdata"
storage_account = "vstoragedatalake"
mount_point = "/mnt/iotdata"

# --- Step 1: Widgets for Parameters ---
dbutils.widgets.text("plant_id", "", "Plant ID (leave empty for all plants)")
dbutils.widgets.text("process_date", "2025-08-15", "Process Date")
dbutils.widgets.text("sas_token", "", "SAS Token for ADLS")  # New widget

plant_id = dbutils.widgets.get("plant_id")
process_date = dbutils.widgets.get("process_date")
sas_token = dbutils.widgets.get("sas_token")  # Get SAS token from widget

# --- Step 2: Define plants to process ---
all_plants = ["Plant_A", "Plant_B", "Plant_C"]
plants_to_process = all_plants if plant_id.strip() == "" else [plant_id]
print(f"Processing plants: {plants_to_process} on date: {process_date}")

# --- Step 3: Mount ADLS safely ---
configs = {
  f"fs.azure.sas.{container_name}.{storage_account}.blob.core.windows.net": sas_token
}

# Only mount if not mounted already
if not any(m.mountPoint == mount_point for m in dbutils.fs.mounts()):
    dbutils.fs.mount(
        source = f"wasbs://{container_name}@{storage_account}.blob.core.windows.net",
        mount_point = mount_point,
        extra_configs = configs
    )
    print(f"Mounted {mount_point} successfully.")
else:
    print(f"{mount_point} already mounted.")

# --- Step 4: Imports and schema ---
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType, NumericType
from pyspark.sql.functions import col, count, isnan, when, to_date
import json

schema = StructType([
    StructField("event_ts", TimestampType(), True),
    StructField("plant_id", StringType(), True),
    StructField("line_id", StringType(), True),
    StructField("machine_id", StringType(), True),
    StructField("shift", StringType(), True),
    StructField("sensor_temp_c", FloatType(), True),
    StructField("vibration_mm_s", FloatType(), True),
    StructField("pressure_bar", FloatType(), True),
    StructField("rpm", IntegerType(), True),
    StructField("energy_kwh", FloatType(), True),
    StructField("ambient_temp_c", FloatType(), True),
    StructField("ambient_humidity_pct", FloatType(), True),
    StructField("status", StringType(), True),
    StructField("fault_code", StringType(), True)
])

# --- Step 5: Loop through plants ---
silver_base_path = f"{mount_point}/silver/iot/"
results = []

for plant in plants_to_process:
    raw_path = f"{mount_point}/raw/iot/plant={plant}/dt={process_date}/{plant}_{process_date}.csv"
    silver_path = f"{silver_base_path}/plant={plant}/dt={process_date}/"

    print(f"Reading raw data from: {raw_path}")
    
    # Read CSV
    df_raw = spark.read.csv(raw_path, header=True, schema=schema)
    
    # --- Data Quality Checks ---
    numeric_cols = [f.name for f in df_raw.schema.fields if isinstance(f.dataType, NumericType)]
    non_numeric_cols = [f.name for f in df_raw.schema.fields if not isinstance(f.dataType, NumericType)]

    dq_numeric = df_raw.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in numeric_cols])
    dq_non_numeric = df_raw.select([count(when(col(c).isNull(), c)).alias(c) for c in non_numeric_cols])
    
    dq_checks = dq_numeric.crossJoin(dq_non_numeric)
    print("Data Quality - Null/NaN counts per column:")
    dq_checks.show()

    valid_status = ["OK", "WARN", "ALERT"]
    invalid_status_count = df_raw.filter(~col("status").isin(valid_status)).count()
    print(f"Invalid status count: {invalid_status_count}")
    
    # --- Transform / Clean Data ---
    df_clean = df_raw.fillna({
        "energy_kwh": 0.0,
        "ambient_humidity_pct": 50.0,
        "fault_code": "NA"
    }).withColumn("event_day", to_date("event_ts"))
    
    # Write to Silver layer
    df_clean.write.mode("overwrite").parquet(silver_path)
    print(f"Written cleaned data to: {silver_path}")
    
    # Append results for orchestration
    results.append({
        "plant_id": plant,
        "process_date": process_date,
        "raw_count": df_raw.count(),
        "clean_count": df_clean.count(),
        "dq_invalid_status": invalid_status_count
    })

# --- Step 6: Return JSON for orchestration ---
dbutils.notebook.exit(json.dumps(results))


Processing plants: ['Plant_A', 'Plant_B', 'Plant_C'] on date: 2025-08-15
/mnt/iotdata already mounted.
Reading raw data from: /mnt/iotdata/raw/iot/plant=Plant_A/dt=2025-08-15/Plant_A_2025-08-15.csv


[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-4739748706150695>, line 69[0m
[1;32m     66[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mReading raw data from: [39m[38;5;132;01m{[39;00mraw_path[38;5;132;01m}[39;00m[38;5;124m"[39m)
[1;32m     68[0m [38;5;66;03m# Read CSV[39;00m
[0;32m---> 69[0m df_raw [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mcsv(raw_path, header[38;5;241m=[39m[38;5;28;01mTrue[39;00m, schema[38;5;241m=[39mschema)
[1;32m     71[0m [38;5;66;03m# --- Data Quality Checks ---[39;00m
[1;32m     72[0m numeric_cols [38;5;241m=[39m [f[38;5;241m.[39mname [38;5;28;01mfor[39;00m f [38;5;129;01min[39;00m df_raw[38;5;241m.[39mschema[38;5;241m.[39mfields [38;5;28;01mif[39;00m [38;5;28misinstance[39m(f[38;5;241m.[39mdataType, NumericType)]

File [0;32m/

In [0]:
mount_point = "/mnt/iotdata"

# Check mounts
for m in dbutils.fs.mounts():
    print(m.mountPoint, m.source)

# Unmount if it exists
if any(m.mountPoint == mount_point for m in dbutils.fs.mounts()):
    dbutils.fs.unmount(mount_point)
    print(f"Unmounted {mount_point} successfully.")


/mnt/iotdata wasbs://iotdata@vstoragedatalake.blob.core.windows.net
/databricks-datasets databricks-datasets
/Volumes UnityCatalogVolumes
/databricks/mlflow-tracking databricks/mlflow-tracking
/databricks-results databricks-results
/databricks/mlflow-registry databricks/mlflow-registry
/Volume DbfsReserved
/volumes DbfsReserved
/ DatabricksRoot
/volume DbfsReserved
/mnt/iotdata has been unmounted.
Unmounted /mnt/iotdata successfully.


In [0]:
# --- Step 2: Re-mount with SAS token ---
container_name = "iotdata"
storage_account = "vstoragedatalake"
mount_point = "/mnt/iotdata"

# Your SAS token
sas_token = "sp=racwdlmeop&st=2025-08-16T07:32:39Z&se=2025-08-23T15:47:39Z&spr=https&sv=2024-11-04&sr=c&sig=Fei1v6hBffMNBO%2BCQIHbjXFd39%2BAbFMQz5Nnp7P2Eh4%3D"

configs = {
  f"fs.azure.sas.{container_name}.{storage_account}.blob.core.windows.net": sas_token
}

# Unmount if already exists
if any(m.mountPoint == mount_point for m in dbutils.fs.mounts()):
    dbutils.fs.unmount(mount_point)
    print(f"Unmounted {mount_point} successfully.")

# Mount container
dbutils.fs.mount(
    source = f"wasbs://{container_name}@{storage_account}.blob.core.windows.net",
    mount_point = mount_point,
    extra_configs = configs
)

print(f"Mounted {mount_point} successfully.")

# Test access
display(dbutils.fs.ls(f"{mount_point}/raw/iot/plant=Plant_A/dt=2025-08-15/"))


Mounted /mnt/iotdata successfully.


path,name,size,modificationTime
dbfs:/mnt/iotdata/raw/iot/plant=Plant_A/dt=2025-08-15/Plant_A_2025-08-15.csv,Plant_A_2025-08-15.csv,58135,1755327609000


In [0]:
# --- Step 0: Widgets for Parameters ---
dbutils.widgets.text("plant_id", "", "Plant ID (leave empty for all plants)")
dbutils.widgets.text("process_date", "2025-08-15", "Process Date")
dbutils.widgets.text("sas_token", "sp=racwdlmeop&st=2025-08-16T07:32:39Z&se=2025-08-23T15:47:39Z&spr=https&sv=2024-11-04&sr=c&sig=Fei1v6hBffMNBO%2BCQIHbjXFd39%2BAbFMQz5Nnp7P2Eh4%3D", "SAS Token for ADLS")


plant_id = dbutils.widgets.get("plant_id")
process_date = dbutils.widgets.get("process_date")
sas_token = dbutils.widgets.get("sas_token")

# --- Step 1: Define plants to process ---
all_plants = ["Plant_A", "Plant_B", "Plant_C"]
plants_to_process = all_plants if plant_id.strip() == "" else [plant_id]
print(f"Processing plants: {plants_to_process} on date: {process_date}")

# --- Step 2: Mount ADLS safely ---
container_name = "iotdata"
storage_account = "vstoragedatalake"
mount_point = "/mnt/iotdata"

configs = {
  f"fs.azure.sas.{container_name}.{storage_account}.blob.core.windows.net": sas_token
}

# Unmount any stale mount
if any(m.mountPoint == mount_point for m in dbutils.fs.mounts()):
    dbutils.fs.unmount(mount_point)
    print(f"Unmounted {mount_point} successfully.")

# Mount container
dbutils.fs.mount(
    source = f"wasbs://{container_name}@{storage_account}.blob.core.windows.net",
    mount_point = mount_point,
    extra_configs = configs
)
print(f"Mounted {mount_point} successfully.")

# Test access
display(dbutils.fs.ls(f"{mount_point}/raw/iot/plant=Plant_A/dt={process_date}/"))

# --- Step 3: Imports and schema ---
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType, NumericType
from pyspark.sql.functions import col, count, isnan, when, to_date
import json

schema = StructType([
    StructField("event_ts", TimestampType(), True),
    StructField("plant_id", StringType(), True),
    StructField("line_id", StringType(), True),
    StructField("machine_id", StringType(), True),
    StructField("shift", StringType(), True),
    StructField("sensor_temp_c", FloatType(), True),
    StructField("vibration_mm_s", FloatType(), True),
    StructField("pressure_bar", FloatType(), True),
    StructField("rpm", IntegerType(), True),
    StructField("energy_kwh", FloatType(), True),
    StructField("ambient_temp_c", FloatType(), True),
    StructField("ambient_humidity_pct", FloatType(), True),
    StructField("status", StringType(), True),
    StructField("fault_code", StringType(), True)
])

# --- Step 4: Loop through plants ---
silver_base_path = f"{mount_point}/silver/iot/"
results = []

for plant in plants_to_process:
    raw_path = f"{mount_point}/raw/iot/plant={plant}/dt={process_date}/{plant}_{process_date}.csv"
    silver_path = f"{silver_base_path}/plant={plant}/dt={process_date}/"

    print(f"Reading raw data from: {raw_path}")
    
    df_raw = spark.read.csv(raw_path, header=True, schema=schema)
    
    # --- Data Quality Checks ---
    numeric_cols = [f.name for f in df_raw.schema.fields if isinstance(f.dataType, NumericType)]
    non_numeric_cols = [f.name for f in df_raw.schema.fields if not isinstance(f.dataType, NumericType)]

    dq_numeric = df_raw.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in numeric_cols])
    dq_non_numeric = df_raw.select([count(when(col(c).isNull(), c)).alias(c) for c in non_numeric_cols])
    dq_checks = dq_numeric.crossJoin(dq_non_numeric)
    print("Data Quality - Null/NaN counts per column:")
    dq_checks.show()

    valid_status = ["OK", "WARN", "ALERT"]
    invalid_status_count = df_raw.filter(~col("status").isin(valid_status)).count()
    print(f"Invalid status count: {invalid_status_count}")
    
    # --- Transform / Clean Data ---
    df_clean = df_raw.fillna({
        "energy_kwh": 0.0,
        "ambient_humidity_pct": 50.0,
        "fault_code": "NA"
    }).withColumn("event_day", to_date("event_ts"))
    
    # Write to Silver layer
    df_clean.write.mode("overwrite").parquet(silver_path)
    print(f"Written cleaned data to: {silver_path}")
    
    results.append({
        "plant_id": plant,
        "process_date": process_date,
        "raw_count": df_raw.count(),
        "clean_count": df_clean.count(),
        "dq_invalid_status": invalid_status_count
    })

# --- Step 5: Return JSON for orchestration ---
dbutils.notebook.exit(json.dumps(results))


Processing plants: ['Plant_A', 'Plant_B', 'Plant_C'] on date: 2025-08-15
/mnt/iotdata has been unmounted.
Unmounted /mnt/iotdata successfully.
Mounted /mnt/iotdata successfully.


[0;31m---------------------------------------------------------------------------[0m
[0;31mExecutionError[0m                            Traceback (most recent call last)
File [0;32m<command-4739748706150698>, line 39[0m
[1;32m     36[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mMounted [39m[38;5;132;01m{[39;00mmount_point[38;5;132;01m}[39;00m[38;5;124m successfully.[39m[38;5;124m"[39m)
[1;32m     38[0m [38;5;66;03m# Test access[39;00m
[0;32m---> 39[0m display(dbutils[38;5;241m.[39mfs[38;5;241m.[39mls([38;5;124mf[39m[38;5;124m"[39m[38;5;132;01m{[39;00mmount_point[38;5;132;01m}[39;00m[38;5;124m/raw/iot/plant=Plant_A/dt=[39m[38;5;132;01m{[39;00mprocess_date[38;5;132;01m}[39;00m[38;5;124m/[39m[38;5;124m"[39m))
[1;32m     41[0m [38;5;66;03m# --- Step 3: Imports and schema ---[39;00m
[1;32m     42[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01msql[39;00m[38;5;21;01m.[39;

In [0]:
#here 1
mount_point = "/mnt/iotdata"

# 1. Unmount any existing mount
if any(m.mountPoint == mount_point for m in dbutils.fs.mounts()):
    dbutils.fs.unmount(mount_point)
    print(f"Unmounted {mount_point} successfully.")

# 2. Mount with SAS
container_name = "iotdata"
storage_account = "vstoragedatalake"
sas_token = "sp=racwdlmeop&st=2025-08-16T07:32:39Z&se=2025-08-23T15:47:39Z&spr=https&sv=2024-11-04&sr=c&sig=Fei1v6hBffMNBO%2BCQIHbjXFd39%2BAbFMQz5Nnp7P2Eh4%3D"  # paste the full SAS here

configs = {
    f"fs.azure.sas.{container_name}.{storage_account}.blob.core.windows.net": sas_token
}

dbutils.fs.mount(
    source=f"wasbs://{container_name}@{storage_account}.blob.core.windows.net",
    mount_point=mount_point,
    extra_configs=configs
)

print(f"Mounted {mount_point} successfully.")

# 3. Test access
display(dbutils.fs.ls(f"{mount_point}/raw/iot/plant=Plant_A/dt=2025-08-15/"))


/mnt/iotdata has been unmounted.
Unmounted /mnt/iotdata successfully.
Mounted /mnt/iotdata successfully.


path,name,size,modificationTime
dbfs:/mnt/iotdata/raw/iot/plant=Plant_A/dt=2025-08-15/Plant_A_2025-08-15.csv,Plant_A_2025-08-15.csv,58135,1755327609000


In [0]:
# --- Step 0: Mount ADLS (if not already mounted) ---
mount_point = "/mnt/iotdata"
container_name = "iotdata"
storage_account = "vstoragedatalake"

# Replace with your valid SAS token
sas_token = "sp=racwdlmeop&st=2025-08-16T07:32:39Z&se=2025-08-23T15:47:39Z&spr=https&sv=2024-11-04&sr=c&sig=Fei1v6hBffMNBO%2BCQIHbjXFd39%2BAbFMQz5Nnp7P2Eh4%3D"

# Unmount if exists
if any(m.mountPoint == mount_point for m in dbutils.fs.mounts()):
    dbutils.fs.unmount(mount_point)
    print(f"Unmounted {mount_point} successfully.")

# Mount
configs = {f"fs.azure.sas.{container_name}.{storage_account}.blob.core.windows.net": sas_token}

dbutils.fs.mount(
    source=f"wasbs://{container_name}@{storage_account}.blob.core.windows.net",
    mount_point=mount_point,
    extra_configs=configs
)
print(f"Mounted {mount_point} successfully.")

# --- Step 1: Widgets for Parameters ---
dbutils.widgets.text("plant_id", "", "Plant ID (leave empty for all plants)")
dbutils.widgets.text("process_date", "2025-08-15", "Process Date")

plant_id = dbutils.widgets.get("plant_id")
process_date = dbutils.widgets.get("process_date")

# --- Step 2: Define plants to process ---
all_plants = ["Plant_A", "Plant_B", "Plant_C"]

if plant_id.strip() == "":
    plants_to_process = all_plants
else:
    plants_to_process = [plant_id]

print(f"Processing plants: {plants_to_process} on date: {process_date}")

# --- Step 3: Define paths ---
silver_base_path = f"{mount_point}/silver/iot/"

# --- Step 4: Imports and schema ---
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from pyspark.sql.functions import col, count, when, to_date
import json

schema = StructType([
    StructField("event_ts", TimestampType(), True),
    StructField("plant_id", StringType(), True),
    StructField("line_id", StringType(), True),
    StructField("machine_id", StringType(), True),
    StructField("shift", StringType(), True),
    StructField("sensor_temp_c", FloatType(), True),
    StructField("vibration_mm_s", FloatType(), True),
    StructField("pressure_bar", FloatType(), True),
    StructField("rpm", IntegerType(), True),
    StructField("energy_kwh", FloatType(), True),
    StructField("ambient_temp_c", FloatType(), True),
    StructField("ambient_humidity_pct", FloatType(), True),
    StructField("status", StringType(), True),
    StructField("fault_code", StringType(), True)
])

# --- Step 5: Loop through plants ---
results = []

for plant in plants_to_process:
    raw_path = f"{mount_point}/raw/iot/plant={plant}/dt={process_date}/{plant}_{process_date}.csv"
    silver_path = f"{silver_base_path}/plant={plant}/dt={process_date}/"

    print(f"Reading raw data from: {raw_path}")
    
    # Read CSV
    df_raw = spark.read.csv(raw_path, header=True, schema=schema)
    
    # --- Data Quality Checks ---
    numeric_cols = [f.name for f in df_raw.schema.fields if isinstance(f.dataType, (FloatType, IntegerType))]
    non_numeric_cols = [f.name for f in df_raw.schema.fields if f.name not in numeric_cols]

    dq_numeric = df_raw.select([count(when(col(c).isNull(), c)).alias(c) for c in numeric_cols])
    dq_non_numeric = df_raw.select([count(when(col(c).isNull(), c)).alias(c) for c in non_numeric_cols])

    dq_checks = dq_numeric.crossJoin(dq_non_numeric)
    print("Data Quality - Null counts per column:")
    dq_checks.show()

    # Status validation
    valid_status = ["OK", "WARN", "ALERT"]
    invalid_status_count = df_raw.filter(~col("status").isin(valid_status)).count()
    print(f"Invalid status count: {invalid_status_count}")
    
    # --- Clean Data ---
    df_clean = df_raw.fillna({
        "energy_kwh": 0.0,
        "ambient_humidity_pct": 50.0,
        "fault_code": "NA"
    }).withColumn("event_day", to_date("event_ts"))
    
    # Write to Silver layer
    df_clean.write.mode("overwrite").parquet(silver_path)
    print(f"Written cleaned data to: {silver_path}")
    
    # Append results
    results.append({
        "plant_id": plant,
        "process_date": process_date,
        "raw_count": df_raw.count(),
        "clean_count": df_clean.count(),
        "dq_invalid_status": invalid_status_count
    })

# --- Step 6: Return JSON for orchestration ---
dbutils.notebook.exit(json.dumps(results))
