In [None]:
import os 
import json
from kafka import KafkaConsumer
from pyspark.sql import SparkSession
from urllib.request import urlopen
from pyspark.ml.regression import GBTRegressionModel
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import sum, col, month, year, dayofweek, dayofmonth, hour, unix_timestamp, median, mode, lit, when, from_json, expr
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, DoubleType, LongType

In [None]:
spark = SparkSession.builder.appName('Stream Processing').getOrCreate()

# Setting AWS Access Credentials 
spark.conf.set("fs.s3a.access.key", str(os.environ['AWS_ACCESS_KEY']))
spark.conf.set("fs.s3a.secret.key", str(os.environ['AWS_SECRET_ACCESS_KEY']))
spark.conf.set("fs.s3a.endpoint", "s3.amazonaws.com")

In [None]:
# Load pre-trained model
model = GBTRegressionModel.load('s3://data228/trained-models/gbt_model/')

# Incoming data stream schema
schema = StructType([
    StructField("VendorID", LongType(), nullable=True),
    StructField("tpep_pickup_datetime", StringType(), nullable=True),
    StructField("tpep_dropoff_datetime", StringType(), nullable=True),
    StructField("passenger_count", IntegerType(), nullable=True),
    StructField("trip_distance", DoubleType(), nullable=True),
    StructField("PULocationID", LongType(), nullable=True),
    StructField("DOLocationID", LongType(), nullable=True),
    StructField("RateCodeID", DoubleType(), nullable=True),
    StructField("Store_and_fwd_flag", StringType(), nullable=True),
    StructField("payment_type", LongType(), nullable=True),
    StructField("Fare_amount", DoubleType(), nullable=True),
    StructField("Extra", DoubleType(), nullable=True),
    StructField("MTA_tax", DoubleType(), nullable=True),
    StructField("Improvement_surcharge", DoubleType(), nullable=True),
    StructField("tip_amount", DoubleType(), nullable=True),
    StructField("Tolls_amount", DoubleType(), nullable=True),
    StructField("total_amount", DoubleType(), nullable=True),
    StructField("Congestion_Surcharge", DoubleType(), nullable=True),
    StructField("Airport_fee", DoubleType(), nullable=True)
])

In [None]:
def enrichWithWeatherData(df):
    df = df \
    .withColumn("trip_duration", (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime"))) \
    .withColumn('year', F.year("tpep_pickup_datetime")) \
    .withColumn('month', F.month("tpep_pickup_datetime")) \
    .withColumn('weekend', F.when((F.dayofweek("tpep_pickup_datetime") == 1) | (F.dayofweek("tpep_pickup_datetime") == 7), lit(1)).otherwise(lit(0))) \
    .withColumn('date', F.dayofmonth("tpep_pickup_datetime")) \
    .withColumn('hour', F.hour("tpep_pickup_datetime")) 

    # Extract year, month, date, and hour from the DataFrame row
    year, month, date, hour = df.select('year', 'month', 'date', 'hour').collect()[0]

    # Fetch weather data using the API
    ResultBytes = urlopen(f"https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/new%20york%2C%20ny/{year}-{month}-{date}/{year}-{month}-{date}?unitGroup=us&include=hours&key={API_KEY}&contentType=json")

    # Parse the results as JSON
    jsonData = json.load(ResultBytes)
    weather = jsonData['days'][0]['hours'][hour]

    # Add weather data to the DataFrame row
    df = df \
        .withColumn('temp', lit(weather['temp'])) \
        .withColumn('dew', lit(weather['dew'])) \
        .withColumn('humidity', lit(weather['humidity'])) \
        .withColumn('windspeed', lit(weather['windspeed'])) \
        .withColumn('visibility', lit(weather['visibility']))

    return df

In [None]:
TOPIC = "taxi-trips-topic"
BROKER_IP = '34.209.9.254:9092'
API_KEY = "ZW87PLPVYKDAEG8DR9ZLMS8RW"

consumer = KafkaConsumer(TOPIC, bootstrap_servers=[BROKER_IP])

for message in consumer:
    msg_value = json.loads(message.value.decode('utf-8'))
    msg_value['passenger_count'] = int(msg_value['passenger_count'])
    print(f"Actual Price: {msg_value['total_amount'] - msg_value['tip_amount']}", end=" | ")
    df = spark.createDataFrame([msg_value], schema = schema)
    df = enrichWithWeatherData(df)
    assembler = VectorAssembler(
    inputCols=['year', 'month', 'date', 'hour', 'passenger_count', 'trip_distance', 'trip_duration', 'PULocationID', 'DOLocationID', 'weekend', 'temp', 'dew', 'humidity', 'windspeed', 'visibility'],
    outputCol='features')

    df = assembler.transform(df)
    df = model.transform(df)
    print(f"Predicted Price: {round(df.select('prediction').collect()[0]['prediction'], 2)}")

    df.coalesce(1).write.parquet("s3://data228/stream-data-archive/", mode='append')

Actual Price: 12.8 | Predicted Price: 13.52
Actual Price: 63.1 | Predicted Price: 49.3
Actual Price: 61.85 | Predicted Price: 50.97
Actual Price: 44.050000000000004 | Predicted Price: 48.74
Actual Price: 29.299999999999997 | Predicted Price: 33.13
Actual Price: 12.3 | Predicted Price: 12.08
Actual Price: 16.8 | Predicted Price: 17.62
Actual Price: 11.8 | Predicted Price: 13.29
Actual Price: 9.3 | Predicted Price: 10.86
Actual Price: 9.8 | Predicted Price: 10.4
Actual Price: 12.3 | Predicted Price: 13.5
Actual Price: 60.599999999999994 | Predicted Price: 62.77
Actual Price: 34.55 | Predicted Price: 39.27
Actual Price: 41.85 | Predicted Price: 38.32
Actual Price: 56.55 | Predicted Price: 58.86
Actual Price: 63.099999999999994 | Predicted Price: 60.92
Actual Price: 11.3 | Predicted Price: 10.95
Actual Price: 60.6 | Predicted Price: 58.85
Actual Price: 10.3 | Predicted Price: 10.5
Actual Price: 8.3 | Predicted Price: 7.96
Actual Price: 9.3 | Predicted Price: 10.14
Actual Price: 8.8 | Predi