In [1]:
!pip install kafka-python



In [2]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

In [3]:
t0, tmp = time.time(), time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    
    t_m = time.time()
    print(f"Sent: {message}, took {(t_m - tmp):.2f}")
    tmp = t_m
    time.sleep(0.05)

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

Sent: {'number': 0}, took 0.00
Sent: {'number': 1}, took 0.06
Sent: {'number': 2}, took 0.06
Sent: {'number': 3}, took 0.06
Sent: {'number': 4}, took 0.06
Sent: {'number': 5}, took 0.05
Sent: {'number': 6}, took 0.06
Sent: {'number': 7}, took 0.05
Sent: {'number': 8}, took 0.06
Sent: {'number': 9}, took 0.06
took 0.55 seconds


In [4]:
import pandas as pd

topic_name = 'green-trips'
t0 = time.time()

df_green = pd.read_csv("green_tripdata_2019-10.csv", dtype={'store_and_fwd_flag': str})

for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    producer.send(topic_name, value=row_dict)
    
producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')


took 80.99 seconds


In [5]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

:: loading settings :: url = jar:file:/Users/ishu/anaconda3/envs/zoomcamp/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/ishu/.ivy2/cache
The jars for the packages stored in: /Users/ishu/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8f96aa12-7879-4de6-a68a-524dfaddaad0;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.5 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.5 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.5 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 456ms :: artifacts dl 24ms
	:: m

In [6]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [7]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

25/03/09 12:45:06 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/yp/8hj8s1x90jxff61lt4rls01r0000gn/T/temporary-5602f771-d27b-4723-9809-e6d2b57d13c4. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/03/09 12:45:06 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [8]:
query.stop()

In [9]:
# Q6
from pyspark.sql import types
from pyspark.sql import functions as F

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())


green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [10]:
green_stream

DataFrame[lpep_pickup_datetime: string, lpep_dropoff_datetime: string, PULocationID: int, DOLocationID: int, passenger_count: double, trip_distance: double, tip_amount: double]

In [13]:
df = green_stream.withColumn('timestamp', F.current_timestamp())
df = df.withWatermark("timestamp", "10 minutes")
df = df.groupBy(
    F.window(timeColumn=F.col('timestamp'), windowDuration='5 minutes'),
    df.DOLocationID
).count()
df = df.orderBy(F.col("count").desc())

query = df \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()


25/03/09 12:49:16 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/yp/8hj8s1x90jxff61lt4rls01r0000gn/T/temporary-b26b1dbe-b6f7-4302-83be-537a4d80a491. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/03/09 12:49:16 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/03/09 12:49:16 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2025-03-09 12:45:00, 2025-03-09 12:50:00}|74          |70238|
|{2025-03-09 12:45:00, 2025-03-09 12:50:00}|42          |62791|
|{2025-03-09 12:45:00, 2025-03-09 12:50:00}|41          |55740|
|{2025-03-09 12:45:00, 2025-03-09 12:50:00}|75          |50661|
|{2025-03-09 12:45:00, 2025-03-09 12:50:00}|129         |47281|
|{2025-03-09 12:45:00, 2025-03-09 12:50:00}|7           |45706|
|{2025-03-09 12:45:00, 2025-03-09 12:50:00}|166         |43124|
|{2025-03-09 12:45:00, 2025-03-09 12:50:00}|236         |31426|
|{2025-03-09 12:45:00, 2025-03-09 12:50:00}|223         |29989|
|{2025-03-09 12:45:00, 2025-03-09 12:50:00}|238         |29052|
|{2025-03-09 12:45:00, 2025-03-09 12:50:00}|82          |28733|
|{2025-

25/03/09 14:33:39 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1273934 ms exceeds timeout 120000 ms
25/03/09 14:33:39 WARN SparkContext: Killing executors is not supported by current scheduler.
25/03/09 14:33:42 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

Py4JError: An error occurred while calling o107.awaitTermination