## Set-Up

In [1]:
import os
import time

from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

from producers import confluent_producer, kafka_python_producer
from consumers import confluent_consumer, kafka_python_consumer
from utils import download_csv, load_kafka_settings
from streaming import (
    parse_ride_df_from_kafka,
    write_batch_df_to_kafka,
    write_streaming_df_to_kafka,
    load_df_from_kafka,
    streaming_df_to_batch_df,
    prepare_df_for_kafka,
    streaming_df_row_count,
)

In [2]:
PRODUCERS = {"confluent": confluent_producer, "kafka-python": kafka_python_producer}
CONSUMERS = {"confluent": confluent_consumer, "kafka-python": kafka_python_consumer}

In [3]:
SETTINGS = load_kafka_settings()

In [4]:
green_csv_path = download_csv(
    "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-01.csv.gz"
)
fhv_csv_path = download_csv(
    "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-01.csv.gz"
)
TAXI_CSVS = {"green": green_csv_path, "fhv": fhv_csv_path}

## Testing of Kafka Consumers and Producers Implemented in Python

In [5]:
max_messages = 3
sleep = 0.0
for i, (package_name, producer) in enumerate(PRODUCERS.items()):
    for j, (taxi, csv_path) in enumerate(TAXI_CSVS.items()):
        if i > 0 or j > 0:
            print()
        print(
            f"Producing {taxi} taxi data with Producer implemented in {package_name}:"
        )
        producer.produce_taxi_data(
            csv_path=csv_path,
            topic=f"{package_name}_{taxi}_trips",
            key_schema=SETTINGS[f"{taxi}_key_schema"],
            value_schema=SETTINGS[f"{taxi}_value_schema"],
            bootstrap_servers=SETTINGS["bootstrap_servers"],
            registry_url=SETTINGS["registry_url"],
            max_messages=max_messages,
            sleep=sleep,
            verbose=True,
        )

Producing green taxi data with Producer implemented in confluent:
Record successfully produced to Partition 0 of topic 'confluent_green_trips' at offset 3.
Record successfully produced to Partition 0 of topic 'confluent_green_trips' at offset 4.
Record successfully produced to Partition 0 of topic 'confluent_green_trips' at offset 5.
Successfully produced 3 messages to 'confluent_green_trips' topic.

Producing fhv taxi data with Producer implemented in confluent:
Record successfully produced to Partition 0 of topic 'confluent_fhv_trips' at offset 3.
Record successfully produced to Partition 0 of topic 'confluent_fhv_trips' at offset 4.
Record successfully produced to Partition 0 of topic 'confluent_fhv_trips' at offset 5.
Successfully produced 3 messages to 'confluent_fhv_trips' topic.

Producing green taxi data with Producer implemented in kafka-python:
Record successfully produced to Partition 0 of topic 'kafka-python_green_trips' at offset 3.
Record successfully produced to Partitio

In [6]:
taxi_types = ["green", "fhv"]
taxi_csvs = [green_csv_path, fhv_csv_path]
max_messages = 3
sleep = 0.0
for i, (package_name, consumer) in enumerate(CONSUMERS.items()):
    for j, (taxi, csv_path) in enumerate(TAXI_CSVS.items()):
        if i > 0 or j > 0:
            print()
        print(
            f"Consuming {taxi} taxi data with Consumer implemented in {package_name}:"
        )
        consumer.consume_taxi_data(
            topic=f"{package_name}_{taxi}_trips",
            key_schema=SETTINGS[f"{taxi}_key_schema"],
            value_schema=SETTINGS[f"{taxi}_value_schema"],
            registry_url=SETTINGS["registry_url"],
            bootstrap_servers=SETTINGS["bootstrap_servers"],
            group_id=f"{package_name}_{taxi}_taxi",
            offset="earliest",
            max_messages=max_messages,
            sleep=sleep,
            verbose=True,
        )

