In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.spark:spark-avro_2.12:3.3.1 pyspark-shell'

In [44]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark = SparkSession \
    .builder \
    .appName("Spark-Notebook") \
    .getOrCreate()

In [45]:
df_kafka_raw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
    .option("subscribe", "PULocations") \
    .option("startingOffsets", "earliest") \
    .option("checkpointLocation", "checkpoint") \
    .load()
df_kafka_raw.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [46]:
df_kafka_encoded = df_kafka_raw.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
df_kafka_encoded.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [47]:
def parse_ride_from_kafka_message(df_raw, schema):
    """ take a Spark Streaming df and parse value col based on <schema>, return streaming df cols in schema """
    assert df_raw.isStreaming is True, "DataFrame doesn't receive streaming data"

    df = df_raw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    # split attributes to nested array in one Column
    col = F.split(df['value'], ', ')

    # expand col to multiple top-level columns
    for idx, field in enumerate(schema):
        df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType))
    return df.select([field.name for field in schema])

RIDE_SCHEMA = T.StructType(
    [
        T.StructField("id", T.StringType()),
        T.StructField("pickup_datetime", T.TimestampType()),
        T.StructField("pulocationid", T.IntegerType()),
        T.StructField("dolocationid", T.IntegerType()),
    ]
)

In [48]:
df_rides = parse_ride_from_kafka_message(df_raw=df_kafka_raw, schema=RIDE_SCHEMA)
df_rides.printSchema()

root
 |-- id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pulocationid: integer (nullable = true)
 |-- dolocationid: integer (nullable = true)



In [49]:
def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
    write_query = df.writeStream \
        .outputMode(output_mode) \
        .trigger(processingTime=processing_time) \
        .format("console") \
        .option("truncate", False) \
        .start()
    return write_query # pyspark.sql.streaming.StreamingQuery

write_query = sink_console(df_rides, output_mode='append')

23/03/11 20:14:30 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-17790190-7dc9-4878-a132-4423fd2b31ed. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/03/11 20:14:30 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


23/03/11 20:14:30 WARN ClientUtils: Couldn't resolve server broker:29092 from bootstrap.servers as DNS resolution failed for broker
23/03/11 20:14:30 WARN ClientUtils: Couldn't resolve server broker:29092 from bootstrap.servers as DNS resolution failed for broker
-------------------------------------------
Batch: 0
-------------------------------------------
+---+-------------------+------------+------------+
|id |pickup_datetime    |pulocationid|dolocationid|
+---+-------------------+------------+------------+
|254|2019-01-01 00:33:03|140         |52          |
|254|2019-01-01 00:03:00|141         |237         |
|254|2019-01-01 00:45:48|237         |236         |
|254|2019-01-01 00:37:39|162         |85          |
|254|2019-01-01 00:35:06|237         |246         |
|254|2019-01-01 00:55:23|145         |224         |
|254|2019-01-01 00:49:23|261         |14          |
|254|2019-01-01 00:11:10|162         |233         |
|254|2019-01-01 00:00:06|13          |87          |
|254|2019-01-01

In [50]:
def sink_memory(df, query_name, query_template):
    write_query = df \
        .writeStream \
        .queryName(query_name) \
        .format('memory') \
        .start()
    query_str = query_template.format(table_name=query_name)
    query_results = spark.sql(query_str)
    return write_query, query_results

In [38]:
query_name = 'pulocationid_counts'
query_template = 'select count(distinct(pulocationid)) from {table_name}'
write_query, df_pulocationid_counts = sink_memory(df=df_rides, query_name=query_name, query_template=query_template)

23/03/11 20:11:32 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-a9556c94-5658-42f8-9641-05064c5c3958. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/03/11 20:11:32 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


23/03/11 20:11:33 WARN ClientUtils: Couldn't resolve server broker:29092 from bootstrap.servers as DNS resolution failed for broker
23/03/11 20:11:33 WARN ClientUtils: Couldn't resolve server broker:29092 from bootstrap.servers as DNS resolution failed for broker


                                                                                

In [51]:
query_name = 'pulocationid_counts'
query_template = '''
    SELECT DISTINCT(pulocationid) AS PUlocation,
        COUNT(*) AS trips_taken
    FROM {table_name}
    GROUP BY PUlocation
'''
write_query, df_pulocationid_counts = sink_memory(df=df_rides, query_name=query_name, query_template=query_template)

23/03/11 20:14:41 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-d973a32c-1762-46f6-bc48-220fc7d7448a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/03/11 20:14:41 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/03/11 20:14:41 WARN ClientUtils: Couldn't resolve server broker:29092 from bootstrap.servers as DNS resolution failed for broker


23/03/11 20:14:41 WARN ClientUtils: Couldn't resolve server broker:29092 from bootstrap.servers as DNS resolution failed for broker


In [52]:
print(type(write_query)) # pyspark.sql.streaming.StreamingQuery
write_query.status

<class 'pyspark.sql.streaming.StreamingQuery'>


