In [23]:
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, window, col
from pyspark.sql.types import IntegerType


In [24]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

# Check if the producer is able to connect to the Kafka server
try:
    metadata = producer.metrics()
    print("Connected to Kafka server successfully")
except Exception as e:
    print("Failed to connect to Kafka server:", str(e))

Connected to Kafka server successfully


In [25]:
t0 = time.time()

topic_name = 'test-topic'

t1 = time.time()
for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)
print(f'took {(t1 - t0):.2f} seconds')

t1 = time.time()
producer.flush()
print(f'took {(t1 - t0):.2f} seconds')

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
took 0.00 seconds
took 0.51 seconds
took 0.51 seconds


In [26]:
columns = [
    'lpep_pickup_datetime',
    'lpep_dropoff_datetime',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'tip_amount'
]

In [27]:
df_green = pd.read_csv(
    "green_tripdata_2019-10.csv.gz",
    encoding='utf-8',
    compression="gzip",
    usecols=columns
)

In [28]:
df_green

Unnamed: 0,lpep_pickup_datetime,lpep_dropoff_datetime,PULocationID,DOLocationID,passenger_count,trip_distance,tip_amount
0,2019-10-01 00:26:02,2019-10-01 00:39:58,112,196,1.0,5.88,0.00
1,2019-10-01 00:18:11,2019-10-01 00:22:38,43,263,1.0,0.80,0.00
2,2019-10-01 00:09:31,2019-10-01 00:24:47,255,228,2.0,7.50,0.00
3,2019-10-01 00:37:40,2019-10-01 00:41:49,181,181,1.0,0.90,0.00
4,2019-10-01 00:08:13,2019-10-01 00:17:56,97,188,1.0,2.52,2.26
...,...,...,...,...,...,...,...
476381,2019-10-31 23:30:00,2019-11-01 00:00:00,65,102,,7.04,0.00
476382,2019-10-31 23:03:00,2019-10-31 23:24:00,129,136,,0.00,0.00
476383,2019-10-31 23:02:00,2019-10-31 23:23:00,61,222,,3.90,0.00
476384,2019-10-31 23:42:00,2019-10-31 23:56:00,76,39,,3.08,0.00


In [29]:
t0 = time.time()

topic_name = 'green-trips'

for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    producer.send(topic_name, value=row_dict)
#     print(row_dict)
    # break

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

