In [6]:
import pyspark
print(pyspark.__version__)

3.5.3


In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Query 0 - Data Cleansing") \
    .getOrCreate()

In [8]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
# Create schema for csv data
schema = StructType([
    StructField("medallion", StringType(), True),
    StructField("hack_license", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("trip_time_in_secs", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("pickup_longitude", DoubleType(), True),
    StructField("pickup_latitude", DoubleType(), True),
    StructField("dropoff_longitude", DoubleType(), True),
    StructField("dropoff_latitude", DoubleType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("surcharge", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True)
])

In [9]:
# Load the CSV data
df = spark.read.schema(schema).csv("input/sorted_data.csv")
df.printSchema()
df.show(5)

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/jovyan/input/sorted_data.csv.

In [5]:
initial_count = df.count()
print(f"Initial row count: {initial_count}")

Initial row count: 173185091


In [6]:
# Clean the data, drop null values that might have been added by giving the data a schema.
df_clean = df.dropna()
# Filter out wrong values
df_clean = df_clean.filter(df_clean.trip_time_in_secs > 0)
df_clean = df_clean.filter(df_clean.trip_distance > 0)
df_clean = df_clean.filter(df_clean.hack_license != "unknown")
df_clean = df_clean.filter(df_clean.medallion != "unknown")

df_clean = df_clean.filter(df_clean.pickup_latitude != 0.0)
df_clean = df_clean.filter(df_clean.pickup_longitude != 0.0)
df_clean = df_clean.filter(df_clean.dropoff_latitude != 0.0)
df_clean = df_clean.filter(df_clean.dropoff_longitude != 0.0)

In [7]:
from pyspark.sql.functions import to_date
#Create new column for partitioning
df_clean = df_clean.withColumn("trip_date", to_date(df_clean.pickup_datetime))

In [8]:
# Count total rows after cleaning
final_count = df_clean.count()
print(f"Row count after cleaning: {final_count}")


Row count after cleaning: 169346433


In [9]:
# Take only a sample of the cleaned data due to memory issues
df_sample = df_clean.sample(fraction=0.1, seed=42)
# Create daily partitions to output folder
df_sample.write.partitionBy("trip_date").mode("overwrite").parquet("output")

Kafka osa on pooleli, hetkel üritasin kasutada partitioneid mis lõin outputi, et need üks haaval kafkasse sisse lugeda.

In [None]:
import os
partition_dirs = [
    os.path.join("output", d)
    for d in os.listdir("output")
    if d.startswith("trip_date=")
]
for partition_path in partition_dirs:
    print(f"Writing data from: {partition_path}")
    df_partition = spark.read.parquet(partition_path)
    df_partition.show(5)

In [None]:
# Send the data to Kafka for future use.
df_sample.selectExpr("CAST(medallion AS STRING) AS key", "to_json(struct(*)) AS value") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "cleaned_data") \
    .save()

In [None]:
## Step 1: Read the Kafka Stream

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import *

stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "cleaned_data") \
    .option("startingOffsets", "earliest") \
    .load()

json_df = stream_df.selectExpr("CAST(value AS STRING)")

# Parse back to structured data
parsed_df = json_df.select(from_json(col("value"), schema).alias("data")).select("data.*")


In [None]:
##  Step 2: Define Grid Cell Mapping Function

from pyspark.sql.functions import udf
from math import floor

# Constants for grid (origin and resolution)
GRID_ORIGIN_LAT = 41.474937
GRID_ORIGIN_LON = -74.913585
GRID_CELL_SIZE_METERS = 500

# Approximate conversion
METER_PER_DEGREE_LAT = 111320
METER_PER_DEGREE_LON = 40075000 * cos(GRID_ORIGIN_LAT * 3.1416 / 180) / 360

def get_cell_id(lat, lon):
    dx = (lon - GRID_ORIGIN_LON) * METER_PER_DEGREE_LON
    dy = (GRID_ORIGIN_LAT - lat) * METER_PER_DEGREE_LAT
    col_idx = int(dx // GRID_CELL_SIZE_METERS) + 1
    row_idx = int(dy // GRID_CELL_SIZE_METERS) + 1
    return f"{col_idx}.{row_idx}"

grid_cell_udf = udf(get_cell_id, StringType())

# Add start/end grid cells
parsed_df = parsed_df \
    .withColumn("start_cell", grid_cell_udf("pickup_latitude", "pickup_longitude")) \
    .withColumn("end_cell", grid_cell_udf("dropoff_latitude", "dropoff_longitude"))


In [None]:
## Step 3: Apply Windowing for Last 30 Minutes

from pyspark.sql.functions import window

routes_30min = parsed_df \
    .withWatermark("dropoff_datetime", "30 minutes") \
    .groupBy(
        window("dropoff_datetime", "30 minutes", "5 minutes"),
        col("start_cell"),
        col("end_cell")
    ) \
    .count() \
    .orderBy(col("count").desc())


In [None]:
##  Step 4: Show Top 10 Routes to Console (or File)
# This will update every 5 minutes with the most frequent routes in the last 30 minutes.
query = routes_30min \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()
