In [0]:
# COMMAND ----------

from pyspark.sql.functions import avg, hour, to_timestamp

silver_path = "dbfs:/delta/silver/sensor_logs"
gold_path = "dbfs:/delta/gold/sensor_hourly_avg"

# Read the cleaned Silver Delta table
df_silver = spark.read.format("delta").load(silver_path)

# Convert timestamp if needed
df_silver = df_silver.withColumn("timestamp", to_timestamp("timestamp"))

# Calculate average temp/humidity by hour + location
df_gold = df_silver.withColumn("hour", hour("timestamp")) \
    .groupBy("location", "hour") \
    .agg(
        avg("temperature").alias("avg_temperature"),
        avg("humidity").alias("avg_humidity")
    ) \
    .orderBy("location", "hour")

# Save Gold Delta table
df_gold.write.format("delta").mode("overwrite").save(gold_path)

print("✅ Gold table with hourly averages created at:", gold_path)


✅ Gold table with hourly averages created at: dbfs:/delta/gold/sensor_hourly_avg


In [0]:
display(spark.read.format("delta").load(gold_path))


location,hour,avg_temperature,avg_humidity
Room_A,0,72.35609375000001,45.42554687500001
Room_B,0,71.35475409836063,45.33090163934428
Room_C,0,73.01111111111112,46.75547619047623
Room_D,0,72.27153225806454,45.69266129032256


In [0]:
display(df_gold)


location,hour,avg_temperature,avg_humidity
Room_A,0,72.35609375000001,45.42554687500001
Room_B,0,71.35475409836063,45.33090163934428
Room_C,0,73.01111111111112,46.75547619047623
Room_D,0,72.27153225806454,45.69266129032256


In [0]:
from datetime import datetime
print(f"✅ Completed step at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")


✅ Completed step at 2025-05-24 00:33:38


In [0]:
# Step 1: Load and coalesce to 1 partition
df_gold = spark.read.format("delta").load("dbfs:/delta/gold/sensor_hourly_avg")
df_gold.coalesce(1).write.mode("overwrite").option("header", True).csv("dbfs:/FileStore/gold_export/tmp")

# Step 2: Use dbutils to find the part file and rename it
files = dbutils.fs.ls("dbfs:/FileStore/gold_export/tmp")

# Find the part file
part_file = [f.path for f in files if f.name.startswith("part-") and f.name.endswith(".csv")]

# Destination path
final_path = "dbfs:/FileStore/gold_export/iot_gold_export.csv"

if part_file:
    # Delete destination if it already exists
    try:
        dbutils.fs.rm(final_path)
    except:
        pass  # File might not exist yet — ignore errors

    # Move file to desired name
    dbutils.fs.mv(part_file[0], final_path)
    print(f"✅ Exported to: {final_path}")
else:
    print("❌ No CSV file found in /gold_export/tmp")


✅ Exported to: dbfs:/FileStore/gold_export/iot_gold_export.csv


In [0]:
df_gold = spark.read.format("delta").load("dbfs:/delta/gold/sensor_hourly_avg")
print(f"Gold record count: {df_gold.count()}")
display(df_gold)



Gold record count: 4


location,hour,avg_temperature,avg_humidity
Room_A,0,72.35609375000001,45.42554687500001
Room_B,0,71.35475409836063,45.33090163934428
Room_C,0,73.01111111111112,46.75547619047623
Room_D,0,72.27153225806454,45.69266129032256
