# Raw Layer to Curated Layer (filter and transform using business logic, feature engineering, aggregation)

In [17]:
from pyspark.sql import SparkSession

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Project 1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC") # fix timestamps loaded by spark
    .getOrCreate()
)

## Yellow Taxis

In [2]:
# make directories for curated data

import os
from pyspark.sql import functions as F

if not os.path.exists('../data/curated/'):
    os.makedirs('../data/curated/')
    
if not os.path.exists('../data/curated/tlc'):
    os.makedirs('../data/curated/tlc')
    
if not os.path.exists('../data/curated/tlc/yellow'):
    os.makedirs('../data/curated/tlc/yellow')

### Load in data (this was used for testing / data exploration)

In [3]:
yellow_raw_dir = '../data/raw/tlc/yellow/'

yellow_curated_dir = '../data/curated/tlc/yellow/'

sdf_yellow_2022_05 = spark.read.parquet(f"{yellow_raw_dir}2022-05.parquet")





                                                                                

### Data Investigation (mostly was used for double checking business logic)

In [4]:
print(sdf_yellow_2022_05.where(
        (F.col('extra') == 0)  # check what happens if a trip begins before rush hour starts
        & (F.hour('tpep_pickup_datetime') < 16)
        & (F.hour('tpep_dropoff_datetime') >= 16)
).count())

print(sdf_yellow_2022_05.where(
        (F.col('extra') == 1)  # check what happens if a trip begins before rush hour starts
        & (F.hour('tpep_pickup_datetime') < 16)
        & (F.hour('tpep_dropoff_datetime') >= 16)
).count())

# appears that if a pickup occurs before rush hour begins, but the dropoff occurs after rush hour starts, no rush hour surcharge is applied
# so we shall assume that the extra surcharges are applied based on pickup time



                                                                                

50344
1848


In [5]:
print(sdf_yellow_2022_05.where(
        (F.col('extra') == 1)  # check what happens if a trip begins during rush hour but ends in overnight timeslot
        & (F.hour('tpep_pickup_datetime') < 20)
        & (F.hour('tpep_dropoff_datetime') >= 20)
).count())

print(sdf_yellow_2022_05.where(
        (F.col('extra') == 0.5)  # check what happens if a trip begins during rush hour but ends in overnight timeslot
        & (F.hour('tpep_pickup_datetime') < 20)
        & (F.hour('tpep_dropoff_datetime') >= 20)
).count())

21562
2486


In [6]:
print(sdf_yellow_2022_05.where(
        (F.col('extra') == 0.5)  # check what happens if a trip begins during overnight slot but ends after it 
        & (F.hour('tpep_pickup_datetime') < 6)
        & (F.hour('tpep_dropoff_datetime') >= 6)
).count())

print(sdf_yellow_2022_05.where(
        (F.col('extra') == 0)  # check what happens if a trip begins during overnight slot but ends after it 
        & (F.hour('tpep_pickup_datetime') < 6)
        & (F.hour('tpep_dropoff_datetime') >= 6)
).count())

2905
2446


In [7]:
sdf_yellow_2022_05.where(
    (F.col('extra') == 0)
    | (F.col('extra') == 0.5)
    | (F.col('extra') == 1)
).count()

2569447

In [8]:
sdf_yellow_2022_05.where(
    (F.col('extra') != 0)
    & (F.col('extra') != 0.5)
    & (F.col('extra') != 1)
    & (F.col('extra') != 1.5)
).count()

1018844

In [18]:
# retrieve location ids for JFK and LaGuardia Airports

taxi_zones = spark.read.parquet('../data/raw/tlc/taxi_zones/taxi_zone_lookup.parquet/')

jfk_id = int(taxi_zones.where(
    F.col('zone') == 'JFK Airport'
).select('locationid').first()['locationid'])

laguardia_id = int(taxi_zones.where(
    F.col('zone') == 'LaGuardia Airport'
).select('locationid').first()['locationid'])

print(jfk_id)

print(laguardia_id)


132
138


### Use business logic to filter and transform data. Also remove invalid entries and perform feature engineering

In [21]:
from pyspark.sql import functions as F

