In [1]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

In [2]:
print('Sending messages')

t0 = time.time()

topic_name = 'test-topic'


for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)
    
t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

print('Flushing messages')

t0 = time.time()

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

Sending messages
Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
took 0.56 seconds
Flushing messages
took 0.00 seconds


In [5]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz

--2024-03-19 22:38:50--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 192.30.255.112
Connecting to github.com (github.com)|192.30.255.112|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/ea580e9e-555c-4bd0-ae73-43051d8e7c0b?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240319%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240319T223850Z&X-Amz-Expires=300&X-Amz-Signature=9efa78681198c7c01d01d6120e955221307b232bbe500d91278bfadffef23288&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dgreen_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-19 22:38:50--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/ea58

In [6]:
!ls -a

.   .ipynb_checkpoints	docker-compose.yml	       homework.md
..  06-streaming.ipynb	green_tripdata_2019-10.csv.gz


In [3]:
import pandas as pd

In [4]:
df_green_all = pd.read_csv('green_tripdata_2019-10.csv.gz')

  df_green_all = pd.read_csv('green_tripdata_2019-10.csv.gz')


In [5]:
df_green_all.count()

VendorID                 387007
lpep_pickup_datetime     476386
lpep_dropoff_datetime    476386
store_and_fwd_flag       387007
RatecodeID               387007
PULocationID             476386
DOLocationID             476386
passenger_count          387007
trip_distance            476386
fare_amount              476386
extra                    476386
mta_tax                  476386
tip_amount               476386
tolls_amount             476386
ehail_fee                     0
improvement_surcharge    476386
total_amount             476386
payment_type             387007
trip_type                387005
congestion_surcharge     387007
dtype: int64

In [10]:
df_green_all.columns

Index(['VendorID', 'lpep_pickup_datetime', 'lpep_dropoff_datetime',
       'store_and_fwd_flag', 'RatecodeID', 'PULocationID', 'DOLocationID',
       'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax',
       'tip_amount', 'tolls_amount', 'ehail_fee', 'improvement_surcharge',
       'total_amount', 'payment_type', 'trip_type', 'congestion_surcharge'],
      dtype='object')

In [6]:
taxi_columns = [
     'lpep_pickup_datetime', 'lpep_dropoff_datetime',
      'PULocationID', 'DOLocationID',
       'passenger_count', 'trip_distance',
       'tip_amount'
]

In [12]:
df_green_all[taxi_columns].head()

Unnamed: 0,lpep_pickup_datetime,lpep_dropoff_datetime,PULocationID,DOLocationID,passenger_count,trip_distance,tip_amount
0,2019-10-01 00:26:02,2019-10-01 00:39:58,112,196,1.0,5.88,0.0
1,2019-10-01 00:18:11,2019-10-01 00:22:38,43,263,1.0,0.8,0.0
2,2019-10-01 00:09:31,2019-10-01 00:24:47,255,228,2.0,7.5,0.0
3,2019-10-01 00:37:40,2019-10-01 00:41:49,181,181,1.0,0.9,0.0
4,2019-10-01 00:08:13,2019-10-01 00:17:56,97,188,1.0,2.52,2.26


In [9]:
df_green = df_green_all[taxi_columns]

In [10]:
print('Sending messages')

t0 = time.time()

topic_name = 'green-trips'


for row in df_green.itertuples(index=False):
    message = {col: getattr(row, col) for col in row._fields}
    producer.send(topic_name, value=message)
#    print(f"Sent: {message}")
    
t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

print('Flushing messages')

t0 = time.time()

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

Sending messages
took 52.00 seconds
Flushing messages
took 0.00 seconds


In [11]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

:: loading settings :: url = jar:file:/home/cnaung/spark/spark-3.3.2-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/cnaung/.ivy2/cache
The jars for the packages stored in: /home/cnaung/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8413798a-7067-42a7-b153-0aff41391036;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.2 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: reso

24/03/20 21:56:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [12]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [13]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

24/03/20 21:56:27 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-1ed52899-cff4-42f4-adfe-a8e220e48f71. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/20 21:56:27 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 20, 21, 54, 47, 864000), timestampType=0)


In [14]:
query.stop()

In [15]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [16]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [17]:
green_stream.printSchema()

root
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)



In [18]:
def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
    write_query = df.writeStream \
        .outputMode(output_mode) \
        .trigger(processingTime=processing_time) \
        .format("console") \
        .option("truncate", False) \
        .start()
    return write_query 

In [19]:
write_query = sink_console(green_stream, output_mode='append')

24/03/20 21:57:26 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-cf69ebeb-f6f1-488a-9f2a-721057abba86. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/20 21:57:26 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
|lpep_pickup_datetime|lpep_dropoff_datetime|PULocationID|DOLocationID|passenger_count|trip_distance|tip_amount|
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
|2019-10-01 00:26:02 |2019-10-01 00:39:58  |112         |196         |1.0            |5.88         |0.0       |
|2019-10-01 00:18:11 |2019-10-01 00:22:38  |43          |263         |1.0            |0.8          |0.0       |
|2019-10-01 00:09:31 |2019-10-01 00:24:47  |255         |228         |2.0            |7.5          |0.0       |
|2019-10-01 00:37:40 |2019-10-01 00:41:49  |181         |181         |1.0            |0.9          |0.0       |
|2019-10-01 00:08:13 |2019-10-01 00:17:56  |97          |188         |1.0            |2.52         |2.26      |
|2019-1

In [20]:
green_stream_ts = green_stream.withColumn("timestamp",F.current_timestamp())

In [21]:
green_stream_ts.printSchema()

root
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- timestamp: timestamp (nullable = false)



In [22]:
def op_windowed_groupby(df, window_duration):
    df_windowed_aggregation = df.groupBy(
        F.window(timeColumn=df.timestamp, windowDuration=window_duration),
        df.DOLocationID
    ).count().sort(F.desc("count"))
    return df_windowed_aggregation

In [23]:
popular_destinations = op_windowed_groupby(green_stream_ts,"5 minutes")

In [24]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination(90)

24/03/20 21:58:41 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-31c1b66c-bb47-4518-86be-c66dfcdb598e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/20 21:58:41 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-03-20 21:55:00, 2024-03-20 22:00:00}|74          |17741|
|{2024-03-20 21:55:00, 2024-03-20 22:00:00}|42          |15942|
|{2024-03-20 21:55:00, 2024-03-20 22:00:00}|41          |14061|
|{2024-03-20 21:55:00, 2024-03-20 22:00:00}|75          |12840|
|{2024-03-20 21:55:00, 2024-03-20 22:00:00}|129         |11930|
|{2024-03-20 21:55:00, 2024-03-20 22:00:00}|7           |11533|
|{2024-03-20 21:55:00, 2024-03-20 22:00:00}|166         |10845|
|{2024-03-20 21:55:00, 2024-03-20 22:00:00}|236         |7913 |
|{2024-03-20 21:55:00, 2024-03-20 22:00:00}|223         |7542 |
|{2024-03-20 21:55:00, 2024-03-20 22:00:00}|238         |7318 |
|{2024-03-20 21:55:00, 2024-03-20 22:00:00}|82          |7292 |
|{2024-

False