In [1]:
import pyspark

import pyspark.sql.types as T
import pyspark.sql.functions as F

from pyspark.sql import SparkSession

In [2]:
pyspark.__version__

'3.3.1'

In [3]:
jar_packages = [
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1",
    "org.apache.spark:spark-avro_2.12:3.3.1",
]

spark = (
    SparkSession.builder \
        .appName("Spark-Notebook") \
        .config("spark.jars.packages", ','.join(jar_packages)) \
        .getOrCreate()
)

sc = spark.sparkContext
sc

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-400a85e3-93a7-46c4-a2b4-3bea90f30206;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.1 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.common

23/03/12 21:06:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
def read_from_kafka(topic: str) -> pyspark.sql.DataFrame:
    servers = [
        "localhost:9092",
        "broker:29092",
    ]

    stream = (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", ",".join(servers))
            .option("subscribe", topic)
            .option("startingOffsets", "earliest")
            .option("checkpointLocation", "checkpoint")
            .load()
    )

    return stream

### Process Green Taxi

In [5]:
SCHEMA_GREEN = T.StructType([
    T.StructField('VendorID',              T.IntegerType(),   True),
    T.StructField('lpep_pickup_datetime',  T.TimestampType(), True),
    T.StructField('lpep_dropoff_datetime', T.TimestampType(), True),
    T.StructField('store_and_fwd_flag',    T.StringType(),    True),
    T.StructField('RatecodeID',            T.IntegerType(),   True),
    T.StructField('PULocationID',          T.IntegerType(),   True),
    T.StructField('DOLocationID',          T.IntegerType(),   True),
    T.StructField('passenger_count',       T.IntegerType(),   True),
    T.StructField('trip_distance',         T.FloatType(),     True),
    T.StructField('fare_amount',           T.FloatType(),     True),
    T.StructField('extra',                 T.FloatType(),     True),
    T.StructField('mta_tax',               T.FloatType(),     True),
    T.StructField('tip_amount',            T.FloatType(),     True),
    T.StructField('tolls_amount',          T.FloatType(),     True),
    T.StructField('ehail_fee',             T.FloatType(),     True),
    T.StructField('improvement_surcharge', T.FloatType(),     True),
    T.StructField('total_amount',          T.FloatType(),     True),
    T.StructField('payment_type',          T.IntegerType(),   True),
    T.StructField('trip_type',             T.IntegerType(),   True),
    T.StructField('congestion_surcharge',  T.FloatType(),     True),
])


def parse_green_ride_from_kafka_message(df_raw):
    assert df_raw.isStreaming is True, "DataFrame doesn't receive streaming data"

    df = df_raw.select(F.from_json(F.col("value").cast("string"), SCHEMA_GREEN).alias("value"))
    
    df = df.selectExpr(
        'value.VendorID',
        'value.lpep_pickup_datetime as pickup_datetime',
        'value.lpep_dropoff_datetime as dropoff_datetime',
        'value.PULocationID',
        'value.DOLocationID',
    )

    return df

In [6]:
df_taxi_green_raw = read_from_kafka("rides_green")
df_taxi_green_raw.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [8]:
df_taxi_green = parse_green_ride_from_kafka_message(df_taxi_green_raw)
df_taxi_green.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)



In [9]:
def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
    write_query = (
        df.writeStream
            .outputMode(output_mode)
            .trigger(processingTime=processing_time)
            .format("console")
            .option("truncate", False)
            .start()
    )
    
    return write_query

In [10]:
sink_console(df_taxi_green, output_mode='append')

23/03/12 21:07:00 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-7935ae93-a444-4ed7-a648-009d76e599fe. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/03/12 21:07:00 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x7f514d4b6350>

23/03/12 21:07:01 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-fad8243a-6bdd-4f94-8203-f4fb4dee6544-1129940546-driver-0-1, groupId=spark-kafka-source-fad8243a-6bdd-4f94-8203-f4fb4dee6544-1129940546-driver-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
23/03/12 21:07:01 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-fad8243a-6bdd-4f94-8203-f4fb4dee6544-1129940546-driver-0-1, groupId=spark-kafka-source-fad8243a-6bdd-4f94-8203-f4fb4dee6544-1129940546-driver-0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+-------------------+-------------------+------------+------------+
|VendorID|pickup_datetime    |dropoff_datetime   |PULocationID|DOLocationID|
+--------+-------------------+-------------------+------------+------------+
|2       |2018-12-21 15:17:29|2018-12-21 15:18:57|264         |264         |
|2       |2019-01-01 00:10:16|2019-01-01 00:16:32|97          |49          |
|2       |2019-01-01 00:27:11|2019-01-01 00:31:38|49          |189         |
|2       |2019-01-01 00:46:20|2019-01-01 01:04:54|189         |17          |
|2       |2019-01-01 00:19:06|2019-01-01 00:39:43|82          |258         |
|2       |2019-01-01 00:12:35|2019-01-01 00:19:09|49          |17          |
|2       |2019-01-01 00:47:55|2019-01-01 01:00:01|255         |33          |
|1       |2019-01-01 00:12:47|2019-01-01 00:30:50|76          |225         |
|2       |2019-01-01 00:16:23|2019-01-01 00:39:46|25    

