In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, LongType
from pyspark.sql.functions import to_json, struct, col  # Corrected import

In [63]:
spark = SparkSession \
        .builder \
        .appName("read-parquet-write-to-kafka") \
        .config("spark.executor.memory", "4g") \
        .config("spark.driver.memory", "4g") \
        .config("spark.executor.cores", "4") \
        .config("spark.executor.instances", "2") \
        .config("spark.sql.shuffle.partitions", "12") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2") \
        .getOrCreate()

In [64]:
df = spark.read.format("csv").option("header", "true").load("work/output/data/chunk_0.csv")

In [65]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- SPEED: string (nullable = true)
 |-- TRAVEL_TIME: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- DATA_AS_OF: string (nullable = true)
 |-- LINK_ID: string (nullable = true)
 |-- LINK_POINTS: string (nullable = true)
 |-- ENCODED_POLY_LINE: string (nullable = true)
 |-- ENCODED_POLY_LINE_LVLS: string (nullable = true)
 |-- OWNER: string (nullable = true)
 |-- TRANSCOM_ID: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- LINK_NAME: string (nullable = true)



In [66]:
SCHEMA = StructType([
    StructField("ID", StringType()),
    StructField("SPEED", StringType()),
    StructField("TRAVEL_TIME", StringType()),
    StructField("STATUS", StringType()),
    StructField("DATA_AS_OF", StringType()),
    StructField("LINK_ID", StringType()),
    StructField("LINK_POINTS", StringType()),
    StructField("ENCODED_POLY_LINE", StringType()),
    StructField("ENCODED_POLY_LINE_LVLS", StringType()),
    StructField("OWNER", StringType()),
    StructField("TRANSCOM_ID", StringType()),
    StructField("BOROUGH", StringType()),
    StructField("LINK_NAME", StringType())
])

In [67]:
# Read the parquet file write it to the topic
# We need to specify the schema in the stream
# and also convert the entries to the format key, value
parquet_input_path = "work/output/parquet/" #"work/output/parquet/*/*.parquet"  # Update this path to where your Parquet files are stored
df = spark.read.parquet(parquet_input_path)

# Display schema to verify the structure
df.take(1)

AnalysisException: [UNABLE_TO_INFER_SCHEMA] Unable to infer schema for Parquet. It must be specified manually.

In [68]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- SPEED: string (nullable = true)
 |-- TRAVEL_TIME: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- DATA_AS_OF: string (nullable = true)
 |-- LINK_ID: string (nullable = true)
 |-- LINK_POINTS: string (nullable = true)
 |-- ENCODED_POLY_LINE: string (nullable = true)
 |-- ENCODED_POLY_LINE_LVLS: string (nullable = true)
 |-- OWNER: string (nullable = true)
 |-- TRANSCOM_ID: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- LINK_NAME: string (nullable = true)



In [69]:
#df = spark.read.schema(SCHEMA).parquet(parquet_input_path)

In [70]:
#df.createOrReplaceTempView("traffic_data")

In [71]:
# kafka_df_traffic_stream = spark.sql(
#     """
#     select 
#         id as key,
#         to_json(struct(*)) as value
#     from traffic_data
#     """
# )

# kafka_df1 = df.selectExpr(
#     "ID as key",
#     "to_json(struct(*)) as value"
# )

In [72]:
#kafka_df.take(1)

In [73]:
#kafka_df1.take(1)

In [74]:
df_traffic_stream = spark.readStream \
    .format("parquet") \
    .option("maxFilesPerTrigger", 5) \
    .schema(SCHEMA) \
    .load(parquet_input_path) \
    .withColumn("value", to_json(struct(col("*")))) \
    .withColumn("key", col("ID").cast("string")) \
    #.limit(10)
    # .withColumn("value", to_json(struct([col(c) for c in df_traffic_stream.columns] ))) \
    # .withColumn("key", col("ID").cast("string")) \
    

In [75]:
# Convert DataFrame to JSON format required for Kafka
# df_traffic_stream = df_traffic_stream \
#     .withColumn("value", to_json(struct([col(c) for c in df_traffic_stream.columns]))) \
#     .withColumn("key", col("ID").cast("string"))

In [76]:
#json_df = df_traffic_stream.selectExpr("to_json(struct(*)) as value")

In [77]:
# query = df_traffic_stream.selectExpr("cast(key as string)", "cast(value as string)").writeStream \
#     .format("console")\
#     .outputMode("append")\
#     .option("truncate", "false") \
#     .start()

