## **Create a simple streaming query with trigger intervals.**

In [0]:
from pyspark.sql.types import StructType, StringType, IntegerType

schema = StructType() \
    .add("name", StringType()) \
    .add("age", IntegerType())


In [0]:
dbutils.fs.mkdirs("/tmp/stream_data")


Out[2]: True

In [0]:
# Start Streaming Query with Trigger Interval
streaming_df = spark.readStream \
    .schema(schema) \
    .json("/tmp/stream_data")  # Directory to watch for new files

query = streaming_df.writeStream \
    .format("console") \
    .outputMode("append") \
    .trigger(processingTime="10 seconds") \
    .start()


In [0]:
query.status
# or use: query.lastProgress


Out[4]: {'message': 'Getting offsets from FileStreamSource[dbfs:/tmp/stream_data]',
 'isDataAvailable': False,
 'isTriggerActive': True}

In [0]:
dbutils.fs.put("/tmp/stream_data/data1.json", '{"name":"Alice", "age":30}', True)


Wrote 26 bytes.
Out[5]: True

## **Build a sample structured streaming pipeline.**

In [0]:
from pyspark.sql.types import StructType, StringType, IntegerType

schema = StructType() \
    .add("name", StringType()) \
    .add("age", IntegerType()) \
    .add("city", StringType())


In [0]:
dbutils.fs.mkdirs("/tmp/input_data/")
dbutils.fs.mkdirs("/tmp/output_data/")


Out[7]: True

In [0]:
input_stream = spark.readStream \
    .schema(schema) \
    .json("/tmp/input_data/")


In [0]:
filtered_stream = input_stream.filter("age >= 18") \
    .select("name", "city")


In [0]:
# Write to Sink (File) with Trigger
query = filtered_stream.writeStream \
    .format("json") \
    .option("path", "/tmp/output_data/") \
    .option("checkpointLocation", "/tmp/checkpoint/") \
    .outputMode("append") \
    .trigger(processingTime="5 seconds") \
    .start()


In [0]:
query.status

Out[12]: {'message': 'Waiting for next trigger',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [0]:
dbutils.fs.put("/tmp/input_data/sample1.json", '''
{"name": "Nikita", "age": 23, "city": "Jaipur"}
''', True)

dbutils.fs.put("/tmp/input_data/sample2.json", '''
{"name": "Aarav", "age": 17, "city": "Delhi"}
''', True)


Wrote 49 bytes.
Wrote 47 bytes.
Out[13]: True

In [0]:
query.stop()

## **Implement watermarking to manage late data.**

In [0]:
from pyspark.sql.types import StructType, StringType, TimestampType

schema = StructType() \
    .add("user", StringType()) \
    .add("action", StringType()) \
    .add("event_time", TimestampType())  # Event time column


In [0]:
dbutils.fs.mkdirs("/tmp/event_data/")


Out[19]: True

In [0]:
from datetime import datetime

event_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')

dbutils.fs.put("/tmp/event_data/sample1.json", f'''
{{"user": "Nikita", "action": "login", "event_time": "{event_time}"}}
''', True)

Wrote 76 bytes.
Out[20]: True

In [0]:
input_stream = spark.readStream \
    .schema(schema) \
    .json("/tmp/event_data/")


In [0]:
from pyspark.sql.functions import window

aggregated_stream = input_stream \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window("event_time", "5 minutes"),
        "action"
    ).count()

In [0]:
#Write Output to File Sink
query = aggregated_stream.writeStream \
    .format("json") \
    .option("path", "/tmp/aggregated_output/") \
    .option("checkpointLocation", "/tmp/checkpoint_watermarking/") \
    .outputMode("append") \
    .trigger(processingTime="10 seconds") \
    .start()


In [0]:
from datetime import datetime, timedelta

# On-time data (processed)
event_time_ontime = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
dbutils.fs.put("/tmp/event_data/on_time.json", f'''
{{"user": "Alice", "action": "click", "event_time": "{event_time_ontime}"}}
''', True)

# Late data (older than watermark threshold = will be dropped)
event_time_late = (datetime.now() - timedelta(minutes=15)).strftime('%Y-%m-%dT%H:%M:%S')
dbutils.fs.put("/tmp/event_data/late.json", f'''
{{"user": "Bob", "action": "click", "event_time": "{event_time_late}"}}
''', True)


Wrote 75 bytes.
Wrote 73 bytes.
Out[28]: True