In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, lag, col, coalesce, lit

# Load Silver from Unity Catalog (update catalog/schema as needed)
df_silver = spark.read.format("delta").table("main.default.silver_machine_failure_clean")

print(f"Silver records: {df_silver.count()}")

# Define window per machine ordered by timestamp
w = Window.partitionBy("machine_id").orderBy("timestamp")

# Create features with proper null handling
df_gold = df_silver \
    .withColumn("temp_roll_avg", avg("temperature").over(w.rowsBetween(-5, -1))) \
    .withColumn("temp_diff", col("temperature") - lag("temperature", 1).over(w)) \
    .withColumn("pressure_roll_avg", avg("pressure").over(w.rowsBetween(-5, -1))) \
    .withColumn("vibration_roll_avg", avg("vibration").over(w.rowsBetween(-5, -1))) \
    .fillna(0, subset=["temp_diff"])

# Show feature statistics
print(f"Gold records: {df_gold.count()}")
print(f"Records with complete features: {df_gold.filter(col('temp_roll_avg').isNotNull()).count()}")
display(df_gold.select("machine_id", "timestamp", "temperature", "temp_roll_avg", "temp_diff", "is_failure").limit(10))

# Save to Unity Catalog
df_gold.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("main.default.gold_machine_features")