## Module 6 Homework 

In this homework, we're going to extend Module 5 Homework and learn about streaming with PySpark.

Instead of Kafka, we will use Red Panda, which is a drop-in
replacement for Kafka. 

Ensure you have the following set up (if you had done the previous homework and the module):

- Docker (see [module 1](https://github.com/DataTalksClub/data-engineering-zoomcamp/tree/main/01-docker-terraform))
- PySpark (see [module 5](https://github.com/DataTalksClub/data-engineering-zoomcamp/tree/main/05-batch/setup))

For this homework we will be using the files from Module 5 homework:

- Green 2019-10 data from [here](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz)



## Start Red Panda

Let's start redpanda in a docker container. 

There's a `docker-compose.yml` file in the homework folder (taken from [here](https://github.com/redpanda-data-blog/2023-python-gsg/blob/main/docker-compose.yml))

Copy this file to your homework directory and run

```bash
docker-compose up
```

(Add `-d` if you want to run in detached mode)


## Question 1: Redpanda version

Now let's find out the version of redpandas. 

For that, check the output of the command `rpk help` _inside the container_. The name of the container is `redpanda-1`.

Find out what you need to execute based on the `help` output.

What's the version, based on the output of the command you executed? (copy the entire version)


### Answer 1:
```bash
docker exec -it redpanda-1 bash
```
```bash
rpk help
rpk version
```
> v22.3.5 (rev 28b2443)


## Question 2. Creating a topic

Before we can send data to the redpanda server, we
need to create a topic. We do it also with the `rpk`
command we used previously for figuring out the version of 
redpandas.

Read the output of `help` and based on it, create a topic with name `test-topic` 

What's the output of the command for creating a topic?


### Answer 2:
```bash
rpk topic create test-topic 
```
```
TOPIC       STATUS
test-topic  OK
```
Docker container log:
```bash
redpanda-1    | INFO  2024-03-18 08:09:16,568 [shard 0] cluster - topics_frontend.cc:94 - Create topics {{configuration: { topic: {ns: {kafka}, topic: {test-topic}}, partition_count: 1, replication_factor: 1, properties: {compression: {nullopt}, cleanup_policy_bitflags: {delete}, compaction_strategy: {nullopt}, retention_bytes: {}, retention_duration_ms: {}, segment_size: {nullopt}, timestamp_type: {nullopt}, recovery_enabled: {nullopt}, shadow_indexing: {disabled}, read_replica: {nullopt}, read_replica_bucket: {nullopt} remote_topic_properties: {nullopt}, batch_max_bytes: {nullopt}, retention_local_target_bytes: {}, retention_local_target_ms: {}, remote_delete: true}}, custom_assignments: {}}}
...
redpanda-1    | INFO  2024-03-18 08:09:16,658 [shard 0] raft - [group_id:3, {kafka/test-topic/0}] consensus.cc:1341 - started raft, log offsets: {start_offset:-9223372036854775808, committed_offset:-9223372036854775808, committed_offset_term:-9223372036854775808, dirty_offset:-9223372036854775808, dirty_offset_term:-9223372036854775808, last_term_start_offset:-9223372036854775808}, term: 0, configuration: {current: {voters: {{id: {1}, revision: {15}}}, learners: {}}, old:{nullopt}, revision: 15, update: {nullopt}, version: 4, brokers: {{id: 1, kafka_advertised_listeners: {{PLAINTEXT:{host: redpanda-1, port: 29092}}, {OUTSIDE:{host: localhost, port: 9092}}}, rpc_address: {host: redpanda-1, port: 33145}, rack: {nullopt}, properties: {cores 1, mem_available 15, disk_available 937}, membership_state: active}}}
redpanda-1    | INFO  2024-03-18 08:09:16,669 [shard 0] raft - [group_id:3, {kafka/test-topic/0}] vote_stm.cc:264 - becoming the leader term:1
redpanda-1    | INFO  2024-03-18 08:09:16,669 [shard 0] storage - segment.cc:655 - Creating new segment /var/lib/redpanda/data/kafka/test-topic/0_15/0-1-v1.log
redpanda-1    | INFO  2024-03-18 08:09:16,690 [shard 0] raft - [group_id:3, {kafka/test-topic/0}] vote_stm.cc:280 - became the leader term: 1
```

## Question 3. Connecting to the Kafka server

We need to make sure we can connect to the server, so
later we can send some data to its topics

First, let's install the kafka connector (up to you if you
want to have a separate virtual environment for that)

```bash
pip install kafka-python
```

You can start a jupyter notebook in your solution folder or
create a script

Let's try to connect to our server:

In [1]:
!pip install kafka-python



In [2]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

## Question 4. Sending data to the stream

Now we're ready to send some test data:

```python
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')
```

How much time did it take? Where did it spend most of the time?

* Sending the messages
* Flushing
* Both took approximately the same amount of time

(Don't remove `time.sleep` when answering this question)

In [3]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
took 0.51 seconds


## Reading data with `rpk`

You can see the messages that you send to the topic
with `rpk`:

```bash
rpk topic consume test-topic
```

Output from inside docker container:

```bash
{
  "topic": "test-topic",
  "value": "{\"number\": 0}",
  "timestamp": 1710750426908,
  "partition": 0,
  "offset": 0
}
{
  "topic": "test-topic",
  "value": "{\"number\": 1}",
  "timestamp": 1710750426958,
  "partition": 0,
  "offset": 1
}
...
{
  "topic": "test-topic",
  "value": "{\"number\": 9}",
  "timestamp": 1710750427362,
  "partition": 0,
  "offset": 9
}
```

Run the command above and send the messages one more time to 
see them


## Sending the taxi data

Now let's send our actual data:

* Read the green csv.gz file
* We will only need these columns:
  * `'lpep_pickup_datetime',`
  * `'lpep_dropoff_datetime',`
  * `'PULocationID',`
  * `'DOLocationID',`
  * `'passenger_count',`
  * `'trip_distance',`
  * `'tip_amount'`

Iterate over the records in the dataframe

```python
for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    print(row_dict)
    break

    # TODO implement sending the data here
```

Note: this way of iterating over the records is more efficient compared
to `iterrows`

In [4]:
import pandas as pd

In [5]:
df_green = pd.read_csv('green_tripdata_2019-10.csv.gz', compression='gzip', header=0, sep=',', quotechar='"')

  df_green = pd.read_csv('green_tripdata_2019-10.csv.gz', compression='gzip', header=0, sep=',', quotechar='"')


In [6]:
df_green.head().T

Unnamed: 0,0,1,2,3,4
VendorID,2.0,1.0,1.0,1.0,2.0
lpep_pickup_datetime,2019-10-01 00:26:02,2019-10-01 00:18:11,2019-10-01 00:09:31,2019-10-01 00:37:40,2019-10-01 00:08:13
lpep_dropoff_datetime,2019-10-01 00:39:58,2019-10-01 00:22:38,2019-10-01 00:24:47,2019-10-01 00:41:49,2019-10-01 00:17:56
store_and_fwd_flag,N,N,N,N,N
RatecodeID,1.0,1.0,1.0,1.0,1.0
PULocationID,112,43,255,181,97
DOLocationID,196,263,228,181,188
passenger_count,1.0,1.0,2.0,1.0,1.0
trip_distance,5.88,0.8,7.5,0.9,2.52
fare_amount,18.0,5.0,21.5,5.5,10.0


In [7]:
columns = [
    'lpep_pickup_datetime',
    'lpep_dropoff_datetime',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'tip_amount'
]

In [8]:
df_green = df_green[columns]

In [9]:
df_green.head().T

Unnamed: 0,0,1,2,3,4
lpep_pickup_datetime,2019-10-01 00:26:02,2019-10-01 00:18:11,2019-10-01 00:09:31,2019-10-01 00:37:40,2019-10-01 00:08:13
lpep_dropoff_datetime,2019-10-01 00:39:58,2019-10-01 00:22:38,2019-10-01 00:24:47,2019-10-01 00:41:49,2019-10-01 00:17:56
PULocationID,112,43,255,181,97
DOLocationID,196,263,228,181,188
passenger_count,1.0,1.0,2.0,1.0,1.0
trip_distance,5.88,0.8,7.5,0.9,2.52
tip_amount,0.0,0.0,0.0,0.0,2.26


In [10]:
for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    print(row_dict)
    
    break


{'lpep_pickup_datetime': '2019-10-01 00:26:02', 'lpep_dropoff_datetime': '2019-10-01 00:39:58', 'PULocationID': 112, 'DOLocationID': 196, 'passenger_count': 1.0, 'trip_distance': 5.88, 'tip_amount': 0.0}


## Question 5: Sending the Trip Data

* Create a topic `green-trips` and send the data there
* How much time in seconds did it take? (You can round it to a whole number)
* Make sure you don't include sleeps in your code


### Answer 5:

#### Create Topic
```bash
rpk topic create green-trips
```

In [11]:
t0 = time.time()
topic_name = 'green-trips'

print(f'Sending {df_green.shape[0]} messages to topic {topic_name}')

for row in df_green.itertuples(index=False):
    message = {col: getattr(row, col) for col in row._fields}
    # print(row_dict)
    producer.send(topic_name, value=message)

t1 = time.time()
producer.flush()

t2 = time.time()
print(f'Sending data took {(t1 - t0):.2f} seconds')
print(f'Flushing took {(t2 - t1):.2f} seconds')

Sending 476386 messages to topic green-trips
Sending data took 38.20 seconds
Flushing took 0.00 seconds


## Creating the PySpark consumer

Now let's read the data with PySpark. 

Spark needs a library (jar) to be able to connect to Kafka, 
so we need to tell PySpark that it needs to use it:

```python
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()
```

Now we can connect to the stream:

```python
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()
```

In order to test that we can consume from the stream, 
let's see what will be the first record there. 

In Spark streaming, the stream is represented as a sequence of 
small batches, each batch being a small RDD (or a small dataframe).

So we can execute a function over each mini-batch.
Let's run `take(1)` there to see what do we have in the stream:

```python
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()
```

You should see a record like this:

```
Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 12, 22, 42, 9, 411000), timestampType=0)
```

Now let's stop the query, so it doesn't keep consuming messages
from the stream

```python
query.stop()
```

In [12]:
import pyspark
from pyspark.sql import SparkSession

In [13]:
pyspark_version = pyspark.__version__
pyspark_version

'3.3.2'

In [14]:
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"
kafka_jar_package

'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2'

In [15]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

24/03/18 17:06:50 WARN Utils: Your hostname, queen resolves to a loopback address: 127.0.1.1; using 192.168.1.247 instead (on interface wlo1)
24/03/18 17:06:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/pinku/spark/spark-3.3.2-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/pinku/.ivy2/cache
The jars for the packages stored in: /home/pinku/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5be72181-c1a1-4f35-b5fb-12689c03c886;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.2 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolu

24/03/18 17:06:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [16]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [22]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

24/03/18 16:52:54 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-ea811786-b64c-4d4d-911d-2aa558535e00. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/18 16:52:54 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
Row(lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=112, DOLocationID=196, passenger_count=1.0, trip_distance=5.88, tip_amount=0.0)


In [51]:
query.stop()

## Question 6. Parsing the data

The data is JSON, but currently it's in binary format. We need
to parse it and turn it into a streaming dataframe with proper
columns

Similarly to PySpark, we define the schema

```python
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())
```

And apply this schema:

```python
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")
```

How does the record look after parsing? Copy the output 

In [17]:
from pyspark.sql import types

In [18]:
schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [19]:
from pyspark.sql import functions as F

In [20]:
green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [23]:
query = green_stream.writeStream.foreachBatch(peek).start()

24/03/18 16:53:03 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-67158a2e-d614-4aba-b8d1-30382130c820. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/18 16:53:03 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [24]:
query.stop()

24/03/18 16:53:32 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-86d7d507-dfee-4714-a89b-c60a44ba775c-1625716302-driver-0-1, groupId=spark-kafka-source-86d7d507-dfee-4714-a89b-c60a44ba775c-1625716302-driver-0] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
24/03/18 16:53:32 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-86d7d507-dfee-4714-a89b-c60a44ba775c-1625716302-driver-0-1, groupId=spark-kafka-source-86d7d507-dfee-4714-a89b-c60a44ba775c-1625716302-driver-0] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
24/03/18 16:53:32 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-86d7d507-dfee-4714-a89b-c60a44ba775c-1625716302-driver-0-1, groupId=spark-kafka-source-86d7d507-dfee-4714-a89b-c60a44ba775c-1625716302-driver-0] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2

## Question 7: Most popular destination

Now let's finally do some streaming analytics. We will
see what's the most popular destination currently 
based on our stream of data (which ideally we should 
have sent with delays like we did in workshop 2)


This is how you can do it:

* Add a column "timestamp" using the `current_timestamp` function
* Group by:
  * 5 minutes window based on the timestamp column (`F.window(col("timestamp"), "5 minutes")`)
  * `"DOLocationID"`
* Order by count

You can print the output to the console using this 
code

```python
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()
```

Write the most popular destanation. (You will need to re-send the data for this to work)


In [21]:
from pyspark.sql.functions import current_timestamp

In [22]:
type(green_stream)

pyspark.sql.dataframe.DataFrame

In [23]:
df_with_ts = green_stream.withColumn("timestamp", current_timestamp())

In [26]:
popular_destinations = df_with_ts.groupBy(F.window('timestamp', '5 minutes'), 'DOLocationID').count().orderBy('count', ascending=False)

In [None]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

24/03/18 17:07:24 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-c84a2c67-0364-42fa-a9f6-1474c9f974c7. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/18 17:07:24 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-03-18 17:05:00, 2024-03-18 17:10:00}|74          |17741|
|{2024-03-18 17:05:00, 2024-03-18 17:10:00}|42          |15942|
|{2024-03-18 17:05:00, 2024-03-18 17:10:00}|41          |14061|
|{2024-03-18 17:05:00, 2024-03-18 17:10:00}|75          |12840|
|{2024-03-18 17:05:00, 2024-03-18 17:10:00}|129         |11930|
|{2024-03-18 17:05:00, 2024-03-18 17:10:00}|7           |11533|
|{2024-03-18 17:05:00, 2024-03-18 17:10:00}|166         |10845|
|{2024-03-18 17:05:00, 2024-03-18 17:10:00}|236         |7913 |
|{2024-03-18 17:05:00, 2024-03-18 17:10:00}|223         |7542 |
|{2024-03-18 17:05:00, 2024-03-18 17:10:00}|238         |7318 |
|{2024-03-18 17:05:00, 2024-03-18 17:10:00}|82          |7292 |
|{2024-

## Submitting the solutions

* Form for submitting: https://courses.datatalks.club/de-zoomcamp-2024/homework/hw6

## Solution

We will publish the solution here after deadline.