### Process Fhv Taxi

In [14]:
SCHEMA_FHV = T.StructType([
    T.StructField('dispatching_base_num',   T.StringType(),    True),
    T.StructField('pickup_datetime',        T.TimestampType(), True),
    T.StructField('dropOff_datetime',       T.TimestampType(), True),
    T.StructField('PUlocationID',           T.IntegerType(),   True),
    T.StructField('DOlocationID',           T.IntegerType(),   True),
    T.StructField('SR_Flag',                T.StringType(),    True),
    T.StructField('Affiliated_base_number', T.StringType(),    True),
])


def parse_fhv_ride_from_kafka_message(df_raw):
    assert df_raw.isStreaming is True, "DataFrame doesn't receive streaming data"

    df = df_raw.select(F.from_json(F.col("value").cast("string"), SCHEMA_FHV).alias("value"))
    
    df = df.selectExpr(
        'value.dispatching_base_num as VendorID',
        'value.pickup_datetime as pickup_datetime',
        'value.dropoff_datetime as dropoff_datetime',
        'value.PUlocationID as PULocationID',
        'value.DOlocationID as DOLocationID',
    )

    return df

In [15]:
df_taxi_fhv_raw = read_from_kafka("rides_fhv")
df_taxi_fhv_raw.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [16]:
df_taxi_fhv = parse_fhv_ride_from_kafka_message(df_taxi_fhv_raw)
df_taxi_fhv.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)



In [17]:
sink_console(df_taxi_fhv, output_mode='append')

23/03/12 21:07:30 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-df5f310a-88cd-4143-b859-a8ec37e2308b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/03/12 21:07:30 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x7f514d45f910>

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+-------------------+-------------------+------------+------------+
|VendorID|pickup_datetime    |dropoff_datetime   |PUlocationID|DOlocationID|
+--------+-------------------+-------------------+------------+------------+
|B00254  |2019-01-01 00:33:03|2019-01-01 01:37:24|null        |null        |
|B00254  |2019-01-01 00:03:00|2019-01-01 00:34:25|null        |null        |
|B00254  |2019-01-01 00:45:48|2019-01-01 01:26:01|null        |null        |
|B00254  |2019-01-01 00:37:39|2019-01-01 01:44:59|null        |null        |
|B00254  |2019-01-01 00:35:06|2019-01-01 01:30:21|null        |null        |
|B00254  |2019-01-01 00:55:23|2019-01-01 01:48:27|null        |null        |
|B00254  |2019-01-01 00:49:23|2019-01-01 01:38:38|null        |null        |
|B00254  |2019-01-01 00:11:10|2019-01-01 00:44:31|null        |null        |
|B00254  |2019-01-01 00:00:06|2019-01-01 00:32:21|null  

### Calculate Statistics

In [18]:
df_taxi = df_taxi_green.union(df_taxi_fhv).filter(F.col('PULocationID').isNotNull())
df_taxi

DataFrame[VendorID: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int]

In [20]:
def sink_memory(df, query_name, query_template):
    write_query = (
        df.writeStream
            .queryName(query_name)
            .format('memory')
            .start()
    )
    
    query_str = query_template.format(table_name=query_name)
    query_results = spark.sql(query_str)

    return write_query, query_results

In [21]:
sql_query = """
select PUlocationID, count(*) as cnt
from {table_name}
group by PUlocationID
order by cnt desc
limit 5
"""

df_taxi_write, df_taxi_agg = sink_memory(df_taxi, "taxi_merged", sql_query)

23/03/12 21:20:26 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-1c6624ef-f255-43b1-b67e-4f0e6cf31b78. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/03/12 21:20:26 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/03/12 21:20:26 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-aa04c6bb-dd54-4dfd-9906-ec4e0ca0ccec--419651977-driver-0-5, groupId=spark-kafka-source-aa04c6bb-dd54-4dfd-9906-ec4e0ca0ccec--419651977-driver-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
23/03/12 21:20:26 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-aa04c6bb-dd54-4dfd-9906-ec4e0ca0ccec--419651977-driv

                                                                                