In [78]:
query = df_traffic_stream \
    .selectExpr("cast(key as string)", "cast(value as string)") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "traffic_sensor") \
    .option("checkpointLocation", "/tmp/spark-kafka-checkpoint") \
    .trigger(processingTime='5 seconds') \
    .start()

In [79]:
query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [16]:
#query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [19]:
# now checked read and attaching of schema. but since we want to simulate streaming of data, we will create readstream
df_traffic_stream = spark.readStream.format("parquet")\
    .schema(SCHEMA)\
    .load(parquet_input_path)\
    .withColumn("value", F.to_json(F.struct(F.col("*")))) \
    .withColumn("key", F.col("ID")) \
    .limit(400000)

In [22]:
df_traffic_stream \
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .option("truncate", "false") \
    .start() \
    .awaitTermination()

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/site-packages/IPython/core/interactiveshell.py", line 3526, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_13410/2929586154.py", line 8, in <module>
    .awaitTermination()
     ^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/sql/streaming/query.py", line 221, in awaitTermination
    return self._jsq.awaitTermination()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/errors/exceptions/captured.py", line 185, in deco
    raise converted from None
pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 11a40d4b-9ca7-4c5b-91f1-dfb0672e94f9, runId = 53152a54-d11b-4dc8-bde8-11364e7ff61e] ter

StreamingQueryException: [STREAM_FAILED] Query [id = 11a40d4b-9ca7-4c5b-91f1-dfb0672e94f9, runId = 53152a54-d11b-4dc8-bde8-11364e7ff61e] terminated with exception: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 1815) (c661253c945d executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
	at org.apache.spark.serializer.SerializerHelper$.$anonfun$serializeToChunkedBuffer$1(SerializerHelper.scala:40)
	at org.apache.spark.serializer.SerializerHelper$.$anonfun$serializeToChunkedBuffer$1$adapted(SerializerHelper.scala:40)
	at org.apache.spark.serializer.SerializerHelper$$$Lambda$2795/0x00007feb7cdfb7d0.apply(Unknown Source)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1862)
	at java.base/java.io.ObjectOutputStream.write(ObjectOutputStream.java:714)
	at org.apache.spark.util.Utils$.$anonfun$writeByteBuffer$1(Utils.scala:209)
	at org.apache.spark.util.Utils$.$anonfun$writeByteBuffer$1$adapted(Utils.scala:209)
	at org.apache.spark.util.Utils$$$Lambda$2798/0x00007feb7cdff0e8.apply(Unknown Source)
	at org.apache.spark.util.Utils$.writeByteBufferImpl(Utils.scala:187)
	at org.apache.spark.util.Utils$.writeByteBuffer(Utils.scala:209)
	at org.apache.spark.util.io.ChunkedByteBuffer.$anonfun$writeExternal$2(ChunkedByteBuffer.scala:103)
	at org.apache.spark.util.io.ChunkedByteBuffer.$anonfun$writeExternal$2$adapted(ChunkedByteBuffer.scala:103)
	at org.apache.spark.util.io.ChunkedByteBuffer$$Lambda$2797/0x00007feb7cdfed08.apply(Unknown Source)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.util.io.ChunkedByteBuffer.writeExternal(ChunkedByteBuffer.scala:103)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:60)
	at org.apache.spark.scheduler.DirectTaskResult$$Lambda$2802/0x00007feb7cdfc800.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.SparkErrorUtils.tryOrIOException(SparkErrorUtils.scala:35)
	at org.apache.spark.util.SparkErrorUtils.tryOrIOException$(SparkErrorUtils.scala:33)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:94)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:59)
	at java.base/java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1465)
	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1436)
	at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)
	at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)

Driver stacktrace:

