In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2,org.apache.spark:spark-avro_2.12:3.3.2 pyspark-shell'

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark = SparkSession \
    .builder \
    .appName("Spark-Jupyter-Notebook") \
    .getOrCreate()

/usr/local/lib/python3.9/dist-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found


:: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-aa3e9e59-bb8e-48a9-943b-3b572f3fcbfa;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.2 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.common

23/03/20 09:28:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# default for startingOffsets is "latest"
df_kafka_raw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
    .option("subscribe", "rides_csv") \
    .option("startingOffsets", "earliest") \
    .option("checkpointLocation", "checkpoint") \
    .load()

In [4]:
df_kafka_raw.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
df_kafka_encoded = df_kafka_raw.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")

In [6]:
df_kafka_encoded.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [7]:
def parse_ride_from_kafka_message(df_raw, schema):
    """ take a Spark Streaming df and parse value col based on <schema>, return streaming df cols in schema """
    assert df_raw.isStreaming is True, "DataFrame doesn't receive streaming data"

    df = df_raw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    # split attributes to nested array in one Column
    col = F.split(df['value'], ', ')

    # expand col to multiple top-level columns
    for idx, field in enumerate(schema):
        df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType))
    return df.select([field.name for field in schema])

In [8]:
ride_schema = T.StructType(
    [T.StructField("vendor_id", T.IntegerType()),
     T.StructField('tpep_pickup_datetime', T.TimestampType()),
     T.StructField('tpep_dropoff_datetime', T.TimestampType()),
     T.StructField("passenger_count", T.IntegerType()),
     T.StructField("trip_distance", T.FloatType()),
     T.StructField("payment_type", T.IntegerType()),
     T.StructField("total_amount", T.FloatType()),
     ])

In [9]:
df_rides = parse_ride_from_kafka_message(df_raw=df_kafka_raw, schema=ride_schema)

In [10]:
df_rides.printSchema()

root
 |-- vendor_id: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- total_amount: float (nullable = true)



In [11]:
# def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
#     write_query = df.writeStream \
#         .outputMode(output_mode) \
#         .trigger(processingTime=processing_time) \
#         .format("console") \
#         .option("truncate", False) \
#         .start()
#     return write_query # pyspark.sql.streaming.StreamingQuery

In [12]:
# write_query = sink_console(df_rides, output_mode='append')

23/03/20 07:01:20 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-febb7964-fabd-4c47-95c3-80b5e2df1a33. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/03/20 07:01:20 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/03/20 07:01:21 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-ac58c280-a1e4-4c1b-94f9-e189d87f7a1c--1173327423-driver-0-1, groupId=spark-kafka-source-ac58c280-a1e4-4c1b-94f9-e189d87f7a1c--1173327423-driver-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
23/03/20 07:01:21 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-ac58c280-a1e4-4c1b-94f9-e189d87f7a1c--1173327423-d

[Stage 0:>                                                          (0 + 1) / 1]

23/03/20 07:01:24 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-ac58c280-a1e4-4c1b-94f9-e189d87f7a1c--1173327423-executor-2, groupId=spark-kafka-source-ac58c280-a1e4-4c1b-94f9-e189d87f7a1c--1173327423-executor] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
23/03/20 07:01:24 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-ac58c280-a1e4-4c1b-94f9-e189d87f7a1c--1173327423-executor-2, groupId=spark-kafka-source-ac58c280-a1e4-4c1b-94f9-e189d87f7a1c--1173327423-executor] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+--------------------+---------------------+---------------+-------------+------------+------------+
|vendor_id|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|payment_type|total_amount|
+---------+--------------------+---------------------+---------------+-------------+------------+------------+
|1        |2020-07-01 00:25:32 |2020-07-01 00:33:39  |1              |1.5          |2           |9.3         |
|1        |2020-07-01 00:03:19 |2020-07-01 00:25:43  |1              |9.5          |1           |27.8        |
|2        |2020-07-01 00:15:11 |2020-07-01 00:29:24  |1              |5.85         |2           |22.3        |
|2        |2020-07-01 00:30:49 |2020-07-01 00:38:26  |1              |1.9          |1           |14.16       |
|2        |2020-07-01 00:31:26 |2020-07-01 00:38:02  |1              |1.25         |2           |7.8         |
+---------+----

In [15]:
df_rides.writeStream \
        .outputMode("append") \
        .trigger(processingTime="5 seconds") \
        .format("console") \
        .option("truncate", False) \
        .start()

23/03/20 07:05:27 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-25a6ebd0-5360-422b-ac5d-5ebca66a4b0a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/03/20 07:05:27 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x7f13aff35790>

23/03/20 07:05:27 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-b20cf926-41fe-4c31-a0ae-fe44724d9dba-1510784805-driver-0-3, groupId=spark-kafka-source-b20cf926-41fe-4c31-a0ae-fe44724d9dba-1510784805-driver-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
23/03/20 07:05:27 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-b20cf926-41fe-4c31-a0ae-fe44724d9dba-1510784805-driver-0-3, groupId=spark-kafka-source-b20cf926-41fe-4c31-a0ae-fe44724d9dba-1510784805-driver-0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
-------------------------------------------
Batch: 0
-------------------------------------------
+---------+--------------------+---------------------+---------------+-------------+------------+------------+
|vendor_id|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|payment_type|total_amount|
+---------+--------------------+--------------------

In [16]:
!spark-shell --version

/usr/local/lib/python3.9/dist-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.2
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 1.8.0_342
Branch HEAD
Compiled by user liangchi on 2023-02-10T19:57:40Z
Revision 5103e00c4ce5fcc4264ca9c4df12295d42557af6
Url https://github.com/apache/spark
Type --help for more information.
23/03/20 07:38:13 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-b20cf926-41fe-4c31-a0ae-fe44724d9dba-1510784805-driver-0-3, groupId=spark-kafka-source-b20cf926-41fe-4c31-a0ae-fe44724d9dba-1510784805-driver-0] Connection to node 1 (broker/172.21.0.7:29092) could not be established. Broker may not be available.
23/03/20 07:38:13 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-ac58c280-a1e4-4c1b-94f9-e189d87f7a1c--1173327423-driver-0-1,