Question 1. Redpanda version

Now let's find out the version of redpandas.

For that, check the output of the command rpk help inside the container. The name of the container is redpanda-1.

Find out what you need to execute based on the help output.

What's the version, based on the output of the command you executed? (copy the entire version)

```
docker exec -it redpanda-1 bash -c "rpk version"
```

output : v22.3.5 (rev 28b2443)

Question 2. Creating a topic

Before we can send data to the redpanda server, we need to create a topic. We do it also with the rpk command we used previously for figuring out the version of redpandas.

Read the output of help and based on it, create a topic with name test-topic

What's the output of the command for creating a topic? Include the entire output in your answer.

```
 docker exec -it redpanda-1 bash -c "rpk topic create test-topic"
```

Question 3. Connecting to the Kafka server

We need to make sure we can connect to the server, so later we can send some data to its topics
Provided that you can connect to the server, what's the output of the last command?

In [1]:
pip install kafka-python

Note: you may need to restart the kernel to use updated packages.


In [2]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

Question 4. Sending data to the stream

Now we're ready to send some test data:
Where did it spend most of the time? → Sending the messages

Question 5. Time to send data

→ 
How much time in seconds did it take? (You can round it to a whole number)→ 
Make sure you don't include sleeps in your code

In [3]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

Sent: {'number': 0}
Sent: {'number': 1}
Sent: {'number': 2}
Sent: {'number': 3}
Sent: {'number': 4}
Sent: {'number': 5}
Sent: {'number': 6}
Sent: {'number': 7}
Sent: {'number': 8}
Sent: {'number': 9}
took 0.52 seconds


Question 6. Parsing the data

→ Create a topic green-trips and send the data there
How does the record look after parsing? Copy the output.

In [4]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
col=['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'PULocationID', 
        'DOLocationID', 'passenger_count', 'trip_distance', 'tip_amount']
df_green = pd.read_csv('green_tripdata_2019-10.csv',index_col=False,usecols=col)
t0 = time.time()
topic_name = 'green-trips'
for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    producer.send(topic_name, value=row_dict)

producer.flush()

t1 = time.time()
print(f'took {(t1 - t0):.2f} seconds')

took 45.54 seconds


In [5]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

:: loading settings :: url = jar:file:/usr/local/sdkman/candidates/spark/3.5.0/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/vscode/.ivy2/cache
The jars for the packages stored in: /home/vscode/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f7e7bee1-1a5f-454c-a391-5e3967f3e08a;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 277ms :: artifacts dl 10ms
	::

In [6]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [7]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

24/03/19 15:45:52 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-3ed3b518-9927-4484-a82d-cc8a44cb710b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/19 15:45:52 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/19 15:45:53 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 19, 12, 11, 24, 344000), timestampType=0)


In [8]:
query.stop()

Question 7. Most popular destination

Now let's finally do some streaming analytics. We will see what's the most popular destination currently based on our stream of data (which ideally we should have sent with delays like we did in workshop 2)

This is how you can do it:

In [9]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

→ Add a column "timestamp" using the current_timestamp function

In [10]:
from pyspark.sql import functions as F
green_stream = (
    green_stream
    .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data"))
    .select("data.*")
    .withColumn('timestamp', F.current_timestamp())
)

Group by:→ 
5 minutes window based on the timestamp column (F.window(col("timestamp"), "5 minutes"))→ 
"DOLocationI
Order by count
Write the most popular destination, your answer should be either the zone ID or the zone name of this destination. (You will need to re-send the data for this to work)D"

In [11]:
popular_destinations = (
    green_stream
    .groupBy([F.window("timestamp", "5 minutes"), "DOLocationID"])
    .count().alias("count")
    .sort("count", ascending=False)
)

In [None]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

24/03/19 15:46:10 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-fe9d35f3-d04a-486a-8f59-4aa2e9afef63. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/19 15:46:10 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/19 15:46:10 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-03-19 15:45:00, 2024-03-19 15:50:00}|74          |35482|
|{2024-03-19 15:45:00, 2024-03-19 15:50:00}|42          |31884|
|{2024-03-19 15:45:00, 2024-03-19 15:50:00}|41          |28122|
|{2024-03-19 15:45:00, 2024-03-19 15:50:00}|75          |25680|
|{2024-03-19 15:45:00, 2024-03-19 15:50:00}|129         |23860|
|{2024-03-19 15:45:00, 2024-03-19 15:50:00}|7           |23066|
|{2024-03-19 15:45:00, 2024-03-19 15:50:00}|166         |21690|
|{2024-03-19 15:45:00, 2024-03-19 15:50:00}|236         |15826|
|{2024-03-19 15:45:00, 2024-03-19 15:50:00}|223         |15084|
|{2024-03-19 15:45:00, 2024-03-19 15:50:00}|238         |14636|
|{2024-03-19 15:45:00, 2024-03-19 15:50:00}|82          |14584|
|{2024-