### Question 1: Redpanda version

```bash
$ rpk version
v22.3.5 (rev 28b2443)
```

### Question 2. Creating a topic

```bash
$ rpk topic create test-topic
TOPIC       STATUS
test-topic  OK
```

### Question 3. Connecting to the Kafka server

In [1]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

### Question 4. Sending data to the stream

In [3]:
import timeit

t0 = time.time()
measured_time_dict = {"send": [], "flush": []}

topic_name = 'test-topic'

for i in range(100):
    message = {'number': i}
    start = timeit.default_timer()
    producer.send(topic_name, value=message)
    measured_time_dict["send"].append(timeit.default_timer() - start)
    print(f"Sent: {message}")
    time.sleep(0.05)

start = timeit.default_timer()
producer.flush()
measured_time_dict["flush"].append(timeit.default_timer() - start)

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
Sent: {'number': 10}
Sent: {'number': 11}
Sent: {'number': 12}
Sent: {'number': 13}
Sent: {'number': 14}
Sent: {'number': 15}
Sent: {'number': 16}
Sent: {'number': 17}
Sent: {'number': 18}
Sent: {'number': 19}
Sent: {'number': 20}
Sent: {'number': 21}
Sent: {'number': 22}
Sent: {'number': 23}
Sent: {'number': 24}
Sent: {'number': 25}
Sent: {'number': 26}
Sent: {'number': 27}
Sent: {'number': 28}
Sent: {'number': 29}
Sent: {'number': 30}
Sent: {'number': 31}
Sent: {'number': 32}
Sent: {'number': 33}
Sent: {'number': 34}
Sent: {'number': 35}
Sent: {'number': 36}
Sent: {'number': 37}
Sent: {'number': 38}
Sent: {'number': 39}
Sent: {'number': 40}
Sent: {'number': 41}
Sent: {'number': 42}
Sent: {'number': 43}
Sent: {'number': 44}
Sent: {'number': 45}
Sent: {'number': 46}
Sent: {'number': 47}
Se

In [4]:
display(sum(measured_time_dict["send"]) / len(measured_time_dict["send"]))
display(sum(measured_time_dict["flush"]) / len(measured_time_dict["flush"]))

display("When tested over 100 runs, send and flush almost took save time. with ~.1ms difference")

0.0004262909997487441

0.0002246999938506633

'When tested over 100 runs, send and flush almost took save time. with ~.1ms difference'

### Question 5: Sending the Trip Data

In [2]:
import pandas as pd
import timeit

df_green = pd.read_csv("green_tripdata_2019-10.csv.gz", low_memory=False)

start = timeit.default_timer()
for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    message = row_dict
    producer.send("green-trips", value=message)

print(timeit.default_timer() - start)

49.79525179998018


In [3]:
import logging

logging.basicConfig(level=logging.INFO)

In [4]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

In [5]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [5]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

In [None]:
query = green_stream.writeStream.foreachBatch(peek).start()

In [24]:
display(query.isActive)
query.stop()
display(query.isActive)

True

False

### Question 6. Parsing the data

In [6]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [7]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

# query = green_stream.writeStream.foreachBatch(peek).start()

In [8]:
green_stream = green_stream.withColumn("timestamp", F.current_timestamp())

popular_destinations = green_stream.groupBy(
    F.window(F.col("timestamp"), "5 minutes"),
    F.col("DOLocationID")
).count().orderBy(F.desc("count"))

query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("res_table") \
    .start()

query.awaitTermination(120)

False

In [9]:
spark.sql("SELECT * FROM res_table ORDER BY count DESC").show()
query.stop()

+--------------------+------------+-----+
|              window|DOLocationID|count|
+--------------------+------------+-----+
|{2024-04-09 18:30...|          74|35482|
|{2024-04-09 18:30...|          42|31884|
|{2024-04-09 18:30...|          41|28122|
|{2024-04-09 18:30...|          75|25680|
|{2024-04-09 18:30...|         129|23860|
|{2024-04-09 18:30...|           7|23066|
|{2024-04-09 18:30...|         166|21690|
|{2024-04-09 18:30...|         236|15826|
|{2024-04-09 18:30...|         223|15084|
|{2024-04-09 18:30...|         238|14636|
|{2024-04-09 18:30...|          82|14584|
|{2024-04-09 18:30...|         181|14564|
|{2024-04-09 18:30...|          95|14488|
|{2024-04-09 18:30...|         244|13466|
|{2024-04-09 18:30...|          61|13212|
|{2024-04-09 18:30...|         116|12678|
|{2024-04-09 18:30...|         138|12288|
|{2024-04-09 18:30...|          97|12100|
|{2024-04-09 18:30...|          49|10442|
|{2024-04-09 18:30...|         151|10306|
+--------------------+------------