In [17]:
# !pyenv local 3.10.16 #pyflink doesnt work with python 3.12


## Question 1: Redpanda version

Now let's find out the version of redpandas. 

For that, check the output of the command `rpk help` _inside the container_. The name of the container is `redpanda-1`.

Find out what you need to execute based on the `help` output.

What's the version, based on the output of the command you executed? (copy the entire version)

Answer:
```bash
docker exec -it redpanda-1 bash

redpanda@9e8c317dcbb1:/$ rpk --version

# rpk version v24.2.18 (rev f9a22d4430)
```

## Question 2. Creating a topic

Before we can send data to the redpanda server, we
need to create a topic. We do it also with the `rpk`
command we used previously for figuring out the version of 
redpandas.

Read the output of `help` and based on it, create a topic with name `green-trips` 

What's the output of the command for creating a topic? Include the entire output in your answer.

Answer

```bash
redpanda@9e8c317dcbb1:/$ rpk help topic create

redpanda@9e8c317dcbb1:/$ rpk topic create green-trips
```

Output:

```text
TOPIC        STATUS
green-trips  OK
```

## Question 3. Connecting to the Kafka server

We need to make sure we can connect to the server, so
later we can send some data to its topics

First, let's install the kafka connector (up to you if you
want to have a separate virtual environment for that)


In [3]:
!pip install kafka-python
!pip freeze > requirements.txt


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


You can start a jupyter notebook in your solution folder or
create a script

Let's try to connect to our server:

In [4]:
import json

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

Provided that you can connect to the server, what's the output
of the last command?

```text
True
```

## Question 4: Sending the Trip Data

Now we need to send the data to the `green-trips` topic

Read the data, and keep only these columns:

* `'lpep_pickup_datetime',`
* `'lpep_dropoff_datetime',`
* `'PULocationID',`
* `'DOLocationID',`
* `'passenger_count',`
* `'trip_distance',`
* `'tip_amount'`

Now send all the data using this code:

In [5]:
!pip install pandas
!pip freeze > requirements.txt


Collecting pandas
  Downloading pandas-2.2.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.1/13.1 MB[0m [31m38.9 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting tzdata>=2022.7
  Downloading tzdata-2025.1-py2.py3-none-any.whl (346 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m346.8/346.8 kB[0m [31m57.3 MB/s[0m eta [36m0:00:00[0m
Collecting numpy>=1.22.4
  Downloading numpy-2.2.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (16.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.4/16.4 MB[0m [31m51.4 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting pytz>=2020.1
  Downloading pytz-2025.1-py2.py3-none-any.whl (507 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m507.9/507.9 kB[0m [31m34.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: pytz, tzdata, numpy, pandas
Successfully installed 

In [6]:
import pandas as pd

In [7]:
topic_name = 'green-trips'

In [8]:
!ls -lh ../../../week_5/codes/data/raw/green/2019/10

total 7.9M
-rw-r--r-- 1 root root 7.9M Jul 14  2022 green_tripdata_2019-10.csv.gz


In [9]:
df = pd.read_csv('../../../week_5/codes/data/raw/green/2019/10/green_tripdata_2019-10.csv.gz', low_memory=False) 

In [10]:
selected_columns = [
    'lpep_pickup_datetime',
    'lpep_dropoff_datetime',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'tip_amount'
]
df = df[selected_columns]
df.head()

Unnamed: 0,lpep_pickup_datetime,lpep_dropoff_datetime,PULocationID,DOLocationID,passenger_count,trip_distance,tip_amount
0,2019-10-01 00:26:02,2019-10-01 00:39:58,112,196,1.0,5.88,0.0
1,2019-10-01 00:18:11,2019-10-01 00:22:38,43,263,1.0,0.8,0.0
2,2019-10-01 00:09:31,2019-10-01 00:24:47,255,228,2.0,7.5,0.0
3,2019-10-01 00:37:40,2019-10-01 00:41:49,181,181,1.0,0.9,0.0
4,2019-10-01 00:08:13,2019-10-01 00:17:56,97,188,1.0,2.52,2.26


In [11]:
messages = df.to_dict(orient='records')

# messages

In [12]:
producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)
producer

<kafka.producer.kafka.KafkaProducer at 0x7fca723620b0>

For each row (`message`) in the dataset. In this case, `message`
is a dictionary.

After sending all the messages, flush the data:

Use `from time import time` to see the total time 

How much time did it take to send the entire dataset and flush? 

In [13]:
from time import time

In [14]:
t0 = time()

for message in messages:
    producer.send(topic_name, value=message)

producer.flush()

t1 = time()
took = t1 - t0
print(f"Time taken to send and flush: {took:.2f} seconds")

Time taken to send and flush: 45.66 seconds


## Question 5: Build a Sessionization Window (2 points)
Now we have the data in the Kafka stream. It's time to process it.

* Copy `aggregation_job.py` and rename it to `session_job.py`
* Have it read from `green-trips` fixing the schema
* Use a [session window](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/) with a gap of 5 minutes
* Use `lpep_dropoff_datetime` time as your watermark with a 5 second tolerance
* Which pickup and drop off locations have the longest unbroken streak of taxi trips?

In [15]:
!pip install apache-flink
!pip freeze > requirements.txt

Collecting apache-flink
  Downloading apache-flink-1.20.1.tar.gz (1.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m13.6 MB/s[0m eta [36m0:00:00[0m00:01[0m0:01[0m
done
[?25h  Getting requirements to build wheel ... [?25done
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25hCollecting ruamel.yaml>=0.18.4
  Downloading ruamel.yaml-0.18.10-py3-none-any.whl (117 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m117.7/117.7 kB[0m [31m22.5 MB/s[0m eta [36m0:00:00[0m
Collecting protobuf>=3.19.0
  Downloading protobuf-6.30.0-cp39-abi3-manylinux2014_x86_64.whl (316 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.2/316.2 kB[0m [31m48.5 MB/s[0m eta [36m0:00:00[0m
Collecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m38.3 MB/s[0m eta [36m0:00:00[0m
Collecting apache-beam

In [22]:
!python session_job.py

Traceback (most recent call last):
  File "/root/app/practice/Zoomcamp-data-engineering/week_6/pyflink_by_zack/homework/session_job.py", line 77, in <module>
    sessionize_trips()
  File "/root/app/practice/Zoomcamp-data-engineering/week_6/pyflink_by_zack/homework/session_job.py", line 62, in sessionize_trips
    t_env.execute_sql(f"""
  File "/root/app/practice/Zoomcamp-data-engineering/week_6/pyflink_by_zack/streaming-venv/lib/python3.10/site-packages/pyflink/table/table_environment.py", line 837, in execute_sql
    return TableResult(self._j_tenv.executeSql(stmt))
  File "/root/spark/spark-3.5.5-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/root/app/practice/Zoomcamp-data-engineering/week_6/pyflink_by_zack/streaming-venv/lib/python3.10/site-packages/pyflink/util/exceptions.py", line 146, in deco
    return f(*a, **kw)
  File "/root/spark/spark-3.5.5-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get