### Spark Streaming

**Spark Streaming (DStreams) :** The older, original streaming API based on DStreams (Discretized Streams). It processes data in micro-batches, but its API is somewhat separate from the DataFrame API.


**Spark Structured Streaming (Modern) :** Built on top of the Spark SQL engine and DataFrames. It treats a live data stream as a continuously appending table.

**Key concepts :**

**Continuous Table Analogy :** Structured Streaming treats incoming data streams as a table that is being continuously appended. Each record that arrives is like a new row being added to this "unbounded" table.

**Micro-batch processing :** Structured Streaming processes data in small, continuous batches.  Data is collected for a short duration (e.g., 1 second) and then processed as a mini-batch Spark job

**Fault tolerant**

**Stateless vs stateful transformations**

Stateless : These transformations operate on each micro-batch independently, without needing information from previous batches. e.g select, withColumn, filters, union

stateful : These operations require maintaining state across multiple micro-batches. e.g group by

output modes-append, update, complete

append mode only work with stateless mode, use case : logs

#### Checkpointing

Checkpointing ensures fault tolerance and Idempotency by maintaining state in a persistent location called checkpoint directory

Offset tracking : for kafka stores the offset that it has successfully processed and for file, it stores which files have been processed. So on resume it start from that offest.

Schema and Query plan : stores the logical/physical plan and schema of the dataframe.

Folder Structure :
commits : tracks if the file has been successfully processed or not
offsets : tracks the offests
sources : have files that are successfullt processed
metadata : about the query
state (if stateful opreations are used) 


#### How Checkpointing Works

Define Query: You define your streaming query (source, transformations, sink) and specify a checkpointLocation.

Start Query: When you start the query, Spark checks the checkpointLocation.
If it's empty: Spark starts the query from scratch, records initial offsets, and builds initial state (if any).
If it contains previous checkpoint data: Spark reads the latest checkpoint information, restores the query's state, and picks up processing from the recorded offsets.

Process Micro-batches:
For each micro-batch, Spark reads new data from the source.
It processes the data through transformations.
It updates the internal state (for stateful operations).
Before writing results to the sink, Spark writes the updated offsets and state to the checkpoint directory.
Only after the checkpoint data is successfully written, Spark commits the output to the sink.

Fault/Restart: If the application crashes at any point, when you restart the query with the same checkpointLocation:
Spark reads the latest complete checkpoint.
It knows precisely which data has been processed (by source offsets) and what the state was.
It resumes processing from the point of the last successful checkpoint, potentially replaying some data from the source if it was processed but not yet committed to the sink before the crash. This "replay" combined with idempotent sinks ensures exactly-once delivery.

### Reading JSON file (Batch)

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
df=spark.read.format("json")\
        .option("inferSchema",True)\
        .option("multiline","true")\
        .load("/Volumes/workspace/my-schema/my-volume/json/day1.json")
display(df)

In [0]:
df=df.select("order_id","customer.customer_id","customer.email","customer.name","customer.address.city","customer.address.country","customer.address.postal_code","payment.method","payment.transaction_id","timestamp","metadata","items")
display(df)

In [0]:
df=df.withColumn("metadata",explode_outer("metadata")).withColumn("items",explode_outer("items"))

In [0]:
df=df.select("*","items.item_id","items.price","items.product_name","items.quantity","metadata.key","metadata.value").drop("items","metadata")
display(df)

### Streaming json file

In [0]:
schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("customer", StructType([
        StructField("customer_id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("email", StringType(), True),
        StructField("address", StructType([
            StructField("city", StringType(), True),
            StructField("postal_code", StringType(), True),
            StructField("country", StringType(), True)
        ]))
    ])),
    StructField("items", ArrayType(StructType([
        StructField("item_id", StringType(), True),
        StructField("product_name", StringType(), True),
        StructField("quantity", IntegerType(), True),
        StructField("price", DoubleType(), True)
    ]))),
    StructField("payment", StructType([
        StructField("method", StringType(), True),
        StructField("transaction_id", StringType(), True)
    ])),
    StructField("metadata", ArrayType(StructType([
        StructField("key", StringType(), True),
        StructField("value", StringType(), True)
    ])))
])

In [0]:
_df=spark.readStream.format("json")\
        .option("multiline","true")\
        .schema(schema)\
        .load("/Volumes/workspace/my-schema/my-volume/json/")

In [0]:
_df=_df.select("order_id","customer.customer_id","customer.email","customer.name","customer.address.city","customer.address.country","customer.address.postal_code","payment.method","payment.transaction_id","timestamp","metadata","items")

_df=_df.withColumn("metadata",explode_outer("metadata")).withColumn("items",explode_outer("items"))

_df=_df.select("*","items.item_id","items.price","items.product_name","items.quantity","metadata.key","metadata.value").drop("items","metadata")


When we write, only the new data get transformed and stored -> Idempotency (Process data not process again). This only possible because of saved checkpoint.

In [0]:
_df.writeStream.format("delta")\
    .trigger(once=True)\
    .outputMode("append")\
    .option("path","/Volumes/workspace/my-schema/my-volume/sink/data")\
    .option("checkpointLocation","/Volumes/workspace/my-schema/my-volume/sink/checkpoint")\
    .start()

