## Preprocess Data from Raw to Curated
This notebook preprocesses the raw data, filtering and adding derived columns and other transformations, such that the curated data is then ready for analysis
### Load constants and libraries
First of all we load any constant values for use within the notebook, and load any libraries required, and initiate a spark object. 

In [2]:
# import all constants used in the note books
from constants import *

# libraries required
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from typing import Callable
from pyspark.sql import functions as F
import datetime
import pandas as pd

import geopandas as gpd
from shapely.geometry import Point


In [201]:
# Create a spark session 
spark = (
    SparkSession.builder.appName("MAST30034 Project 1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config('spark.driver.memory', '4g')
    .config('spark.executor.memory', '2g')
    .getOrCreate()
)

your 131072x1 screen size is bogus. expect trouble
23/08/18 13:38:15 WARN Utils: Your hostname, DESKTOP-VP2PCTV resolves to a loopback address: 127.0.1.1; using 172.21.252.215 instead (on interface eth0)
23/08/18 13:38:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/18 13:38:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/08/18 13:38:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### TLC Data

In [203]:
def apply_filter(data: DataFrame, filter_function: Callable) -> DataFrame:
    '''
    Helper function to apply a filter to remove outliers (or for any other filtering purpose)
    Arguments:
        data = pySpark DataFrame
        filter_function = a series of predicates used to filter the data
    Ouput: returns a pySpark DataFrame
    '''
    
    # apply the filter and report how many records were removed
    rows_before_filter = data.count()
    data = data.where(filter_function)
    print(f'applied filter{str(filter_function)}, rows before:{rows_before_filter}, rows after:{data.count()}')
    return data


def prepare_curated_tlc_data(source: str, keep_cols: list[str]) -> None:
    '''
    Retrieves raw TLC data, combining separate monthly data into one dataframe, and renames 
    and filters columns of interest. Saves this curated data down accordingly
    Arguments:
        source = TLC data source (yellow, green, or fhvhv)
        keep_cols = columns of interest that are to be kept for analysis, all other columns discarded
    Ouput: None
    '''
    
    # get raw data
    all_data = spark.read.parquet(f'{RAW_DATA}{source}/*')
    print(f'Number of raw records for {source}: {str(all_data.count())}')

    # select columns of interest
    all_data = all_data.select(*keep_cols)
    
    # add derived columns
    # adding trip time in minutes
    all_data = all_data.withColumn('trip_time', 
        F.round((all_data.dropoff_datetime- all_data.pickup_datetime).cast('long')/60,2))
    
    # adding temporal derived features
    all_data = all_data.select('*',
        F.year("pickup_datetime").alias('year'), 
        F.month("pickup_datetime").alias('month'), 
        F.dayofmonth("pickup_datetime").alias('day'), 
        F.dayofweek("pickup_datetime").alias('day_of_week'),
        F.hour("pickup_datetime").alias('hour')        
    )
    
    # adding an identifier for data source, as curated data will combine all sources
    # into one dataframe
    all_data=all_data.withColumn('source',F.lit(source))
    
    # filter out records with suspect data, do this one by one so we can establish
    # impact of filters. Date range is hardcoded - could be made dynamic
    all_data = apply_filter(all_data, (F.col('fare') > 0) & (F.col('fare') < MAX_FARE))
    all_data = apply_filter(all_data, (F.col('trip_distance') > 0) & (F.col('trip_distance') < MAX_DISTANCE))
    all_data = apply_filter(all_data, (F.col('trip_time') > 0) & (F.col('trip_time') < MAX_TIME))
    all_data = apply_filter(all_data, (F.col('pickup_id') > 0) & (F.col('pickup_id') < MAX_LOCATION_ID))
    all_data = apply_filter(all_data, (F.col("pickup_datetime") >= datetime.datetime(2020, 5, 1, 0, 0, 0)) \
        & (F.col("pickup_datetime") < datetime.datetime(2023, 6, 1, 0, 0, 0)))
    
    # save the curated data
    all_data.write.mode('overwrite').parquet(CURATED_DATA+source)
    print(f'Number of curated records for {source}: {str(all_data.count())}')

In [206]:
# curate the data for all TLC sources
keep_cols = ['pickup_datetime', 'dropoff_datetime', 'trip_distance', 'pickup_id', 'fare']
prepare_curated_tlc_data('yellow', keep_cols)
prepare_curated_tlc_data('green', keep_cols)
prepare_curated_tlc_data('fhvhv', keep_cols)

Number of raw records for yellow: 95445749


                                                                                

applied filterColumn<'((fare > 0) AND (fare < 500))'>, rows before:95445749, rows after:94829572


                                                                                

applied filterColumn<'((trip_distance > 0) AND (trip_distance < 100))'>, rows before:94829572, rows after:93551616


                                                                                

applied filterColumn<'((trip_time > 0) AND (trip_time < 300))'>, rows before:93551616, rows after:93340310


                                                                                

applied filterColumn<'((pickup_id > 0) AND (pickup_id < 264))'>, rows before:93340310, rows after:92284232


                                                                                

applied filterColumn<'((pickup_datetime >= TIMESTAMP '2020-04-30 14:00:00') AND (pickup_datetime < TIMESTAMP '2023-05-31 14:00:00'))'>, rows before:92284232, rows after:92215295


                                                                                

Number of curated records for yellow: 92215295
Number of raw records for green: 2877421
applied filterColumn<'((fare > 0) AND (fare < 500))'>, rows before:2877421, rows after:2864204
applied filterColumn<'((trip_distance > 0) AND (trip_distance < 100))'>, rows before:2864204, rows after:2734280
applied filterColumn<'((trip_time > 0) AND (trip_time < 300))'>, rows before:2734280, rows after:2724395


                                                                                

applied filterColumn<'((pickup_id > 0) AND (pickup_id < 264))'>, rows before:2724395, rows after:2719252


                                                                                

applied filterColumn<'((pickup_datetime >= TIMESTAMP '2020-04-30 14:00:00') AND (pickup_datetime < TIMESTAMP '2023-05-31 14:00:00'))'>, rows before:2719252, rows after:2718001


                                                                                

Number of curated records for green: 2718001
Number of raw records for fhvhv: 566168421


                                                                                

applied filterColumn<'((fare > 0) AND (fare < 500))'>, rows before:566168421, rows after:565300188


                                                                                

applied filterColumn<'((trip_distance > 0) AND (trip_distance < 100))'>, rows before:565300188, rows after:565131616


                                                                                

applied filterColumn<'((trip_time > 0) AND (trip_time < 300))'>, rows before:565131616, rows after:565104793


                                                                                

applied filterColumn<'((pickup_id > 0) AND (pickup_id < 264))'>, rows before:565104793, rows after:565071103


                                                                                

applied filterColumn<'((pickup_datetime >= TIMESTAMP '2020-04-30 14:00:00') AND (pickup_datetime < TIMESTAMP '2023-05-31 14:00:00'))'>, rows before:565071103, rows after:564757294




Number of curated records for fhvhv: 564757294


                                                                                

### Weather Data

In [5]:
def temporal_decomp(data: pd.DataFrame, datetime_col: str) -> None:
     '''
    Generates derived temporal features from a provided datetime column
    Arguments:
        data = pandas DataFrame
        datetime_col = the name of the column derive temporal features from
    Ouput: None
    '''
        
    data['year'] = data[datetime_col].dt.year
    data['month'] = data[datetime_col].dt.month
    data['day'] = data[datetime_col].dt.day
    data['hour'] = data[datetime_col].dt.hour

In [7]:
weather_data = pd.read_pickle(f'{RAW_DATA}weather_data.pkl')

# check distribution of variables
weather_data.describe()

# weather data just requires the addition of derived temporal features
temporal_decomp(weather_data, 'valid')
weather_data.to_pickle(f'{CURATED_DATA}weather_data.pkl')

Unnamed: 0,valid,tmpc,sped,p01m
count,18229,18229.0,18229.0,18229.0
mean,2022-05-15 23:29:02.758242048,13.556579,11.404487,0.137378
min,2021-05-01 00:51:00,-15.0,0.0,0.0
25%,2021-11-06 22:51:00,6.72,6.9,0.0
50%,2022-05-15 20:51:00,13.28,10.35,0.0
75%,2022-11-22 00:51:00,21.11,14.95,0.0
max,2023-05-30 23:51:00,35.0,40.25,21.84
std,,9.077536,5.935191,0.788282


### Shapefile Data

In [None]:
# this code is taken from MAST30034 Tutorial 2 (https://github.com/VoLKyyyOG/MAST30034_Python/blob/main/tutorials/tute_2/Tute2_Python.ipynb)

# code to build a geopandas DataFrame linking TLC locationIDs with a polygon of longitude, latitude coordinates enclosing that zone
sf = gpd.read_file(f'{TAXI_ZONE_DATA}taxi_zones.shp')
zones = pd.read_csv("../data/taxi_zones/taxi+_zone_lookup.csv")
sf['geometry'] = sf['geometry'].to_crs("+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs")
gdf = gpd.GeoDataFrame(
    pd.merge(zones, sf, on='LocationID', how='inner')
)
gdf.columns = map(str.lower, gdf.columns)
gdf=gpd.GeoDataFrame(gdf.T.drop_duplicates().T)

# save the gdf file for use in the initial_analysis notebook
gdf.to_file(f'{TAXI_ZONE_DATA}gdf.shp')

### Events Data
In this section the provided longitude, latitude coordinates for the events data are mapped back to TLC locationIDs. Note that the events data is for the greater New York area, so any that fall outside the TLC taxi zones are filtered out of the data set. In addition some of the provided data is inaccurate, with coordinates just sitting outside dry land (in bodies of water). <br>
Further to this the events data provides a 'create_time' and a 'close_time'. After examining the dataset it appears that the create_time for full or recent events is the open time of the event in question (and no close_time is given), whereas for past events the close_time appears to be the end time for the event but the create_time may actually be the time that the event was input into the database. For this reason the events data is split up into past and full events, in order to best estimate when the event actually occurred.


In [181]:
def find_zone(location: tuple[float, float], gdf: gpd.GeoDataFrame) -> str:    
     '''
    For a given coordinate tuple (longitude, latitude) this function returns the TLC locationID,
    if the coordinates are within the TLC zones. 
    Arguments:
        location = coordinate tuple (longitude, latitude)
        gdf = geopandas DataFrame containing individual zones in a 'geometry' column
    Ouput: the TLC locationID, if the coordinates are within the TLC zones
    '''    
    
    # locations should be longitude, latitude
    check_point = Point(location)
    
    # search each zone looking for a match
    for row, area in enumerate(gdf['geometry']):
        if gpd.GeoSeries(area).contains(check_point).sum():
            return gdf.loc[row, 'locationid']
    return 'None'


In [209]:
events_data = pd.read_pickle(f'{RAW_DATA}events_data.pkl')

# get locationid - note many of the events are outside the TLC taxi zones so will return None, in 
# addition some of the coordinates provided are just outside zones and in the river etc. 
events_data["locationid"] = events_data.apply(lambda x: find_zone((x['longitude'], x['latitude']), gdf), axis=1)
events_data = events_data[events_data['locationid']!='None']
events_data['locationid'] = events_data['locationid'].apply(pd.to_numeric)

# split events into those that have an end time (full_records), 
# and those for which an end time is not yet recorded (future_records)
last_full_record = events_data['close_time'].max()
full_records = events_data[events_data['close_time']<=last_full_record]
future_records = events_data[events_data['create_time']>last_full_record]

# generate the event end time for full_records on close_time, and for future_records on create_time
temporal_decomp(full_records, 'close_time')
temporal_decomp(future_records, 'create_time')
# records with a start time of midnight are assumed to not have had a correct time input
future_records = future_records[future_records['hour']!=0]

# assume peak demand for taxis is around the end of an event +/- 1 hour
full_records['start_hour'] = full_records['hour']-1
full_records['end_hour'] = full_records['hour']+1

# for future events we assume the create_time is the start of the event, so assume peak taxi demand
# is 2-4 hours after this
future_records['start_hour'] = future_records['hour']+2
future_records['end_hour'] = future_records['hour']+4
future_records['hour'] = future_records['hour']+3

events_data = pd.concat([full_records, future_records])
# set the taxi 
events_data['start_hour'] = events_data['hour']-1
events_data['end_hour'] = events_data['hour']+1

events_data.to_pickle(f'{CURATED_DATA}events_data.pkl')


POINT (-73.961164 40.742956)
POINT (-73.986571 40.742808)
POINT (-73.976593 40.683358)
POINT (-73.976593 40.683358)
POINT (-73.976593 40.683358)
POINT (-74.167145 40.740331)
POINT (-74.033553 40.737904)
POINT (-73.961164 40.742956)
POINT (-73.91974 40.7662)
POINT (-73.976593 40.683358)
POINT (-73.976593 40.683358)
POINT (-73.976593 40.683358)
POINT (-73.980393 40.759902)
POINT (-74.034204 40.609907)
POINT (-73.976593 40.683358)
POINT (-74.01509 40.70091)
POINT (-73.99219 40.75002)
POINT (-74.01509 40.70091)
POINT (-73.976593 40.683358)
POINT (-74.068414 40.81033)
POINT (-74.034204 40.609907)
POINT (-73.99219 40.75002)
POINT (-74.01509 40.70091)
POINT (-73.99219 40.75002)
POINT (-73.980393 40.759902)
POINT (-73.975109 40.724856)
POINT (-73.981478 40.576873)
POINT (-73.941709 40.798657)
POINT (-74.00194 40.706015)
POINT (-74.00194 40.706015)
POINT (-74.00194 40.706015)
POINT (-73.726249 40.711673)
POINT (-73.976593 40.683358)
POINT (-74.00194 40.706015)
POINT (-73.926124 40.827674)
POINT

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data['year'] = data[datetime_col].dt.year
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data['month'] = data[datetime_col].dt.month
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data['day'] = data[datetime_col].dt.day
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .l

In [9]:
events_data = pd.read_pickle(f'{RAW_DATA}events_data.pkl')
print(events_data.shape)
events_data = pd.read_pickle(f'{CURATED_DATA}events_data.pkl')
print(events_data.shape)

(3311, 6)
(1855, 13)
