## Incremental Append-only Bronze Table
#### When "Phase" == 20, a Cycle Ends and a New Cycle Starts

In [19]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window as W

import datetime

spark = SparkSession.builder.appName("DummyTable").getOrCreate()

schema = T.StructType(
    [
        T.StructField("DateTime", T.TimestampType(), True),
        T.StructField("Key", T.StringType(), True),
        T.StructField("Value", T.DoubleType(), True),
    ]
)

input_data = [
    (datetime.datetime(2022, 1, 1, 10, 1, 1), "Phase", 20.0),
    (datetime.datetime(2022, 1, 1, 10, 1, 2), "SensorA", 1103.2),
    (datetime.datetime(2022, 1, 1, 10, 1, 2), "SensorB", 1.3),
    (datetime.datetime(2022, 1, 1, 10, 5, 3), "Phase", 80.0),
    (datetime.datetime(2022, 1, 1, 10, 5, 3), "SensorA", 1107.7),
    (datetime.datetime(2022, 1, 1, 10, 5, 4), "SensorB", 1.4),
    (datetime.datetime(2022, 1, 1, 10, 11, 5), "Phase", 20.0),
    (datetime.datetime(2022, 1, 1, 10, 11, 6), "SensorA", 1108.1),
    (datetime.datetime(2022, 1, 1, 10, 11, 7), "SensorB", 1.6),
    (datetime.datetime(2022, 1, 1, 10, 16, 3), "Phase", 80.0),
    (datetime.datetime(2022, 1, 1, 10, 16, 3), "SensorA", 1109.3),
    (datetime.datetime(2022, 1, 1, 10, 16, 4), "SensorB", 1.5),
    (datetime.datetime(2022, 1, 1, 10, 21, 5), "Phase", 20.0),
    (datetime.datetime(2022, 1, 1, 10, 21, 6), "SensorA", 1110.1),
    (datetime.datetime(2022, 1, 1, 10, 21, 7), "SensorB", 1.7),
]

df = spark.createDataFrame(input_data, schema)
df.show()

+-------------------+-------+------+
|           DateTime|    Key| Value|
+-------------------+-------+------+
|2022-01-01 10:01:01|  Phase|  20.0|
|2022-01-01 10:01:02|SensorA|1103.2|
|2022-01-01 10:01:02|SensorB|   1.3|
|2022-01-01 10:05:03|  Phase|  80.0|
|2022-01-01 10:05:03|SensorA|1107.7|
|2022-01-01 10:05:04|SensorB|   1.4|
|2022-01-01 10:11:05|  Phase|  20.0|
|2022-01-01 10:11:06|SensorA|1108.1|
|2022-01-01 10:11:07|SensorB|   1.6|
|2022-01-01 10:16:03|  Phase|  80.0|
|2022-01-01 10:16:03|SensorA|1109.3|
|2022-01-01 10:16:04|SensorB|   1.5|
|2022-01-01 10:21:05|  Phase|  20.0|
|2022-01-01 10:21:06|SensorA|1110.1|
|2022-01-01 10:21:07|SensorB|   1.7|
+-------------------+-------+------+



## Current non-streaming approach
### Calculate "CycleCount" column, which increments by 1 every time Phase == 20.0

In [26]:
cumsum_window = W.orderBy("DateTime").rowsBetween(W.unboundedPreceding, W.currentRow)
df = (
    df
    .withColumn("CycleCount", F.when(F.col("Value") == 20.0, F.lit(1)).otherwise(F.lit(0)))
    .withColumn("CycleCount", F.sum(F.col("CycleCount")).over(cumsum_window))
)
df.show()


+-------------------+-------+------+----------+
|           DateTime|    Key| Value|CycleCount|
+-------------------+-------+------+----------+
|2022-01-01 10:01:01|  Phase|  20.0|         1|
|2022-01-01 10:01:02|SensorA|1103.2|         1|
|2022-01-01 10:01:02|SensorB|   1.3|         1|
|2022-01-01 10:05:03|  Phase|  80.0|         1|
|2022-01-01 10:05:03|SensorA|1107.7|         1|
|2022-01-01 10:05:04|SensorB|   1.4|         1|
|2022-01-01 10:11:05|  Phase|  20.0|         2|
|2022-01-01 10:11:06|SensorA|1108.1|         2|
|2022-01-01 10:11:07|SensorB|   1.6|         2|
|2022-01-01 10:16:03|  Phase|  80.0|         2|
|2022-01-01 10:16:03|SensorA|1109.3|         2|
|2022-01-01 10:16:04|SensorB|   1.5|         2|
|2022-01-01 10:21:05|  Phase|  20.0|         3|
|2022-01-01 10:21:06|SensorA|1110.1|         3|
|2022-01-01 10:21:07|SensorB|   1.7|         3|
+-------------------+-------+------+----------+



24/03/04 21:30:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/04 21:30:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/04 21:30:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/04 21:30:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/04 21:30:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [28]:
df_sensors = df.filter(F.col("Key").isin(["SensorA", "SensorB"]))
df_sensors.show()

+-------------------+-------+------+----------+
|           DateTime|    Key| Value|CycleCount|
+-------------------+-------+------+----------+
|2022-01-01 10:01:02|SensorA|1103.2|         1|
|2022-01-01 10:01:02|SensorB|   1.3|         1|
|2022-01-01 10:05:03|SensorA|1107.7|         1|
|2022-01-01 10:05:04|SensorB|   1.4|         1|
|2022-01-01 10:11:06|SensorA|1108.1|         2|
|2022-01-01 10:11:07|SensorB|   1.6|         2|
|2022-01-01 10:16:03|SensorA|1109.3|         2|
|2022-01-01 10:16:04|SensorB|   1.5|         2|
|2022-01-01 10:21:06|SensorA|1110.1|         3|
|2022-01-01 10:21:07|SensorB|   1.7|         3|
+-------------------+-------+------+----------+



24/03/04 21:31:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/04 21:31:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/04 21:31:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/04 21:31:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/04 21:31:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


### Calculate Features

In [32]:
df_features = (
    df_sensors
    .groupBy("CycleCount", "Key")
    .agg(
        F.mean(F.col("Value")).alias("Value_mean"),
        F.std(F.col("Value")).alias("Value_std")
    )
)
df_features.show()

24/03/04 21:33:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/04 21:33:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/04 21:33:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/04 21:33:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+-------+------------------+-------------------+
|CycleCount|    Key|        Value_mean|          Value_std|
+----------+-------+------------------+-------------------+
|         1|SensorA|           1105.45|  3.181980515339464|
|         1|SensorB|              1.35|0.07071067811865465|
|         2|SensorA|1108.6999999999998| 0.8485281374238892|
|         2|SensorB|              1.55|0.07071067811865482|
|         3|SensorA|            1110.1|               NULL|
|         3|SensorB|               1.7|               NULL|
+----------+-------+------------------+-------------------+