Consuming green taxi data with Consumer implemented in confluent:
Record 1, Key: {'VendorID': 2}, Values: {'PULocationID': 264, 'lpep_pickup_datetime': '2018-12-21 15:17:29', 'lpep_dropoff_datetime': '2018-12-21 15:18:57', 'passenger_count': 5, 'trip_distance': 0.0, 'fare_amount': 3.0}
Record 2, Key: {'VendorID': 2}, Values: {'PULocationID': 97, 'lpep_pickup_datetime': '2019-01-01 00:10:16', 'lpep_dropoff_datetime': '2019-01-01 00:16:32', 'passenger_count': 2, 'trip_distance': 0.8600000143051147, 'fare_amount': 6.0}
Record 3, Key: {'VendorID': 2}, Values: {'PULocationID': 49, 'lpep_pickup_datetime': '2019-01-01 00:27:11', 'lpep_dropoff_datetime': '2019-01-01 00:31:38', 'passenger_count': 2, 'trip_distance': 0.6600000262260437, 'fare_amount': 4.5}
Succesfully consumed 3 records from topics: ['confluent_green_trips'].

Consuming fhv taxi data with Consumer implemented in confluent:
Record 1, Key: {'DOlocationID': None}, Values: {'PUlocationID': None, 'DOlocationID': None, 'pickup_datetim

## Connecting PySpark to Kafka

In [7]:
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,"
    "org.apache.spark:spark-avro_2.12:3.3.1 pyspark-shell"
)
spark = SparkSession.builder.appName("kafka-example").getOrCreate()

In [8]:
def produce_taxi_data_with_kafka_python(num_messages):
    for taxi, csv_path in TAXI_CSVS.items():
        kafka_python_producer.produce_taxi_data(
            csv_path=csv_path,
            topic=f"{taxi}_trips",
            key_schema=SETTINGS[f"{taxi}_key_schema"],
            value_schema=SETTINGS[f"{taxi}_value_schema"],
            bootstrap_servers=SETTINGS["bootstrap_servers"],
            registry_url=SETTINGS["registry_url"],
            max_messages=num_messages,
            sleep=0.0,
            verbose=False,
        )
    return None

In [9]:
produce_taxi_data_with_kafka_python(num_messages=1000)

Successfully produced 1000 messages to 'green_trips' topic.
Successfully produced 1000 messages to 'fhv_trips' topic.


### Loading Kafka Messages into PySpark

In [10]:
df_green_raw = load_df_from_kafka(spark, topic="green_trips")
df_fhv_raw = load_df_from_kafka(spark, topic="fhv_trips")

In [11]:
green_schema = T.StructType(
    [
        T.StructField("PULocationID", T.IntegerType()),
        T.StructField("lpep_pickup_datetime", T.TimestampType()),
        T.StructField("lpep_dropoff_datetime", T.TimestampType()),
        T.StructField("passenger_count", T.IntegerType()),
        T.StructField("trip_distance", T.FloatType()),
        T.StructField("fare_amount", T.FloatType()),
    ]
)
df_green = parse_ride_df_from_kafka(df_green_raw, green_schema)
print("Schema before parsing:")
streaming_df_to_batch_df(df_green_raw, spark).show()
print("After parsing:")
streaming_df_to_batch_df(df_green, spark).show()

