Producer

In [1]:
import datetime
import random
import time

In [2]:
from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

In [3]:
import pandas as pd
columns_of_interest = [
    'lpep_pickup_datetime',
    'lpep_dropoff_datetime',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'tip_amount',
     
]
df_green = pd.read_csv('green_tripdata_2019-10.csv.gz', compression='gzip', usecols=columns_of_interest) #, error_bad_lines=False)
df_green.head()

Unnamed: 0,lpep_pickup_datetime,lpep_dropoff_datetime,PULocationID,DOLocationID,passenger_count,trip_distance,tip_amount
0,2019-10-01 00:26:02,2019-10-01 00:39:58,112,196,1.0,5.88,0.0
1,2019-10-01 00:18:11,2019-10-01 00:22:38,43,263,1.0,0.8,0.0
2,2019-10-01 00:09:31,2019-10-01 00:24:47,255,228,2.0,7.5,0.0
3,2019-10-01 00:37:40,2019-10-01 00:41:49,181,181,1.0,0.9,0.0
4,2019-10-01 00:08:13,2019-10-01 00:17:56,97,188,1.0,2.52,2.26


In [8]:
from tqdm.notebook import tqdm
import json

In [9]:
topic_name = 'green-trips'
time0 = time.time()

for _, row in tqdm(df_green.iterrows(),total=len(df_green)) :
    row_dict = row.to_dict()
    row_dict['timestamp'] = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
    time.sleep(random.randrange(15, 18) / 10000)
    producer.send(topic_name, value=row_dict)
    
producer.flush()
t1 = time.time()
print(f'took {(t1 - time0):.2f} seconds')

  0%|          | 0/476386 [00:00<?, ?it/s]

took 971.82 seconds


Consumer

In [10]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

:: loading settings :: url = jar:file:/home/cat53/spark/spark-3.3.2-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/cat53/.ivy2/cache
The jars for the packages stored in: /home/cat53/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-08d32781-c092-4447-8bfe-5832c4550ccf;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.2 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolu

24/03/20 10:20:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [11]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

Analytics

In [12]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType()) \
    .add("timestamp", types.StringType())

In [13]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [14]:
popular_destinations = green_stream \
    .groupBy(F.window(F.col("timestamp"), "5 minutes"),'DOLocationID') \
    .agg(F.count("*").alias("total_count")) \
    .orderBy("total_count", ascending=False)

In [None]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

24/03/20 10:22:30 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-ade6b35c-6513-43cc-9abe-609354c5c3da. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/20 10:22:30 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+-----------+
|window                                    |DOLocationID|total_count|
+------------------------------------------+------------+-----------+
|{2024-03-20 10:10:00, 2024-03-20 10:15:00}|74          |6299       |
|{2024-03-20 10:05:00, 2024-03-20 10:10:00}|74          |6090       |
|{2024-03-20 10:10:00, 2024-03-20 10:15:00}|42          |5615       |
|{2024-03-20 10:05:00, 2024-03-20 10:10:00}|42          |5549       |
|{2024-03-20 10:10:00, 2024-03-20 10:15:00}|41          |5069       |
|{2024-03-20 10:05:00, 2024-03-20 10:10:00}|41          |5025       |
|{2024-03-20 10:10:00, 2024-03-20 10:15:00}|129         |4581       |
|{2024-03-20 10:05:00, 2024-03-20 10:10:00}|75          |4439       |
|{2024-03-20 10:10:00, 2024-03-20 10:15:00}|75          |4380       |
|{2024-03-20 10:10:00, 2024-03-20 10:15:00}|7           |4322  