# Window operation on event time
1. This document is mostly based on the 
<a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time"> [link] </a>

2. Run the generator from data_generator.ipynb in the current folder. 

# Load SparkSession and Data

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StringType
from pyspark.sql.functions import udf
from pyspark.sql.functions import col

from pyspark.sql.functions import to_timestamp
from pyspark.sql import types

spark = SparkSession \
    .builder \
    .appName("Window Operations on Event Time") \
    .getOrCreate()

static_df = spark.read.json("./data/sample.txt")
schema = static_df.schema

df = spark.readStream.format("json").schema(schema).option("maxFilesPerTrigger", 1).load("./data/")

print(df)
static_df.show(5, truncate=False)

DataFrame[created_at: string, sentiment_level: string]
+-------------------+---------------+
|created_at         |sentiment_level|
+-------------------+---------------+
|2021-10-29T18:10:09|2              |
+-------------------+---------------+



# Window operation

In [2]:
from pyspark.sql.functions import window

windowsedCounts = df.groupBy(
    window(col(created_at), "10 seconds", "2 seconds").alias("created_at"),
    df.sentiment_level
).count()

In [3]:
numLevelBySec = windowsedCounts.groupBy(
    col("sentiment_level"),
    col("count")
).sum()

In [6]:
launch = numLevelBySec \
    .writeStream \
    .outputMode("complete") \
    .queryName("df") \
    .format("memory") \
.start()

AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;
Aggregate [sentiment_level#12, count#31L], [sentiment_level#12, count#31L, sum(count#31L) AS sum(count)#40L]
+- Aggregate [window#32, sentiment_level#12], [window#32 AS created_at#27, sentiment_level#12, count(1) AS count#31L]
   +- Filter ((cast(created_at#11 as timestamp) >= window#32.start) AND (cast(created_at#11 as timestamp) < window#32.end))
      +- Expand [List(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as double) = (cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) END + cast(0 as bigint)) - cast(5 as bigint)) * 2000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as double) = (cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) END + cast(0 as bigint)) - cast(5 as bigint)) * 2000000) + 0) + 10000000), LongType, TimestampType)), created_at#11, sentiment_level#12), List(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as double) = (cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) END + cast(1 as bigint)) - cast(5 as bigint)) * 2000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as double) = (cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) END + cast(1 as bigint)) - cast(5 as bigint)) * 2000000) + 0) + 10000000), LongType, TimestampType)), created_at#11, sentiment_level#12), List(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as double) = (cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) END + cast(2 as bigint)) - cast(5 as bigint)) * 2000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as double) = (cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) END + cast(2 as bigint)) - cast(5 as bigint)) * 2000000) + 0) + 10000000), LongType, TimestampType)), created_at#11, sentiment_level#12), List(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as double) = (cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) END + cast(3 as bigint)) - cast(5 as bigint)) * 2000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as double) = (cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) END + cast(3 as bigint)) - cast(5 as bigint)) * 2000000) + 0) + 10000000), LongType, TimestampType)), created_at#11, sentiment_level#12), List(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as double) = (cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) END + cast(4 as bigint)) - cast(5 as bigint)) * 2000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as double) = (cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(created_at#11 as timestamp), TimestampType, LongType) - 0) as double) / cast(2000000 as double))) END + cast(4 as bigint)) - cast(5 as bigint)) * 2000000) + 0) + 10000000), LongType, TimestampType)), created_at#11, sentiment_level#12)], [window#32, created_at#11, sentiment_level#12]
         +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@6d31bb30,json,List(),Some(StructType(StructField(created_at,StringType,true), StructField(sentiment_level,StringType,true))),List(),None,Map(maxFilesPerTrigger -> 1, path -> ./data/),None), FileSource[./data/], [created_at#11, sentiment_level#12]


In [None]:
import time

for _ in range(3):
    spark.sql("select * from df").sort("created_at", ascending=False).show(5, truncate=False)
    time.sleep(5)