JVM stacktrace:
org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 1815) (c661253c945d executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
	at org.apache.spark.serializer.SerializerHelper$.$anonfun$serializeToChunkedBuffer$1(SerializerHelper.scala:40)
	at org.apache.spark.serializer.SerializerHelper$.$anonfun$serializeToChunkedBuffer$1$adapted(SerializerHelper.scala:40)
	at org.apache.spark.serializer.SerializerHelper$$$Lambda$2795/0x00007feb7cdfb7d0.apply(Unknown Source)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1862)
	at java.base/java.io.ObjectOutputStream.write(ObjectOutputStream.java:714)
	at org.apache.spark.util.Utils$.$anonfun$writeByteBuffer$1(Utils.scala:209)
	at org.apache.spark.util.Utils$.$anonfun$writeByteBuffer$1$adapted(Utils.scala:209)
	at org.apache.spark.util.Utils$$$Lambda$2798/0x00007feb7cdff0e8.apply(Unknown Source)
	at org.apache.spark.util.Utils$.writeByteBufferImpl(Utils.scala:187)
	at org.apache.spark.util.Utils$.writeByteBuffer(Utils.scala:209)
	at org.apache.spark.util.io.ChunkedByteBuffer.$anonfun$writeExternal$2(ChunkedByteBuffer.scala:103)
	at org.apache.spark.util.io.ChunkedByteBuffer.$anonfun$writeExternal$2$adapted(ChunkedByteBuffer.scala:103)
	at org.apache.spark.util.io.ChunkedByteBuffer$$Lambda$2797/0x00007feb7cdfed08.apply(Unknown Source)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.util.io.ChunkedByteBuffer.writeExternal(ChunkedByteBuffer.scala:103)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:60)
	at org.apache.spark.scheduler.DirectTaskResult$$Lambda$2802/0x00007feb7cdfc800.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.SparkErrorUtils.tryOrIOException(SparkErrorUtils.scala:35)
	at org.apache.spark.util.SparkErrorUtils.tryOrIOException$(SparkErrorUtils.scala:33)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:94)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:59)
	at java.base/java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1465)
	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1436)
	at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)
	at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)

