## Question 3. Connecting to the Kafka server

In [1]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

True

## Question 4. Sending data to the stream

In [2]:
t0 = time.time()

topic_name = 'test-topic'
total_message_time=0
for i in range(10):
    message = {'number': i}
    t1 = time.time()
    producer.send(topic_name, value=message)
    t2 = time.time()
    print(f'Sending the messages took {(t2 - t1):.4f} seconds')
    total_message_time+=t2-t1
    print(f"Sent: {message}")
    time.sleep(0.05)

t3 = time.time()
producer.flush()
t4 = time.time()
print(f'Average Sending the messages took {total_message_time/10:.4f} seconds')
print(f'Flushing took {(t4 - t3):.4f} seconds')
print(f'took {(t4 - t0):.2f} seconds')

Sending the messages took 0.1570 seconds
Sent: {'number': 0}
Sending the messages took 0.0003 seconds
Sent: {'number': 1}
Sending the messages took 0.0010 seconds
Sent: {'number': 2}
Sending the messages took 0.0003 seconds
Sent: {'number': 3}
Sending the messages took 0.0005 seconds
Sent: {'number': 4}
Sending the messages took 0.0003 seconds
Sent: {'number': 5}
Sending the messages took 0.0003 seconds
Sent: {'number': 6}
Sending the messages took 0.0003 seconds
Sent: {'number': 7}
Sending the messages took 0.0002 seconds
Sent: {'number': 8}
Sending the messages took 0.0003 seconds
Sent: {'number': 9}
Average Sending the messages took 0.0161 seconds
Flushing took 0.0001 seconds
took 0.66 seconds


> Answer:
```
Sending the messages (0.0161 sec) and Flushing is (0.0001) sec. More time on sending the messages.
```

## Question 5: Sending the Trip Data

In [3]:
import pandas as pd
from kafka import KafkaProducer
import json
import time


# Kafka producer configuration
topic = 'green-trips'

# Read the CSV file with selected columns
selected_columns = ['lpep_pickup_datetime', 'lpep_dropoff_datetime', 
                    'PULocationID', 'DOLocationID', 'passenger_count', 
                    'trip_distance', 'tip_amount']
data_path = 'data/green_tripdata_2019-10.csv.gz'

# Read the CSV file into a DataFrame
start_time = time.time()  # Start time
df_green = pd.read_csv(data_path, usecols=selected_columns, compression='gzip')

# Iterate over the records in the DataFrame and send data to Kafka
for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    # Send the row data to Kafka
    producer.send(topic, value=row_dict)

# Close the Kafka producer
producer.close()

# Calculate the time taken in seconds
end_time = time.time()
time_taken = round(end_time - start_time)
print("Time taken (seconds):", time_taken)


Time taken (seconds): 55


## Creating the PySpark consumer

In [4]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType,StructField

spark = SparkSession.builder \
    .appName("GreenTripsConsumer") \
    .master("local[*]") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
    .getOrCreate()

:: loading settings :: url = jar:file:/home/murat/spark/spark-3.3.2-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/murat/.ivy2/cache
The jars for the packages stored in: /home/murat/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-59098c47-c8ba-4f13-8ec1-d5417ad9217c;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 504ms :: artifacts dl 15ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#commons-pool2;2.6.2 from central i

24/03/19 03:07:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
spark

### Now we can connect to the stream:

In [6]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

### We can execute a function over each mini-batch.

In [7]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

24/03/19 03:07:50 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-6b85f105-2308-4067-9821-fce266638d70. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/19 03:07:50 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

Row(key=None, value=bytearray(b'{"lpep_pickup_datetime": "2019-10-01 00:00:48", "lpep_dropoff_datetime": "2019-10-01 00:05:42", "PULocationID": 210, "DOLocationID": 108, "passenger_count": 2.0, "trip_distance": 1.03, "tip_amount": 2.19}'), topic='green-trips', partition=0, offset=0, timestamp=datetime.datetime(2024, 3, 19, 3, 6, 2, 201000), timestampType=0)


In [8]:
query.stop()

## Question 6. Parsing the data

In [9]:
# Define the schema for parsing JSON data
schema = StructType([
    StructField("lpep_pickup_datetime", StringType()),
    StructField("lpep_dropoff_datetime", StringType()),
    StructField("PULocationID", IntegerType()),
    StructField("DOLocationID", IntegerType()),
    StructField("passenger_count", DoubleType()),
    StructField("trip_distance", DoubleType()),
    StructField("tip_amount", DoubleType())
])