Row(lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=112, DOLocationID=196, passenger_count=1.0, trip_distance=5.88, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-01 00:00:48', lpep_dropoff_datetime='2019-10-01 00:05:42', PULocationID=210, DOLocationID=108, passenger_count=2.0, trip_distance=1.03, tip_amount=2.19)
Row(lpep_pickup_datetime='2019-10-01 12:02:24', lpep_dropoff_datetime='2019-10-01 12:08:23', PULocationID=75, DOLocationID=74, passenger_count=1.0, trip_distance=1.56, tip_amount=2.19)
Row(lpep_pickup_datetime='2019-10-01 14:07:00', lpep_dropoff_datetime='2019-10-01 14:11:27', PULocationID=74, DOLocationID=74, passenger_count=1.0, trip_distance=0.58, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-01 17:57:46', lpep_dropoff_datetime='2019-10-01 18:07:38', PULocationID=65, DOLocationID=49, passenger_count=1.0, trip_distance=1.26, tip_amount=2.94)
Row(lpep_pickup_datetime='2019-10-01 20:04:27', lpep_dropoff_datetime='20

Row(lpep_pickup_datetime='2019-10-06 22:04:18', lpep_dropoff_datetime='2019-10-06 22:20:48', PULocationID=173, DOLocationID=83, passenger_count=2.0, trip_distance=2.67, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-07 07:21:21', lpep_dropoff_datetime='2019-10-07 07:27:39', PULocationID=7, DOLocationID=7, passenger_count=1.0, trip_distance=1.05, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-07 09:09:43', lpep_dropoff_datetime='2019-10-07 09:45:19', PULocationID=66, DOLocationID=132, passenger_count=1.0, trip_distance=17.58, tip_amount=7.24)
Row(lpep_pickup_datetime='2019-10-07 11:52:45', lpep_dropoff_datetime='2019-10-07 12:08:33', PULocationID=82, DOLocationID=129, passenger_count=1.0, trip_distance=1.9, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-07 06:19:44', lpep_dropoff_datetime='2019-10-07 06:29:33', PULocationID=97, DOLocationID=49, passenger_count=1.0, trip_distance=1.3, tip_amount=1.76)
Row(lpep_pickup_datetime='2019-10-07 16:00:30', lpep_dropoff_datetime='2019-10

Row(lpep_pickup_datetime='2019-10-12 15:00:21', lpep_dropoff_datetime='2019-10-12 15:31:16', PULocationID=145, DOLocationID=181, passenger_count=1.0, trip_distance=8.67, tip_amount=3.0)
Row(lpep_pickup_datetime='2019-10-12 16:54:47', lpep_dropoff_datetime='2019-10-12 17:09:07', PULocationID=116, DOLocationID=41, passenger_count=2.0, trip_distance=2.39, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-12 18:06:35', lpep_dropoff_datetime='2019-10-12 18:26:44', PULocationID=66, DOLocationID=231, passenger_count=1.0, trip_distance=2.46, tip_amount=3.61)
Row(lpep_pickup_datetime='2019-10-12 19:22:47', lpep_dropoff_datetime='2019-10-12 19:26:50', PULocationID=97, DOLocationID=97, passenger_count=1.0, trip_distance=0.49, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-12 21:39:00', lpep_dropoff_datetime='2019-10-12 21:51:12', PULocationID=255, DOLocationID=232, passenger_count=3.0, trip_distance=2.2, tip_amount=1.0)
Row(lpep_pickup_datetime='2019-10-12 23:30:05', lpep_dropoff_datetime='201

Row(lpep_pickup_datetime='2019-10-17 21:27:24', lpep_dropoff_datetime='2019-10-17 21:42:08', PULocationID=254, DOLocationID=200, passenger_count=1.0, trip_distance=4.9, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-17 23:01:10', lpep_dropoff_datetime='2019-10-17 23:09:31', PULocationID=95, DOLocationID=130, passenger_count=1.0, trip_distance=2.28, tip_amount=2.16)
Row(lpep_pickup_datetime='2019-10-18 03:01:11', lpep_dropoff_datetime='2019-10-18 03:05:30', PULocationID=7, DOLocationID=146, passenger_count=2.0, trip_distance=1.03, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-18 08:49:37', lpep_dropoff_datetime='2019-10-18 08:55:34', PULocationID=42, DOLocationID=74, passenger_count=1.0, trip_distance=0.76, tip_amount=1.0)
Row(lpep_pickup_datetime='2019-10-18 09:58:24', lpep_dropoff_datetime='2019-10-18 10:13:29', PULocationID=35, DOLocationID=177, passenger_count=1.0, trip_distance=0.9, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-18 12:54:43', lpep_dropoff_datetime='2019-1

Row(lpep_pickup_datetime='2019-10-23 07:32:46', lpep_dropoff_datetime='2019-10-23 07:44:25', PULocationID=74, DOLocationID=166, passenger_count=1.0, trip_distance=1.42, tip_amount=2.94)
Row(lpep_pickup_datetime='2019-10-23 10:58:35', lpep_dropoff_datetime='2019-10-23 11:05:23', PULocationID=260, DOLocationID=157, passenger_count=1.0, trip_distance=1.4, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-23 12:13:42', lpep_dropoff_datetime='2019-10-23 12:26:18', PULocationID=74, DOLocationID=152, passenger_count=1.0, trip_distance=1.44, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-23 14:17:46', lpep_dropoff_datetime='2019-10-23 14:39:58', PULocationID=116, DOLocationID=75, passenger_count=1.0, trip_distance=4.11, tip_amount=3.56)
Row(lpep_pickup_datetime='2019-10-23 16:57:03', lpep_dropoff_datetime='2019-10-23 17:04:28', PULocationID=159, DOLocationID=159, passenger_count=1.0, trip_distance=0.93, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-23 18:15:42', lpep_dropoff_datetime='2

Row(lpep_pickup_datetime='2019-10-28 12:56:25', lpep_dropoff_datetime='2019-10-28 13:44:04', PULocationID=244, DOLocationID=41, passenger_count=1.0, trip_distance=7.28, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-28 14:32:13', lpep_dropoff_datetime='2019-10-28 14:36:49', PULocationID=75, DOLocationID=151, passenger_count=1.0, trip_distance=0.99, tip_amount=1.26)
Row(lpep_pickup_datetime='2019-10-28 17:00:52', lpep_dropoff_datetime='2019-10-28 17:11:49', PULocationID=43, DOLocationID=74, passenger_count=1.0, trip_distance=1.64, tip_amount=1.0)
Row(lpep_pickup_datetime='2019-10-28 19:54:57', lpep_dropoff_datetime='2019-10-28 20:05:00', PULocationID=75, DOLocationID=262, passenger_count=1.0, trip_distance=1.27, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-28 21:28:09', lpep_dropoff_datetime='2019-10-28 21:32:33', PULocationID=244, DOLocationID=244, passenger_count=1.0, trip_distance=0.0, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-29 05:07:15', lpep_dropoff_datetime='2019

Row(lpep_pickup_datetime='2019-10-08 15:10:00', lpep_dropoff_datetime='2019-10-08 15:25:00', PULocationID=124, DOLocationID=218, passenger_count=nan, trip_distance=5.23, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-09 08:50:00', lpep_dropoff_datetime='2019-10-09 09:16:00', PULocationID=182, DOLocationID=237, passenger_count=nan, trip_distance=9.42, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-09 13:24:00', lpep_dropoff_datetime='2019-10-09 13:53:00', PULocationID=76, DOLocationID=72, passenger_count=nan, trip_distance=6.23, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-10 06:48:00', lpep_dropoff_datetime='2019-10-10 07:48:00', PULocationID=11, DOLocationID=225, passenger_count=nan, trip_distance=9.3, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-10 13:44:00', lpep_dropoff_datetime='2019-10-10 13:48:00', PULocationID=250, DOLocationID=182, passenger_count=nan, trip_distance=0.43, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-10 20:36:00', lpep_dropoff_datetime='201

Row(lpep_pickup_datetime='2019-10-29 07:32:00', lpep_dropoff_datetime='2019-10-29 08:20:00', PULocationID=210, DOLocationID=95, passenger_count=nan, trip_distance=15.35, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-29 12:02:00', lpep_dropoff_datetime='2019-10-29 12:23:00', PULocationID=48, DOLocationID=244, passenger_count=nan, trip_distance=7.63, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-29 16:02:00', lpep_dropoff_datetime='2019-10-29 16:38:00', PULocationID=218, DOLocationID=168, passenger_count=nan, trip_distance=15.23, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-30 07:50:00', lpep_dropoff_datetime='2019-10-30 08:08:00', PULocationID=198, DOLocationID=82, passenger_count=nan, trip_distance=3.14, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-30 13:36:00', lpep_dropoff_datetime='2019-10-30 14:13:00', PULocationID=225, DOLocationID=39, passenger_count=nan, trip_distance=6.72, tip_amount=0.0)
Row(lpep_pickup_datetime='2019-10-30 18:19:00', lpep_dropoff_datetime='2

In [30]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

Row(lpep_pickup_datetime='2019-10-31 13:05:00', lpep_dropoff_datetime='2019-10-31 13:29:00', PULocationID=159, DOLocationID=242, passenger_count=nan, trip_distance=4.11, tip_amount=0.0)


In [31]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [32]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

24/03/25 23:12:08 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-b7928375-103c-4681-a311-65ba70afedf4. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/25 23:12:08 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/25 23:12:08 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Row(key=None, value=bytearray(b'{"number": 9}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 25, 19, 27, 13, 171000), timestampType=0)


In [33]:
query.stop()

In [34]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [35]:
schema

StructType([StructField('lpep_pickup_datetime', StringType(), True), StructField('lpep_dropoff_datetime', StringType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('passenger_count', DoubleType(), True), StructField('trip_distance', DoubleType(), True), StructField('tip_amount', DoubleType(), True)])

In [36]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [37]:
green_stream

DataFrame[lpep_pickup_datetime: string, lpep_dropoff_datetime: string, PULocationID: int, DOLocationID: int, passenger_count: double, trip_distance: double, tip_amount: double]

In [38]:
green_stream.writeStream.foreachBatch(peek).start()

24/03/25 23:12:32 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-8afdd339-7a0a-4489-8337-8ca5b95692f8. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/25 23:12:32 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x7fe904e7ef50>

24/03/25 23:12:32 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Row(lpep_pickup_datetime=None, lpep_dropoff_datetime=None, PULocationID=None, DOLocationID=None, passenger_count=None, trip_distance=None, tip_amount=None)


In [39]:
green_stream_with_timestamp = green_stream.withColumn("timestamp", current_timestamp())


In [40]:
popular_destinations = green_stream_with_timestamp \
    .groupBy(window(col("timestamp"), "5 minutes"), col("DOLocationID")) \
    .count() \
    .orderBy("count", ascending=False)


In [None]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

24/03/25 23:14:00 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-46f071e9-9073-424b-be79-c1a0584a4310. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/25 23:14:00 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/25 23:14:00 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+------+
|window                                    |DOLocationID|count |
+------------------------------------------+------------+------+
|{2024-03-25 23:10:00, 2024-03-25 23:15:00}|NULL        |952772|
|{2024-03-25 23:10:00, 2024-03-25 23:15:00}|74          |53223 |
|{2024-03-25 23:10:00, 2024-03-25 23:15:00}|42          |47826 |
|{2024-03-25 23:10:00, 2024-03-25 23:15:00}|41          |42183 |
|{2024-03-25 23:10:00, 2024-03-25 23:15:00}|75          |38520 |
|{2024-03-25 23:10:00, 2024-03-25 23:15:00}|129         |35790 |
|{2024-03-25 23:10:00, 2024-03-25 23:15:00}|7           |34599 |
|{2024-03-25 23:10:00, 2024-03-25 23:15:00}|166         |32535 |
|{2024-03-25 23:10:00, 2024-03-25 23:15:00}|236         |23739 |
|{2024-03-25 23:10:00, 2024-03-25 23:15:00}|223         |22626 |
|{2024-03-25 23:10:00, 2024-03-25 23:15:00}|238         |2