# columns which are supposed to add up to total amount
payment_cols = ['fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'congestion_surcharge', 'airport_fee']

# number of records before filtering
before_filt_sum = 0

# number of records after filtering
after_filt_sum = 0

yellow_raw_dir = '../data/raw/tlc/yellow/'
    
yellow_curated_dir = '../data/curated/tlc/yellow/'

YEARS = ['2022']

MONTHS = range(5,12)

for year in YEARS:   
    for month in MONTHS:
        
        sdf = spark.read.parquet(f"{yellow_raw_dir}{year}-{str(month).zfill(2)}.parquet")  
        
        #################################### Remove invalid records by applying business logic ####################################
        
        print(f"=====================================================================")
        print(f"Number of records for month {month} before filtering: {sdf.count()}")
        before_filt_sum += sdf.count()
        
        sdf = \
        sdf.where(
            (F.year('tpep_pickup_datetime') == year) # ensure that this trip was initiated within the correct year (assuming a trip belongs to a year based on pickup time)
            & (F.month('tpep_pickup_datetime') == month)  # ensure that this trip was initiated within the correct month (assuming a trip belongs to a month based on pickup time)
            & (F.unix_timestamp('tpep_pickup_datetime') < F.unix_timestamp('tpep_dropoff_datetime') - 60) # 1 minute minimum trip time
            & (F.col('passenger_count') > 0) # ensure non-zero passenger count, but also within 6 people based on the law
            & (F.col('passenger_count') <= 6)
            & (F.col('trip_distance') > 0.25) # remove extremely short / negative trip distance based on assumed distance people are willing to walk before taking a taxi
            & (F.col('trip_distance') < 300) # remove extreme outliers for trip distance (trips around and above this distance generally appear invalid / erroneous)
            & (F.col('pulocationid') >= 1) # ensure trips are only within the specified range of locations
            & (F.col('pulocationid') <= 263)
            & (F.col('dolocationid') >= 1)
            & (F.col('dolocationid') <= 263)
            & (F.col('total_amount') >= 2.50) # ensure non-negative fees, minimum of $2.50 for standard fares
            & (F.col('fare_amount') >= 2.50) 
            & (F.col('tip_amount') >= 0) 
            & (F.col('tolls_amount') >= 0)
            & (F.col('payment_type') == 1) # include only credit card payments as cash tips are not included in this data
            & (F.col('total_amount') == sum(sdf[col] for col in payment_cols)) # check the total amount equals the sum of all other fees
        ).where(
            (F.col('vendorid') == 1) # check for valid vendor id
            | (F.col('vendorid') == 2)  
        ).where(
            (F.col('ratecodeid') == 1) # standard, JFK, and Newark rates suitable for this research based on the locations covered
            | (F.col('ratecodeid') == 2)  
            | (F.col('ratecodeid') == 3)
        ).where(
            (F.col('mta_tax') == 0.50) # $0.50 MTA tax for trips that end in NYC (but not Newark).
            & (F.col('dolocationid') != 1)  
            | (F.col('mta_tax') == 0)
            & (F.col('dolocationid') == 1)  
        ).where(
            (F.col('improvement_surcharge') == 0) # check improvement surcharge is either $0 or $0.30
            | (F.col('improvement_surcharge') == 0.30)
        ).where(
            (F.col('congestion_surcharge') == 0) # check congestion surcharge is either $0 or $2.50 for yellow taxis
            | (F.col('congestion_surcharge') == 2.50)
        ).where(
            (F.col('airport_fee') == 0) # ensure airport fee was only applied for pickup at JFK or LaGuardia
            & (F.col('pulocationid') != jfk_id)
            & (F.col('pulocationid') != laguardia_id)
            | (F.col('pulocationid') == jfk_id)
            & (F.col('airport_fee') == 1.25)
            | (F.col('pulocationid') == laguardia_id)
            & (F.col('airport_fee') == 1.25)  
        ).dropna(
            'any' # remove any records containing NULL values as it will be assumed these are invalid / erroneous entries 
        ).withColumn( # split datetime into separate units of time
            'hour',
            (F.hour('tpep_pickup_datetime'))
        ).withColumn(
            'day_of_month',
            (F.dayofmonth('tpep_pickup_datetime'))
        ).withColumn(
            'month',
            (F.month('tpep_pickup_datetime'))
        ).withColumn(
            'year',
            (F.year('tpep_pickup_datetime'))
        ).withColumn(
            'day_of_week',
            (F.dayofweek('tpep_pickup_datetime'))
        ).withColumn(
            'day_of_year',
            (F.dayofyear('tpep_pickup_datetime'))
        ).withColumn(
            'is_weekday',
            F.when(
                (F.col('day_of_week') != 1)
                & (F.col('day_of_week') != 7),
                True
            ).otherwise(False)
        ).withColumn(
            'is_public_holiday',
            F.when(
                (F.col('day_of_month') == 25) # Christmas Day
                & (F.col('month') == 12)
                | (F.col('day_of_month') == 1) # New Year's Day
                & (F.col('month') == 1)
                | (F.col('day_of_month') == 4) # Independence Day
                & (F.col('month') == 7)
                | (F.col('day_of_month') == 11) # Veteran's Day
                & (F.col('month') == 11)
                | (F.col('day_of_month') == 12) # Lincoln's Birthday
                & (F.col('month') == 2)
                | (F.col('day_of_month') == 19) # Juneteenth
                & (F.col('month') == 6)
                | (F.col('day_of_month') == 17) # Martin Luther King Jr. Day 2022
                & (F.col('month') == 1)
                & (F.col('year') == 2022)
                | (F.col('day_of_month') == 21) # Washington's Birthday 2022
                & (F.col('month') == 2)
                & (F.col('year') == 2022)
                | (F.col('day_of_month') == 30) # Memorial Day 2022
                & (F.col('month') == 5)
                & (F.col('year') == 2022)
                | (F.col('day_of_month') == 5) # Labor Day 2022
                & (F.col('month') == 9)
                & (F.col('year') == 2022)
                | (F.col('day_of_month') == 10) # Columbus Day 2022
                & (F.col('month') == 10)
                & (F.col('year') == 2022)
                | (F.col('day_of_month') == 24) # Thanksgiving 2022
                & (F.col('month') == 11)
                & (F.col('year') == 2022),
                True
            ).otherwise(False)
        ).where( # note that it is assumed that extra surcharges are applied based on pickup time, and rush hour surcharges are therefore applied if a trip
                 # begins in rush hour but ends in the overnight timeslot
            (F.col('extra') == 1.00) # ensure rush hour surcharge only applied for trips that begin during rush hour
            & (F.hour('tpep_pickup_datetime') >= 16)
            & (F.hour('tpep_pickup_datetime') < 20)
            | (F.col('extra') == 0)  # no surcharge for trips that don't fall within rush hour or overnight time slots
            & (F.hour('tpep_pickup_datetime') >= 6)
            & (F.hour('tpep_pickup_datetime') < 16)
            | (F.col('extra') == 0) # no rush hour surcharge for weekends
            & (F.col('is_weekday') == False) 
            & (F.hour('tpep_pickup_datetime') >= 16)
            & (F.hour('tpep_pickup_datetime') < 20)
            | (F.col('extra') == 0) # no rush hour surcharge for holidays
            & (F.col('is_public_holiday') == True) 
            & (F.hour('tpep_pickup_datetime') >= 16)
            & (F.hour('tpep_pickup_datetime') < 20)
            | (F.col('extra') == 0.50) # ensure overnight surcharge only applied for overnight trips
            & (F.hour('tpep_pickup_datetime') >= 20)
            | (F.col('extra') == 0.50)
            & (F.hour('tpep_pickup_datetime') < 6)
        )
    
        print(f"Number of records for month {month} after filtering: {sdf.count()}")
        after_filt_sum += sdf.count()
        print(f"=====================================================================")
        
        ################################## Feature Engineering ##################################
        sdf = sdf.withColumn(
            'total_minus_tolls', # work out how much money a taxi trip made after accounting for tolls paid
            F.col('total_amount') - F.col('tolls_amount')
        ).withColumn(
            'trip_time_minutes', # record time of a trip in minutes
            (F.unix_timestamp('tpep_dropoff_datetime') - F.unix_timestamp('tpep_pickup_datetime')) / 60
        ).groupBy(['year', 'month', 'day_of_month', 'day_of_week', 'hour', 'pulocationid', 'is_weekday', 'is_public_holiday']) \
            .agg(F.avg(F.col('total_minus_tolls') / F.col('trip_distance') / 5).alias('avg_usd_per_1/5_mile'), # record metrics for each taxi zone after grouping by hour and pickup location
                F.avg(F.col('total_minus_tolls') / F.col('trip_time_minutes')).alias('avg_usd_per_minute'),
                F.count('pulocationid').alias('number_of_trips_here')
        )
            
        # count the number of total trips across NYC within each hour
        num_trips_nyc = sdf.groupBy(['year', 'month', 'day_of_month', 'hour']).agg(F.sum(F.col('number_of_trips_here')).alias('number_of_trips_nyc'))
        
        sdf = sdf.join(num_trips_nyc, ['year', 'month', 'day_of_month', 'hour'], 'inner')
        
        # create a hourly zone profitability metric which combines the avg usd per 1/5 mile and the avg usd per minute of a location, and multiplies it
        # by the number of trips in that location for the hour
        sdf = sdf.withColumn(
            'zone_profitability',
            (F.col('avg_usd_per_1/5_mile') + F.col('avg_usd_per_minute')) * (F.col('number_of_trips_here'))
        ).orderBy(F.col('zone_profitability').desc())
        
        # save this month to curated directory
        sdf \
        .coalesce(1) \
        .write \
        .mode('overwrite') \
        .parquet(f"{yellow_curated_dir}{year}-{month}.parquet")
    

Number of records for month 5 before filtering: 3588295


                                                                                

Number of records for month 5 after filtering: 1393990


                                                                                



                                                                                

Number of records for month 6 before filtering: 3558124


                                                                                

Number of records for month 6 after filtering: 1392921


                                                                                



                                                                                

Number of records for month 7 before filtering: 3174394


                                                                                

Number of records for month 7 after filtering: 1228419


                                                                                



                                                                                

Number of records for month 8 before filtering: 3152677


                                                                                

Number of records for month 8 after filtering: 1234481


                                                                                



                                                                                

Number of records for month 9 before filtering: 3183767


                                                                                

Number of records for month 9 after filtering: 1254802


                                                                                



                                                                                

Number of records for month 10 before filtering: 3675411


                                                                                

Number of records for month 10 after filtering: 1457803


                                                                                



                                                                                

Number of records for month 11 before filtering: 3252717


                                                                                

Number of records for month 11 after filtering: 1392964


                                                                                



                                                                                

In [22]:
print(f"Total number of records before filtering: {before_filt_sum}")
      
print(f"Total number of records after filtering: {after_filt_sum}")

Total number of records before filtering: 23585385
Total number of records after filtering: 9355380


### Checking out the curated data

In [6]:
sdf_yellow_all = spark.read.parquet(f"{yellow_curated_dir}*")

sdf_yellow_all.count()

332627

In [41]:
sdf_yellow_all.orderBy("year", "month", "day_of_month", "hour", "pulocationid").where(F.col('is_public_holiday') == True).show(10)

+----+-----+------------+----+-----------+------------+--------------------+------------------+--------------------+--------------+----------+-----------------+-------------------+--------------------+
|year|month|day_of_month|hour|day_of_week|pulocationid|avg_usd_per_1/5_mile|avg_usd_per_minute|number_of_trips_here|is_yellow_taxi|is_weekday|is_public_holiday|number_of_trips_nyc|  zone_profitability|
+----+-----+------------+----+-----------+------------+--------------------+------------------+--------------------+--------------+----------+-----------------+-------------------+--------------------+
|2022|    5|          30|   0|          2|           4|  1.5698860765527431|1.6812467412467413|                   3|          true|      true|             true|               1028|0.009487741686185266|
|2022|    5|          30|   0|          2|           7|  1.5025081969857867|1.5670594162569307|                   3|          true|      true|             true|               1028|0.0089578821

In [39]:
sdf_yellow_all.orderBy("year", "month", "day_of_month", "hour", "pulocationid").tail(5)
                        

[Row(year=2022, month=11, day_of_month=30, hour=23, day_of_week=4, pulocationid=249, avg_usd_per_1/5_mile=1.496743223535352, avg_usd_per_minute=1.5574090991149905, number_of_trips_here=70, is_yellow_taxi=True, is_weekday=True, is_public_holiday=False, number_of_trips_nyc=1914, zone_profitability=0.11169836080748377),
 Row(year=2022, month=11, day_of_month=30, hour=23, day_of_week=4, pulocationid=255, avg_usd_per_1/5_mile=1.0586806332408358, avg_usd_per_minute=1.2296842789922902, number_of_trips_here=3, is_yellow_taxi=True, is_weekday=True, is_public_holiday=False, number_of_trips_nyc=1914, zone_profitability=0.0035867788592995704),
 Row(year=2022, month=11, day_of_month=30, hour=23, day_of_week=4, pulocationid=261, avg_usd_per_1/5_mile=1.4467818263681513, avg_usd_per_minute=1.872403795657228, number_of_trips_here=10, is_yellow_taxi=True, is_weekday=True, is_public_holiday=False, number_of_trips_nyc=1914, zone_profitability=0.01734161766993406),
 Row(year=2022, month=11, day_of_month=30

## Create geometry dataframe
This will be used for merging geometry with location IDs

In [23]:
import pandas as pd
import geopandas as gpd

In [24]:
# sf stands for shape file
sf = gpd.read_file("../data/raw/tlc/taxi_zones/taxi_zones.shp")
zones = pd.read_parquet("../data/raw/tlc/taxi_zones/taxi_zone_lookup.parquet")

sf.head()

Unnamed: 0,OBJECTID,Shape_Leng,Shape_Area,zone,LocationID,borough,geometry
0,1,0.116357,0.000782,Newark Airport,1,EWR,"POLYGON ((933100.918 192536.086, 933091.011 19..."
1,2,0.43347,0.004866,Jamaica Bay,2,Queens,"MULTIPOLYGON (((1033269.244 172126.008, 103343..."
2,3,0.084341,0.000314,Allerton/Pelham Gardens,3,Bronx,"POLYGON ((1026308.770 256767.698, 1026495.593 ..."
3,4,0.043567,0.000112,Alphabet City,4,Manhattan,"POLYGON ((992073.467 203714.076, 992068.667 20..."
4,5,0.092146,0.000498,Arden Heights,5,Staten Island,"POLYGON ((935843.310 144283.336, 936046.565 14..."


In [25]:
# Convert the geometry shape to to latitude and longitude
# Taken from tute2 for MAST300034 Semester 2 2023
sf['geometry'] = sf['geometry'].to_crs(crs="+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs") #crs="+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs"


sf.columns= sf.columns.str.lower()

sf.head()


Unnamed: 0,objectid,shape_leng,shape_area,zone,locationid,borough,geometry
0,1,0.116357,0.000782,Newark Airport,1,EWR,"POLYGON ((-74.18445 40.69500, -74.18449 40.695..."
1,2,0.43347,0.004866,Jamaica Bay,2,Queens,"MULTIPOLYGON (((-73.82338 40.63899, -73.82277 ..."
2,3,0.084341,0.000314,Allerton/Pelham Gardens,3,Bronx,"POLYGON ((-73.84793 40.87134, -73.84725 40.870..."
3,4,0.043567,0.000112,Alphabet City,4,Manhattan,"POLYGON ((-73.97177 40.72582, -73.97179 40.725..."
4,5,0.092146,0.000498,Arden Heights,5,Staten Island,"POLYGON ((-74.17422 40.56257, -74.17349 40.562..."


In [26]:
# ensure correct data types
zones['locationid'] = zones['locationid'].astype(int)

zones['borough'] = zones['borough'].astype(str)

zones['zone'] = zones['zone'].astype(str)

zones['service_zone'] = zones['service_zone'].astype(str)

zones.dtypes


locationid       int64
borough         object
zone            object
service_zone    object
dtype: object

In [27]:
# create and clean up geometry dataframe with locationid and geometry combined
gdf = gpd.GeoDataFrame(
    pd.merge(zones, sf, on='locationid', how='inner')
)

gdf = gdf.rename(columns={'borough_x': 'borough', 'zone_x':'zone'}).drop(['objectid', 'shape_leng', 'shape_area', 'zone_y', 'borough_y'], axis=1)

geoJSON = gdf[['locationid', 'geometry']].drop_duplicates('locationid').to_json()




In [28]:
gdf.head()

Unnamed: 0,locationid,borough,zone,service_zone,geometry
0,1,EWR,Newark Airport,EWR,"POLYGON ((-74.18445 40.69500, -74.18449 40.695..."
1,2,Queens,Jamaica Bay,Boro Zone,"MULTIPOLYGON (((-73.82338 40.63899, -73.82277 ..."
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone,"POLYGON ((-73.84793 40.87134, -73.84725 40.870..."
3,4,Manhattan,Alphabet City,Yellow Zone,"POLYGON ((-73.97177 40.72582, -73.97179 40.725..."
4,5,Staten Island,Arden Heights,Boro Zone,"POLYGON ((-74.17422 40.56257, -74.17349 40.562..."


In [29]:
import os
from pyspark.sql import functions as F

if not os.path.exists('../data/curated/'):
    os.makedirs('../data/curated/')
    
if not os.path.exists('../data/curated/tlc'):
    os.makedirs('../data/curated/tlc')
    
if not os.path.exists('../data/curated/tlc/taxi_zones'):
    os.makedirs('../data/curated/tlc/taxi_zones')
    
# save shapefile for merged geometry dataframe

gdf.to_file("../data/curated/tlc/taxi_zones/gdf.shp")

  gdf.to_file("../data/curated/tlc/taxi_zones/gdf.shp")


## External Datasets

### Subway Stations

In [16]:
# make directories for curated layer

import os
from pyspark.sql import functions as F

if not os.path.exists('../data/curated/'):
    os.makedirs('../data/curated/')
    
if not os.path.exists('../data/curated/subway_stations'):
    os.makedirs('../data/curated/subway_stations')



In [12]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType

def extract_long(loc):
    """
    Extracts longitude from location feature for subway data

    """    
    return float(loc.split(',')[1][:-1])

def extract_lat(loc):
    """
    Extracts latitude from location feature for subway data
    """    
    return float(loc.split(',')[0][1:])


extract_long_UDF = F.udf(lambda z: extract_long(z), DoubleType())

extract_lat_UDF = F.udf(lambda z: extract_lat(z), DoubleType())

subway_entr_exit = spark.read.parquet('../data/raw/subway_entr_exit/subway_entr_exit.parquet')

print(f"Number of rows before preprocessing: {subway_entr_exit.count()}")

# Create separate longitude and latitude features
subway_entr_exit = subway_entr_exit.withColumn(
    'longitude',
    extract_long_UDF(F.col('station_location'))
    ).withColumn(
    'latitude',
    extract_lat_UDF(F.col('station_location'))
    ).drop('station_location').toPandas()
    
# Convert to geopandas dataframe
subway_entr_exit = gpd.GeoDataFrame( 
    subway_entr_exit, geometry=gpd.points_from_xy(subway_entr_exit.longitude, subway_entr_exit.latitude, crs="+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs"), 
)

subway_entr_exit = subway_entr_exit.drop(['latitude', 'longitude'], axis=1)



Number of rows before preprocessing: 472


In [27]:
# Spatial join subway locations with locationid from geometry dataframe shapefile
subway_entr_exit = gpd.sjoin(subway_entr_exit, sf, how='inner')

subway_entr_exit = subway_entr_exit[['locationid']]

subway_entr_exit =spark.createDataFrame(subway_entr_exit) 

In [32]:
subway_entr_exit.head(20)


[Row(locationid=246),
 Row(locationid=90),
 Row(locationid=90),
 Row(locationid=90),
 Row(locationid=90),
 Row(locationid=90),
 Row(locationid=68),
 Row(locationid=249),
 Row(locationid=249),
 Row(locationid=249),
 Row(locationid=125),
 Row(locationid=125),
 Row(locationid=211),
 Row(locationid=211),
 Row(locationid=231),
 Row(locationid=231),
 Row(locationid=231),
 Row(locationid=231),
 Row(locationid=231),
 Row(locationid=231)]

In [33]:
# group by number of stations in each taxi zone
subway_entr_exit = subway_entr_exit.groupBy(['locationid']) \
            .agg(
                F.count('locationid').alias('number_of_subway_stations_here')
            )
            


In [125]:
# check that the total number of stations still adds up to 472

def sum_col(df, col):
    """_summary_

    Args:
        df (_type_): _description_
        col (_type_): _description_

    Returns:
        _type_: _description_
    """    
    return df.select(F.sum(col)).collect()[0][0]

# check that total number of subway stations remained same
sum_col(subway_entr_exit, 'number_of_subway_stations_here')
            
        

472

In [None]:
print(f"Number of rows after preprocessing: {subway_entr_exit.count()}")

In [34]:


subway_entr_exit \
        .coalesce(1) \
        .write \
        .mode('overwrite') \
        .parquet(f"../data/curated/subway_stations/subway_stations.parquet")

Shape after preprocessing: 153


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 53780)
Traceback (most recent call last):
  File "/usr/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/home/jlafontaine/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/home/jlafontaine/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/home/jlafontaine/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 257, in accum_updates
    n

Py4JError: An error occurred while calling o7153.parquet

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/jlafontaine/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jlafontaine/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/jlafontaine/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


### Hotel Properties

In [9]:
# make directories for curated layer

import os
from pyspark.sql import functions as F

if not os.path.exists('../data/curated/'):
    os.makedirs('../data/curated/')
    
if not os.path.exists('../data/curated/hotels'):
    os.makedirs('../data/curated/hotels')


In [12]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType

hotels = spark.read.parquet('../data/raw/hotels/hotels.parquet')

print(f"Number of rows before preprocessing: {hotels.count()}")

hotels = hotels.where(
    (F.col('taxyear') == 2022) # only include hotels counted under the 2022 financial year
).drop('taxyear').toPandas()

# convert to geopandas dataframe
hotels = gpd.GeoDataFrame( 
    hotels, geometry=gpd.points_from_xy(hotels.longitude, hotels.latitude, crs="+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs"),
)

hotels = hotels.drop(['latitude', 'longitude'], axis=1)




Number of rows before preprocessing: 5519


In [12]:
hotels.head()

Unnamed: 0,borough,geometry
0,1,POINT (-74.01242 40.70323)
1,1,POINT (-74.01220 40.70274)
2,1,POINT (-74.01264 40.70403)
3,1,POINT (-74.01232 40.70404)
4,1,POINT (-74.01615 40.71481)


In [13]:
# spatial join hotels with shapefile
hotels = gpd.sjoin(hotels, sf, how='inner')

hotels = hotels[['locationid']]

hotels =spark.createDataFrame(hotels) 

hotels.head(20)

In [15]:
# count number of hotels per taxi zone

hotels = hotels.groupBy(['locationid']) \
            .agg(
                F.count('locationid').alias('number_of_hotels_here')
            )
            


In [19]:
hotels.show(20)

+----------+---------------------+
|locationid|number_of_hotels_here|
+----------+---------------------+
|       144|                   15|
|       232|                    1|
|        87|                   23|
|       209|                    9|
|        79|                   15|
|        88|                    5|
|        13|                    3|
|       125|                  396|
|       231|                   12|
|       261|                    8|
|        45|                    8|
|       148|                   30|
|       211|                    4|
|       113|                   18|
|        68|                   32|
|       158|                    9|
|       246|                    4|
|       114|                    7|
|        48|                   42|
|       247|                    2|
+----------+---------------------+
only showing top 20 rows



In [16]:
print(f"Number of rows after preprocessing: {hotels.count()}")

Number of rows after preprocessing: 171


In [51]:


hotels \
        .coalesce(1) \
        .write \
        .mode('overwrite') \
        .parquet(f"../data/curated/hotels/hotels.parquet")

### Airbnb

In [52]:
# make directories for curated layer

import os
from pyspark.sql import functions as F

if not os.path.exists('../data/curated/'):
    os.makedirs('../data/curated/')
    
if not os.path.exists('../data/curated/airbnb'):
    os.makedirs('../data/curated/airbnb')


In [13]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType

airbnb = spark.read.parquet('../data/raw/airbnb/airbnb.parquet')

print(f"Number of rows before preprocessing: {airbnb.count()}")

airbnb = airbnb.toPandas()

airbnb = gpd.GeoDataFrame(
    airbnb, geometry=gpd.points_from_xy(airbnb.longitude, airbnb.latitude, crs="+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs"), 
)

airbnb = airbnb.drop(['latitude', 'longitude'], axis=1)




Number of rows before preprocessing: 41533


In [24]:
airbnb.head()

Unnamed: 0,neighbourhood_group,room_type,daily_price_usd,minimum_nights,reviews_per_month,geometry
0,Brooklyn,Entire home/apt,275,21,0.03,POINT (-73.99454 40.66265)
1,Manhattan,Private room,75,2,0.73,POINT (-73.96751 40.80380)
2,Brooklyn,Private room,60,30,0.3,POINT (-73.95512 40.68535)
3,Manhattan,Private room,68,2,3.38,POINT (-73.98317 40.76457)
4,Manhattan,Entire home/apt,175,30,0.31,POINT (-73.98559 40.75356)


In [14]:
# spatial join with geometry shapefile
airbnb = gpd.sjoin(airbnb, sf, how='inner')

In [26]:
airbnb.head()

Unnamed: 0,neighbourhood_group,room_type,daily_price_usd,minimum_nights,reviews_per_month,geometry,index_right,objectid,shape_leng,shape_area,zone,locationid,borough
0,Brooklyn,Entire home/apt,275,21,0.03,POINT (-73.99454 40.66265),227,228,0.177685,0.000993,Sunset Park West,228,Brooklyn
502,Brooklyn,Entire home/apt,70,18,0.15,POINT (-74.02102 40.64368),227,228,0.177685,0.000993,Sunset Park West,228,Brooklyn
732,Brooklyn,Entire home/apt,88,30,0.07,POINT (-73.99919 40.65724),227,228,0.177685,0.000993,Sunset Park West,228,Brooklyn
776,Brooklyn,Entire home/apt,100,10,0.33,POINT (-73.99279 40.66044),227,228,0.177685,0.000993,Sunset Park West,228,Brooklyn
1083,Brooklyn,Private room,75,7,0.42,POINT (-74.00793 40.64850),227,228,0.177685,0.000993,Sunset Park West,228,Brooklyn


In [16]:
airbnb = airbnb[['daily_price_usd', 'locationid']]
airbnb =spark.createDataFrame(airbnb) 
airbnb.show(20)




In [17]:
# count number of airbnbs per taxi zone and average daily airbnb price for each taxi zone
airbnb = airbnb.groupBy(['locationid']) \
            .agg(
                F.count('locationid').alias('number_of_airbnb_here'),
                F.avg('daily_price_usd').alias('avg_daily_price_usd')
            )
            


In [33]:
airbnb.show(20)

+----------+---------------------+-------------------+
|locationid|number_of_airbnb_here|avg_daily_price_usd|
+----------+---------------------+-------------------+
|       228|                  179| 126.11173184357541|
|       181|                  580| 210.89655172413794|
|       100|                   62|  639.6129032258065|
|        74|                  390| 174.07435897435897|
|        49|                  577| 188.28769497400347|
|       163|                  289|  616.9653979238755|
|       256|                  802|  182.9501246882793|
|        24|                  267| 140.79026217228466|
|       255|                  678| 220.86725663716814|
|        48|                  882|  270.2596371882086|
|       225|                 1209| 136.13316790736147|
|        75|                  382|  198.7198952879581|
|        97|                  315| 192.57777777777778|
|       229|                  283|  305.9257950530035|
|       145|                  195|              267.2|
|        7

In [18]:
print(f"Number of rows after preprocessing: {airbnb.count()}")

Number of rows after preprocessing: 244


                                                                                

In [53]:


airbnb \
        .coalesce(1) \
        .write \
        .mode('overwrite') \
        .parquet(f"../data/curated/airbnb/airbnb.parquet")

### Census Data

In [30]:
# make directories for curated layer

import os
from pyspark.sql import functions as F

if not os.path.exists('../data/curated/'):
    os.makedirs('../data/raw/')
    
if not os.path.exists('../data/curated/census'):
    os.makedirs('../data/curated/census')


In [31]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, LongType

def remove_last_4(number):
    """
    Removes the last 4 digits from the census block codes to obtain the census tract code 
    """    
    return int(str(number)[:-4])


remove_last_4_UDF = F.udf(lambda z: remove_last_4(z), LongType())

census_block_loc = spark.read.parquet('../data/raw/census/census_block_loc.parquet')

census_tracts = spark.read.parquet('../data/raw/census/census_tracts.parquet')

print(f"Number of rows before preprocessing: {census_tracts.count()}")

# Estimate the central longitude and latitude of census tracts by averaging the longitude and latitude of the blocks contained within each tract
census = census_block_loc.withColumn(
    'censustract',
    remove_last_4_UDF(F.col('blockcode'))
).drop(
    'blockcode'
).groupBy("censustract") \
                    .agg(
                        F.avg("latitude").alias("latitude"),
                        F.avg("longitude").alias("longitude")
                    ).join(census_tracts, "censustract", "inner").toPandas()

# convert to geopandas dataframe                    
census = gpd.GeoDataFrame(
    census, geometry=gpd.points_from_xy(census.longitude, census.latitude, crs="+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs")
)

census = census.drop(['latitude', 'longitude'], axis=1)




Number of rows before preprocessing: 2167


                                                                                

In [32]:
census.head()

Unnamed: 0,censustract,borough,totalpop,incomepercap,poverty,professional,service,office,construction,production,...,carpool,transit,walk,workathome,meancommute,privatework,publicwork,selfemployed,unemployment,geometry
0,36047016400,Brooklyn,654,31448.0,8.6,41.0,40.4,9.3,0.0,9.3,...,16.3,15.2,12.5,9.9,21.4,63.9,26.8,9.3,3.2,POINT (-74.03004 40.60602)
1,36047037700,Brooklyn,4240,25055.0,25.7,40.7,18.4,27.9,3.0,9.9,...,5.4,66.2,2.7,6.2,43.3,77.6,16.0,6.4,11.4,POINT (-73.92015 40.68427)
2,36081027000,Queens,2080,17577.0,29.3,21.2,49.2,11.0,11.0,7.4,...,3.5,50.6,3.8,1.2,47.1,67.6,28.9,3.5,16.1,POINT (-73.79035 40.68879)
3,36081021200,Queens,2362,17468.0,22.7,10.4,36.4,20.6,20.2,12.4,...,3.9,68.8,9.8,1.0,46.2,84.6,7.9,7.5,15.0,POINT (-73.81082 40.70116)
4,36081024000,Queens,6087,18458.0,24.8,20.5,33.2,22.3,14.4,9.6,...,0.8,73.2,6.3,0.7,45.2,81.7,10.3,8.0,18.5,POINT (-73.80513 40.70387)


In [33]:
# spatial join with geometry shapefile
census = gpd.sjoin(census, sf, how='inner')

census.head()

Unnamed: 0,censustract,borough_left,totalpop,incomepercap,poverty,professional,service,office,construction,production,...,selfemployed,unemployment,geometry,index_right,objectid,shape_leng,shape_area,zone,locationid,borough_right
0,36047016400,Brooklyn,654,31448.0,8.6,41.0,40.4,9.3,0.0,9.3,...,9.3,3.2,POINT (-74.03004 40.60602),13,14,0.175214,0.001382,Bay Ridge,14,Brooklyn
13,36047016200,Brooklyn,2560,39457.0,11.0,54.3,13.5,23.3,6.6,2.3,...,4.0,5.9,POINT (-74.03053 40.61387),13,14,0.175214,0.001382,Bay Ridge,14,Brooklyn
29,36047006200,Brooklyn,2818,30471.0,17.7,56.0,16.5,17.6,3.1,7.0,...,5.2,12.6,POINT (-74.03148 40.62472),13,14,0.175214,0.001382,Bay Ridge,14,Brooklyn
191,36047016000,Brooklyn,4475,32414.0,20.7,44.7,21.8,21.6,3.4,8.5,...,3.0,9.5,POINT (-74.02673 40.61907),13,14,0.175214,0.001382,Bay Ridge,14,Brooklyn
288,36047005201,Brooklyn,1722,50110.0,26.2,67.6,15.6,13.1,2.0,1.7,...,4.8,7.0,POINT (-74.03940 40.62171),13,14,0.175214,0.001382,Bay Ridge,14,Brooklyn


In [34]:
# drop useless features
census = census.drop(['censustract', 'borough_left', 'geometry', 'index_right', 'objectid', 'shape_leng', 'shape_area', 'zone', 'borough_right'], axis=1)
census =spark.createDataFrame(census) 
census.show(20)

+--------+------------+-------+------------+-------+------+------------+----------+-----+-------+-------+----+----------+-----------+-----------+----------+------------+------------+----------+
|totalpop|incomepercap|poverty|professional|service|office|construction|production|drive|carpool|transit|walk|workathome|meancommute|privatework|publicwork|selfemployed|unemployment|locationid|
+--------+------------+-------+------------+-------+------+------------+----------+-----+-------+-------+----+----------+-----------+-----------+----------+------------+------------+----------+
|     654|     31448.0|    8.6|        41.0|   40.4|   9.3|         0.0|       9.3| 46.0|   16.3|   15.2|12.5|       9.9|       21.4|       63.9|      26.8|         9.3|         3.2|        14|
|    2560|     39457.0|   11.0|        54.3|   13.5|  23.3|         6.6|       2.3| 29.2|    4.5|   54.4| 6.8|       4.4|       40.4|       74.9|      21.1|         4.0|         5.9|        14|
|    2818|     30471.0|   17.7

In [35]:
# retain total pop for each taxi zone, and averages of employment and transportation statistics
census = census.groupBy(['locationid']) \
            .agg(
                F.sum('totalpop').alias('total_pop'),
                F.avg('incomepercap').alias('avg_income_per_cap'),
                F.avg('poverty').alias('poverty_%'),
                F.avg('professional').alias('professional_%'),
                F.avg('service').alias('service_%'),
                F.avg('office').alias('office_%'),
                F.avg('construction').alias('construction_%'),
                F.avg('production').alias('production_%'),
                F.avg('drive').alias('drive_%'),
                F.avg('carpool').alias('carpool_%'),
                F.avg('transit').alias('transit_%'),
                F.avg('walk').alias('walk_%'),
                F.avg('workathome').alias('work_at_home_%'),
                F.avg('meancommute').alias('avg_commute_mins'),
                F.avg('privatework').alias('private_work_%'),
                F.avg('publicwork').alias('public_work_%'),
                F.avg('selfemployed').alias('self_employed_%'),
                F.avg('unemployment').alias('unemployment_%'),
            )
            


In [36]:
census.show(20)

23/08/21 04:27:09 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+---------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+
|locationid|total_pop|avg_income_per_cap|         poverty_%|    professional_%|         service_%|          office_%|    construction_%|      production_%|           drive_%|         carpool_%|         transit_%|            walk_%|    work_at_home_%|  avg_commute_mins|   private_work_%|     public_work_%|   self_employed_%|    unemployment_%|
+----------+---------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+----

In [37]:
print(f"Number of rows after preprocessing: {census.count()}")    


Number of rows after preprocessing: 252


In [38]:

census \
        .coalesce(1) \
        .write \
        .mode('overwrite') \
        .parquet(f"../data/curated/census/census.parquet")


### Parking Munimeters

In [110]:
# make directories for curated layer

import os
from pyspark.sql import functions as F

if not os.path.exists('../data/curated/'):
    os.makedirs('../data/parking/')
    
if not os.path.exists('../data/curated/parking'):
    os.makedirs('../data/curated/parking')


In [39]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType

parking = spark.read.parquet('../data/raw/parking/parking.parquet')

print(f"Number of rows before preprocessing: {parking.count()}")

# retain parking meters which are currently listed as 'active'
parking = parking.where(F.col('status') == "Active").drop('status').toPandas()

# convert to geopandas dataframe
parking = gpd.GeoDataFrame( 
    parking, geometry=gpd.points_from_xy(parking.longitude, parking.latitude, crs="+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs"),  
)

parking = parking.drop(['latitude', 'longitude'], axis=1)



Number of rows before preprocessing: 13415


In [112]:
parking.head()

Unnamed: 0,borough,geometry
0,Manhattan,POINT (-73.97660 40.78751)
1,Manhattan,POINT (-73.98694 40.74620)
2,Manhattan,POINT (-73.95643 40.77849)
3,Queens,POINT (-73.81484 40.73544)
4,Manhattan,POINT (-73.98251 40.73990)


In [27]:
# spatial join with shapefile
parking = gpd.sjoin(parking, sf, how='inner')

In [114]:
parking.head()

Unnamed: 0,borough_left,geometry,index_right,objectid,shape_leng,shape_area,zone,locationid,borough_right
0,Manhattan,POINT (-73.97660 40.78751),238,239,0.063626,0.000205,Upper West Side South,239,Manhattan
720,Manhattan,POINT (-73.97979 40.78439),238,239,0.063626,0.000205,Upper West Side South,239,Manhattan
856,Manhattan,POINT (-73.97808 40.78668),238,239,0.063626,0.000205,Upper West Side South,239,Manhattan
1037,Manhattan,POINT (-73.97805 40.78616),238,239,0.063626,0.000205,Upper West Side South,239,Manhattan
1135,Manhattan,POINT (-73.97671 40.78782),238,239,0.063626,0.000205,Upper West Side South,239,Manhattan


In [28]:
parking = parking[['locationid']]

parking =spark.createDataFrame(parking) 

parking.show(20)

+----------+
|locationid|
+----------+
|       239|
|       239|
|       239|
|       239|
|       239|
|       239|
|       239|
|       239|
|       239|
|       239|
|       239|
|       239|
|       239|
|       239|
|       239|
|       239|
|       239|
|       239|
|       239|
|       239|
+----------+
only showing top 20 rows



In [29]:
# count number of parking munimeters in each taxi zone
parking = parking.groupBy(['locationid']) \
            .agg(
                F.count('locationid').alias('number_of_parking_munimeters_here'),
            )
            


In [119]:
parking.show(20)

+----------+---------------------------------+
|locationid|number_of_parking_munimeters_here|
+----------+---------------------------------+
|        94|                               45|
|       107|                              164|
|       239|                               95|
|       170|                              148|
|       236|                              107|
|        48|                              143|
|        61|                               83|
|       164|                              123|
|        75|                               77|
|       135|                               62|
|       157|                               50|
|        95|                              208|
|        41|                               79|
|       252|                               28|
|        89|                              196|
|       182|                               74|
|       129|                              213|
|        97|                               34|
|       159| 

In [31]:
print(f"Number of rows after preprocessing: {parking.count()}")

Number of rows after preprocessing: 222


In [120]:


parking \
        .coalesce(1) \
        .write \
        .mode('overwrite') \
        .parquet(f"../data/curated/parking/parking.parquet")

### Combining External Datasets with Taxi Data

In [40]:
sdf_yellow_all = spark.read.parquet('../data/curated/tlc/yellow/*')

airbnb = spark.read.parquet('../data/curated/airbnb/airbnb.parquet')

census = spark.read.parquet('../data/curated/census/census.parquet')

hotels = spark.read.parquet('../data/curated/hotels/hotels.parquet')

parking = spark.read.parquet('../data/curated/parking/parking.parquet')

subway_stations = spark.read.parquet('../data/curated/subway_stations/subway_stations.parquet')

In [41]:
combined = sdf_yellow_all.withColumnRenamed(
    'pulocationid', 'locationid'
    ).join(
    airbnb, 'locationid', 'left'
    ).join(
        hotels, 'locationid', 'left'
    ).join(
        parking, 'locationid', 'left'
    ).join(
        subway_stations, 'locationid', 'left'
    ).join(
    census, 'locationid', 'left'
    ).withColumnRenamed(
        'avg_daily_price_usd', 'avg_daily_airbnb_price_usd'
    )
    
combined.count()
    





    
 

332627

In [42]:
combined.limit(5)

locationid,year,month,day_of_month,hour,day_of_week,is_weekday,is_public_holiday,avg_usd_per_1/5_mile,avg_usd_per_minute,number_of_trips_here,number_of_trips_nyc,zone_profitability,number_of_airbnb_here,avg_daily_airbnb_price_usd,number_of_hotels_here,number_of_parking_munimeters_here,number_of_subway_stations_here,total_pop,avg_income_per_cap,poverty_%,professional_%,service_%,office_%,construction_%,production_%,drive_%,carpool_%,transit_%,walk_%,work_at_home_%,avg_commute_mins,private_work_%,public_work_%,self_employed_%,unemployment_%
237,2022,5,12,18,5,True,False,2.2375015498102404,1.596395406518293,258,3995,989.1454147327617,82,585.4146341463414,17,181,5,19408,170575.66666666666,4.733333333333333,76.81666666666668,3.35,18.33333333333333,0.75,0.75,12.183333333333335,2.25,30.75,30.7,12.916666666666666,23.149999999999995,82.26666666666667,5.333333333333333,11.45,3.4500000000000006
237,2022,5,4,18,4,True,False,2.303214582246571,1.6567343527728342,249,3983,986.027284819832,82,585.4146341463414,17,181,5,19408,170575.66666666666,4.733333333333333,76.81666666666668,3.35,18.33333333333333,0.75,0.75,12.183333333333335,2.25,30.75,30.7,12.916666666666666,23.149999999999995,82.26666666666667,5.333333333333333,11.45,3.4500000000000006
237,2022,5,17,17,3,True,False,2.749651557725555,1.522720939792324,229,3094,978.3733019315944,82,585.4146341463414,17,181,5,19408,170575.66666666666,4.733333333333333,76.81666666666668,3.35,18.33333333333333,0.75,0.75,12.183333333333335,2.25,30.75,30.7,12.916666666666666,23.149999999999995,82.26666666666667,5.333333333333333,11.45,3.4500000000000006
161,2022,5,4,19,4,True,False,1.9862360280996771,1.5318345925392174,278,3452,978.0236325376128,150,592.8,82,145,2,1404,95716.25,6.025,64.925,18.075,10.275,5.4,1.3,4.25,0.0,21.975,52.85,12.875,,87.65,2.925,9.425,2.575
161,2022,5,10,19,3,True,False,1.9896948754319328,1.4385266658933749,285,3386,977.0431392777126,150,592.8,82,145,2,1404,95716.25,6.025,64.925,18.075,10.275,5.4,1.3,4.25,0.0,21.975,52.85,12.875,,87.65,2.925,9.425,2.575


In [43]:
sdf_yellow_all.count()


332627

In [44]:
combined \
        .coalesce(1) \
        .write \
        .mode('overwrite') \
        .parquet(f"../data/curated/combined.parquet")

                                                                                

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 40062)
Traceback (most recent call last):
  File "/usr/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/home/jlafontaine/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/home/jlafontaine/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/home/jlafontaine/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 257, in accum_updates
    n