In [10]:
parsed_stream = green_stream \
    .select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
    .select("data.*")

In [11]:
parsed_stream

DataFrame[lpep_pickup_datetime: string, lpep_dropoff_datetime: string, PULocationID: int, DOLocationID: int, passenger_count: double, trip_distance: double, tip_amount: double]

In [12]:
# Show the parsed data
query = parsed_stream \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Wait for the termination of the query
query.awaitTermination()

# Stop the SparkSession
spark.stop()

24/03/19 03:08:20 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-2a0dab94-1e2e-4ac1-a7a7-b2b22e76376f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/19 03:08:20 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
|lpep_pickup_datetime|lpep_dropoff_datetime|PULocationID|DOLocationID|passenger_count|trip_distance|tip_amount|
+--------------------+---------------------+------------+------------+---------------+-------------+----------+
| 2019-10-01 00:00:48|  2019-10-01 00:05:42|         210|         108|            2.0|         1.03|      2.19|
| 2019-10-01 00:45:08|  2019-10-01 01:04:28|          83|          36|            2.0|         5.75|       0.0|
| 2019-10-01 00:32:44|  2019-10-01 00:46:53|          92|         260|            2.0|         6.01|      4.16|
| 2019-10-01 00:05:09|  2019-10-01 00:18:34|          75|         119|            1.0|         1.08|      2.36|
| 2019-10-01 00:18:16|  2019-10-01 00:53:16|          29|          92|            1.0|        21.39|       0.0|
| 2019-

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/murat/spark/spark-3.3.2-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/murat/spark/spark-3.3.2-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/home/murat/anaconda3/envs/myenv/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

## Question 7: Most popular destination

In [13]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType, StructField

spark = SparkSession.builder \
    .appName("GreenTripsConsumer") \
    .master("local[*]") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
    .getOrCreate()

green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

# Define the schema for parsing JSON data
schema = StructType([
    StructField("lpep_pickup_datetime", StringType()),
    StructField("lpep_dropoff_datetime", StringType()),
    StructField("PULocationID", IntegerType()),
    StructField("DOLocationID", IntegerType()),
    StructField("passenger_count", DoubleType()),
    StructField("trip_distance", DoubleType()),
    StructField("tip_amount", DoubleType())
])

parsed_stream = green_stream \
    .select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
    .select("data.*") \
    .withColumn("timestamp", F.current_timestamp())  # Add a column "timestamp" using current_timestamp function

# Find the most popular destination
popular_destinations = parsed_stream \
    .groupBy(F.window(F.col("timestamp"), "5 minutes"), F.col("DOLocationID")) \
    .count() \
    .orderBy(F.desc("count"))  # Order by count in descending order

# Write the output to the console
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

# Wait for the termination of the query
query.awaitTermination()


# Stop the SparkSession
spark.stop()


24/03/19 03:11:01 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-94445a9f-5788-4cb4-a135-7ece896fb510. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/19 03:11:01 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-03-19 03:10:00, 2024-03-19 03:15:00}|74          |17733|
|{2024-03-19 03:10:00, 2024-03-19 03:15:00}|42          |15935|
|{2024-03-19 03:10:00, 2024-03-19 03:15:00}|41          |14058|
|{2024-03-19 03:10:00, 2024-03-19 03:15:00}|75          |12837|
|{2024-03-19 03:10:00, 2024-03-19 03:15:00}|129         |11922|
|{2024-03-19 03:10:00, 2024-03-19 03:15:00}|7           |11527|
|{2024-03-19 03:10:00, 2024-03-19 03:15:00}|166         |10843|
|{2024-03-19 03:10:00, 2024-03-19 03:15:00}|236         |7913 |
|{2024-03-19 03:10:00, 2024-03-19 03:15:00}|223         |7540 |
|{2024-03-19 03:10:00, 2024-03-19 03:15:00}|238         |7318 |
|{2024-03-19 03:10:00, 2024-03-19 03:15:00}|82          |7285 |
|{2024-

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/murat/spark/spark-3.3.2-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/murat/spark/spark-3.3.2-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/home/murat/anaconda3/envs/myenv/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 