In [1]:
from pyspark.sql import SparkSession

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Project 1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/20 23:03:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/08/20 23:03:05 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
''' This code is to read the most updated schema from the taxi data and apply it to 
other data used '''

from pyspark.sql import functions as F

# read 2023-02 (updated schema type) data
sdf_feb = spark.read.parquet('../data/landing/taxi_data/2023-02.parquet')
sdf_feb = sdf_feb.select('tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count'
, 'trip_distance', 'PULocationID', 'DOLocationID', 'fare_amount')

consistent_col_casing = [F.col(col_name).alias(col_name.lower()) for col_name in sdf_feb.columns]
sdf_feb = sdf_feb.select(*consistent_col_casing)

# this will be used in the cell below when reading in
sdf_schema = sdf_feb.schema
sdf_schema

                                                                                

StructType([StructField('tpep_pickup_datetime', TimestampNTZType(), True), StructField('tpep_dropoff_datetime', TimestampNTZType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('pulocationid', IntegerType(), True), StructField('dolocationid', IntegerType(), True), StructField('fare_amount', DoubleType(), True)])

In [3]:
''' This code is to change the schema to the correct schema and move it to the 
raw data directory'''

for month in range(1, 13):
    print(f"starting month {month}")
    input_path = f'../data/landing/taxi_data/2022-{str(month).zfill(2)}.parquet'
    output_path = f'../data/raw/2022-{str(month).zfill(2)}.parquet'

    sdf_malformed = spark.read.parquet(input_path)
    sdf_malformed = sdf_malformed.select('tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count'
, 'trip_distance', 'PULocationID', 'DOLocationID', 'fare_amount')
        
    # select all columns from the existing malformed dataframe and cast it to the required schema
    sdf_malformed = sdf_malformed.select([F.col(c).cast(sdf_schema[i].dataType) for i, c in enumerate(sdf_malformed.columns)])
    sdf_malformed = sdf_malformed.coalesce(1)

    # write it to raw data directory
    sdf_malformed.write.mode('overwrite').parquet(output_path)
    print(f"month {month} done")

starting month 1


                                                                                

month 1 done
starting month 2


                                                                                

month 2 done
starting month 3


                                                                                

month 3 done
starting month 4


                                                                                

month 4 done
starting month 5


                                                                                

month 5 done
starting month 6


                                                                                

month 6 done
starting month 7


                                                                                

month 7 done
starting month 8


                                                                                

month 8 done
starting month 9


                                                                                

month 9 done
starting month 10


                                                                                

month 10 done
starting month 11


                                                                                

month 11 done
starting month 12


[Stage 24:>                                                         (0 + 1) / 1]

month 12 done


                                                                                

In [4]:
# read the data 
sdf = spark.read.schema(sdf_schema).parquet('../data/raw/*')
sdf

tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pulocationid,dolocationid,fare_amount
2022-10-01 00:03:41,2022-10-01 00:18:39,1,1.7,249,107,9.5
2022-10-01 00:14:30,2022-10-01 00:19:48,2,0.72,151,238,5.5
2022-10-01 00:27:13,2022-10-01 00:37:41,1,1.74,238,166,9.0
2022-10-01 00:32:53,2022-10-01 00:38:55,0,1.3,142,239,6.5
2022-10-01 00:44:55,2022-10-01 00:50:21,0,1.0,238,166,6.0
2022-10-01 00:22:52,2022-10-01 00:52:14,1,6.8,186,41,25.5
2022-10-01 00:33:19,2022-10-01 00:44:51,3,1.88,162,145,10.5
2022-10-01 00:02:42,2022-10-01 00:50:01,1,12.2,100,22,41.0
2022-10-01 00:06:35,2022-10-01 00:24:38,1,7.79,138,112,23.5
2022-10-01 00:29:25,2022-10-01 00:43:15,1,4.72,145,75,14.5


In [5]:
sdf.count()

39656098

In [6]:
# filtering the data

from pyspark.sql.functions import year
from datetime import datetime

start_date = datetime.strptime("2022-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")
end_date = datetime.strptime("2022-12-31 23:59:59", "%Y-%m-%d %H:%M:%S")

filtered_df = sdf.where(
    (year(F.col('tpep_pickup_datetime')) == 2022)
    & (year(F.col('tpep_dropoff_datetime')) == 2022)
    & (F.col('tpep_pickup_datetime') >= start_date) 
    & (F.col('tpep_pickup_datetime') <= end_date)
    & (F.col('tpep_dropoff_datetime') >= start_date) 
    & (F.col('tpep_dropoff_datetime') <= end_date)
)
sdf.count() - filtered_df.count()


                                                                                

56955

In [7]:
filtered_df2 = filtered_df.where(
(F.col('tpep_dropoff_datetime') > F.col('tpep_pickup_datetime'))
)
filtered_df.count() - filtered_df2.count()

                                                                                

32021

In [8]:
# calculate outlier
percentile_99_distance = sdf.approxQuantile("trip_distance", [0.99], 0.001)[0]
percentile_1_distance = sdf.approxQuantile("trip_distance", [0.01], 0.001)[0]

                                                                                

In [9]:
# more data filtering 
from pyspark.sql import functions as F

sdf_filtered = filtered_df2.where(
    (F.col('passenger_count') > 0)
    & (F.col('trip_distance') < percentile_99_distance)
    & (F.col('trip_distance') > percentile_1_distance)
    & (F.col('fare_amount') > 3.0)
    & (F.col('pulocationid').between(1, 263))
    & (F.col('dolocationid').between(1, 263))
)

In [10]:
# data count check
sdf_filtered.count() - filtered_df2.count()

                                                                                

-3888647

In [11]:
# extract day from datetime 
from pyspark.sql.functions import date_format, unix_timestamp

sdf_filtered = sdf_filtered.withColumn('pickup_day', date_format('tpep_pickup_datetime', 'EE'))
sdf_filtered

tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pulocationid,dolocationid,fare_amount,pickup_day
2022-10-01 00:03:41,2022-10-01 00:18:39,1,1.7,249,107,9.5,Sat
2022-10-01 00:14:30,2022-10-01 00:19:48,2,0.72,151,238,5.5,Sat
2022-10-01 00:27:13,2022-10-01 00:37:41,1,1.74,238,166,9.0,Sat
2022-10-01 00:22:52,2022-10-01 00:52:14,1,6.8,186,41,25.5,Sat
2022-10-01 00:33:19,2022-10-01 00:44:51,3,1.88,162,145,10.5,Sat
2022-10-01 00:02:42,2022-10-01 00:50:01,1,12.2,100,22,41.0,Sat
2022-10-01 00:06:35,2022-10-01 00:24:38,1,7.79,138,112,23.5,Sat
2022-10-01 00:29:25,2022-10-01 00:43:15,1,4.72,145,75,14.5,Sat
2022-10-01 00:01:55,2022-10-01 00:20:16,1,8.8,138,236,26.0,Sat
2022-10-01 00:27:48,2022-10-01 00:59:50,1,8.6,140,36,29.5,Sat


In [12]:
# calculate the trip duration 
sdf_day = sdf_filtered.withColumn('trip_duration_in_sec', 
               unix_timestamp('tpep_dropoff_datetime') - unix_timestamp('tpep_pickup_datetime'))


In [13]:
# filter for trip duration longer than one minute
sdf_day = sdf_day.where(
    (F.col('trip_duration_in_sec') > 60)
)

In [14]:
sdf_filtered.count() - sdf_day.count()

                                                                                

54262

In [15]:
# read weather data
weather = spark.read.csv("../data/landing/weather/2022.csv", header=True, inferSchema=True)
weather

datetime,temp,precip,snow,windspeed,conditions
2021-12-31 00:00:00,9.4,0.0,0.0,3.5,Overcast
2021-12-31 01:00:00,9.0,0.0,0.0,7.4,Overcast
2021-12-31 02:00:00,9.4,0.0,0.0,3.0,Overcast
2021-12-31 03:00:00,9.4,0.0,0.0,6.5,Overcast
2021-12-31 04:00:00,9.0,0.0,0.0,3.5,Overcast
2021-12-31 05:00:00,9.2,0.0,0.0,2.5,Overcast
2021-12-31 06:00:00,9.4,0.0,0.0,0.0,Overcast
2021-12-31 07:00:00,10.0,0.0,0.0,0.0,Overcast
2021-12-31 08:00:00,10.2,0.0,0.0,0.0,Overcast
2021-12-31 09:00:00,10.7,0.0,0.0,0.0,Overcast


In [16]:
# change weather timezone to UTC timezone
from pyspark.sql.functions import to_utc_timestamp
weather = weather.withColumn('utc_datetime', to_utc_timestamp(F.col('datetime'), "EST").alias('utc_time'))

In [17]:
# check the data
weather

datetime,temp,precip,snow,windspeed,conditions,utc_datetime
2021-12-31 00:00:00,9.4,0.0,0.0,3.5,Overcast,2021-12-31 05:00:00
2021-12-31 01:00:00,9.0,0.0,0.0,7.4,Overcast,2021-12-31 06:00:00
2021-12-31 02:00:00,9.4,0.0,0.0,3.0,Overcast,2021-12-31 07:00:00
2021-12-31 03:00:00,9.4,0.0,0.0,6.5,Overcast,2021-12-31 08:00:00
2021-12-31 04:00:00,9.0,0.0,0.0,3.5,Overcast,2021-12-31 09:00:00
2021-12-31 05:00:00,9.2,0.0,0.0,2.5,Overcast,2021-12-31 10:00:00
2021-12-31 06:00:00,9.4,0.0,0.0,0.0,Overcast,2021-12-31 11:00:00
2021-12-31 07:00:00,10.0,0.0,0.0,0.0,Overcast,2021-12-31 12:00:00
2021-12-31 08:00:00,10.2,0.0,0.0,0.0,Overcast,2021-12-31 13:00:00
2021-12-31 09:00:00,10.7,0.0,0.0,0.0,Overcast,2021-12-31 14:00:00


In [18]:
# filtering weather data 
from pyspark.sql.functions import year
weather = weather.where(year(weather["utc_datetime"]) == 2022)

In [19]:
# check duplicate
duplicate_counts = weather.groupBy('utc_datetime').count()
duplicates_count = duplicate_counts.filter(F.col("count") > 1).count()
duplicates_count

                                                                                

1

In [20]:
# check data that duplicated
duplicate_counts.filter(F.col("count") > 1)

utc_datetime,count
2022-11-06 06:00:00,2


In [21]:
# drop duplicate 
weather = weather.dropDuplicates(['datetime'])

In [22]:
# count weather data
weather.count()

8759

In [23]:
# outter join the taxi dataset with weather data to check which date is missing in the weather dataset
from pyspark.sql.functions import hour, date_trunc

combined_df = sdf_day.join(
    weather,
    (hour(sdf_day["tpep_pickup_datetime"]) == hour(weather["utc_datetime"]))
    & (date_trunc("day", sdf_day["tpep_pickup_datetime"]) == date_trunc("day", weather["utc_datetime"])),
    "outer"
)

In [24]:
# check missing values in weather dataset
combined_df.where(F.col('temp').isNull())

                                                                                

tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pulocationid,dolocationid,fare_amount,pickup_day,trip_duration_in_sec,datetime,temp,precip,snow,windspeed,conditions,utc_datetime
2022-03-13 07:45:10,2022-03-13 07:47:22,5,0.99,263,141,4.5,Sun,132,,,,,,,
2022-03-13 07:10:52,2022-03-13 07:47:18,1,12.1,132,17,38.0,Sun,2186,,,,,,,
2022-03-13 07:02:48,2022-03-13 07:32:57,1,19.54,132,260,52.0,Sun,1809,,,,,,,
2022-03-13 07:00:55,2022-03-13 07:20:28,1,7.31,162,13,23.5,Sun,1173,,,,,,,
2022-03-13 07:00:47,2022-03-13 07:08:06,1,1.86,90,79,8.0,Sun,439,,,,,,,
2022-03-13 07:13:16,2022-03-13 07:17:48,1,0.96,260,260,5.5,Sun,272,,,,,,,
2022-03-13 07:20:12,2022-03-13 07:40:11,1,4.69,260,230,18.0,Sun,1199,,,,,,,
2022-03-13 07:48:27,2022-03-13 07:58:20,1,1.74,164,50,9.0,Sun,593,,,,,,,
2022-03-13 07:15:34,2022-03-13 07:25:09,1,1.52,236,239,8.0,Sun,575,,,,,,,
2022-03-13 07:42:22,2022-03-13 07:56:59,1,2.86,68,231,13.0,Sun,877,,,,,,,


In [25]:
# check the previous hour data
weather.where(F.col('utc_datetime') == '2022-03-13 06:00:00')

datetime,temp,precip,snow,windspeed,conditions,utc_datetime
2022-03-13 01:00:00,-3.9,0.0,0.0,15.6,Partially cloudy,2022-03-13 06:00:00


In [26]:
# check the next hour data
weather.where(F.col('utc_datetime') == '2022-03-13 08:00:00')

datetime,temp,precip,snow,windspeed,conditions,utc_datetime
2022-03-13 03:00:00,-3.9,0.0,0.0,14.3,Partially cloudy,2022-03-13 08:00:00


In [27]:
schema = weather.schema

In [28]:
# make new data (row) and add it to weather data
missing_data = spark.createDataFrame([('2022-03-13 02:00:00', '-3.9', '0.0', '0.0', f'{(15.6 + 14.3) / 2}', 'Partially cloudy', '2022-03-13 07:00:00')], weather.columns)
weather_df = weather.union(missing_data)

In [29]:
# check the data count 
weather_df.count()

                                                                                

8760

In [30]:
# inner join the taxi data with completed weather data
from pyspark.sql.functions import hour, date_trunc

combined_df2 = sdf_day.join(
    weather_df,
    (hour(sdf_day["tpep_pickup_datetime"]) == hour(weather_df["utc_datetime"]))
    & (date_trunc("day", sdf_day["tpep_pickup_datetime"]) == date_trunc("day", weather_df["utc_datetime"])),
    "inner"
)

sdf.count(), combined_df2.count(), sdf_day.count()

                                                                                

(39656098, 35624213, 35624213)

In [31]:
# check columns name
combined_df2.columns

['tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'pulocationid',
 'dolocationid',
 'fare_amount',
 'pickup_day',
 'trip_duration_in_sec',
 'datetime',
 'temp',
 'precip',
 'snow',
 'windspeed',
 'conditions',
 'utc_datetime']

In [32]:
# drop unneccessary columns 
final_df = combined_df2.select(['tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'pulocationid',
 'dolocationid',
 'pickup_day',
 'temp',
 'precip',
 'snow',
 'windspeed', 
 'conditions'])

In [33]:
# read the geopandas file 
import pandas as pd 
import geopandas as gpd

# sf stands for shape file
sf = gpd.read_file("../data/landing/geopandas/taxi_zones.shp")
zones = pd.read_csv("../data/landing/geopandas/taxi_zones.csv")
sf

Unnamed: 0,OBJECTID,Shape_Leng,Shape_Area,zone,LocationID,borough,geometry
0,1,0.116357,0.000782,Newark Airport,1,EWR,"POLYGON ((933100.918 192536.086, 933091.011 19..."
1,2,0.433470,0.004866,Jamaica Bay,2,Queens,"MULTIPOLYGON (((1033269.244 172126.008, 103343..."
2,3,0.084341,0.000314,Allerton/Pelham Gardens,3,Bronx,"POLYGON ((1026308.770 256767.698, 1026495.593 ..."
3,4,0.043567,0.000112,Alphabet City,4,Manhattan,"POLYGON ((992073.467 203714.076, 992068.667 20..."
4,5,0.092146,0.000498,Arden Heights,5,Staten Island,"POLYGON ((935843.310 144283.336, 936046.565 14..."
...,...,...,...,...,...,...,...
258,259,0.126750,0.000395,Woodlawn/Wakefield,259,Bronx,"POLYGON ((1025414.782 270986.139, 1025138.624 ..."
259,260,0.133514,0.000422,Woodside,260,Queens,"POLYGON ((1011466.966 216463.005, 1011545.889 ..."
260,261,0.027120,0.000034,World Trade Center,261,Manhattan,"POLYGON ((980555.204 196138.486, 980570.792 19..."
261,262,0.049064,0.000122,Yorkville East,262,Manhattan,"MULTIPOLYGON (((999804.795 224498.527, 999824...."


In [34]:
# Convert the geometry shaape to to latitude and longitude
sf['geometry'] = sf['geometry'].to_crs("+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs")
sf

Unnamed: 0,OBJECTID,Shape_Leng,Shape_Area,zone,LocationID,borough,geometry
0,1,0.116357,0.000782,Newark Airport,1,EWR,"POLYGON ((-74.18445 40.69500, -74.18449 40.695..."
1,2,0.433470,0.004866,Jamaica Bay,2,Queens,"MULTIPOLYGON (((-73.82338 40.63899, -73.82277 ..."
2,3,0.084341,0.000314,Allerton/Pelham Gardens,3,Bronx,"POLYGON ((-73.84793 40.87134, -73.84725 40.870..."
3,4,0.043567,0.000112,Alphabet City,4,Manhattan,"POLYGON ((-73.97177 40.72582, -73.97179 40.725..."
4,5,0.092146,0.000498,Arden Heights,5,Staten Island,"POLYGON ((-74.17422 40.56257, -74.17349 40.562..."
...,...,...,...,...,...,...,...
258,259,0.126750,0.000395,Woodlawn/Wakefield,259,Bronx,"POLYGON ((-73.85107 40.91037, -73.85207 40.909..."
259,260,0.133514,0.000422,Woodside,260,Queens,"POLYGON ((-73.90175 40.76078, -73.90147 40.759..."
260,261,0.027120,0.000034,World Trade Center,261,Manhattan,"POLYGON ((-74.01333 40.70503, -74.01327 40.704..."
261,262,0.049064,0.000122,Yorkville East,262,Manhattan,"MULTIPOLYGON (((-73.94383 40.78286, -73.94376 ..."


In [35]:
# merge both files 
gdf = gpd.GeoDataFrame(
    pd.merge(zones, sf, on='LocationID', how='inner')
)

In [36]:
# just take needed column for borough dataframe 
borough_df = gdf[['LocationID', 'Borough', 'Zone']]
borough_df = spark.createDataFrame(borough_df)

In [37]:
# join taxi_data with the borough dataframe
taxi_data = final_df.join(
    borough_df, final_df["pulocationid"] == borough_df["LocationID"], "inner"
)

In [38]:
# save it to curated directory
taxi_data.write.mode('overwrite').parquet('../data/curated/taxi_data.parquet')

23/08/20 23:08:45 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/08/20 23:08:45 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/08/20 23:09:06 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/08/20 23:09:06 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/08/20 23:09:07 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/08/20 23:09:07 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/08/20 23:09:07 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014