In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, DoubleType
import os

# 1️⃣ Create SparkSession
# No HADOOP_HOME, no winutils, no warehouse config needed. It just works.
spark = SparkSession.builder \
    .appName("ParcelTrackingStreaming") \
    .config("spark.driver.memory", "4g") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.7") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
print("Spark version:", spark.version)

# 2️⃣ Define schema (same as before)
gps_schema = StructType().add("package_id", StringType()).add("latitude", DoubleType()).add("longitude", DoubleType())
temperature_schema = StructType().add("package_id", StringType()).add("temperature", DoubleType())
battery_schema = StructType().add("package_id", StringType()).add("battery_level", DoubleType())

# 3️⃣ Read from Kafka - USE THE SPECIAL DOCKER ADDRESS
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "host.docker.internal:9092") \
    .option("subscribe", "gps,temperature,battery") \
    .option("startingOffsets", "latest") \
    .load()

# 4️⃣ Convert value (same as before)
df = df.selectExpr("topic", "CAST(value AS STRING) as value")

# 5️⃣ Parse messages (same as before)
gps_df = df.filter(col("topic") == "gps").select(from_json(col("value"), gps_schema).alias("data")).select("data.*")
temperature_df = df.filter(col("topic") == "temperature").select(from_json(col("value"), temperature_schema).alias("data")).select("data.*")
battery_df = df.filter(col("topic") == "battery").select(from_json(col("value"), battery_schema).alias("data")).select("data.*")

# 6️⃣ Write stream
# We can use simple relative paths for checkpoints. They are created inside the container's file system.
gps_query = gps_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .option("checkpointLocation", "/tmp/spark-checkpoints/gps") \
    .start()

temperature_query = temperature_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .option("checkpointLocation", "/tmp/spark-checkpoints/temperature") \
    .start()

battery_query = battery_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .option("checkpointLocation", "/tmp/spark-checkpoints/battery") \
    .start()

# 7️⃣ Await termination
print("Starting streams... waiting for termination.")
spark.streams.awaitAnyTermination()

Spark version: 3.5.7
Starting streams... waiting for termination.


StreamingQueryException: [STREAM_FAILED] Query [id = 90f5c898-d3c9-415d-b3c9-f85dcbdd113c, runId = 7c79783d-d01f-49da-963c-5309a23e8198] terminated with exception: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'