{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [53]:
df_pulocationid_counts.show()
write_query.stop()

+----------+-----------+
|PUlocation|trips_taken|
+----------+-----------+
|       236|          4|
|       140|          2|
|       206|          2|
|        13|          1|
|        16|          2|
|       142|          2|
|       257|          1|
|        88|          1|
|       229|          1|
|       173|          5|
|       162|          2|
|         7|          5|
|        87|          1|
|       171|          2|
|       261|          1|
|       249|          1|
|       181|          8|
|        82|          2|
|       245|          1|
|        25|          7|
+----------+-----------+
only showing top 20 rows



23/03/11 20:44:34 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@12aecb5d is aborting.
23/03/11 20:44:34 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@12aecb5d aborted.
23/03/11 20:44:34 ERROR MicroBatchExecution: Query pulocationid_counts [id = 5c394c61-2a3c-4f49-888f-5add66bae430, runId = 1207edad-3b6e-4058-a816-cf4235a5d5c7] terminated with error
org.apache.spark.SparkException: Writing job aborted
	at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:767)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:409)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:353)
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2E

Exception in thread "stream execution thread for pulocationid_counts [id = 5c394c61-2a3c-4f49-888f-5add66bae430, runId = 1207edad-3b6e-4058-a816-cf4235a5d5c7]" org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:87)
	at org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef.deactivateInstances(StateStoreCoordinator.scala:119)
	at org.apache.spark.sql.streaming.StreamingQueryManager.notifyQueryTermination(StreamingQueryManager.scala:406)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$3(StreamExecution.scala:357)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.UninterruptibleThread.runUninte

23/03/11 20:44:35 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@75f7806a is aborting.
23/03/11 20:44:35 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@75f7806a aborted.
23/03/11 20:44:35 ERROR MicroBatchExecution: Query [id = 00311256-fe9d-4ff4-897b-dc53689e405b, runId = 09f7e67a-ceef-4512-8f22-401428afd9e5] terminated with error
org.apache.spark.SparkException: Writing job aborted
	at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:767)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:409)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:353)
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:302)
	at o

Exception in thread "stream execution thread for [id = 00311256-fe9d-4ff4-897b-dc53689e405b, runId = 09f7e67a-ceef-4512-8f22-401428afd9e5]" org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:87)
	at org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef.deactivateInstances(StateStoreCoordinator.scala:119)
	at org.apache.spark.sql.streaming.StreamingQueryManager.notifyQueryTermination(StreamingQueryManager.scala:406)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$3(StreamExecution.scala:357)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(Uninterrup

-------------------------------------------
Batch: 1
-------------------------------------------
+---+-------------------+------------+------------+
|id |pickup_datetime    |pulocationid|dolocationid|
+---+-------------------+------------+------------+
|254|2019-01-01 00:33:03|140         |52          |
|254|2019-01-01 00:03:00|141         |237         |
|254|2019-01-01 00:45:48|237         |236         |
|254|2019-01-01 00:37:39|162         |85          |
|254|2019-01-01 00:35:06|237         |246         |
|254|2019-01-01 00:55:23|145         |224         |
|254|2019-01-01 00:49:23|261         |14          |
|254|2019-01-01 00:11:10|162         |233         |
|254|2019-01-01 00:00:06|13          |87          |
|254|2019-01-01 00:36:32|249         |236         |
|254|2019-01-01 00:15:15|236         |229         |
|445|2019-01-01 00:32:02|145         |16          |
|445|2019-01-01 00:25:50|171         |15          |
|445|2019-01-01 00:45:47|252         |82          |
|445|2019-01-01 00:

In [54]:
spark.stop()

23/03/11 21:43:05 ERROR MicroBatchExecution: Query [id = d19b4770-f2d8-4f43-9b9c-ed2fd2ddf81c, runId = 9079b8f4-14cc-4b78-a15b-ff0676cf5152] terminated with error
org.apache.spark.SparkException: The Spark SQL phase planning failed with an internal error. Please, fill a bug report in, and provide the full stack trace.
	at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:500)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:512)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:145)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:138)
	at org.apache.spark.sql.execution.QueryExecuti

Exception in thread "stream execution thread for [id = d19b4770-f2d8-4f43-9b9c-ed2fd2ddf81c, runId = 9079b8f4-14cc-4b78-a15b-ff0676cf5152]" org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:87)
	at org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef.deactivateInstances(StateStoreCoordinator.scala:119)
	at org.apache.spark.sql.streaming.StreamingQueryManager.notifyQueryTermination(StreamingQueryManager.scala:406)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$3(StreamExecution.scala:357)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(Uninterrup

In [None]:
def prepare_dataframe_to_kafka_sink(df, value_columns, key_column=None):
    columns = df.columns
    df = df.withColumn("value", F.concat_ws(', ',*value_columns))    
    if key_column:
        df = df.withColumnRenamed(key_column,"key")
        df = df.withColumn("key",df.key.cast('string'))
    return df.select(['key', 'value'])
    
def sink_kafka(df, topic, output_mode='append'):
    write_query = df.writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
        .outputMode(output_mode) \
        .option("topic", topic) \
        .option("checkpointLocation", "checkpoint") \
        .start()
    return write_query