In [1]:
import json
import time 

In [2]:
from kafka import KafkaProducer

In [3]:
def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

In [4]:
producer.bootstrap_connected()

True

In [None]:
#Now we're ready to send some test data:

t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

In [None]:
#Green taxi trip data - web to local

#!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz

#!gunzip -c green_tripdata_2019-10.csv.gz > green_tripdata_2019-10.csv

#!rm green_tripdata_2019-10.csv.gz

In [5]:
import findspark
findspark.init("/usr/local/sdkman/candidates/spark/current")

import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

24/04/08 20:04:22 WARN Utils: Your hostname, codespaces-68eae1 resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
24/04/08 20:04:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/usr/local/sdkman/candidates/spark/3.3.2/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/codespace/.ivy2/cache
The jars for the packages stored in: /home/codespace/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6fbd12ae-7105-4d8a-be37-8a4a42b5959b;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.2 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:

24/04/08 20:04:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [6]:
df = spark.read \
    .option("header", "true") \
    .csv('green_tripdata_2019-10.csv')

In [None]:
df.show()

In [7]:
df_green = df.select('lpep_pickup_datetime',
    'lpep_dropoff_datetime',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'tip_amount')

In [None]:
df_green.show()

In [8]:
df_green_pd = df_green.toPandas()

for row in df_green_pd.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    print(row_dict)
    break

                                                                                

{'lpep_pickup_datetime': '2019-10-01 00:26:02', 'lpep_dropoff_datetime': '2019-10-01 00:39:58', 'PULocationID': '112', 'DOLocationID': '196', 'passenger_count': '1', 'trip_distance': '5.88', 'tip_amount': '0'}


In [None]:
# producer sends row by row the green taxi data to the green-trips topic

t0 = time.time()

topic_name = 'green-trips'

for row in df_green_pd.itertuples(index=False):
    message = {col: getattr(row, col) for col in row._fields}
    producer.send(topic_name, value=message)
    #print(f"Sent: {message}")
    #time.sleep(0.05)

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

In [9]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [None]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

In [None]:
query.stop()

In [None]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [None]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [None]:
#for the output result of this query see query cell above
query = green_stream.writeStream.foreachBatch(peek).start()

In [None]:
query.stop()

In [29]:
#add timestamp column

#from pyspark.sql.functions import current_timestamp
from pyspark.sql import functions as F

df_green_with_ts = df_green.withColumn("timestamp", F.current_timestamp())
        
#popular_destinations = df_green_with_ts.groupby([F.window("timestamp", "5 minutes"), df_green_with_ts.DOLocationID])\
#    .count().sort(F.col("count").desc())
    
popular_destinations = df_green_with_ts.groupby([F.window("timestamp", "5 minutes"), df_green_with_ts.DOLocationID])\
    .agg(F.count("*").alias("count")).sort(F.col("count").desc())

 

In [24]:
popular_destinations_pd = popular_destinations.toPandas()

for row in popular_destinations_pd.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    print(row_dict)

                                                                                

{'window': Row(start=datetime.datetime(2024, 4, 8, 21, 25), end=datetime.datetime(2024, 4, 8, 21, 30)), 'DOLocationID': '74', 'count': 17741}
{'window': Row(start=datetime.datetime(2024, 4, 8, 21, 25), end=datetime.datetime(2024, 4, 8, 21, 30)), 'DOLocationID': '42', 'count': 15942}
{'window': Row(start=datetime.datetime(2024, 4, 8, 21, 25), end=datetime.datetime(2024, 4, 8, 21, 30)), 'DOLocationID': '41', 'count': 14061}
{'window': Row(start=datetime.datetime(2024, 4, 8, 21, 25), end=datetime.datetime(2024, 4, 8, 21, 30)), 'DOLocationID': '75', 'count': 12840}
{'window': Row(start=datetime.datetime(2024, 4, 8, 21, 25), end=datetime.datetime(2024, 4, 8, 21, 30)), 'DOLocationID': '129', 'count': 11930}
{'window': Row(start=datetime.datetime(2024, 4, 8, 21, 25), end=datetime.datetime(2024, 4, 8, 21, 30)), 'DOLocationID': '7', 'count': 11533}
{'window': Row(start=datetime.datetime(2024, 4, 8, 21, 25), end=datetime.datetime(2024, 4, 8, 21, 30)), 'DOLocationID': '166', 'count': 10845}
{'win

In [13]:
#from pyspark.sql import types

#schema = types.StructType() \
#    .add("window", types.StringType()) \
#    .add("DOLocationID", types.IntegerType()) \
#    .add("count", types.IntegerType())

In [25]:
popular_destinations_pd.columns

Index(['window', 'DOLocationID', 'count'], dtype='object')

In [26]:
# producer sends row by row the df data to the pop-dest topic
popular_destinations_pd['window'] = popular_destinations_pd['window'].astype(str)


t0 = time.time()

topic_name = 'pop-dest'

for row in popular_destinations_pd.itertuples(index=False):
    message = {col: getattr(row, col) for col in row._fields}
    producer.send(topic_name, value=message)
    #print(f"Sent: {message}")
    #time.sleep(0.05)

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

took 0.03 seconds


In [31]:
pop_dest_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "pop-dest") \
    .option("startingOffsets", "earliest") \
    .load()

#popular_destinations = popular_destinations \
#  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
#  .select("data.*")

In [32]:
query = pop_dest_stream \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

24/04/08 21:35:03 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-b3d71725-ee16-44f1-bcd4-834e5ebb4773. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/08 21:35:03 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;
kafka

In [None]:
query.stop()