# Streaming with PySpark - RedPandas - Kafka 
### Homework 6
#### Maria Fisher 

# Test Kafka + RedPandas 
Start Red Panda

Let's start redpanda in a docker container.

There's a docker-compose.yml file in the homework folder (taken from here)

Copy this file to your homework directory and run

docker-compose up

(Add -d if you want to run in detached mode)

# Question 1: Redpanda version

Now let's find out the version of redpandas.

For that, check the output of the command rpk help inside the container. The name of the container is redpanda-1.

Find out what you need to execute based on the help output.

What's the version, based on the output of the command you executed? (copy the entire version)

# Question 2. Creating a topic

Before we can send data to the redpanda server, we need to create a topic. We do it also with the rpk command we used previously for figuring out the version of redpandas.

Read the output of help and based on it, create a topic with name test-topic

What's the output of the command for creating a topic?

# Question 3. Connecting to the Kafka server

We need to make sure we can connect to the server, so later we can send some data to its topics

First, let's install the kafka connector (up to you if you want to have a separate virtual environment for that)

pip install kafka-python

You can start a jupyter notebook in your solution folder or create a script

### Let's try to connect to our server:

In [None]:
import json
import time 
from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

try:
    topic = 'test-topic'
    message = {'key': 'value'}  # Your message here
    producer.send(topic, value=message)
    print("Message sent successfully.")
except Exception as e:
    print(f"Failed to send message: {e}")



# Question 4. Sending data to the stream

## Now we're ready to send some test data:

How much time did it take? Where did it spend most of the time?

    Sending the messages
    Flushing
    Both took approximately the same amount of time

(Don't remove time.sleep when answering this question)

In [None]:
import time
from kafka import KafkaProducer
import json

# Initialize KafkaProducer
server = 'localhost:9092'
producer = KafkaProducer(bootstrap_servers=[server], value_serializer=lambda x: json.dumps(x).encode('utf-8'))

# Record the starting time for sending messages
start_sending_time = time.time()

# Define the Kafka topic
topic_name = 'test-topic'

# Send 10 messages with a delay of 0.05 seconds between each message
for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

# Record the ending time for sending messages
end_sending_time = time.time()

# Record the starting time for flushing
start_flushing_time = time.time()

# Flush the producer to ensure all messages are sent
producer.flush()

# Record the ending time for flushing
end_flushing_time = time.time()

# Calculate the time taken for sending messages and flushing
sending_time = end_sending_time - start_sending_time
flushing_time = end_flushing_time - start_flushing_time

# Print the time taken for each operation
print(f'Time taken for sending messages: {sending_time:.2f} seconds')
print(f'Time taken for flushing: {flushing_time:.2f} seconds')

# Close the producer


# Reading data with rpk

You can see the messages that you send to the topic with rpk:

rpk topic consume test-topic

Run the command above and send the messages one more time to see them

# Sending the taxi data

Now let's send our actual data:

    Read the green csv.gz file
    We will only need these columns:
        'lpep_pickup_datetime',
        'lpep_dropoff_datetime',
        'PULocationID',
        'DOLocationID',
        'passenger_count',
        'trip_distance',
        'tip_amount'

Iterate over the records in the dataframe

# Question 5: Sending the Trip Data

    Create a topic green-trips and send the data there
    How much time in seconds did it take? (You can round it to a whole number)
    Make sure you don't include sleeps in your code



# NYC green-taxi


In [None]:
import pandas as pd

# Define the path to the CSV file
data_path = 'resources/data/green_tripdata_2019-10.csv'

# Define the columns to be extracted
columns_to_extract = [
    'lpep_pickup_datetime',
    'lpep_dropoff_datetime',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'tip_amount'
]

# Read the CSV file and extract the required columns
df_green = pd.read_csv(data_path, usecols=columns_to_extract)

# Display the first few rows of the dataframe
print(df_green.head())


In [None]:
from confluent_kafka import Producer, KafkaException
import pandas as pd
import time

# Initialize Kafka producer
producer = Producer({'bootstrap.servers': 'localhost:9092', 'queue.buffering.max.ms': 1})

# Define the topic
topic = 'taxi-green'

# Define the columns to be included
columns = [
    'lpep_pickup_datetime',
    'lpep_dropoff_datetime',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'tip_amount'
]

# Start time to measure execution time
start_time = time.time()

# Asynchronously produce messages to Kafka topic
try:
    for row in df_green[columns].itertuples(index=False):
        row_dict = {col: getattr(row, col) for col in columns}
        # Convert to JSON string
        message = pd.Series(row_dict).to_json()
        # Produce message to Kafka topic asynchronously
        producer.produce(topic, message.encode('utf-8'), callback=lambda err, _: None)

except BufferError as e:
    print(f"BufferError: {e}")

# Wait for all messages to be delivered
producer.flush()

# Calculate execution time
execution_time = round(time.time() - start_time)

print("Execution time (seconds):", execution_time)


# Creating the PySpark consumer

Now let's read the data with PySpark.

Spark needs a library (jar) to be able to connect to Kafka, so we need to tell PySpark that it needs to use it:



In [1]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

24/03/14 18:57:39 WARN Utils: Your hostname, river resolves to a loopback address: 127.0.1.1; using 192.168.1.252 instead (on interface wlp1s0)
24/03/14 18:57:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/malu/spark/spark-3.3.2-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/malu/.ivy2/cache
The jars for the packages stored in: /home/malu/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-83052cdf-b00f-42d9-8a3d-5adfef1df3e4;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 805ms :: artifacts dl 25ms
	:: mod

24/03/14 18:57:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Now we can connect to the stream:

In [2]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "taxi-green") \
    .option("startingOffsets", "earliest") \
    .load()