Driver stacktrace:
=== Streaming Query ===
Identifier: [id = 11a40d4b-9ca7-4c5b-91f1-dfb0672e94f9, runId = 53152a54-d11b-4dc8-bde8-11364e7ff61e]
Current Committed Offsets: {}
Current Available Offsets: {FileStreamSource[file:/home/jovyan/work/output/parquet/*/*.parquet]: {"logOffset":0}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource org.apache.spark.sql.execution.streaming.ConsoleTable$@2c7c3cc4, 11a40d4b-9ca7-4c5b-91f1-dfb0672e94f9, [truncate=false], Append
+- Project [cast(key#279 as string) AS key#340, cast(value#263 as string) AS value#341]
   +- GlobalLimit 400000
      +- LocalLimit 400000
         +- Project [ID#237, SPEED#238, TRAVEL_TIME#239, STATUS#240, DATA_AS_OF#241, LINK_ID#242, LINK_POINTS#243, ENCODED_POLY_LINE#244, ENCODED_POLY_LINE_LVLS#245, OWNER#246, TRANSCOM_ID#247, BOROUGH#248, LINK_NAME#249, value#263, ID#237 AS key#279]
            +- Project [ID#237, SPEED#238, TRAVEL_TIME#239, STATUS#240, DATA_AS_OF#241, LINK_ID#242, LINK_POINTS#243, ENCODED_POLY_LINE#244, ENCODED_POLY_LINE_LVLS#245, OWNER#246, TRANSCOM_ID#247, BOROUGH#248, LINK_NAME#249, to_json(struct(ID, ID#237, SPEED, SPEED#238, TRAVEL_TIME, TRAVEL_TIME#239, STATUS, STATUS#240, DATA_AS_OF, DATA_AS_OF#241, LINK_ID, LINK_ID#242, LINK_POINTS, LINK_POINTS#243, ENCODED_POLY_LINE, ENCODED_POLY_LINE#244, ENCODED_POLY_LINE_LVLS, ENCODED_POLY_LINE_LVLS#245, OWNER, OWNER#246, TRANSCOM_ID, TRANSCOM_ID#247, BOROUGH, BOROUGH#248, ... 2 more fields), Some(Etc/UTC)) AS value#263]
               +- StreamingExecutionRelation FileStreamSource[file:/home/jovyan/work/output/parquet/*/*.parquet], [ID#237, SPEED#238, TRAVEL_TIME#239, STATUS#240, DATA_AS_OF#241, LINK_ID#242, LINK_POINTS#243, ENCODED_POLY_LINE#244, ENCODED_POLY_LINE_LVLS#245, OWNER#246, TRANSCOM_ID#247, BOROUGH#248, LINK_NAME#249]

	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:332)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$1(StreamExecution.scala:211)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:211)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 1815) (c661253c945d executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
	at org.apache.spark.serializer.SerializerHelper$.$anonfun$serializeToChunkedBuffer$1(SerializerHelper.scala:40)
	at org.apache.spark.serializer.SerializerHelper$.$anonfun$serializeToChunkedBuffer$1$adapted(SerializerHelper.scala:40)
	at org.apache.spark.serializer.SerializerHelper$$$Lambda$2795/0x00007feb7cdfb7d0.apply(Unknown Source)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1862)
	at java.base/java.io.ObjectOutputStream.write(ObjectOutputStream.java:714)
	at org.apache.spark.util.Utils$.$anonfun$writeByteBuffer$1(Utils.scala:209)
	at org.apache.spark.util.Utils$.$anonfun$writeByteBuffer$1$adapted(Utils.scala:209)
	at org.apache.spark.util.Utils$$$Lambda$2798/0x00007feb7cdff0e8.apply(Unknown Source)
	at org.apache.spark.util.Utils$.writeByteBufferImpl(Utils.scala:187)
	at org.apache.spark.util.Utils$.writeByteBuffer(Utils.scala:209)
	at org.apache.spark.util.io.ChunkedByteBuffer.$anonfun$writeExternal$2(ChunkedByteBuffer.scala:103)
	at org.apache.spark.util.io.ChunkedByteBuffer.$anonfun$writeExternal$2$adapted(ChunkedByteBuffer.scala:103)
	at org.apache.spark.util.io.ChunkedByteBuffer$$Lambda$2797/0x00007feb7cdfed08.apply(Unknown Source)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.util.io.ChunkedByteBuffer.writeExternal(ChunkedByteBuffer.scala:103)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:60)
	at org.apache.spark.scheduler.DirectTaskResult$$Lambda$2802/0x00007feb7cdfc800.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.SparkErrorUtils.tryOrIOException(SparkErrorUtils.scala:35)
	at org.apache.spark.util.SparkErrorUtils.tryOrIOException$(SparkErrorUtils.scala:33)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:94)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:59)
	at java.base/java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1465)
	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1436)
	at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)
	at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:385)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:359)
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:307)
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:318)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3585)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:3585)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:741)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:729)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:729)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:286)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:249)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:239)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:311)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:289)
	... 4 more
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
	at org.apache.spark.serializer.SerializerHelper$.$anonfun$serializeToChunkedBuffer$1(SerializerHelper.scala:40)
	at org.apache.spark.serializer.SerializerHelper$.$anonfun$serializeToChunkedBuffer$1$adapted(SerializerHelper.scala:40)
	at org.apache.spark.serializer.SerializerHelper$$$Lambda$2795/0x00007feb7cdfb7d0.apply(Unknown Source)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1862)
	at java.base/java.io.ObjectOutputStream.write(ObjectOutputStream.java:714)
	at org.apache.spark.util.Utils$.$anonfun$writeByteBuffer$1(Utils.scala:209)
	at org.apache.spark.util.Utils$.$anonfun$writeByteBuffer$1$adapted(Utils.scala:209)
	at org.apache.spark.util.Utils$$$Lambda$2798/0x00007feb7cdff0e8.apply(Unknown Source)
	at org.apache.spark.util.Utils$.writeByteBufferImpl(Utils.scala:187)
	at org.apache.spark.util.Utils$.writeByteBuffer(Utils.scala:209)
	at org.apache.spark.util.io.ChunkedByteBuffer.$anonfun$writeExternal$2(ChunkedByteBuffer.scala:103)
	at org.apache.spark.util.io.ChunkedByteBuffer.$anonfun$writeExternal$2$adapted(ChunkedByteBuffer.scala:103)
	at org.apache.spark.util.io.ChunkedByteBuffer$$Lambda$2797/0x00007feb7cdfed08.apply(Unknown Source)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.util.io.ChunkedByteBuffer.writeExternal(ChunkedByteBuffer.scala:103)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:60)
	at org.apache.spark.scheduler.DirectTaskResult$$Lambda$2802/0x00007feb7cdfc800.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.SparkErrorUtils.tryOrIOException(SparkErrorUtils.scala:35)
	at org.apache.spark.util.SparkErrorUtils.tryOrIOException$(SparkErrorUtils.scala:33)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:94)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:59)
	at java.base/java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1465)
	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1436)
	at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)
	at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


In [22]:
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2 pyspark-shell'

KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
KAFKA_TOPIC = "traffic_sensor"


df_traffic_stream.take(1)

df_traffic_stream\
    .writeStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)\
    .option("topic", KAFKA_TOPIC)\
    .option("checkpointLocation", "/tmp/checkpoint")\
    .start()\
    .awaitTermination()

AnalysisException: Queries with streaming sources must be executed with writeStream.start();
FileSource[work/output/parquet/*/*.parquet]

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.