## Module 6 Homework 

In this homework, we're going to extend Module 5 Homework and learn about streaming with PySpark.

Instead of Kafka, we will use Red Panda, which is a drop-in
replacement for Kafka. 

Ensure you have the following set up (if you had done the previous homework and the module):

- Docker (see [module 1](https://github.com/DataTalksClub/data-engineering-zoomcamp/tree/main/01-docker-terraform))
- PySpark (see [module 5](https://github.com/DataTalksClub/data-engineering-zoomcamp/tree/main/05-batch/setup))

For this homework we will be using the files from Module 5 homework:

- Green 2019-10 data from [here](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz)



## Start Red Panda

Let's start redpanda in a docker container. 

There's a `docker-compose.yml` file in the homework folder (taken from [here](https://github.com/redpanda-data-blog/2023-python-gsg/blob/main/docker-compose.yml))

Copy this file to your homework directory and run

```bash
docker-compose up
```

(Add `-d` if you want to run in detached mode)


## Question 1: Redpanda version

Now let's find out the version of redpandas. 

For that, check the output of the command `rpk help` _inside the container_. The name of the container is `redpanda-1`.

Find out what you need to execute based on the `help` output.


> [!TIP]
> Setup an alias so don't have to run commands in container with "docker exec"
> 
> Command: 
> 
> ```bash
> alias rpk="docker exec -ti redpanda-1 rpk"
> ```
>
>```bash
> rpk version  
>
> v22.3.5 (rev 28b2443)
> ```


What's the version, based on the output of the command you executed? (copy the entire version)

### Answer 1: `v22.3.5 (rev 28b2443)`

## Question 2. Creating a topic

Before we can send data to the redpanda server, weneed to create a topic. We do it also with the `rpk` command we used previously for figuring out the version of redpandas.

Read the output of `help` and based on it, create a topic with name `test-topic` 

What's the output of the command for creating a topic?

### Answer 2: 
```bash
TOPIC       STATUS
test-topic  OK
```

> [!TIP]
> 
> Command: 
> 
> ```bash
> rpk topic create test-topic
>
> TOPIC       STATUS
> test-topic  OK
> ```

## Question 3. Connecting to the Kafka server



We need to make sure we can connect to the server, so later we can send some data to its topics

First, let's install the kafka connector (up to you if you want to have a separate virtual environment for that)

```bash
pip install kafka-python
```

You can start a jupyter notebook in your solution folder or create a script

Let's try to connect to our server:

In [15]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = "localhost:9092"

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True


Provided that you can connect to the server, what's the output of the last command?

### Answer 3: `True`

## Question 4. Sending data to the stream

Now we're ready to send some test data:

(Don't remove `time.sleep` when answering this question)

In [16]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    prev_loop = time.time()
    message = {'number': i}
    producer.send(topic_name, value=message)
    cur_loop = prev_loop
    # print(f'loop: {i} took {(cur_loop - prev_loop - t0):.2f} seconds')
    print(f"Sent 'number': {i}")
    print(f'Sending {i+1} messages took {(time.time() - t0):.2f} seconds')
    time.sleep(0.05)
    if i==9:
        f0 = time.time()
    # print(f0)

producer.flush()
# print(f0)
f1 = time.time()
print(f'Flushing took {(f1 - f0):.2f} seconds')

t1 = time.time()
print(f'this cell block took {(t1 - t0):.2f} seconds')

Sent 'number': 0
Sending 1 messages took 0.00 seconds
Sent 'number': 1
Sending 2 messages took 0.05 seconds
Sent 'number': 2
Sending 3 messages took 0.10 seconds
Sent 'number': 3
Sending 4 messages took 0.15 seconds
Sent 'number': 4
Sending 5 messages took 0.20 seconds
Sent 'number': 5
Sending 6 messages took 0.25 seconds
Sent 'number': 6
Sending 7 messages took 0.30 seconds
Sent 'number': 7
Sending 8 messages took 0.35 seconds
Sent 'number': 8
Sending 9 messages took 0.40 seconds
Sent 'number': 9
Sending 10 messages took 0.45 seconds
Flushing took 0.00 seconds
this cell block took 0.51 seconds


How much time did it take? Where did it spend most of the time?

* Sending the messages
* Flushing
* Both took approximately the same amount of time
  
### Answer 4: `0.50 secs, Sending the messages`

## Reading data with `rpk`

You can see the messages that you send to the topic
with `rpk`:

> [!TIP]
> 
> Command: 
> 
> ```bash
> rpk topic consume test-topic
> ```

Run the command above and send the messages one more time to see them


---

## Qn 5 Create Producer to Populate topics
## Sending the taxi data

Now let's send our actual data:

* Read the green csv.gz file
* We will only need these columns:
  * `'lpep_pickup_datetime',`
  * `'lpep_dropoff_datetime',`
  * `'PULocationID',`
  * `'DOLocationID',`
  * `'passenger_count',`
  * `'trip_distance',`
  * `'tip_amount'`



### Declare column names

In [17]:
column_names = [
        'lpep_pickup_datetime', 
        'lpep_dropoff_datetime', 
        'PULocationID',
        'DOLocationID',
        'passenger_count',
        'trip_distance',
        'tip_amount']

### Import packages

In [18]:
import json
import time 
import pandas as pd
from typing import Dict
from kafka import KafkaProducer
import pyspark.pandas as ps

input_path = f'./resources/green_tripdata_2019-10.csv'




### Declare schema

If schema not declared with 'object' for `_datatime` columns, datetime schema is inferred (default setting) and set to `datetime64[ns]` which results in following error `TypeError: Object of type datetime is not JSON serializable`


In [19]:
csv_schema = {
    'lpep_pickup_datetime': object,
    'lpep_dropoff_datetime': object,
    'PULocationID': 'int',
    'DOLocationID': 'int',
    'passenger_count': 'float',
    'trip_distance': 'float',
    'tip_amount': 'float'
    }

### Declare options

In [20]:
options = { "header": 0, "usecols": column_names, "dtype": csv_schema } 

### Read csv into pyspark dataframe
[docs](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.read_csv.html)


In [21]:
df = pd.read_csv(input_path, **options)

In [22]:
df.head(2)

Unnamed: 0,lpep_pickup_datetime,lpep_dropoff_datetime,PULocationID,DOLocationID,passenger_count,trip_distance,tip_amount
0,2019-10-01 00:26:02,2019-10-01 00:39:58,112,196,1.0,5.88,0.0
1,2019-10-01 00:18:11,2019-10-01 00:22:38,43,263,1.0,0.8,0.0


### Confirming record count of csv file and dataframe

In [23]:
!wc -l './resources/green_tripdata_2019-10.csv'

476387 ./resources/green_tripdata_2019-10.csv


In [24]:
df.shape

(476386, 7)

### Confirming schema and null counts

In [25]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 476386 entries, 0 to 476385
Data columns (total 7 columns):
 #   Column                 Non-Null Count   Dtype  
---  ------                 --------------   -----  
 0   lpep_pickup_datetime   476386 non-null  object 
 1   lpep_dropoff_datetime  476386 non-null  object 
 2   PULocationID           476386 non-null  int64  
 3   DOLocationID           476386 non-null  int64  
 4   passenger_count        387007 non-null  float64
 5   trip_distance          476386 non-null  float64
 6   tip_amount             476386 non-null  float64
dtypes: float64(3), int64(2), object(2)
memory usage: 25.4+ MB


Before next cell, create the topic first.

> [!TIP]
> 
> Command: 
> 
> ```bash
> rpk topic create green-trips
>
> TOPIC        STATUS
> green-trips  OK
> ```

Iterate over the records in the dataframe

Note: this way of iterating over the records is more efficient compared to `iterrows`

In [26]:
t0 = time.time()

topic_name = 'green-trips'

for row in df.itertuples(index=False):
    # print(f'row: {row}')
    row_dict = {col: getattr(row, col) for col in row._fields}
    # print(f'row_dict: {row_dict}')
    # break

    # implement sending the data here
    producer.send(topic=topic_name, value=row_dict) # 

f0 = time.time()
producer.flush()
f1 = time.time()
print(f'flush took {(f1 - f0):.2f} seconds')

t1 = time.time()
print(f'producer took {(t1 - t0):.2f} seconds to populate "green-trips" topic')

                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|74          |48214|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|42          |43011|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|41          |38226|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|75          |34857|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|129         |32427|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|7           |31456|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|166         |29782|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|236         |21615|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|223         |20704|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|238         |19997|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|82          |19684|
|{2024-

                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|74          |48214|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|42          |43011|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|41          |38226|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|75          |34857|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|129         |32427|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|7           |31456|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|166         |29782|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|236         |21615|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|223         |20704|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|238         |19997|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|82          |19684|
|{2024-

[Stage 28:==>                                                    (8 + 13) / 200]

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|74          |48214|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|42          |43011|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|41          |38226|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|75          |34857|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|129         |32427|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|7           |31456|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|166         |29782|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|236         |21615|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|223         |20704|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|238         |19997|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|82          |19684|
|{2024-

                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|74          |48214|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|42          |43011|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|41          |38226|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|75          |34857|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|129         |32427|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|7           |31456|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|166         |29782|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|236         |21615|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|223         |20704|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|238         |19997|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|82          |19684|
|{2024-

[Stage 38:=>                                                     (4 + 18) / 200]

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|74          |48214|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|42          |43011|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|41          |38226|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|75          |34857|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|129         |32427|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|7           |31456|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|166         |29782|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|236         |21615|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|223         |20704|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|238         |19997|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|82          |19684|
|{2024-

                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|74          |48214|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|42          |43011|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|41          |38226|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|75          |34857|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|129         |32427|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|7           |31456|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|166         |29782|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|236         |21615|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|223         |20704|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|238         |19997|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|82          |19684|
|{2024-

[Stage 48:===>                                                  (13 + 12) / 200]

-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|74          |48214|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|42          |43011|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|41          |38226|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|75          |34857|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|129         |32427|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|7           |31456|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|166         |29782|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|236         |21615|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|223         |20704|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|238         |19997|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|82          |19684|
|{2024-

                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|74          |48214|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|42          |43011|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|41          |38226|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|75          |34857|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|129         |32427|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|7           |31456|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|166         |29782|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|236         |21615|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|223         |20704|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|238         |19997|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|82          |19684|
|{2024-

[Stage 58:=>                                                     (6 + 15) / 200]

-------------------------------------------
Batch: 5
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|74          |48214|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|42          |43011|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|41          |38226|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|75          |34857|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|129         |32427|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|7           |31456|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|166         |29782|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|236         |21615|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|223         |20704|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|238         |19997|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|82          |19684|
|{2024-



-------------------------------------------
Batch: 5
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|74          |48214|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|42          |43011|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|41          |38226|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|75          |34857|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|129         |32427|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|7           |31456|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|166         |29782|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|236         |21615|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|223         |20704|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|238         |19997|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|82          |19684|
|{2024-

                                                                                

-------------------------------------------
Batch: 6
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|74          |48214|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|42          |43011|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|41          |38226|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|75          |34857|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|129         |32427|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|7           |31456|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|166         |29782|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|236         |21615|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|223         |20704|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|238         |19997|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|82          |19684|
|{2024-

                                                                                

-------------------------------------------
Batch: 6
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|74          |48214|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|42          |43011|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|41          |38226|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|75          |34857|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|129         |32427|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|7           |31456|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|166         |29782|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|236         |21615|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|223         |20704|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|238         |19997|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|82          |19684|
|{2024-

[Stage 78:===>                                                  (12 + 10) / 200]

-------------------------------------------
Batch: 7
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|74          |48214|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|42          |43011|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|41          |38226|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|75          |34857|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|129         |32427|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|7           |31456|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|166         |29782|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|236         |21615|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|223         |20704|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|238         |19997|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|82          |19684|
|{2024-

                                                                                

-------------------------------------------
Batch: 7
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|74          |48214|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|42          |43011|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|41          |38226|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|75          |34857|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|129         |32427|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|7           |31456|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|166         |29782|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|236         |21615|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|223         |20704|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|238         |19997|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|82          |19684|
|{2024-

                                                                                

-------------------------------------------
Batch: 8
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|74          |48214|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|42          |43011|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|41          |38226|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|75          |34857|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|129         |32427|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|7           |31456|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|166         |29782|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|236         |21615|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|223         |20704|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|238         |19997|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|82          |19684|
|{2024-

                                                                                

-------------------------------------------
Batch: 8
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|74          |48214|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|42          |43011|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|41          |38226|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|75          |34857|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|129         |32427|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|7           |31456|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|166         |29782|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|236         |21615|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|223         |20704|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|238         |19997|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|82          |19684|
|{2024-

                                                                                

-------------------------------------------
Batch: 9
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|74          |48214|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|42          |43011|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|41          |38226|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|75          |34857|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|129         |32427|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|7           |31456|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|166         |29782|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|236         |21615|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|223         |20704|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|238         |19997|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|82          |19684|
|{2024-

                                                                                

-------------------------------------------
Batch: 9
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|74          |48214|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|42          |43011|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|41          |38226|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|75          |34857|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|129         |32427|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|7           |31456|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|166         |29782|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|236         |21615|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|223         |20704|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|238         |19997|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|82          |19684|
|{2024-

                                                                                

-------------------------------------------
Batch: 10
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|74          |48214|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|42          |43011|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|41          |38226|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|75          |34857|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|129         |32427|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|7           |31456|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|166         |29782|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|236         |21615|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|223         |20704|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|238         |19997|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|82          |19684|
|{2024

                                                                                

-------------------------------------------
Batch: 10
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|74          |48214|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|42          |43011|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|41          |38226|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|75          |34857|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|129         |32427|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|7           |31456|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|166         |29782|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|236         |21615|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|223         |20704|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|238         |19997|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|82          |19684|
|{2024

                                                                                

flush took 0.01 seconds
producer took 37.40 seconds to populate "green-trips" topic


## Question 5: Sending the Trip Data

### Answer 5: `26 seconds`

* Create a topic `green-trips` and send the data there
* How much time in seconds did it take? (You can round it to a whole number)
* Make sure you don't include sleeps in your code


## Qn 6-7 Creating the PySpark consumer

### Import packages

In [8]:
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F

print(pyspark.__version__)

3.5.1


Now let's read the data with PySpark. 

Spark needs a library (jar) to be able to connect to Kafka, so we need to tell PySpark that it needs to use it:

In [9]:
scala_version = '2.12'
pyspark_version = pyspark.__version__

kafka_jar_package = f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{pyspark_version}'

# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1'

spark = SparkSession.builder \
    .appName("GreenTripsConsumer") \
    .master("local[*]") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()   


In [10]:
spark

Now we can connect to the stream

In [11]:
# consumer = KafkaConsumer('green-trips')
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .option("checkpointLocation", "checkpoint") \
    .load()
    
# ,broker:29092
# green_stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In order to test that we can consume from the stream, let's see what will be the first record there. 

In Spark streaming, the stream is represented as a sequence of small batches, each batch being a small RDD (or a small dataframe).

So we can execute a function over each mini-batch.
Let's run `take(1)` there to see what do we have in the stream:

In [12]:
# test with mini batch
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])


In [27]:
query = green_stream.writeStream.foreachBatch(peek).start()

24/04/08 23:19:56 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-1724b8c3-4fdf-4c03-8cc5-2c1f225b982a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/08 23:19:56 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 01:41:39", "lpep_dropoff_datetime": "2019-10-01 01:45:06", "PULocationID": 82, "DOLocationID": 173, "passenger_count": 2.0, "trip_distance": 0.72, "tip_amount": 0.0}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 4, 8, 21, 39, 22, 150000), timestampType=0)


You should see a record like this:

```
Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 12, 22, 42, 9, 411000), timestampType=0)
```


Now let's stop the query, so it doesn't keep consuming messages from the stream

In [None]:
query.stop()

## Question 6. Parsing the data

### Read csv into pyspark dataframe
[docs](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.read_csv.html)

The data is JSON, but currently it's in binary format. We need to parse it and turn it into a streaming dataframe with proper columns

### declare pyspark schema

Similarly to PySpark, we define the schema

In [13]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

ps_schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [28]:
scala_version = '2.12'
pyspark_version = pyspark.__version__

kafka_jar_package = f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{pyspark_version}'

spark = SparkSession.builder \
    .appName("GreenTripsConsumer") \
    .master("local[*]") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()   

green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

24/04/08 23:19:56 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


And apply this schema:

In [29]:
green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), ps_schema).alias("data")) \
  .select("data.*")

                                                                                

-------------------------------------------
Batch: 11
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|74          |48214|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|42          |43011|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|41          |38226|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|75          |34857|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|129         |32427|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|7           |31456|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|166         |29782|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|236         |21615|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|223         |20704|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|238         |19997|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|82          |19684|
|{2024

                                                                                

-------------------------------------------
Batch: 11
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|74          |48214|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|42          |43011|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|41          |38226|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|75          |34857|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|129         |32427|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|7           |31456|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|166         |29782|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|236         |21615|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|223         |20704|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|238         |19997|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|82          |19684|
|{2024

                                                                                

-------------------------------------------
Batch: 12
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|74          |48214|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|42          |43011|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|41          |38226|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|75          |34857|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|129         |32427|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|7           |31456|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|166         |29782|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|236         |21615|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|223         |20704|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|238         |19997|
|{2024-04-08 22:50:00, 2024-04-08 22:55:00}|82          |19684|
|{2024

                                                                                

-------------------------------------------
Batch: 12
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|74          |48214|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|42          |43011|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|41          |38226|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|75          |34857|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|129         |32427|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|7           |31456|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|166         |29782|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|236         |21615|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|223         |20704|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|238         |19997|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|82          |19684|
|{2024

In [30]:
query = green_stream \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()
spark.stop()

24/04/08 23:20:05 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-55a6810a-f369-4839-8f93-93f067c3ce12. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/08 23:20:05 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/04/08 23:20:05 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
|lpep_pickup_datetime|lpep_dropoff_datetime|PULocationID|DOLocationID|passenger_count|trip_distance|tip_amount|
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
|2019-10-01 01:41:39 |2019-10-01 01:45:06  |82          |173         |2.0            |0.72         |0.0       |
|2019-10-01 01:16:30 |2019-10-01 01:22:42  |82          |83          |1.0            |1.27         |1.56      |
|2019-10-01 01:03:24 |2019-10-01 01:19:34  |75          |244         |1.0            |4.05         |0.0       |
|2019-10-01 01:26:12 |2019-10-01 01:34:34  |260         |157         |2.0            |1.43         |0.0       |
|2019-10-01 01:25:27 |2019-10-01 01:41:16  |168         |250         |1.0            |6.16         |4.16      |
|2019-1

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/ellabelle/spark/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ellabelle/spark/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ellabelle/micromamba/envs/newzoom/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

How does the record look after parsing? Copy the output 

### Answer 6: 

```bash
-------------------------------------------                                     
Batch: 0
-------------------------------------------
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
|lpep_pickup_datetime|lpep_dropoff_datetime|PULocationID|DOLocationID|passenger_count|trip_distance|tip_amount|
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
|2019-10-01 01:41:39 |2019-10-01 01:45:06  |82          |173         |2.0            |0.72         |0.0       |
|2019-10-01 01:16:30 |2019-10-01 01:22:42  |82          |83          |1.0            |1.27         |1.56      |
|2019-10-01 01:03:24 |2019-10-01 01:19:34  |75          |244         |1.0            |4.05         |0.0       |
|2019-10-01 01:26:12 |2019-10-01 01:34:34  |260         |157         |2.0            |1.43         |0.0       |
|2019-10-01 01:25:27 |2019-10-01 01:41:16  |168         |250         |1.0            |6.16         |4.16      |
|2019-10-01 01:19:41 |2019-10-01 01:31:19  |7           |226         |5.0            |2.52         |0.0       |
|2019-10-01 01:00:56 |2019-10-01 01:19:12  |82          |179         |1.0            |4.94         |0.0       |
|2019-10-01 01:22:03 |2019-10-02 01:01:18  |188         |61          |5.0            |2.78         |0.0       |
|2019-10-01 06:58:06 |2019-10-01 07:18:48  |74          |74          |1.0            |0.48         |0.0       |
|2019-10-01 07:23:01 |2019-10-01 07:38:03  |74          |75          |1.0            |0.61         |0.0       |
|2019-10-01 01:02:21 |2019-10-01 01:09:46  |82          |129         |1.0            |1.01         |1.56      |
|2019-10-01 01:23:02 |2019-10-01 01:30:37  |129         |157         |1.0            |1.44         |0.0       |
|2019-10-01 01:22:48 |2019-10-01 01:28:32  |260         |226         |1.0            |0.88         |0.0       |
|2019-10-01 01:36:51 |2019-10-01 01:44:32  |260         |7           |1.0            |1.7          |2.64      |
|2019-10-01 01:11:00 |2019-10-01 02:14:45  |244         |187         |1.0            |28.21        |0.0       |
|2019-10-01 01:26:23 |2019-10-01 01:41:24  |80          |233         |1.0            |5.52         |0.0       |
|2019-10-01 01:03:32 |2019-10-01 01:05:56  |75          |74          |1.0            |1.04         |0.0       |
|2019-10-01 01:28:34 |2019-10-01 01:42:09  |75          |47          |1.0            |4.17         |0.0       |
|2019-10-01 01:38:26 |2019-10-01 01:47:35  |179         |223         |1.0            |1.07         |0.0       |
|2019-10-01 01:27:08 |2019-10-01 01:43:26  |256         |198         |1.0            |4.13         |0.0       |
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
only showing top 20 rows
```

### Question 7: Most popular destination

Now let's finally do some streaming analytics. We will see what's the most popular destination currently  based on our stream of data (which ideally we should  have sent with delays like we did in workshop 2)


This is how you can do it:

* Add a column "timestamp" using the `current_timestamp` function
* Group by:
  * 5 minutes window based on the timestamp column (`F.window(col("timestamp"), "5 minutes")`)
  * `"DOLocationID"`
* Order by count

You can print the output to the console using this code

In [14]:
popular_destinations = green_stream \
    .select(F.from_json(F.col("value").cast('STRING'), ps_schema).alias("data")) \
    .select("data.*") \
    .withColumn("timestamp", F.current_timestamp()) \
    .groupBy(F.window("timestamp", "5 minutes"), "DOLocationID") \
    .count() \
    .orderBy(F.col("count").desc())


query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()
spark.stop()

24/04/08 23:13:28 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-9e31171e-24df-449d-a227-fc7350419599. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/08 23:13:28 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/04/08 23:13:28 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|74          |48214|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|42          |43011|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|41          |38226|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|75          |34857|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|129         |32427|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|7           |31456|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|166         |29782|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|236         |21615|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|223         |20704|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|238         |19997|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|82          |19684|
|{2024-

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/ellabelle/spark/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ellabelle/spark/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ellabelle/micromamba/envs/newzoom/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

### Answer 7: `DOLocationID=74`

```bash
-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|74          |48214|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|42          |43011|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|41          |38226|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|75          |34857|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|129         |32427|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|7           |31456|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|166         |29782|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|236         |21615|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|223         |20704|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|238         |19997|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|82          |19684|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|181         |19394|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|95          |19241|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|244         |18130|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|116         |17139|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|138         |17009|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|61          |16565|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|97          |15894|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|151         |14041|
|{2024-04-08 23:10:00, 2024-04-08 23:15:00}|49          |13840|
+------------------------------------------+------------+-----+
only showing top 20 rows
```