### In order to test that we can consume from the stream, let's see what will be the first record there.

In Spark streaming, the stream is represented as a sequence of small batches, each batch being a small RDD (or a small dataframe).

So we can execute a function over each mini-batch. Let's run take(1) there to see what do we have in the stream:

In [3]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

24/03/14 18:57:59 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-90827a4d-3ad0-4cd8-9797-180d44919345. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/14 18:57:59 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:26:02", "lpep_dropoff_datetime": "2019-10-01 00:39:58", "PULocationID": 112, "DOLocationID": 196, "passenger_count": 1.0, "trip_distance": 5.88, "tip_amount": 0.0}'), topic='taxi-green', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 14, 18, 9, 13, 790000), timestampType=0)


### Now let's stop the query, so it doesn't keep consuming messages from the stream

In [4]:
query.stop()

# Question 6. Parsing the data

The data is JSON, but currently it's in binary format. We need to parse it and turn it into a streaming dataframe with proper columns

Similarly to PySpark, we define the schema

In [5]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [6]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")



# Question 7: Most popular destination

Now let's finally do some streaming analytics. We will see what's the most popular destination currently based on our stream of data (which ideally we should have sent with delays like we did in workshop 2)

This is how you can do it:

    Add a column "timestamp" using the current_timestamp function
    Group by:
        5 minutes window based on the timestamp column (F.window(col("timestamp"), "5 minutes"))
        "DOLocationID"
    Order by count

You can print the output to the console using this code

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, current_timestamp

# Create SparkSession
spark = SparkSession.builder \
    .appName("Most Popular Destination") \
    .getOrCreate()

# Assuming green_stream is the parsed DataFrame from Question 6

# Add a column "timestamp" using the current_timestamp function
green_stream_with_timestamp = green_stream.withColumn("timestamp", current_timestamp())

# Group by window and DOLocationID, and order by count
popular_destinations = green_stream_with_timestamp \
    .groupBy(window("timestamp", "5 minutes"), "DOLocationID") \
    .count() \
    .orderBy("count", ascending=False)

# Write to console
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()


24/03/14 18:58:16 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/03/14 18:58:17 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-12564910-db08-4714-8499-a7c502017879. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/14 18:58:17 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-03-14 18:55:00, 2024-03-14 19:00:00}|74          |43810|
|{2024-03-14 18:55:00, 2024-03-14 19:00:00}|42          |38778|
|{2024-03-14 18:55:00, 2024-03-14 19:00:00}|41          |35382|
|{2024-03-14 18:55:00, 2024-03-14 19:00:00}|75          |31337|
|{2024-03-14 18:55:00, 2024-03-14 19:00:00}|129         |28626|
|{2024-03-14 18:55:00, 2024-03-14 19:00:00}|166         |28281|
|{2024-03-14 18:55:00, 2024-03-14 19:00:00}|7           |28236|
|{2024-03-14 18:55:00, 2024-03-14 19:00:00}|236         |19434|
|{2024-03-14 18:55:00, 2024-03-14 19:00:00}|238         |18686|
|{2024-03-14 18:55:00, 2024-03-14 19:00:00}|223         |18676|
|{2024-03-14 18:55:00, 2024-03-14 19:00:00}|82          |16711|
|{2024-