In [2]:
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_2.13:{pyspark.__version__} pyspark-shell'

spark = SparkSession.builder \
    .appName("ElectricCarsStreaming") \
    .getOrCreate()

kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "electric_cars") \
    .option("startingOffsets", "earliest") \
    .load()

cars_df = kafka_df.selectExpr("CAST(value AS STRING)") \
    .select(split(col("value"), ",").alias("data")) \
    .select(
        col("data").getItem(0).alias("vin"),
        col("data").getItem(2).alias("city"),
        col("data").getItem(3).alias("state")
    )

query = cars_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/16 14:59:55 WARN Utils: Your hostname, Mika, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/01/16 14:59:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/m6699/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/m6699/.ivy2.5.2/cache
The jars for the packages stored in: /home/m6699/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4233b959-0080-494e-a53a-3a2cdb33c702;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;4.1.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;4.1.1 in central
	found org.apache.kafka#kafka-clients;3.9.1 in central
	found org.lz4#lz4-java;

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+------------+-----+
|       vin|        city|state|
+----------+------------+-----+
|KM8K33AGXL|     Seattle|   WA|
|1C4RJYB61N|     Bothell|   WA|
|1C4RJYD61P|      Yakima|   WA|
|5YJ3E1EA7J|    Kirkland|   WA|
|WBY7Z8C5XJ|     Olympia|   WA|
|5YJ3E1EAXL|  Marysville|   WA|
|2C4RC1N77H|        Kent|   WA|
|5YJYGDEE3L| Woodinville|   WA|
|5YJ3E1EA1J|  Coupeville|   WA|
|7SAYGDEF0P|    Bellevue|   WA|
|5YJ3E1EA7J|    Kirkland|   WA|
|3FA6P0SU9G|Port Orchard|   WA|
|JTDKARFP9H|Port Orchard|   WA|
|5YJ3E1EB8K|    Mukilteo|   WA|
|5YJ3E1EA5K|     Redmond|   WA|
|3FA6P0SU0D|   Rochester|   WA|
|WA1VABGE4K|     Seattle|   WA|
|1N4AZ0CP6F|     Seattle|   WA|
|KNDCC3LD7K|   Bremerton|   WA|
|1N4AZ0CP1E|     Poulsbo|   WA|
+----------+------------+-----+
only showing top 20 rows


In [3]:
from pyspark.sql.functions import desc

cars_data = kafka_df.selectExpr("CAST(value AS STRING)") \
    .select(split(col("value"), ",").alias("data")) \
    .select(
        col("data").getItem(2).alias("city"),      
        col("data").getItem(5).alias("model_year") 
    )

top3_cities = cars_data.filter(col("model_year") == "2023") \
    .groupBy("city") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(3) 

query_top3 = top3_cities.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

26/01/16 15:06:38 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-795b3a51-f046-4ee6-ac67-9382789a2da9. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
26/01/16 15:06:38 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
26/01/16 15:06:38 WARN MicroBatchExecution: Disabling AQE since AQE is not supported in stateful workloads.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+-----+
|    city|count|
+--------+-----+
| Seattle|11220|
|Bellevue| 4176|
| Tukwila| 3546|
+--------+-----+

