In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Create Spark Session with Kafka support
spark = SparkSession.builder \
    .appName("ElectricVehicleStreaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:4.1.1") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

:: loading settings :: url = jar:file:/Users/Rom/Documents/big%20data/wikimedia/.venv3.11/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/Rom/.ivy2.5.2/cache
The jars for the packages stored in: /Users/Rom/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e7d11ef3-dacd-48fd-ae62-98d69900ff36;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;4.1.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;4.1.1 in central
	found org.apache.kafka#kafka-clients;3.9.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.8 in central
	found org.slf4j#slf4j-api;2.0.17 in central
	found org.apache.hadoop#hadoop-client-runtime;3.4.2 in central
	found org.apache.hadoop#hadoop-client-api;3.4.2 in central
	found com.google.code.findbugs#jsr

In [2]:
# Define schema for Electric Vehicle data
ev_schema = StructType([
    StructField("City", StringType(), True),
    StructField("Model Year", StringType(), True),
    StructField("Make", StringType(), True)
])

In [3]:
# Now this will work because spark exists
df_kafka = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "electric-cars") \
    .option("startingOffsets", "earliest") \
    .load()

In [4]:
df_ev = df_kafka.select(
    from_json(col("value").cast("string"), ev_schema).alias("data")
).select("data.*")

df_2023 = df_ev.filter(col("Model Year") == 2023)

In [5]:
df_top_cities = df_2023 \
    .groupBy("City") \
    .agg(count("*").alias("ev_count")) \
    .orderBy(col("ev_count").desc()) \
    .limit(3)

In [6]:
print("TOP 3 CITIES WITH MOST ELECTRIC CARS (2023)")

query_top_cities = df_top_cities \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .option("numRows", 3) \
    .trigger(processingTime="10 seconds") \
    .start()

print("Query started!")

TOP 3 CITIES WITH MOST ELECTRIC CARS (2023)
Query started!


26/01/16 15:15:46 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/n7/b6y90pg11xvbtb3j2ydggq780000gp/T/temporary-9172be91-3090-4475-8b55-1f1d2765a023. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
26/01/16 15:15:46 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
26/01/16 15:15:46 WARN MicroBatchExecution: Disabling AQE since AQE is not supported in stateful workloads.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+--------+
|City    |ev_count|
+--------+--------+
|Seattle |11271   |
|Bellevue|4204    |
|Tukwila |3582    |
+--------+--------+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+--------+--------+
|City    |ev_count|
+--------+--------+
|Seattle |11308   |
|Bellevue|4230    |
|Tukwila |3605    |
+--------+--------+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+--------+--------+
|City    |ev_count|
+--------+--------+
|Seattle |11322   |
|Bellevue|4238    |
|Tukwila |3617    |
+--------+--------+



                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+--------+--------+
|City    |ev_count|
+--------+--------+
|Seattle |11347   |
|Bellevue|4260    |
|Tukwila |3638    |
+--------+--------+



                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+--------+--------+
|City    |ev_count|
+--------+--------+
|Seattle |11381   |
|Bellevue|4288    |
|Tukwila |3662    |
+--------+--------+



                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+--------+--------+
|City    |ev_count|
+--------+--------+
|Seattle |11410   |
|Bellevue|4313    |
|Tukwila |3679    |
+--------+--------+



                                                                                

-------------------------------------------
Batch: 6
-------------------------------------------
+--------+--------+
|City    |ev_count|
+--------+--------+
|Seattle |11443   |
|Bellevue|4342    |
|Tukwila |3699    |
+--------+--------+



                                                                                

-------------------------------------------
Batch: 7
-------------------------------------------
+--------+--------+
|City    |ev_count|
+--------+--------+
|Seattle |11474   |
|Bellevue|4364    |
|Tukwila |3723    |
+--------+--------+



In [7]:
query_top_cities.stop()
print("Query stopped")

26/01/16 15:17:00 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 8, writer: ConsoleWriter[numRows=3, truncate=false]] is aborting.
26/01/16 15:17:00 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 8, writer: ConsoleWriter[numRows=3, truncate=false]] aborted.
26/01/16 15:17:00 WARN TaskSetManager: Lost task 16.0 in stage 25.0 (TID 1633) (roms-air executor driver): TaskKilled (Stage cancelled: [SPARK_JOB_CANCELLED] Job 8 cancelled Query [id = 40cd88e2-eec5-455f-bbf1-53c27ec55bdd, runId = 9d6a294c-e38a-4244-bbcc-9c1c397b04bf] was stopped SQLSTATE: XXKDA)
26/01/16 15:17:00 WARN TaskSetManager: Lost task 20.0 in stage 25.0 (TID 1637) (roms-air executor driver): TaskKilled (Stage cancelled: [SPARK_JOB_CANCELLED] Job 8 cancelled Query [id = 40cd88e2-eec5-455f-bbf1-53c27ec55bdd, runId = 9d6a294c-e38a-4244-bbcc-9c1c397b04bf] was stopped SQLSTATE: XXKDA)
26/01/16 15:17:00 WARN TaskSetManager: Lost task 17.0 in stage 25.0 (TID 1634)

Query stopped


26/01/16 15:17:00 WARN DAGScheduler: Failed to cancel job group 9d6a294c-e38a-4244-bbcc-9c1c397b04bf. Cannot find active jobs for it.
