In [28]:
import json
import time
import pandas as pd
import datetime


from kafka import KafkaProducer

In [21]:
def json_serializer(data):
    return json.dumps(data).encode("utf-8")

In [22]:
server = "localhost:9092"

producer = KafkaProducer(bootstrap_servers=[server], value_serializer=json_serializer)

In [23]:
producer.bootstrap_connected()


True

# Question 2

In [24]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

producer.flush()

t1 = time.time()    
print(f'took {(t1 - t0):.2f} seconds')


Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
took 0.51 seconds


In [None]:
df_green = pd.read_csv('green_tripdata_2019-10.csv.gz', compression='gzip')



df_green = df_green[['lpep_pickup_datetime',
                     'lpep_dropoff_datetime',
                     'PULocationID',
                     'DOLocationID',
                     'passenger_count',
                     'trip_distance',
                     'tip_amount']]

topic_name = 'green-trips'
t0 = time.time()
for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    row_dict['timetstamp'] = datetime.datetime.now().timestamp()
    producer.send(topic_name, value=row_dict)
t1 = time.time()

print(f'It took {round(t1 - t0, 2)} seconds to send these messages.')


In [43]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

In [44]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [45]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

24/03/14 18:38:34 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-40ded9c3-de37-4d52-9a21-2cfb4bf768a9. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/14 18:38:34 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/14 18:38:34 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0, "timestamp": "2024-03-14 18:03:49"}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 14, 18, 3, 49, 388000), timestampType=0)


In [46]:
query.stop()


In [47]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [48]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [49]:
green_stream.printSchema()


root
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)



In [51]:
green_stream = green_stream.withColumn("timestamp", F.current_timestamp())


popular_destinations = green_stream.groupBy(
    F.window(
        green_stream.timestamp,
        windowDuration='5 minutes'
    ),
    green_stream.DOLocationID,
).count()

In [None]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

24/03/14 18:43:33 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-0e3777b1-004c-4bcd-a1b3-36f9ccd741df. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/14 18:43:33 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/14 18:43:33 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+------+
|window                                    |DOLocationID|count |
+------------------------------------------+------------+------+
|{2024-03-14 18:40:00, 2024-03-14 18:45:00}|30          |149   |
|{2024-03-14 18:40:00, 2024-03-14 18:45:00}|168         |26848 |
|{2024-03-14 18:40:00, 2024-03-14 18:45:00}|171         |6256  |
|{2024-03-14 18:40:00, 2024-03-14 18:45:00}|112         |22467 |
|{2024-03-14 18:40:00, 2024-03-14 18:45:00}|127         |12396 |
|{2024-03-14 18:40:00, 2024-03-14 18:45:00}|160         |15123 |
|{2024-03-14 18:40:00, 2024-03-14 18:45:00}|250         |6502  |
|{2024-03-14 18:40:00, 2024-03-14 18:45:00}|231         |13981 |
|{2024-03-14 18:40:00, 2024-03-14 18:45:00}|120         |1250  |
|{2024-03-14 18:40:00, 2024-03-14 18:45:00}|129         |107847|
|{2024-03-14 18:40:00, 2024-03-14 18:45:00}|47          |8