In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, to_timestamp
from pyspark.sql.functions import unix_timestamp, from_unixtime, round

spark = SparkSession.builder \
    .appName("read_s3_parquet") \
    .getOrCreate()


storage_account_name = "bia678"
storage_account_access_key = "Okp/ALAV3dep7SNjenw9n8nx5Oojv2DFtOOZ3FRYzzpjIOviTSS3JmpjJAsMskG1/bqZFn6Id2HO+AStB0MDTQ=="
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_access_key)
container_name = "taxidataset"

dem = spark.read.csv(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/pavan-bucket/dataset/demand_data/",header=True)
df = spark.read.parquet(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/pavan-bucket/dataset/taxi-data/")
weather = spark.read.csv(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/pavan-bucket/dataset/weather-data/weather.csv",header=True, inferSchema=True)
holiday=spark.read.csv(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/pavan-bucket/dataset/holiday-data/export.csv",header=True)
location_coordinates=spark.read.csv (f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/pavan-bucket/location_data/location.csv",header=True)

Joining the weather dataset

In [0]:
weather_df=weather.select(F.col("datetime"),F.col("temp"))
weather_df = weather_df.withColumn("hourly_interval", F.date_trunc("hour", weather_df.datetime))
df = df.withColumn("hourly_interval", F.date_trunc("hour", df.tpep_pickup_datetime))
joined_df = df.join(weather_df, "hourly_interval", "left_outer")
joined_df = joined_df.drop("hourly_interval","datetime")

Joining the Holiday Dataset

In [0]:
holiday_data=holiday.select("date","holidayName")
data = joined_df.join(holiday_data, joined_df.tpep_pickup_datetime == holiday_data.date, "left_outer")
data=data.drop("date")

Joining Coordinates

In [0]:
location_coordinates= location_coordinates.select("LocationID","Latitude","Longitude")
data=data.join(location_coordinates,data.PULocationID==location_coordinates.LocationID)
data = data.withColumn("Latitude", F.col("Latitude").cast(DoubleType()))
data = data.withColumn("Longitude", F.col("Longitude").cast(DoubleType()))

New feature "isholiday"

In [0]:
#holiday = 1 , no holiday = 0
data = data.withColumn("isHoliday", F.when(data.holidayName.isNotNull(), F.lit(1)).otherwise(F.lit(0)))
data=data.drop("holidayName")

Modifying "isHoliday" to include weekends as holidays

In [0]:
data = data.withColumn('dayofweek', F.dayofweek(F.col('tpep_pickup_datetime')))
# New column'isWeekend' Saturdays and Sundays are as 1, others as 0
data = data.withColumn('isWeekend', F.when(F.col('dayofweek').isin([1, 7]), 1).otherwise(0))
data = data.withColumn(
    'isHoliday', 
    F.when(
        (F.col('isHoliday') == 1) | (F.col('dayofweek').isin([7, 1])), 1
    ).otherwise(0)
)
data=data.drop("dayofweek","isWeekend")

New Feature trip_duration

In [0]:
data = data.withColumn(
    "trip_duration",
    F.round(
        (F.unix_timestamp("tpep_dropoff_datetime") - F.unix_timestamp("tpep_pickup_datetime")) / 60,
        2
    )
)

New Feature speed_mph

In [0]:
data = data.withColumn("trip_duration_hours", (F.col("trip_duration") / 60))
data = data.withColumn("speed_mph", F.round(F.col("trip_distance") / F.col("trip_duration_hours"),2))
data=data.drop('trip_duration_hours')

Splitting Date time to different Columns

In [0]:
data = data.withColumn("year", F.year("tpep_pickup_datetime")) \
    .withColumn("month", F.month("tpep_pickup_datetime")) \
    .withColumn("day", F.dayofmonth("tpep_pickup_datetime")) \
    .withColumn("hour", F.hour("tpep_pickup_datetime")) \
    .withColumn("minute", F.minute("tpep_pickup_datetime"))

Removing Tip Amount

In [0]:
data = data.withColumn("total_amount",(F.col("total_amount")-F.col("tip_amount")))

Adding Congestion surcharge to the total amount

In [0]:
def adjust_total_amount(data):
    sum_of_charges = col('fare_amount') + col('extra') + col('mta_tax') + \
                     col('tolls_amount') + col('improvement_surcharge') + \
                     col('congestion_surcharge')

    data = data.withColumn(
        'total_amount',
        F.when((F.col('total_amount')) != sum_of_charges, sum_of_charges)
        .otherwise(col('total_amount'))
    )
    
    return data
data = adjust_total_amount(data)

Adding airport fee to the total amount

In [0]:
data = data.withColumn(
    "total_amount",
    F.round(F.col("total_amount") + F.col("airport_fee"), 2)
)

Outliers

In [0]:
data = data.filter(
    (F.col('total_amount') > 5) &
    (F.col('total_amount') < 500) &
    (F.col('trip_duration') > 2) &
    (F.col('trip_duration') < 300) &
    (F.col('trip_distance') > 0.25) &
    (F.col('trip_distance') < 50) &
    (F.col('year') <= 2023) &
    (F.col('year') > 2021) &
    (F.col('passenger_count') > 0) &
    (F.col('passenger_count') < 7) &
    (F.col('speed_mph') <= 55)
)

Dropping unnecessary columns from the data

In [0]:
data=data.drop("tip_amount","fare_amount",'extra','mta_tax','tolls_amount','improvement_surcharge','congestion_surcharge',"airport_fee","RatecodeID","store_and_fwd_flag","LocationID")

In [0]:
data=data.na.drop()

In [0]:
display(data)

Loading final dataset to Azure

In [0]:
# data=data.coalesce(1)
# data.write.parquet(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/pavan-bucket/final_dataset_1/")

Creating demand dataset

In [0]:
fin_df = df.select(F.col('tpep_pickup_datetime'), F.col('PULocationID'))
fin_df = fin_df.withColumn("pickup_timestamp", unix_timestamp("tpep_pickup_datetime", "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
fin_df = fin_df.withColumn("time_bins", from_unixtime(((unix_timestamp("pickup_timestamp") / 3600).cast("integer") * 3600), "yyyy-MM-dd HH:mm:ss"))
fin_df.drop(F.col('pickup_timestamp'))
demand_df = fin_df.groupBy(F.col('time_bins'), F.col('PULocationID')).agg(F.count('time_bins').alias('no_of_pickups'))
demand_df = demand_df.orderBy(F.col('time_bins'))

Demand will be continued in "demand" notebook