In [19]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from pyspark.sql.types import StructType, StringType, TimestampType, IntegerType
from datetime import datetime

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [20]:
schema = StructType() \
    .add("eventId", IntegerType()) \
    .add("timestamp", TimestampType())

In [21]:
data = [
    (1, datetime(2025, 7, 18, 12, 0, 3)),
    (2, datetime(2025, 7, 18, 12, 5, 7)),
    (3, datetime(2025, 7, 18, 12, 9, 12)),
    (4, datetime(2025, 7, 18, 12, 10, 17)),
    (5, datetime(2025, 7, 18, 12, 15, 17)),
    (6, datetime(2025, 7, 18, 12, 20, 27)),
    (7, datetime(2025, 7, 18, 12, 22, 7)),
]


In [22]:
df = spark.createDataFrame(data, schema)

### 1. What if I need to count the number of events in 10 minutes interval ?

In [29]:
# SQL like option - Add a column representing window_start time

df2 = df.withColumn('hour',sf.date_trunc('hour',df.timestamp))

In [31]:
df3 = df2.withColumn('minutes', sf.minute(df.timestamp) )

In [58]:
df4 = df3.withColumn('window_start', sf.timestamp_add('minute',sf.lit(10* sf.floor(df3.minutes/10)) ,df3.hour ))

In [60]:
df4.show()

+-------+-------------------+-------------------+-------+-------------------+
|eventId|          timestamp|               hour|minutes|       window_start|
+-------+-------------------+-------------------+-------+-------------------+
|      1|2025-07-18 12:00:03|2025-07-18 12:00:00|      0|2025-07-18 12:00:00|
|      2|2025-07-18 12:05:07|2025-07-18 12:00:00|      5|2025-07-18 12:00:00|
|      3|2025-07-18 12:09:12|2025-07-18 12:00:00|      9|2025-07-18 12:00:00|
|      4|2025-07-18 12:10:17|2025-07-18 12:00:00|     10|2025-07-18 12:10:00|
|      5|2025-07-18 12:15:17|2025-07-18 12:00:00|     15|2025-07-18 12:10:00|
|      6|2025-07-18 12:20:27|2025-07-18 12:00:00|     20|2025-07-18 12:20:00|
|      7|2025-07-18 12:22:07|2025-07-18 12:00:00|     22|2025-07-18 12:20:00|
+-------+-------------------+-------------------+-------+-------------------+



In [62]:
df4.groupby('window_start').count().show()

+-------------------+-----+
|       window_start|count|
+-------------------+-----+
|2025-07-18 12:00:00|    3|
|2025-07-18 12:10:00|    2|
|2025-07-18 12:20:00|    2|
+-------------------+-----+



## Pyspark window options

1. Tumbling window - |--10 mins ---|---10 mins---|---
2. Sliding window - overlapping Tumbling window |-----10 mins----|
                                                       |---- 10 mins----|
                                                               |------10 mins ----|

3. Session windows - Arbitrary length windows - can be dynamic based on a column value
                        |-----15 mins ----|         |----2 mins----|    |-------14mins ----------|
   

In [66]:
# Pyspark window option (Tumbling window)
df2=df.groupby(sf.window(df.timestamp,'10 minutes')).count()
df2.show()

+--------------------+-----+
|              window|count|
+--------------------+-----+
|{2025-07-18 12:00...|    3|
|{2025-07-18 12:10...|    2|
|{2025-07-18 12:20...|    2|
+--------------------+-----+



In [64]:
# Sliding window with overlapping
df.groupby(sf.window(df.timestamp,'10 minutes','5 minutes')).count().show()

+--------------------+-----+
|              window|count|
+--------------------+-----+
|{2025-07-18 12:05...|    3|
|{2025-07-18 12:00...|    3|
|{2025-07-18 11:55...|    1|
|{2025-07-18 12:10...|    2|
|{2025-07-18 12:20...|    2|
|{2025-07-18 12:15...|    3|
+--------------------+-----+



In [68]:
df2.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)



In [124]:
#df = spark.createDataFrame( [["2025-7-18 12:03:10 hello world"]],"lines  String" )
df = spark.readStream.format("socket").option("host","localhost").option("port",9001).load()

25/07/19 01:06:18 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


In [138]:
df2=df.withColumn('SplitData',sf.split(df.value," ")).\
                        withColumn('TimeStamp'
                        ,sf.concat( sf.col('SplitData')[0].cast("string")                                                            
                        ,sf.lit(" ")
                        ,sf.col('SplitData')[1].cast("string")
                        )).\
                        withColumn("lineData"
                        ,sf.slice (sf.col("SplitData"),3,sf.size(sf.col("SplitData")))
                        ).\
                        withColumn("word",sf.explode(sf.col("lineData"))).\
                        withColumn("TimeStamp",sf.to_timestamp(sf.col("TimeStamp"))).\
                        select("TimeStamp","word")

In [126]:
df3=df2.groupby(sf.window(df2.TimeStamp,'10 minutes')).count()
df3.writeStream.outputMode("complete").format("console").start()

In [139]:
df2.printSchema()

root
 |-- TimeStamp: timestamp (nullable = true)
 |-- word: string (nullable = false)



In [141]:
df3=df2.withWatermark("TimeStamp", "10 minutes").groupby(sf.window(df2.TimeStamp,'10 minutes')).count()
df3.writeStream.outputMode("update").format("console").start()

25/07/19 01:14:02 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-6c624bbd-e464-4a43-8032-1bb582552099. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/07/19 01:14:02 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x7905d7b233b0>

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
|window|count|
+------+-----+
+------+-----+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+-----+
|              window|count|
+--------------------+-----+
|{2025-07-18 12:00...|    4|
+--------------------+-----+



                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+--------------------+-----+
|              window|count|
+--------------------+-----+
|{2025-07-18 12:00...|    4|
|{2025-07-18 11:50...|    3|
+--------------------+-----+



                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+--------------------+-----+
|              window|count|
+--------------------+-----+
|{2025-07-18 12:00...|    4|
|{2025-07-18 11:50...|    7|
+--------------------+-----+



                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+--------------------+-----+
|              window|count|
+--------------------+-----+
|{2025-07-18 12:00...|    4|
|{2025-07-18 11:00...|    4|
|{2025-07-18 11:50...|    7|
+--------------------+-----+



In [142]:
spark.stop()

25/07/19 01:17:19 WARN TextSocketMicroBatchStream: Stream closed by localhost:9001
25/07/19 01:17:44 WARN StateStore: Error running maintenance thread
java.lang.IllegalStateException: SparkEnv not active, cannot do maintenance on StateStores
	at org.apache.spark.sql.execution.streaming.state.StateStore$.doMaintenance(StateStore.scala:950)
	at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$startMaintenanceIfNeeded$1(StateStore.scala:924)
	at org.apache.spark.sql.execution.streaming.state.StateStore$MaintenanceTask$$anon$1.run(StateStore.scala:725)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadP