In [1]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

In [2]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
took 0.63 seconds


In [None]:
# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz

In [3]:
import pandas as pd

In [4]:
data = pd.read_csv('green_tripdata_2019-10.csv.gz', compression='gzip')
data

  data = pd.read_csv('green_tripdata_2019-10.csv.gz', compression='gzip')


Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2.0,2019-10-01 00:26:02,2019-10-01 00:39:58,N,1.0,112,196,1.0,5.88,18.00,0.50,0.5,0.00,0.00,,0.3,19.30,2.0,1.0,0.0
1,1.0,2019-10-01 00:18:11,2019-10-01 00:22:38,N,1.0,43,263,1.0,0.80,5.00,3.25,0.5,0.00,0.00,,0.3,9.05,2.0,1.0,0.0
2,1.0,2019-10-01 00:09:31,2019-10-01 00:24:47,N,1.0,255,228,2.0,7.50,21.50,0.50,0.5,0.00,0.00,,0.3,22.80,2.0,1.0,0.0
3,1.0,2019-10-01 00:37:40,2019-10-01 00:41:49,N,1.0,181,181,1.0,0.90,5.50,0.50,0.5,0.00,0.00,,0.3,6.80,2.0,1.0,0.0
4,2.0,2019-10-01 00:08:13,2019-10-01 00:17:56,N,1.0,97,188,1.0,2.52,10.00,0.50,0.5,2.26,0.00,,0.3,13.56,1.0,1.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
476381,,2019-10-31 23:30:00,2019-11-01 00:00:00,,,65,102,,7.04,29.57,2.75,0.5,0.00,0.00,,0.0,32.82,,,
476382,,2019-10-31 23:03:00,2019-10-31 23:24:00,,,129,136,,0.00,39.83,2.75,0.5,0.00,6.12,,0.0,49.20,,,
476383,,2019-10-31 23:02:00,2019-10-31 23:23:00,,,61,222,,3.90,23.11,2.75,0.5,0.00,0.00,,0.0,26.36,,,
476384,,2019-10-31 23:42:00,2019-10-31 23:56:00,,,76,39,,3.08,15.23,2.75,0.5,0.00,0.00,,0.0,18.48,,,


In [5]:
df_green = data[['lpep_pickup_datetime',
'lpep_dropoff_datetime',
'PULocationID',
'DOLocationID',
'passenger_count',
'trip_distance',
'tip_amount']]
df_green.head()

Unnamed: 0,lpep_pickup_datetime,lpep_dropoff_datetime,PULocationID,DOLocationID,passenger_count,trip_distance,tip_amount
0,2019-10-01 00:26:02,2019-10-01 00:39:58,112,196,1.0,5.88,0.0
1,2019-10-01 00:18:11,2019-10-01 00:22:38,43,263,1.0,0.8,0.0
2,2019-10-01 00:09:31,2019-10-01 00:24:47,255,228,2.0,7.5,0.0
3,2019-10-01 00:37:40,2019-10-01 00:41:49,181,181,1.0,0.9,0.0
4,2019-10-01 00:08:13,2019-10-01 00:17:56,97,188,1.0,2.52,2.26


In [6]:
t0 = time.time()

topic_name = 'green-trips'

In [7]:
for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    producer.send(topic_name, value=row_dict)

    # TODO implement sending the data 
producer.flush()
t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

took 92.18 seconds


In [8]:
import findspark
findspark.init()

In [9]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf


pyspark_version = pyspark.__version__
kafka_jar_package = f'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.3'


spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

In [10]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [None]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

In [None]:
query.stop()

In [11]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [12]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [13]:
green_stream

DataFrame[lpep_pickup_datetime: string, lpep_dropoff_datetime: string, PULocationID: int, DOLocationID: int, passenger_count: double, trip_distance: double, tip_amount: double]

In [14]:
from pyspark.sql.functions import window, col
# Add a timestamp column
green_stream = green_stream.withColumn("timestamp", F.current_timestamp())

# Group by 5 minutes window based on timestamp and DOLocationID
popular_destinations = green_stream.groupBy(window(col("timestamp"), "5 minutes"), col("DOLocationID")) \
    .count() \
    .orderBy("count", ascending=False)

In [None]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()