# Stateless Processing
Stateless processing is where each record is processed independently.

### Configure SparkSession with required dependencies

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Spark Structured Streaming Application")  \
    .config("spark.sql.shuffle.partitions", "2")    \
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")    \
    .config("spark.sql.streaming.checkpointLocation", "notebook_data/w09/checkpoints/example_01")    \
	.config('spark.jars.packages', 'io.delta:delta-spark_2.12:3.2.0')	\
	.config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')	\
	.config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')	\
    .getOrCreate()

In [None]:

# Read streaming data from a folder
input_path = "notebook_data/w09/data/input"
schema = "id INT, name STRING, age INT, event_time timestamp"

### Read Stream from CSV source

In [None]:
df = spark.readStream \
    .format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load(input_path)

In [None]:
from pyspark.sql.functions import col

### Write Stream to Console (for debugging) with filter
Console Sink

In [None]:
# Stateless operation: Filter rows where age > 18
# Console Sink supports all output modes: append / complete / update
# change the outputMode to see the differences

query = df.filter(col("age") > 18)	\
	.writeStream \
    .format("console") \
    .outputMode("append") \
    .start()

### Print Stream to Standard Output (for debugging)
Foreach Sink

In [None]:
# Define a function to process each batch
def process_batch(batch_df, batch_id):
    # Convert the Spark DataFrame to a Pandas DataFrame for easier printing
    pandas_df = batch_df.toPandas()
    print(f"Batch ID: {batch_id}")
    print(pandas_df)

In [None]:

# Use foreachBatch to apply the function to each micro-batch
# Foreach Sink supports all output modes: append / complete / update
# change the outputMode to see the differences

query_01 = df.filter(col("age") > 18).writeStream \
    .foreachBatch(process_batch) \
    .outputMode("append") \
    .start()

### Write Stream to Memory Sink (for debugging)
Memory Sink

In [None]:
# Memory Sink supports all output modes: append / complete / update
# change the outputMode to see the differences

query_02 = df.writeStream.format("memory").outputMode("update").queryName("tmp_streaming_tbl").start()

In [None]:
spark.sql("select * from tmp_streaming_tbl").show()

### Write Stream to a CSV-formatted table
File Sink

In [None]:
csv_table_appended = 'notebook_data/w09/data/output/csv_table_appended'

In [None]:
query_03 = df.writeStream.format('csv').option('header', 'true').outputMode('append').start(csv_table_appended)

In [None]:
csv_table = 'notebook_data/w09/data/output/csv_table'

In [None]:
# The following will raise error: AnalysisException: Data source csv does not support Complete output mode.
df.writeStream.format('csv').option('header', 'true').outputMode('complete').start(csv_table)

In [None]:
# The following will raise error: AnalysisException: Data source csv does not support Update output mode.
df.writeStream.format('csv').option('header', 'true').outputMode('update').start(csv_table)

### Verify CSV table using batch mode

In [None]:
df_csv = spark.read.csv(path = csv_table_appended, inferSchema = True, header = True)

In [None]:
df_csv.show()

### Write Stream to a Parquet table
File Sink

In [None]:
parquet_table = 'notebook_data/w09/data/output/parquet_table'

In [None]:
# The following will raise error: AnalysisException: Data source parquet does not support Complete output mode.
df.writeStream.format('parquet').outputMode('complete').start(parquet_table)

In [None]:
# The following will raise error: AnalysisException: Data source parquet does not support Update output mode.
df.writeStream.format('parquet').outputMode('update').start(parquet_table)

In [None]:
# However, the following should work:
query_04 = df.writeStream.format('parquet').outputMode('append').start(parquet_table)

### Verify data from the Parquet table

In [None]:
spark.read.parquet(parquet_table).show()

### Write Stream to a new Delta Table

In [None]:
delta_table = "notebook_data/w09/data/output/delta_table"

In [None]:

# Write the result to a new Delta Table
query_05 = df.writeStream.format("delta").outputMode("append").start(delta_table)

### Read the Delta table using batch mode

In [None]:
# verify the data from Delta table
df_delta = spark.read.format('delta').load(delta_table)

In [None]:
df_delta.show()

### Read the Delta table using Streaming Mode (Spark Structured Streaming)

In [None]:
df_delta_streaming = spark.readStream.format('delta').load(delta_table)

In [None]:
query_06 = df_delta_streaming.writeStream.foreachBatch(process_batch).outputMode('append').start()

### Stop Queries

In [None]:
query.stop()

In [None]:
query_01.stop()

In [None]:
query_02.stop()

In [None]:
query_03.stop()

In [None]:
query_04.stop()

In [None]:
query_05.stop()

In [None]:
query_06.stop()

### Clean up Checkpoints & Outputs

In [None]:
import os
import shutil

# Define the path to the checkpoint directory
checkpoint_dir = "notebook_data/w09/checkpoints/example_01"
output_dir = "notebook_data/w09/data/output/"
directories = [checkpoint_dir, output_dir]

# Remove the checkpoint directory if it exists
for dir in directories:
	if os.path.exists(dir):
		shutil.rmtree(dir)
		print(f"Directory {dir} cleaned up.")
	else:
		print(f"Directory {dir} does not exist.")