# Kappa Architecture Implementation in Microsoft Fabric using PySpark

This notebook demonstrates a simplified Kappa Architecture using a publicly available streaming dataset, simulating ingestion, processing, storage, and serving in a Microsoft Fabric-like environment.

> Dataset: [NYC Taxi Trips Stream Sample](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page) - We'll simulate streaming using `readStream` on a folder with taxi trip files.

## ✅ 1. Simulate Streaming Ingestion from NYC Taxi Data

In [None]:
# Load Spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("KappaDemo").getOrCreate()

# Define source folder with streaming files (simulate stream)
source_path = "/mnt/nyc/taxi_stream_input"

# Read streaming data (simulate JSON stream)
df_stream = (
    spark.readStream
    .format("json")
    .schema("VendorID INT, tpep_pickup_datetime TIMESTAMP, tpep_dropoff_datetime TIMESTAMP, passenger_count INT, trip_distance DOUBLE, fare_amount DOUBLE")
    .load(source_path)
)

## ✅ 2. Store Raw Stream in Delta Table (Immutable Log)

In [None]:
# Store raw streaming data to Delta Lake (OneLake equivalent)
raw_output_path = "/mnt/delta/raw_nyc_taxi"

query_raw = (
    df_stream.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/mnt/delta/checkpoints/raw_nyc")
    .start(raw_output_path)
)

## ✅ 3. Process Streaming Data: Filter and Aggregate

In [None]:
# Filter for trips with distance > 2 miles
df_filtered = df_stream.filter("trip_distance > 2")

# Aggregate by hour and passenger count
from pyspark.sql.functions import window

df_agg = (
    df_filtered.groupBy(window("tpep_pickup_datetime", "1 hour"), "passenger_count")
    .count()
    .withColumnRenamed("count", "trip_count")
)

## ✅ 4. Write Processed Data to Delta Table

In [None]:
processed_output_path = "/mnt/delta/processed_nyc_taxi"

query_processed = (
    df_agg.writeStream
    .format("delta")
    .outputMode("complete")
    .option("checkpointLocation", "/mnt/delta/checkpoints/processed_nyc")
    .start(processed_output_path)
)

## ✅ 5. Serve Processed Data with SQL (Lakehouse SQL Endpoint)

In [None]:
-- Register table (if using SQL)
CREATE TABLE IF NOT EXISTS processed_nyc_taxi
USING DELTA
LOCATION '/mnt/delta/processed_nyc_taxi';

-- Query processed data
SELECT *
FROM processed_nyc_taxi
ORDER BY trip_count DESC
LIMIT 10;

## ✅ 6. Reprocessing Historical Data (Kappa Style)

In [None]:
# Reprocess from raw data with new logic
df_reprocess = spark.read.format("delta").load(raw_output_path)

# New transformation: filter by fare_amount > 50
df_reprocessed = df_reprocess.filter("fare_amount > 50")

# Show high-fare trips
df_reprocessed.select("tpep_pickup_datetime", "trip_distance", "fare_amount").show(5)

## ✅ Stop All Streaming Queries

In [None]:
for query in spark.streams.active:
    query.stop()

## ✅ Summary

- Ingested NYC taxi data as a simulated stream
- Stored raw data in Delta Lake as immutable log
- Processed and aggregated real-time data
- Queried final output for reporting
- Reprocessed historical data from log — no duplicate logic

This aligns with **Kappa Architecture principles**, implemented using **PySpark** in a **Microsoft Fabric-style setup**.