In [22]:
df_taxi_write.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [23]:
df_taxi_agg.show()

23/03/12 21:21:05 WARN TaskSetManager: Stage 3 contains a task of very large size (5687 KiB). The maximum recommended task size is 1000 KiB.




+------------+------+
|PUlocationID|   cnt|
+------------+------+
|        null|186779|
|          74|  8628|
|          41|  7110|
|          75|  6814|
|           7|  5762|
+------------+------+



                                                                                

In [24]:
df_taxi_write.stop()

### Write to Kafka

In [27]:
def prepare_dataframe_to_kafka_sink(df):
    return df.select(
        F.col('PULocationID').cast('string').alias('key'),
        F.to_json(F.struct([F.col(x) for x in df.columns])).alias("value"),
    )

In [28]:
df_taxi_write = prepare_dataframe_to_kafka_sink(df_taxi)
df_taxi_write

DataFrame[key: string, value: string]

In [29]:
def sink_kafka(df, topic, output_mode='append'):
    servers = [
        "localhost:9092",
        "broker:29092",
    ]
    
    write_query = (
        df.writeStream
            .format("kafka")
            .option("kafka.bootstrap.servers", ",".join(servers))
            .outputMode(output_mode)
            .option("topic", topic)
            .option("checkpointLocation", "checkpoint")
            .start()
    )

    return write_query

In [30]:
sink_kafka(df_taxi_write, 'rides_all')

23/03/12 21:43:23 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x7f514c4b8be0>

23/03/12 21:43:24 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-04cf63c5-fb37-4398-b8f3-add7336acf19-1721993562-executor-11, groupId=spark-kafka-source-04cf63c5-fb37-4398-b8f3-add7336acf19-1721993562-executor] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
23/03/12 21:43:24 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-04cf63c5-fb37-4398-b8f3-add7336acf19-1721993562-executor-11, groupId=spark-kafka-source-04cf63c5-fb37-4398-b8f3-add7336acf19-1721993562-executor] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
23/03/12 21:43:24 WARN NetworkClient: [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {rides_all=LEADER_NOT_AVAILABLE}
23/03/12 21:43:24 WARN NetworkClient: [Producer clientId=producer-1] Error while fetching metadata with correlation id 3 : {rides_all=LEADER_NOT_AVAILABLE}


                                                                                

```bash
> kcat -C -b localhost:9092 -t  rides_all -q | head

{"VendorID":"B00254","pickup_datetime":"2019-01-01T00:33:03.000Z","dropoff_datetime":"2019-01-01T01:37:24.000Z"}
{"VendorID":"2","pickup_datetime":"2018-12-21T15:17:29.000Z","dropoff_datetime":"2018-12-21T15:18:57.000Z","PULocationID":264,"DOLocationID":264}
{"VendorID":"2","pickup_datetime":"2019-01-01T00:10:16.000Z","dropoff_datetime":"2019-01-01T00:16:32.000Z","PULocationID":97,"DOLocationID":49}
{"VendorID":"B00254","pickup_datetime":"2019-01-01T00:03:00.000Z","dropoff_datetime":"2019-01-01T00:34:25.000Z"}
{"VendorID":"2","pickup_datetime":"2019-01-01T00:27:11.000Z","dropoff_datetime":"2019-01-01T00:31:38.000Z","PULocationID":49,"DOLocationID":189}
{"VendorID":"B00254","pickup_datetime":"2019-01-01T00:45:48.000Z","dropoff_datetime":"2019-01-01T01:26:01.000Z"}
{"VendorID":"2","pickup_datetime":"2019-01-01T00:46:20.000Z","dropoff_datetime":"2019-01-01T01:04:54.000Z","PULocationID":189,"DOLocationID":17}
{"VendorID":"B00254","pickup_datetime":"2019-01-01T00:37:39.000Z","dropoff_datetime":"2019-01-01T01:44:59.000Z"}
{"VendorID":"2","pickup_datetime":"2019-01-01T00:19:06.000Z","dropoff_datetime":"2019-01-01T00:39:43.000Z","PULocationID":82,"DOLocationID":258}
{"VendorID":"B00254","pickup_datetime":"2019-01-01T00:35:06.000Z","dropoff_datetime":"2019-01-01T01:30:21.000Z"}
```