## Question 1: Redpanda version

[](https://github.com/DataTalksClub/data-engineering-zoomcamp/blob/main/cohorts/2024/06-streaming/homework.md#question-1-redpanda-version)

Now let's find out the version of redpandas.

For that, check the output of the command `rpk help` _inside the container_. The name of the container is `redpanda-1`.

Find out what you need to execute based on the `help` output.

What's the version, based on the output of the command you executed? (copy the entire version)

In [2]:
!docker compose exec redpanda-1 rpk help

rpk is the Redpanda CLI & toolbox

Usage:
  rpk [command]

Available Commands:
  acl         Manage ACLs and SASL users
  cloud       Interact with Redpanda cloud
  cluster     Interact with a Redpanda cluster
  container   Manage a local container cluster
  debug       Debug the local Redpanda process
  generate    Generate a configuration template for related services
  group       Describe, list, and delete consumer groups and manage their offsets
  help        Help about any command
  iotune      Measure filesystem performance and create IO configuration file
  plugin      List, download, update, and remove rpk plugins
  redpanda    Interact with a local Redpanda process
  topic       Create, delete, produce to and consume from Redpanda topics
  version     Check the current version
  wasm        Deploy and remove inline WASM engine scripts

Flags:
  -h, --help      Help for rpk
  -v, --verbose   Enable verbose logging (default: false)

Use "rpk [command] --help" for more informati

In [1]:
!docker compose exec redpanda-1 rpk version

v22.3.5 (rev 28b2443)


## Question 2. Creating a topic

[](https://github.com/DataTalksClub/data-engineering-zoomcamp/blob/main/cohorts/2024/06-streaming/homework.md#question-2-creating-a-topic)

Before we can send data to the redpanda server, we need to create a topic. We do it also with the `rpk` command we used previously for figuring out the version of redpandas.

Read the output of `help` and based on it, create a topic with name `test-topic`

What's the output of the command for creating a topic?

In [3]:
!docker compose exec redpanda-1 rpk topic create test-topic

TOPIC       STATUS
test-topic  OK


## Question 3. Connecting to the Kafka server

[](https://github.com/DataTalksClub/data-engineering-zoomcamp/blob/main/cohorts/2024/06-streaming/homework.md#question-3-connecting-to-the-kafka-server)

We need to make sure we can connect to the server, so later we can send some data to its topics

First, let's install the kafka connector (up to you if you want to have a separate virtual environment for that)

```shell
pip install kafka-python
```

You can start a jupyter notebook in your solution folder or create a script

Let's try to connect to our server:

In [5]:
import json
import time
from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

Provided that you can connect to the server, what's the output of the last command?

R= **True**

## Question 4. Sending data to the stream

[](https://github.com/DataTalksClub/data-engineering-zoomcamp/blob/main/cohorts/2024/06-streaming/homework.md#question-4-sending-data-to-the-stream)

Now we're ready to send some test data:

In [6]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)
t2 = time.time()
print(f'sending messages took {(t2 - t0):.2f} seconds')
producer.flush()

t1 = time.time()
print(f'final step took {(t1 - t0):.2f} seconds')

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
sending messages took 0.51 seconds
final step took 0.51 seconds


How much time did it take? Where did it spend most of the time?

Sending the messages -> 0.51
Flushing -> 0.51
Both took approximately the same amount of time
(Don't remove time.sleep when answering this question)

R= **Both took approximately the same amount of time**

## Reading data with `rpk`

[](https://github.com/DataTalksClub/data-engineering-zoomcamp/blob/main/cohorts/2024/06-streaming/homework.md#reading-data-with-rpk)

You can see the messages that you send to the topic with `rpk`:

```shell
rpk topic consume test-topic
```

Run the command above and send the messages one more time to see them

## Sending the taxi data

[](https://github.com/DataTalksClub/data-engineering-zoomcamp/blob/main/cohorts/2024/06-streaming/homework.md#sending-the-taxi-data)

Now let's send our actual data:

- Read the green csv.gz file
- We will only need these columns:
    - `'lpep_pickup_datetime',`
    - `'lpep_dropoff_datetime',`
    - `'PULocationID',`
    - `'DOLocationID',`
    - `'passenger_count',`
    - `'trip_distance',`
    - `'tip_amount'`

Iterate over the records in the dataframe

In [7]:
# Load pandas
import pandas as pd

In [8]:
# df
df_green = pd.read_csv("https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz",
                    compression="gzip", low_memory=False)

In [9]:
# check
df_green.head(5)

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2.0,2019-10-01 00:26:02,2019-10-01 00:39:58,N,1.0,112,196,1.0,5.88,18.0,0.5,0.5,0.0,0.0,,0.3,19.3,2.0,1.0,0.0
1,1.0,2019-10-01 00:18:11,2019-10-01 00:22:38,N,1.0,43,263,1.0,0.8,5.0,3.25,0.5,0.0,0.0,,0.3,9.05,2.0,1.0,0.0
2,1.0,2019-10-01 00:09:31,2019-10-01 00:24:47,N,1.0,255,228,2.0,7.5,21.5,0.5,0.5,0.0,0.0,,0.3,22.8,2.0,1.0,0.0
3,1.0,2019-10-01 00:37:40,2019-10-01 00:41:49,N,1.0,181,181,1.0,0.9,5.5,0.5,0.5,0.0,0.0,,0.3,6.8,2.0,1.0,0.0
4,2.0,2019-10-01 00:08:13,2019-10-01 00:17:56,N,1.0,97,188,1.0,2.52,10.0,0.5,0.5,2.26,0.0,,0.3,13.56,1.0,1.0,0.0


In [10]:
# select columns
cols = [
    'lpep_pickup_datetime',
    'lpep_dropoff_datetime',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'tip_amount'
]

In [11]:
# apply selection
df_selection = df_green[cols]

In [12]:
# test iterate
t0 = time.time()
for row in df_selection.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    print(row_dict)
    break

{'lpep_pickup_datetime': '2019-10-01 00:26:02', 'lpep_dropoff_datetime': '2019-10-01 00:39:58', 'PULocationID': 112, 'DOLocationID': 196, 'passenger_count': 1.0, 'trip_distance': 5.88, 'tip_amount': 0.0}


## Question 5: Sending the Trip Data

[](https://github.com/DataTalksClub/data-engineering-zoomcamp/blob/main/cohorts/2024/06-streaming/homework.md#question-5-sending-the-trip-data)

- Create a topic `green-trips` and send the data there
- How much time in seconds did it take? (You can round it to a whole number)
- Make sure you don't include sleeps in your code

In [13]:
!docker compose exec redpanda-1 rpk topic create green-trips

TOPIC        STATUS
green-trips  OK


In [14]:
# Note this step will be long XD
topic_name = 'green-trips'
t0 = time.time()

for row in df_selection.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    producer.send(topic_name, value=row_dict)
    # print(row_dict)

t1 = time.time()
print(f'Time to send data {(t1 - t0):.2f} seconds')

Time to send data 67.47 seconds


## Creating the PySpark consumer

Now let's read the data with PySpark.

Spark needs a library (jar) to be able to connect to Kafka, so we need to tell PySpark that it needs to use it:

In [1]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

Now we can connect to the stream:

In [2]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In order to test that we can consume from the stream, let's see what will be the first record there.

In Spark streaming, the stream is represented as a sequence of small batches, each batch being a small RDD (or a small dataframe).

So we can execute a function over each mini-batch. Let's run take(1) there to see what do we have in the stream:

In [3]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 17, 12, 9, 43, 905000), timestampType=0)
BatchID - 0
[Row(lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=112, DOLocationID=196, passenger_count=1.0, trip_distance=5.88, tip_amount=0.0), Row(lpep_pickup_datetime='2019-10-01 00:18:11', lpep_dropoff_datetime='2019-10-01 00:22:38', PULocationID=43, DOLocationID=263, passenger_count=1.0, trip_distance=0.8, tip_amount=0.0), Row(lpep_pickup_datetime='2019-10-01 00:09:31', lpep_dropoff_datetime='2019-10-01 00:24:47', PULocationID=255, DOLocationID=228, passenger_count=2.0, trip_distance=7.5, tip_amount=0.0), Row(lpep_pickup_datetime='2019-10-01 00:37:40', lpep_dropoff_datetim

You should see a record like this:



In [4]:
query.stop()

## Question 6. Parsing the data

[](https://github.com/DataTalksClub/data-engineering-zoomcamp/blob/main/cohorts/2024/06-streaming/homework.md#question-6-parsing-the-data)

The data is JSON, but currently it's in binary format. We need to parse it and turn it into a streaming dataframe with proper columns

Similarly to PySpark, we define the schema

In [5]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [8]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [None]:
def debug_and_process(batch_df, batch_id):
    print(f"Batch ID: {batch_id}")
    batch_df.show()

In [None]:
query = green_stream \
    .writeStream \
    .foreachBatch(debug_and_process) \
    .start()

query.awaitTermination()

Output Before:

```shell
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value
                                                           |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}|
|{"lpep_pickup_datetime": "2019-10-01 00:18:11", "lpep_dropoff_datetime": "2019-10-01 00:22:38", "PULocationID": 43, "DOLocationID": 263, "passenger_count": 1.0, "trip_distance": 0.8, "tip_amount": 0.0}  |
|{"lpep_pickup_datetime": "2019-10-01 00:09:31", "lpep_dropoff_datetime": "2019-10-01 00:24:47", "PULocationID": 255, "DOLocationID": 228, "passenger_count": 2.0, "trip_distance": 7.5, "tip_amount": 0.0} |
|{"lpep_pickup_datetime": "2019-10-01 00:37:40", "lpep_dropoff_datetime": "2019-10-01 00:41:49", "PULocationID": 181, "DOLocationID": 181, "passenger_count": 1.0, "trip_distance": 0.9, "tip_amount": 0.0} |
|{"lpep_pickup_datetime": "2019-10-01 00:08:13", "lpep_dropoff_datetime": "2019-10-01 00:17:56", "PULocationID": 97, "DOLocationID": 188, "passenger_count": 1.0, "trip_distance": 2.52, "tip_amount": 2.26}|
|{"lpep_pickup_datetime": "2019-10-01 00:35:01", "lpep_dropoff_datetime": "2019-10-01 00:43:40", "PULocationID": 65, "DOLocationID": 49, "passenger_count": 1.0, "trip_distance": 1.47, "tip_amount": 1.86} |
|{"lpep_pickup_datetime": "2019-10-01 00:28:09", "lpep_dropoff_datetime": "2019-10-01 00:30:49", "PULocationID": 7, "DOLocationID": 179, "passenger_count": 1.0, "trip_distance": 0.6, "tip_amount": 1.0}   |
|{"lpep_pickup_datetime": "2019-10-01 00:28:26", "lpep_dropoff_datetime": "2019-10-01 00:32:01", "PULocationID": 41, "DOLocationID": 74, "passenger_count": 1.0, "trip_distance": 0.56, "tip_amount": 0.0}  |
|{"lpep_pickup_datetime": "2019-10-01 00:14:01", "lpep_dropoff_datetime": "2019-10-01 00:26:16", "PULocationID": 255, "DOLocationID": 49, "passenger_count": 1.0, "trip_distance": 2.42, "tip_amount": 0.0} |
|{"lpep_pickup_datetime": "2019-10-01 00:03:03", "lpep_dropoff_datetime": "2019-10-01 00:17:13", "PULocationID": 130, "DOLocationID": 131, "passenger_count": 1.0, "trip_distance": 3.4, "tip_amount": 2.85}|
|{"lpep_pickup_datetime": "2019-10-01 00:07:10", "lpep_dropoff_datetime": "2019-10-01 00:23:38", "PULocationID": 24, "DOLocationID": 74, "passenger_count": 3.0, "trip_distance": 3.18, "tip_amount": 0.0}  |
|{"lpep_pickup_datetime": "2019-10-01 00:25:48", "lpep_dropoff_datetime": "2019-10-01 00:49:52", "PULocationID": 255, "DOLocationID": 188, "passenger_count": 1.0, "trip_distance": 4.7, "tip_amount": 1.0} |
|{"lpep_pickup_datetime": "2019-10-01 00:03:12", "lpep_dropoff_datetime": "2019-10-01 00:14:43", "PULocationID": 129, "DOLocationID": 160, "passenger_count": 1.0, "trip_distance": 3.1, "tip_amount": 0.0} |
|{"lpep_pickup_datetime": "2019-10-01 00:44:56", "lpep_dropoff_datetime": "2019-10-01 00:51:06", "PULocationID": 18, "DOLocationID": 169, "passenger_count": 1.0, "trip_distance": 1.19, "tip_amount": 0.25}|
|{"lpep_pickup_datetime": "2019-10-01 00:55:14", "lpep_dropoff_datetime": "2019-10-01 01:00:49", "PULocationID": 223, "DOLocationID": 7, "passenger_count": 1.0, "trip_distance": 1.09, "tip_amount": 1.46} |
|{"lpep_pickup_datetime": "2019-10-01 00:06:06", "lpep_dropoff_datetime": "2019-10-01 00:11:05", "PULocationID": 75, "DOLocationID": 262, "passenger_count": 1.0, "trip_distance": 1.24, "tip_amount": 2.01}|
|{"lpep_pickup_datetime": "2019-10-01 00:00:19", "lpep_dropoff_datetime": "2019-10-01 00:14:32", "PULocationID": 97, "DOLocationID": 228, "passenger_count": 1.0, "trip_distance": 3.03, "tip_amount": 3.58}|
|{"lpep_pickup_datetime": "2019-10-01 00:09:31", "lpep_dropoff_datetime": "2019-10-01 00:20:41", "PULocationID": 41, "DOLocationID": 74, "passenger_count": 1.0, "trip_distance": 2.03, "tip_amount": 2.16} |
|{"lpep_pickup_datetime": "2019-10-01 00:30:36", "lpep_dropoff_datetime": "2019-10-01 00:34:30", "PULocationID": 41, "DOLocationID": 42, "passenger_count": 1.0, "trip_distance": 0.73, "tip_amount": 1.26} |
|{"lpep_pickup_datetime": "2019-10-01 00:58:32", "lpep_dropoff_datetime": "2019-10-01 01:05:08", "PULocationID": 41, "DOLocationID": 116, "passenger_count": 1.0, "trip_distance": 1.48, "tip_amount": 0.0} |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 20 rows
```

Output after parsing

```shell
Batch ID: 0
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
|lpep_pickup_datetime|lpep_dropoff_datetime|PULocationID|DOLocationID|passenger_count|trip_distance|tip_amount|
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
| 2019-10-01 00:26:02|  2019-10-01 00:39:58|         112|         196|            1.0|         5.88|       0.0|
| 2019-10-01 00:18:11|  2019-10-01 00:22:38|          43|         263|            1.0|          0.8|       0.0|
| 2019-10-01 00:09:31|  2019-10-01 00:24:47|         255|         228|            2.0|          7.5|       0.0|
| 2019-10-01 00:37:40|  2019-10-01 00:41:49|         181|         181|            1.0|          0.9|       0.0|
| 2019-10-01 00:08:13|  2019-10-01 00:17:56|          97|         188|            1.0|         2.52|      2.26|
| 2019-10-01 00:35:01|  2019-10-01 00:43:40|          65|          49|            1.0|         1.47|      1.86|
| 2019-10-01 00:28:09|  2019-10-01 00:30:49|           7|         179|            1.0|          0.6|       1.0|
| 2019-10-01 00:28:26|  2019-10-01 00:32:01|          41|          74|            1.0|         0.56|       0.0|
| 2019-10-01 00:14:01|  2019-10-01 00:26:16|         255|          49|            1.0|         2.42|       0.0|
| 2019-10-01 00:03:03|  2019-10-01 00:17:13|         130|         131|            1.0|          3.4|      2.85|
| 2019-10-01 00:07:10|  2019-10-01 00:23:38|          24|          74|            3.0|         3.18|       0.0|
| 2019-10-01 00:25:48|  2019-10-01 00:49:52|         255|         188|            1.0|          4.7|       1.0|
| 2019-10-01 00:03:12|  2019-10-01 00:14:43|         129|         160|            1.0|          3.1|       0.0|
| 2019-10-01 00:44:56|  2019-10-01 00:51:06|          18|         169|            1.0|         1.19|      0.25|
| 2019-10-01 00:55:14|  2019-10-01 01:00:49|         223|           7|            1.0|         1.09|      1.46|
| 2019-10-01 00:06:06|  2019-10-01 00:11:05|          75|         262|            1.0|         1.24|      2.01|
| 2019-10-01 00:00:19|  2019-10-01 00:14:32|          97|         228|            1.0|         3.03|      3.58|
| 2019-10-01 00:09:31|  2019-10-01 00:20:41|          41|          74|            1.0|         2.03|      2.16|
| 2019-10-01 00:30:36|  2019-10-01 00:34:30|          41|          42|            1.0|         0.73|      1.26|
| 2019-10-01 00:58:32|  2019-10-01 01:05:08|          41|         116|            1.0|         1.48|       0.0|
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
only showing top 20 rows
```

### Question 7: Most popular destination

[](https://github.com/DataTalksClub/data-engineering-zoomcamp/blob/main/cohorts/2024/06-streaming/homework.md#question-7-most-popular-destination)

Now let's finally do some streaming analytics. We will see what's the most popular destination currently based on our stream of data (which ideally we should have sent with delays like we did in workshop 2)

This is how you can do it:

- Add a column "timestamp" using the `current_timestamp` function
- Group by:
    - 5 minutes window based on the timestamp column (`F.window(col("timestamp"), "5 minutes")`)
    - `"DOLocationID"`
- Order by count

In [12]:
popular_destinations = green_stream.withColumn("timestamp", F.current_timestamp()) \
    .groupBy(F.window("timestamp", "5 minutes"), "DOLocationID") \
    .count() \
    .orderBy(F.col("count").desc())

You can print the output to the console using this code:

In [15]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

Output:

```shell
python popular_dest.py
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|74          |17741|
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|42          |15942|
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|41          |14061|
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|75          |12840|
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|129         |11930|
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|7           |11533|
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|166         |10845|
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|236         |7913 |
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|223         |7542 |
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|238         |7318 |
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|82          |7292 |
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|181         |7282 |
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|95          |7244 |
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|244         |6733 |
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|61          |6606 |
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|116         |6339 |
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|138         |6144 |
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|97          |6050 |
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|49          |5221 |
|{2024-03-17 13:50:00, 2024-03-17 13:55:00}|151         |5153 |
+------------------------------------------+------------+-----+
only showing top 20 rows
```