In [122]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

In [123]:
producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

In [124]:
producer.bootstrap_connected()

True

In [125]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

t1 = time.time()

producer.flush()

t2 = time.time()
print(f'sending took {(t1 - t0):.2f} seconds')
print(f'flushing took {(t2 - t1):.2f} seconds')

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
sending took 0.53 seconds
flushing took 0.00 seconds


In [8]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz

--2024-03-20 23:53:40--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.114.4
Connecting to github.com (github.com)|140.82.114.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/ea580e9e-555c-4bd0-ae73-43051d8e7c0b?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240321%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240321T045343Z&X-Amz-Expires=300&X-Amz-Signature=bbb6d7245c1bd505561a9078cfb8dd8c10840a035cf1444d9f9cbb2f46593471&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dgreen_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-20 23:53:40--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/ea580e9e

In [28]:
#!gzip -dc green_tripdata_2019-10.csv.gz
!gunzip green_tripdata_2019-10.csv.gz

In [126]:
df_green = spark.read \
    .option("header", "true") \
    .csv('green_tripdata_2019-10.csv')

In [127]:
df_green.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2019-10-01 00:26:02|  2019-10-01 00:39:58|                 N|         1|         112|         196|              1|         5.88|         18|  0.5|    0.

In [128]:
columns = [
    'lpep_pickup_datetime',
    'lpep_dropoff_datetime',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'tip_amount'
]

In [129]:
df_green = df_green.select(*columns)

In [130]:
df_green.schema

