## Module 6 Homework

#### Question 3. Connecting to the Kafka server

In [None]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

#### Question 4. Sending data to the stream

In [None]:
import time

t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

t_sent = time.time()
print(f'Sending messages took {(t_sent - t0):.2f} seconds')

producer.flush()

t_flush = time.time()
print(f'Flushing took {(t_flush - t_sent):.2f} seconds')

t1 = time.time()
print(f'Total time took {(t1 - t0):.2f} seconds')

#### Question 5: Sending the Trip Data

In [None]:
import pandas as pd
import time
import json
from kafka import KafkaProducer

file_path = 'green_tripdata_2019-10.csv.gz'

df = pd.read_csv(file_path)

df_green = df[['lpep_pickup_datetime',
               'lpep_dropoff_datetime',
               'PULocationID',
               'DOLocationID',
               'passenger_count',
               'trip_distance',
               'tip_amount']]

producer = KafkaProducer(bootstrap_servers='localhost:9092')

t0 = time.time()
topic_name = 'green-trips'
for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    value_bytes = json.dumps(row_dict).encode('utf-8')
    producer.send(topic_name, value=value_bytes)
t1 = time.time()

time_taken = round(t1 - t0)

print(f'It took {time_taken} seconds to send the data')


#### Question 6. Parsing the Data

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

schema = StructType() \
    .add("lpep_pickup_datetime", StringType()) \
    .add("lpep_dropoff_datetime", StringType()) \
    .add("PULocationID", IntegerType()) \
    .add("DOLocationID", IntegerType()) \
    .add("passenger_count", DoubleType()) \
    .add("trip_distance", DoubleType()) \
    .add("tip_amount", DoubleType())

parsed_stream = green_stream \
    .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
    .select("data.*")

query = parsed_stream \
    .writeStream \
    .format("console") \
    .start()

query.awaitTermination()

#### Question 7: Most Popular Destination

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

schema = StructType() \
    .add("lpep_pickup_datetime", StringType()) \
    .add("lpep_dropoff_datetime", StringType()) \
    .add("PULocationID", IntegerType()) \
    .add("DOLocationID", IntegerType()) \
    .add("passenger_count", DoubleType()) \
    .add("trip_distance", DoubleType()) \
    .add("tip_amount", DoubleType())

parsed_stream = green_stream \
    .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
    .select("data.*") \

parsed_stream_with_timestamp = parsed_stream.withColumn("timestamp", F.current_timestamp())

popular_destinations = parsed_stream_with_timestamp \
    .groupBy(
        F.window(F.col("timestamp"), "5 minutes"),
        F.col("DOLocationID")
    ) \
    .count() \
    .orderBy("count", ascending=False)

query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()