Schema before parsing:
+----+--------------------+-----------+---------+------+--------------------+-------------+
| key|               value|      topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|[32]|[32 36 34 2C 20 3...|green_trips|        0|     0|2023-03-15 23:56:...|            0|
|[32]|[39 37 2C 20 32 3...|green_trips|        0|     1|2023-03-15 23:56:...|            0|
|[32]|[34 39 2C 20 32 3...|green_trips|        0|     2|2023-03-15 23:56:...|            0|
|[32]|[31 38 39 2C 20 3...|green_trips|        0|     3|2023-03-15 23:56:...|            0|
|[32]|[38 32 2C 20 32 3...|green_trips|        0|     4|2023-03-15 23:56:...|            0|
|[32]|[34 39 2C 20 32 3...|green_trips|        0|     5|2023-03-15 23:56:...|            0|
|[32]|[32 35 35 2C 20 3...|green_trips|        0|     6|2023-03-15 23:56:...|            0|
|[31]|[37 36 2C 20 32 3...|green_trips|        0|     7|2

In [12]:
fhv_schema = T.StructType(
    [
        T.StructField("PUlocationID", T.IntegerType()),
        T.StructField("DOlocationID", T.IntegerType()),
        T.StructField("pickup_datetime", T.TimestampType()),
        T.StructField("dropOff_datetime", T.TimestampType()),
        T.StructField("dispatching_base_num", T.StringType()),
        T.StructField("SR_Flag", T.BooleanType()),
    ]
)
df_fhv = parse_ride_df_from_kafka(df_fhv_raw, fhv_schema)
print("Schema before parsing:")
df_fhv.printSchema()
print("After parsing:")
streaming_df_to_batch_df(df_fhv, spark).show()

Schema before parsing:
root
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- SR_Flag: boolean (nullable = true)

After parsing:
+------------+------------+-------------------+-------------------+--------------------+-------+
|PUlocationID|DOlocationID|    pickup_datetime|   dropOff_datetime|dispatching_base_num|SR_Flag|
+------------+------------+-------------------+-------------------+--------------------+-------+
|        null|        null|2019-01-01 00:30:00|2019-01-01 02:51:55|              B00001|   null|
|        null|        null|2019-01-01 00:45:00|2019-01-01 00:54:49|              B00001|   null|
|        null|        null|2019-01-01 00:15:00|2019-01-01 00:54:52|              B00001|   null|
|        null|        null|2019-01-01 00:19:00|2019-01-01 00:39:00|              B00008|   

### Combining Streaming Dataframes

In [13]:
df_green = df_green.withColumnRenamed(
    "lpep_pickup_datetime", "pickup_datetime"
).withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")
df_fhv = df_fhv.withColumnRenamed("PUlocationID", "PULocationID").withColumnRenamed(
    "dropOff_datetime", "dropoff_datetime"
)

In [14]:
df_green = df_green.withColumn("service_type", F.lit("green").cast("string"))
df_fhv = df_fhv.withColumn("service_type", F.lit("fhv").cast("string"))

In [15]:
df_fhv_wm = df_fhv.withColumn("timestamp", F.current_timestamp()).withWatermark(
    "timestamp", "1 hours"
)
df_green_wm = df_green.withColumn("timestamp", F.current_timestamp()).withWatermark(
    "timestamp", "1 hours"
)
df_rides = df_fhv_wm.unionByName(df_green_wm, allowMissingColumns=True)
streaming_df_to_batch_df(df_rides, spark).show()

+------------+------------+-------------------+-------------------+--------------------+-------+------------+--------------------+---------------+-------------+-----------+
|PULocationID|DOlocationID|    pickup_datetime|   dropoff_datetime|dispatching_base_num|SR_Flag|service_type|           timestamp|passenger_count|trip_distance|fare_amount|
+------------+------------+-------------------+-------------------+--------------------+-------+------------+--------------------+---------------+-------------+-----------+
|        null|        null|2019-01-01 00:30:00|2019-01-01 02:51:55|              B00001|   null|         fhv|2023-03-16 00:02:...|           null|         null|       null|
|        null|        null|2019-01-01 00:45:00|2019-01-01 00:54:49|              B00001|   null|         fhv|2023-03-16 00:02:...|           null|         null|       null|
|        null|        null|2019-01-01 00:15:00|2019-01-01 00:54:52|              B00001|   null|         fhv|2023-03-16 00:02:...|     

### Querying Streaming Dataframes

In [16]:
def count_pu_locations(df, spark, query_name="count_pu_locs"):
    query = (
        df.writeStream.queryName(query_name)
        .format("memory")
        .trigger(availableNow=True)
        .start()
    )
    query.awaitTermination()
    query_results = spark.sql(
        f"""
        SELECT
            PULocationID, COUNT(PULocationID) AS count
        FROM
            {query_name}
        GROUP BY
            PULocationID
        ORDER BY
            count DESC
        LIMIT 10;
    """
    )
    return query_results

In [17]:
count_pu_locations(df_rides, spark).show()

+------------+-----+
|PULocationID|count|
+------------+-----+
|          41|  169|
|         181|  155|
|           7|  143|
|          42|  134|
|         255|  133|
|          25|  120|
|          74|   88|
|          82|   85|
|         129|   81|
|          75|   78|
+------------+-----+



In [18]:
print("Add new data to Kafka topics:")
produce_taxi_data_with_kafka_python(num_messages=500)
print("Re-run query:")
count_pu_locations(df_rides, spark).show()

Add new data to Kafka topics:
Successfully produced 500 messages to 'green_trips' topic.
Successfully produced 500 messages to 'fhv_trips' topic.
Re-run query:
+------------+-----+
|PULocationID|count|
+------------+-----+
|          41|  199|
|         181|  188|
|           7|  170|
|          42|  158|
|         255|  157|
|          25|  142|
|          74|  106|
|          82|  100|
|          75|   95|
|         129|   93|
+------------+-----+



### Writing Straming Dataframes to a Kafka Topic

In [19]:
all_taxis_topic = "all_taxi"
cols_to_write = ["service_type", "pickup_datetime", "dropoff_datetime"]

In [20]:
df_rides_kafka = prepare_df_for_kafka(df_rides, cols_to_write)

In [21]:
df_batch = streaming_df_to_batch_df(df_rides_kafka, spark)
write_batch_df_to_kafka(df_batch, all_taxis_topic)

In [22]:
df_all_taxi = load_df_from_kafka(spark, topic=all_taxis_topic)
print("Schema:")
streaming_df_to_batch_df(df_all_taxi, spark).show()

Schema:
+----+--------------------+--------+---------+------+--------------------+-------------+
| key|               value|   topic|partition|offset|           timestamp|timestampType|
+----+--------------------+--------+---------+------+--------------------+-------------+
|null|[67 72 65 65 6E 2...|all_taxi|        0|     0|2023-03-15 23:56:...|            0|
|null|[66 68 76 2C 20 3...|all_taxi|        0|     1|2023-03-15 23:56:...|            0|
|null|[66 68 76 2C 20 3...|all_taxi|        0|     2|2023-03-15 23:56:...|            0|
|null|[66 68 76 2C 20 3...|all_taxi|        0|     3|2023-03-15 23:56:...|            0|
|null|[66 68 76 2C 20 3...|all_taxi|        0|     4|2023-03-15 23:56:...|            0|
|null|[66 68 76 2C 20 3...|all_taxi|        0|     5|2023-03-15 23:56:...|            0|
|null|[67 72 65 65 6E 2...|all_taxi|        0|     6|2023-03-15 23:56:...|            0|
|null|[66 68 76 2C 20 3...|all_taxi|        0|     7|2023-03-15 23:56:...|            0|
|null|[67 72 

In [23]:
print(f"Number of rows: {streaming_df_row_count(df_all_taxi, spark)}")

Number of rows: 9246


In [24]:
write_query = write_streaming_df_to_kafka(df_rides_kafka, all_taxis_topic)

In [27]:
print(f"Number of rows: {streaming_df_row_count(df_all_taxi, spark)}")

Number of rows: 9246


In [33]:
produce_taxi_data_with_kafka_python(num_messages=123)
# Wait until all new data has streamed into new topic:
while write_query.status["isTriggerActive"]:
    time.sleep(0.1)
print(f"Number of rows: {streaming_df_row_count(df_all_taxi, spark)}")
write_query.stop()

Successfully produced 123 messages to 'green_trips' topic.
Successfully produced 123 messages to 'fhv_trips' topic.
Number of rows: 15984


In [34]:
!spark-submit --version

23/03/16 00:11:54 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.1.146 instead (on interface enp9s0)
23/03/16 00:11:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.2
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 11.0.17
Branch HEAD
Compiled by user liangchi on 2023-02-10T19:57:40Z
Revision 5103e00c4ce5fcc4264ca9c4df12295d42557af6
Url https://github.com/apache/spark
Type --help for more information.