### Reading data from sink

In [0]:
%sql
SELECT * from delta.`/Volumes/workspace/my-schema/my-volume/sink/data/`

#### Triggers

It defines when the next micro batch should be executed

**Types :**

**default** - When not mention any trigger, it uses default one. In this next batch executed just after the current batch processed.

**processingTime** - It instructs spark to execute the batch after a regular user-defined interval.

.trigger(processingTime="interval string")
interval string examples: "5 seconds", "1 minute", "10 minutes", "1 hour".

**once** - instructs spark to process all currently available data from the source and then shut down.

.trigger(once=True)

**available now** - It also process all data present in the source but divide that data in micro batch. Useful when source data is large.

#### Archiving source files

When the file present in the source get processed, we generally archive that file only when a new file come and the new file should not be already processed one.

If the file is processed and file reuploaded that is processed already. Then the new file is not processed and hence the processed file remain in the source until a new file come. And the already processed file will remail there in the source like garbage.

In [0]:
 __df=spark.readStream.format("json")\
        .option("multiline","true")\
        .option("cleanSource","archive")\
        .option("SourceArchiveDir","/Volumes/workspace/my-schema/my-volume/archive")\
        .schema(schema)\
        .load("/Volumes/workspace/my-schema/my-volume/json_new/")

In [0]:
__df=__df.select("order_id","customer.customer_id","customer.email","customer.name","customer.address.city","customer.address.country","customer.address.postal_code","payment.method","payment.transaction_id","timestamp","metadata","items")

__df=__df.withColumn("metadata",explode_outer("metadata")).withColumn("items",explode_outer("items"))

__df=__df.select("*","items.item_id","items.price","items.product_name","items.quantity","metadata.key","metadata.value").drop("items","metadata")


In [0]:
__df.writeStream.format("delta")\
    .trigger(once=True)\
    .outputMode("append")\
    .option("path","/Volumes/workspace/my-schema/my-volume/sink_new/data")\
    .option("checkpointLocation","/Volumes/workspace/my-schema/my-volume/sink_new/checkpoint")\
    .start()

In [0]:
%sql
SELECT * from delta.`/Volumes/workspace/my-schema/my-volume/sink_new/data/`


### Output modes

**append** (Default for stateless): Only new rows appended to the results table since the last trigger are written to the sink. This is the default and most efficient for stateless queries.
Requirement for stateful queries: If your query is stateful (e.g., aggregation), it needs a watermark defined to use append mode efficiently.

**complete**: The entire updated result table is written to the sink after each trigger. All aggregated results (e.g., all word counts) are re-written. This is often used for aggregations where you want to see the full, current state.
Caveat: Can be inefficient for very large state, as the entire state is rewritten.

**update** (Spark 2.1+): Only rows that have been updated in the results table since the last trigger are written to the sink. This is more efficient than complete for stateful queries where only a few rows change.


### ForEachBatch

def my_func(df, bach_id):

  df.write.format()\
    .mode("append")\
    .option("path","...")\
    .save()

  df.write.format()\
    .mode("append")\
    .option("path","...")\
    .save()

df.writeStream.foreachBatch(my_func)\
    .outputMode("append")\
    .trigger(once=True)\
    .option("checkPointLocation","...")\
    .start()

### Event vs processing time

**processing time**

It refers to the time on the system that is processing the data.

**problems with this :**

**Out-of-Order Data:** Events might arrive in a different order than they occurred.
Example: Sensor A sends a reading at 9:00 AM. Sensor B sends a reading at 9:01 AM. Due to network congestion, Sensor A's reading arrives at 9:03 AM, and Sensor B's reading arrives at 9:02 AM. If you're doing a 1-minute windowed count based on processing time, Sensor B's 9:01 AM event might get counted in the 9:02-9:03 window, while Sensor A's 9:00 AM event (arriving late) might fall into the 9:03-9:04 window, leading to incorrect counts for those windows.

use only in stateless transformations and when needed low latency

**Event time**

It refers to the time the event actually occured.

Benefits :

Correct windowing, Accurate aggregations

use when needed time-based aggregations

#### Window Opearions

It allows us to aggragate data into window of time.

Types : 

Tumbling window : Fixed size non overlapping window
Sliding window : fixed size overlapping window

In [0]:
%sql
CREATE TABLE workspace.`my-schema`.myTable(
  color STRING,
  event_date TIMESTAMP
)

In [0]:
%sql
INSERT INTO workspace.`my-schema`.myTable VALUES
('red',   TIMESTAMP('2025-06-22 10:04:45'));

In [0]:
df_table=spark.readStream.table("workspace.`my-schema`.myTable")

In [0]:
df_table=df_table.withWatermark("event_date", "10 seconds")
df_table=df_table.groupBy("color",window("event_date", "5 minutes")).count()

In [0]:
df_table.writeStream.format("delta")\
    .trigger(once=True)\
    .outputMode("complete")\
    .option("checkpointLocation","/Volumes/workspace/my-schema/my-volume/table/checkpoint")\
    .option("path","/Volumes/workspace/my-schema/my-volume/table/sink")\
    .start()

In [0]:
%sql
SELECT * FROM DELTA.`/Volumes/workspace/my-schema/my-volume/table/sink/`