StructType([StructField('lpep_pickup_datetime', StringType(), True), StructField('lpep_dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('passenger_count', StringType(), True), StructField('trip_distance', StringType(), True), StructField('tip_amount', StringType(), True)])

In [131]:
!head -n 1001 green_tripdata_2019-10.csv > head.csv

In [132]:
import pandas as pd
df_pandas = pd.read_csv('head.csv', header=0, sep=',')
df_pandas.shape

(1000, 20)

In [133]:
df_pandas = df_pandas[columns]
df_pandas.shape

(1000, 7)

In [134]:
df_pandas.dtypes

lpep_pickup_datetime      object
lpep_dropoff_datetime     object
PULocationID               int64
DOLocationID               int64
passenger_count            int64
trip_distance            float64
tip_amount               float64
dtype: object

In [135]:
spark.createDataFrame(df_pandas).schema

StructType([StructField('lpep_pickup_datetime', StringType(), True), StructField('lpep_dropoff_datetime', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('tip_amount', DoubleType(), True)])

In [136]:
from pyspark.sql import types

In [137]:
schema = types.StructType([ 
    types.StructField('lpep_pickup_datetime', types.TimestampType(), True), 
    types.StructField('lpep_dropoff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('passenger_count', types.IntegerType(), True), 
    types.StructField('trip_distance', types.DoubleType(), True), 
    types.StructField('tip_amount', types.DoubleType(), True)
    ])

In [138]:
df_green = spark.read \
    .option("header", "true") \
    .csv('green_tripdata_2019-10.csv')

In [139]:
df_green = df_green.select(*columns)

In [140]:
from pyspark.sql.functions import col

df_green = df_green.select(*[col(c).cast(schema[c].dataType) for c in df_green.columns])

In [141]:
from pyspark.sql.functions import current_timestamp
df_green = df_green.withColumn("timestamp", current_timestamp())

In [142]:
df_green.schema

StructType([StructField('lpep_pickup_datetime', TimestampType(), True), StructField('lpep_dropoff_datetime', TimestampType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('passenger_count', IntegerType(), True), StructField('trip_distance', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('timestamp', TimestampType(), False)])

In [143]:
df_green = df_green.repartition(6)
df_green.write.parquet('green/2020/10/')
df_green = spark.read.parquet('green/2020/10/')
df_green.printSchema()



root
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



                                                                                

In [144]:
df_green.show()

+--------------------+---------------------+------------+------------+---------------+-------------+----------+--------------------+
|lpep_pickup_datetime|lpep_dropoff_datetime|PULocationID|DOLocationID|passenger_count|trip_distance|tip_amount|           timestamp|
+--------------------+---------------------+------------+------------+---------------+-------------+----------+--------------------+
| 2019-10-04 03:41:28|  2019-10-04 03:56:09|         244|         241|              1|         3.86|      3.16|2024-03-21 02:33:...|
| 2019-10-01 11:23:28|  2019-10-01 11:43:08|         119|         183|              1|         7.06|       0.0|2024-03-21 02:33:...|
| 2019-10-02 16:23:39|  2019-10-02 16:31:39|          95|          95|              2|         0.86|      1.66|2024-03-21 02:33:...|
| 2019-10-03 08:23:27|  2019-10-03 08:32:28|          74|          75|              1|         0.69|      1.56|2024-03-21 02:33:...|
| 2019-10-05 15:20:43|  2019-10-05 15:48:52|          74|         114

In [145]:
df_green = df_green.withColumn('timestamp', F.col('timestamp').cast('string'))
df_green = df_green.withColumn('lpep_pickup_datetime', F.col('lpep_pickup_datetime').cast('string'))
df_green = df_green.withColumn('lpep_dropoff_datetime', F.col('lpep_dropoff_datetime').cast('string'))

In [146]:
df_green.printSchema()

root
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- timestamp: string (nullable = true)



In [147]:
pandas_df_green = df_green.toPandas()



In [148]:
for row in pandas_df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    print(row_dict)
    break

{'lpep_pickup_datetime': '2019-10-04 03:41:28', 'lpep_dropoff_datetime': '2019-10-04 03:56:09', 'PULocationID': 244, 'DOLocationID': 241, 'passenger_count': 1.0, 'trip_distance': 3.86, 'tip_amount': 3.16, 'timestamp': '2024-03-21 02:33:10.384989'}


In [169]:
t0 = time.time()

topic_name = 'green-trips'

for row in pandas_df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    #print(row_dict)
    #break
    producer.send(topic_name, value=row_dict)

producer.flush()

t1 = time.time()
print(f'sending took {(t1 - t0):.2f} seconds')

sending took 33.91 seconds


In [170]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

In [171]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [172]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

24/03/21 02:47:55 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/mk/7b5_0wpd11dcy9mtjysp1trw0000gn/T/temporary-1c7595e9-9bfa-457c-8a71-b6b183aba60b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/21 02:47:55 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/21 02:47:56 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-04 03:41:28", "lpep_dropoff_datetime": "2019-10-04 03:56:09", "PULocationID": 244, "DOLocationID": 241, "passenger_count": 1.0, "trip_distance": 3.86, "tip_amount": 3.16, "timestamp": "2024-03-21 02:33:10.384989"}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 21, 2, 46, 49, 709000), timestampType=0)


In [173]:
query.stop()

In [174]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType()) \
    .add("timestamp", types.StringType())

In [175]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [176]:
query = green_stream.writeStream.foreachBatch(peek).start()

24/03/21 02:48:04 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/mk/7b5_0wpd11dcy9mtjysp1trw0000gn/T/temporary-7f0f004a-dcde-4047-9cdd-9883650d577c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/21 02:48:04 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/21 02:48:04 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Row(lpep_pickup_datetime='2019-10-04 03:41:28', lpep_dropoff_datetime='2019-10-04 03:56:09', PULocationID=244, DOLocationID=241, passenger_count=1.0, trip_distance=3.86, tip_amount=3.16, timestamp='2024-03-21 02:33:10.384989')


In [177]:
query.stop()

In [158]:
!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

--2024-03-21 02:38:28--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.217.114.64, 52.216.215.40, 52.217.195.80, ...
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.217.114.64|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: ‘taxi+_zone_lookup.csv’


2024-03-21 02:38:28 (88.4 MB/s) - ‘taxi+_zone_lookup.csv’ saved [12322/12322]



In [159]:
!head taxi+_zone_lookup.csv

"LocationID","Borough","Zone","service_zone"
1,"EWR","Newark Airport","EWR"
2,"Queens","Jamaica Bay","Boro Zone"
3,"Bronx","Allerton/Pelham Gardens","Boro Zone"
4,"Manhattan","Alphabet City","Yellow Zone"
5,"Staten Island","Arden Heights","Boro Zone"
6,"Staten Island","Arrochar/Fort Wadsworth","Boro Zone"
7,"Queens","Astoria","Boro Zone"
8,"Queens","Astoria Park","Boro Zone"
9,"Queens","Auburndale","Boro Zone"


In [160]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

In [161]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [199]:
df_result = green_stream.join(df_zones, green_stream.DOLocationID == df_zones.LocationID)

In [200]:
df_result.printSchema()

root
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [201]:
grouped_df = df_result \
    .select('DOLocationID','Zone','timestamp') \
    .groupBy((F.window(col("timestamp"), "5 minutes")), 'DOLocationID','Zone') \
    .count()

In [202]:
popular_destinations = grouped_df.orderBy(col("count").desc())

In [None]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

24/03/21 03:07:12 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/mk/7b5_0wpd11dcy9mtjysp1trw0000gn/T/temporary-195da432-5b33-4ed7-915c-9fdf603820aa. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/21 03:07:12 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/21 03:07:12 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+------------------------+-----+
|window                                    |DOLocationID|Zone                    |count|
+------------------------------------------+------------+------------------------+-----+
|{2024-03-21 02:30:00, 2024-03-21 02:35:00}|74          |East Harlem North       |17741|
|{2024-03-21 02:30:00, 2024-03-21 02:35:00}|42          |Central Harlem North    |15942|
|{2024-03-21 02:30:00, 2024-03-21 02:35:00}|41          |Central Harlem          |14061|
|{2024-03-21 02:30:00, 2024-03-21 02:35:00}|75          |East Harlem South       |12840|
|{2024-03-21 02:30:00, 2024-03-21 02:35:00}|129         |Jackson Heights         |11930|
|{2024-03-21 02:30:00, 2024-03-21 02:35:00}|7           |Astoria                 |11533|
|{2024-03-21 02:30:00, 2024-03-21 02:35:00}|166         |Morningside Heights     |10845|
|{2024-03-21 