**Read the Equipment Sensor Streaming Data from One Lake**

In [3]:
delta_table_path = "abfss://FabricDemo@onelake.dfs.fabric.microsoft.com/Equipment_Sensor_Lakehouse.Lakehouse/Tables/Equipment-Sensor-Streaming"
equipmentSensorStreamingRaw = spark.read.format("delta").load(delta_table_path)
equipmentSensorStreamingRaw.createOrReplaceTempView("equipmentSensorStreamingRaw")
equipmentSensorStreamingRaw.show()


StatementMeta(, df7ef874-c4dd-4c9e-ba56-6e1f0ee2cef1, 5, Finished, Available, Finished)

+--------------------+--------+--------------------+-----------+--------------+--------------+-------+---+-------------+
|            sensorid|deviceid|    readingtimestamp|temperature|vibrationlevel|operatinghours| status| op|        ts_ms|
+--------------------+--------+--------------------+-----------+--------------+--------------+-------+---+-------------+
|d7d0db14-41f9-4a4...| sen_008|2024-11-09 18:16:...|      54.82|          3.34|           124| Normal|  c|1731156392899|
|71cd95a1-7f7a-419...| sen_009|2024-11-09 18:16:...|      50.78|           1.5|           226| Normal|  c|1731156392900|
|5bf32cbf-a0d2-43b...| sen_014|2024-11-10 13:46:...|      20.78|          1.36|           486| Normal|  c|1731226594595|
+--------------------+--------+--------------------+-----------+--------------+--------------+-------+---+-------------+



**Aggregate the Data to the Day Grain**

In [13]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window


windowSpec = Window.partitionBy("DEVICEID", F.col("readingtimestamp").cast("date")) \
                   .orderBy(F.col("readingtimestamp").desc())
equipmentSensorStreamingAgg = (\
    spark.table("equipmentSensorStreamingRaw")\
    .select(
        "DEVICEID",
        F.col("readingtimestamp").cast("date").alias("readingtimestamp"),
        F.last("temperature", True).over(windowSpec).alias("temperature"),
        F.last("vibrationlevel", True).over(windowSpec).alias("vibrationlevel"),
        F.avg("operatinghours").over(windowSpec).alias("operatinghours"),
        F.last("status", True).over(windowSpec).alias("status")
    )\
    .groupBy("DEVICEID", "readingtimestamp")\
    .agg(
        F.first("temperature").alias("temperature"),
        F.first("vibrationlevel").alias("vibrationlevel"),
        F.avg("operatinghours").alias("operatinghours"),
        F.first("status").alias("status")
    )\
)
equipmentSensorStreamingAgg.show()


StatementMeta(, df7ef874-c4dd-4c9e-ba56-6e1f0ee2cef1, 15, Finished, Available, Finished)

+--------+----------------+-----------+--------------+--------------+-------+
|DEVICEID|readingtimestamp|temperature|vibrationlevel|operatinghours| status|
+--------+----------------+-----------+--------------+--------------+-------+
| sen_014|      2024-11-10|      20.78|          1.36|         486.0| Normal|
+--------+----------------+-----------+--------------+--------------+-------+



**Write the aggregated data to lakehouse**

In [6]:
delta_target_path = "abfss://FabricDemo@onelake.dfs.fabric.microsoft.com/Equipment_Sensor_Lakehouse.Lakehouse/Files"
equipmentSensorStreamingAgg.write.format("delta").mode("append").saveAsTable("EQUIPMENT_SENSOR_SNAPSHOT_DAILY", path=delta_target_path)

StatementMeta(, 25f48f30-4ab7-4822-b05a-f89644ae4f50, 8, Finished, Available, Finished)