In [None]:
spark.stop()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.sql.functions import from_json, col, count, desc

# Define schema for green taxi data
schema = StructType([
    StructField("PULocationID", IntegerType(), True),
    StructField("DOLocationID", IntegerType(), True)
])

# Initialize Spark session
spark = SparkSession.builder \
    .appName("TopTrendingPickupDrop") \
    .master("local[*]") \
    .config("spark.jars", "/usr/local/spark/jars/spark-sql-kafka-0-10_2.11-2.4.8.jar,/usr/local/spark/jars/kafka-clients-2.7.0.jar") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Read from Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "greenTaxi") \
    .option("startingOffsets", "latest") \
    .load()

# Parse the Kafka message JSON value
json_df = kafka_df.selectExpr("CAST(value AS STRING) as json_str")
parsed_df = json_df.select(from_json(col("json_str"), schema).alias("data")).select("data.*")

# Top 5 Pickup Locations
top_pu_df = parsed_df.groupBy("PULocationID") \
    .agg(count("*").alias("pickup_count")) \
    .orderBy(desc("pickup_count")) \
    .limit(5)

# Top 5 Dropoff Locations
top_do_df = parsed_df.groupBy("DOLocationID") \
    .agg(count("*").alias("drop_count")) \
    .orderBy(desc("drop_count")) \
    .limit(5)

# Write pickup results to console
query_pickup = top_pu_df.writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='10 seconds') \
    .option("truncate", False) \
    .start()

# Write dropoff results to console
query_dropoff = top_do_df.writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='10 seconds') \
    .option("truncate", False) \
    .start()

# Wait for termination
query_pickup.awaitTermination()
query_dropoff.awaitTermination()