# Introduction

This notebook is a copy of `1_etl.ipynb` with the modification of increasing the number of unique values for the variables:
* [`VendorID`](#Validation:-VendorID)
* [`PULocationID`](#Validation:-PULocationID)
* [`DOLocationID`](#Validation:-DOLocationID)

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from dotenv import find_dotenv
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import SparkContext

import pyspark.sql.functions as F
from pyspark.sql.functions import when
from pyspark.sql.types import (
    IntegerType,
    DateType,
    FloatType,
    StringType,
    TimestampType 
)

# from src.data.utils import count_missing

In [3]:
project_dir = Path().cwd().parent
data_dir = project_dir / 'data'
raw_data_dir = data_dir / 'raw'
interim_data_dir = data_dir / 'interim'
processed_data_dir = data_dir / 'processed'
reports_dir = project_dir / 'reports'

In [4]:
spark = (
    SparkSession
    .builder
    .master('local[12]')
    .appName('new_york_taxis')
    .getOrCreate()
)

In [5]:
spark

In [6]:
spark._jvm.org.apache.hadoop.util.VersionInfo.getVersion()

'3.0.0'

In [7]:
spark.sparkContext._conf.getAll()

[('spark.driver.extraJavaOptions',
  '"-Dio.netty.tryReflectionSetAccessible=true"'),
 ('spark.driver.memory', '22g'),
 ('spark.driver.port', '42577'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.host', 'd5681308c4ca'),
 ('spark.app.name', 'new_york_taxis'),
 ('spark.executor.extraJavaOptions',
  '"-Dio.netty.tryReflectionSetAccessible=true"'),
 ('spark.master', 'local[12]'),
 ('spark.app.id', 'local-1619350536246'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.debug.maxToStringFields', '1000')]

In [8]:
conf = spark.sparkContext._conf.setAll([
    ('spark.driver.memory', '16g'),
    ('spark.executor.memory', '16g'),
    ('spark.app.name', 'new_york_taxis'),
])
spark.sparkContext.stop()
spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [9]:
spark.sparkContext._conf.getAll()

[('spark.driver.extraJavaOptions',
  '"-Dio.netty.tryReflectionSetAccessible=true"'),
 ('spark.driver.port', '42577'),
 ('spark.driver.host', 'd5681308c4ca'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.memory', '16g'),
 ('spark.executor.memory', '16g'),
 ('spark.app.name', 'new_york_taxis'),
 ('spark.executor.extraJavaOptions',
  '"-Dio.netty.tryReflectionSetAccessible=true"'),
 ('spark.master', 'local[12]'),
 ('spark.app.id', 'local-1619350536246'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.debug.maxToStringFields', '1000')]

# Load data

The green and yellow taxi data are loaded together, which means that the extra feature, `Trip_type` in the green data set is omitted.

In [10]:
df_dict = {}

for colour in ['green', 'yellow']:
    path = raw_data_dir.joinpath(f'{colour}_tripdata_20*.csv').as_posix()
    df = spark.read.csv(path, header=True)
    
    # Add the taxi colour
    df = df.withColumn('colour', F.lit(colour))
    df_dict[colour] = df

# Check column differences

The green taxi data has more columns than does the yellow taxi data. See the table below:

|green columns|yellow columns|comment|
|-------------|--------------|-------|
|`lpep_pickup_datetime`|`pickup_datetime`|Rename to `pickup_datetime`|
|`lpep_dropoff_datetime`|`dropoff_datetime`|Rename to `dropoff_datetime`|
|`trip_type`||The green taxis have two types, "Street-hail" and "Dispatch". Drop this column.|
|`ehail_fee`||Drop this column.|

In [11]:
set(df_dict['green'].columns) - set(df_dict['yellow'].columns)

{'ehail_fee', 'lpep_dropoff_datetime', 'lpep_pickup_datetime', 'trip_type'}

In [12]:
set(df_dict['yellow'].columns) - set(df_dict['green'].columns)

{'tpep_dropoff_datetime', 'tpep_pickup_datetime'}

# Combine the green and the yellow

This requires the columns to be the same; renaming and dropping of columns is required, see [table](#Check-column-differences).

In [22]:
df_green = (
    df_dict['green']
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime')
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')
    .drop('trip_type')
    .drop('ehail_fee')
)

df_yellow = (
    df_dict['yellow']
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime')
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')
)

In [15]:
for col in df_green.columns:
    print()

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge',
 'colour']

In [23]:
df_yellow.select('pickup_datetime')

DataFrame[pickup_datetime: string]

In [24]:
df_yellow_reordered = df_yellow.select(
    'VendorID',
    'pickup_datetime',
    'dropoff_datetime',
    'store_and_fwd_flag',
    'RatecodeID',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'fare_amount',
    'extra',
    'mta_tax',
    'tip_amount',
    'tolls_amount',
    'improvement_surcharge',
    'total_amount',
    'payment_type',
    'congestion_surcharge',
    'colour'
)

In [33]:
df_yellow_reordered.count()

142234006

In [34]:
df_green.count()

7778101

In [35]:
df = df_yellow_reordered.unionByName(df_green)

## Load/save parquet

In [36]:
path = interim_data_dir.joinpath('df_combined').as_posix()

In [37]:
df.write.parquet(path, mode='overwrite')
df = spark.read.parquet(path)

In [38]:
df.count()

150012107

In [39]:
df.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)
 |-- colour: string (nullable = true)



In [40]:
df.describe().show()

+-------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+-------------------+------------------+------------------+--------------------+------------------+-------------------+--------------------+-------------------+---------------------+--------------------+------------------+--------------------+---------+
|summary|            VendorID|     pickup_datetime|    dropoff_datetime|  store_and_fwd_flag|        RatecodeID|        PULocationID|       DOLocationID|   passenger_count|     trip_distance|         fare_amount|             extra|            mta_tax|          tip_amount|       tolls_amount|improvement_surcharge|        total_amount|      payment_type|congestion_surcharge|   colour|
+-------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+-------------------+------------------+------------------+--------------------+----------------

# Change datatype

The columns are all of type string. This section changes some of them into more appropriate types.

In [41]:
datatype_dict = {
    'VendorID': StringType(),
    'pickup_datetime': TimestampType(),
    'dropoff_datetime': TimestampType(),
    'passenger_count': IntegerType(),
    'trip_distance': FloatType(),
    'RatecodeID': StringType(),
    'store_and_fwd_flag': StringType(),
    'PULocationID': StringType(),
    'DOLocationID': StringType(),
    'payment_type': StringType(),
    'fare_amount': FloatType(),
    'extra': FloatType(),
    'mta_tax': FloatType(),
    'tip_amount': FloatType(),
    'tolls_amount': FloatType(),
    'improvement_surcharge': FloatType(),
    'total_amount': FloatType(),
    'congestion_surcharge': FloatType()
}

In [42]:
# For convenience so all the code doesn't have to be manually typed
for key, value in datatype_dict.items():
    print(f".withColumn('{key}', F.col('{key}').astype({value}()))")

.withColumn('VendorID', F.col('VendorID').astype(StringType()))
.withColumn('pickup_datetime', F.col('pickup_datetime').astype(TimestampType()))
.withColumn('dropoff_datetime', F.col('dropoff_datetime').astype(TimestampType()))
.withColumn('passenger_count', F.col('passenger_count').astype(IntegerType()))
.withColumn('trip_distance', F.col('trip_distance').astype(FloatType()))
.withColumn('RatecodeID', F.col('RatecodeID').astype(StringType()))
.withColumn('store_and_fwd_flag', F.col('store_and_fwd_flag').astype(StringType()))
.withColumn('PULocationID', F.col('PULocationID').astype(StringType()))
.withColumn('DOLocationID', F.col('DOLocationID').astype(StringType()))
.withColumn('payment_type', F.col('payment_type').astype(StringType()))
.withColumn('fare_amount', F.col('fare_amount').astype(FloatType()))
.withColumn('extra', F.col('extra').astype(FloatType()))
.withColumn('mta_tax', F.col('mta_tax').astype(FloatType()))
.withColumn('tip_amount', F.col('tip_amount').astype(FloatType())

In [43]:
df_typed = (
    df
    .withColumn('VendorID', F.col('VendorID').astype(StringType()))
    .withColumn('pickup_datetime', F.col('pickup_datetime').astype(TimestampType()))
    .withColumn('dropoff_datetime', F.col('dropoff_datetime').astype(TimestampType()))
    .withColumn('passenger_count', F.col('passenger_count').astype(IntegerType()))
    .withColumn('trip_distance', F.col('trip_distance').astype(FloatType()))
    .withColumn('RatecodeID', F.col('RatecodeID').astype(StringType()))
    .withColumn('store_and_fwd_flag', F.col('store_and_fwd_flag').astype(StringType()))
    .withColumn('PULocationID', F.col('PULocationID').astype(StringType()))
    .withColumn('DOLocationID', F.col('DOLocationID').astype(StringType()))
    .withColumn('payment_type', F.col('payment_type').astype(StringType()))
    .withColumn('fare_amount', F.col('fare_amount').astype(FloatType()))
    .withColumn('extra', F.col('extra').astype(FloatType()))
    .withColumn('mta_tax', F.col('mta_tax').astype(FloatType()))
    .withColumn('tip_amount', F.col('tip_amount').astype(FloatType()))
    .withColumn('tolls_amount', F.col('tolls_amount').astype(FloatType()))
    .withColumn('improvement_surcharge', F.col('improvement_surcharge').astype(FloatType()))
    .withColumn('total_amount', F.col('total_amount').astype(FloatType()))
    .withColumn('congestion_surcharge', F.col('congestion_surcharge').astype(FloatType()))
)

In [44]:
df_typed.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- extra: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- improvement_surcharge: float (nullable = true)
 |-- total_amount: float (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- congestion_surcharge: float (nullable = true)
 |-- colour: string (nullable = true)



# Extract dateparts

1. `pickup_year`
1. `pickup_month`
1. `pickup_dayofyear`
1. `pickup_dayofmonth`
1. `pickup_dayofweek`
1. `pickup_weekofyear`
1. `pickup_hour`

In [56]:
hour = F.udf(lambda x: x.hour if x is not None else None, IntegerType())

In [57]:
df_dateparts = (
    df_typed
    .withColumn('pickup_year', F.year('pickup_datetime'))
    .withColumn('pickup_month', F.month('pickup_datetime'))
    .withColumn('pickup_dayofyear', F.dayofyear('pickup_datetime'))
    .withColumn('pickup_dayofmonth', F.dayofmonth('pickup_datetime'))
    .withColumn('pickup_dayofweek', F.dayofweek('pickup_datetime'))
    .withColumn('pickup_weekofyear', F.weekofyear('pickup_datetime'))
    .withColumn('pickup_hourofday', hour('pickup_datetime'))
)

In [58]:
#
df_dateparts.rdd.getNumPartitions()

43

In [10]:
path = interim_data_dir.joinpath('df_dateparts').as_posix()
n_partitions = 12 * 10

In [11]:
df_dateparts = spark.read.parquet(path)

# Count distinct

There are categorical columns with defined levels, these are:
* `VendorID`
* `RatecodeID`
* `store_and_fwd_flag`
* `PULocationID`
* `DOLocationID`
* `payment_type`

There are numerical columns with limited distinct values, these are:
* `Extra`: \\$0.50 or \\$1
* `MTA_tax`: \\$0.50
* `Improvement_surcharge`: \\$0.30


In [12]:
string_cols = [col for (col, col_type) in df_dateparts.dtypes if col_type == 'string']
string_cols

['VendorID',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'colour']

In [13]:
lim_num_cols = ['Extra', 'MTA_tax', 'Improvement_surcharge']

# Data validation

## String type columns

* [VendorID](#Validation:-VendorID)
* [PULocationID](#Validation:-PULocationID)
* [DOLocationID](#Validation:-DOLocationID)
* [RatecodeID](#Validation:-RatecodeID)
* [Store_and_fwd_flag](#Validation:-Store_and_fwd_flag)
* [Payment_type](#Validation:-Payment_type)

### Validation: `VendorID`

In [14]:
top_n = (
    df_dateparts
    .groupBy('VendorID')
    .count()
    .sort(F.col('count').desc())
    .limit(100)
    .collect()
)

top_n

[Row(VendorID='2', count=75165983),
 Row(VendorID='1', count=39402240),
 Row(VendorID=None, count=2187903),
 Row(VendorID='4', count=270473),
 Row(VendorID='�', count=80113),
 Row(VendorID='\x0e�', count=73222),
 Row(VendorID='��', count=42438),
 Row(VendorID='\x12�', count=25063),
 Row(VendorID='N�', count=24192),
 Row(VendorID='�\x00', count=20156),
 Row(VendorID='F�', count=16438),
 Row(VendorID='J�\x00\x0e�', count=14632),
 Row(VendorID='\x0e', count=11320),
 Row(VendorID='�\x01', count=10002),
 Row(VendorID='R�', count=9706),
 Row(VendorID='!', count=9089),
 Row(VendorID='0', count=8070),
 Row(VendorID='@', count=7932),
 Row(VendorID='N', count=7653),
 Row(VendorID='J�\x01\x0e�', count=7404),
 Row(VendorID='\x16�', count=7104),
 Row(VendorID='J�\x00\x0e', count=6665),
 Row(VendorID='$', count=6463),
 Row(VendorID='(', count=6222),
 Row(VendorID='#', count=6155),
 Row(VendorID='J�', count=5815),
 Row(VendorID='�@', count=5528),
 Row(VendorID='`', count=5502),
 Row(VendorID='%', cou

In [15]:
valid_values = [e['VendorID'] for e in top_n if e['VendorID'] is not None]
valid_values

['2',
 '1',
 '4',
 '�',
 '\x0e�',
 '��',
 '\x12�',
 'N�',
 '�\x00',
 'F�',
 'J�\x00\x0e�',
 '\x0e',
 '�\x01',
 'R�',
 '!',
 '0',
 '@',
 'N',
 'J�\x01\x0e�',
 '\x16�',
 'J�\x00\x0e',
 '$',
 '(',
 '#',
 'J�',
 '�@',
 '`',
 '%',
 'F',
 '�\x02',
 '"',
 'Z�',
 "'",
 ')',
 '&',
 'J\x17\x00\x0e�',
 '-',
 'p',
 '\x0e>',
 '*',
 'P',
 'V�',
 "\x0e'",
 '\x12',
 '\x0el',
 '\x0eU',
 '+',
 '\x0e5',
 '\x0eL',
 '\x0ez',
 '\x0ec',
 'J�\x02\x0e�',
 'R',
 'J.\x00\x0e�',
 '\x16',
 '\x01',
 '�\x03',
 'A',
 'JE\x00\x0e�',
 '.',
 '�\x10',
 '� ',
 '/',
 'J�\x01\x0e',
 'J\\\x00\x0e�',
 '�p',
 '�`',
 '\x0b',
 '\x0c',
 '5',
 'V',
 'Js\x00\x0e�',
 '\x05',
 '�0',
 '\x10',
 '\x0b\x0e�',
 'N\x10',
 '8',
 '\x08',
 '\x04',
 '\t',
 '\x14',
 '3',
 '���',
 "N'",
 '\x02',
 '\x0c\x0e�',
 '\x7f',
 '�\x04',
 'N>',
 '\x06',
 'Nl',
 'NU',
 '\x1a',
 '6',
 '�\x0e�',
 '�P',
 '\x0f',
 '7']

In [16]:
col = 'VendorID'
df_vendor = (
    df_dateparts
    .withColumn(col, 
                F.when(~F.col(col).isin(valid_values), 'null')
                .otherwise(F.col(col)))
)

### Validation: `PULocationID`

In [17]:
top_n = (
    df_dateparts
    .groupBy('PULocationID')
    .count()
    .sort(F.col('count').desc())
    .limit(100)
    .collect()
)

top_n

[Row(PULocationID=None, count=33172537),
 Row(PULocationID='237', count=4788089),
 Row(PULocationID='161', count=4398323),
 Row(PULocationID='236', count=4398183),
 Row(PULocationID='186', count=3890491),
 Row(PULocationID='162', count=3879188),
 Row(PULocationID='230', count=3589107),
 Row(PULocationID='48', count=3409675),
 Row(PULocationID='132', count=3361237),
 Row(PULocationID='170', count=3360614),
 Row(PULocationID='142', count=3280291),
 Row(PULocationID='234', count=3216647),
 Row(PULocationID='239', count=2888382),
 Row(PULocationID='163', count=2867917),
 Row(PULocationID='79', count=2710559),
 Row(PULocationID='141', count=2649092),
 Row(PULocationID='68', count=2598115),
 Row(PULocationID='138', count=2567229),
 Row(PULocationID='107', count=2460272),
 Row(PULocationID='164', count=2458499),
 Row(PULocationID='238', count=2265321),
 Row(PULocationID='100', count=2191074),
 Row(PULocationID='263', count=2125817),
 Row(PULocationID='229', count=2098217),
 Row(PULocationID='

In [18]:
valid_values = [e['PULocationID'] for e in top_n]
valid_values

[None,
 '237',
 '161',
 '236',
 '186',
 '162',
 '230',
 '48',
 '132',
 '170',
 '142',
 '234',
 '239',
 '163',
 '79',
 '141',
 '68',
 '138',
 '107',
 '164',
 '238',
 '100',
 '263',
 '229',
 '249',
 '140',
 '90',
 '43',
 '231',
 '246',
 '113',
 '233',
 '75',
 '137',
 '262',
 '114',
 '143',
 '148',
 '74',
 '158',
 '151',
 '144',
 '264',
 '50',
 '166',
 '41',
 '13',
 '211',
 '87',
 '261',
 '125',
 '42',
 '7',
 '88',
 '24',
 '82',
 '244',
 '97',
 '65',
 '33',
 '116',
 '95',
 '129',
 '25',
 '181',
 '209',
 '226',
 '224',
 '45',
 '260',
 '145',
 '4',
 '152',
 '66',
 '255',
 '130',
 '232',
 '223',
 '146',
 '61',
 '52',
 '193',
 '265',
 '179',
 '256',
 '49',
 '76',
 '196',
 '40',
 '17',
 '188',
 '134',
 '80',
 '92',
 '112',
 '243',
 '89',
 '168',
 '225',
 '247']

In [19]:
col = 'PULocationID'
df_pulocationid = (
    df_vendor
    .withColumn(col, 
                F.when(F.col(col).isin(valid_values[1:]), F.col(col))
                .otherwise('null'))
)

### Validation: `DOLocationID`

In [20]:
top_n = (
    df_pulocationid
    .groupBy('DOLocationID')
    .count()
    .sort(F.col('count').desc())
    .limit(100)
    .collect()
)

top_n

[Row(DOLocationID=None, count=33177811),
 Row(DOLocationID='236', count=4678456),
 Row(DOLocationID='237', count=4325226),
 Row(DOLocationID='161', count=4135680),
 Row(DOLocationID='170', count=3409310),
 Row(DOLocationID='162', count=3248441),
 Row(DOLocationID='230', count=3188425),
 Row(DOLocationID='48', count=3014969),
 Row(DOLocationID='142', count=2997134),
 Row(DOLocationID='239', count=2863508),
 Row(DOLocationID='234', count=2792470),
 Row(DOLocationID='141', count=2787876),
 Row(DOLocationID='186', count=2755437),
 Row(DOLocationID='163', count=2557537),
 Row(DOLocationID='238', count=2497880),
 Row(DOLocationID='68', count=2482786),
 Row(DOLocationID='79', count=2345136),
 Row(DOLocationID='107', count=2251145),
 Row(DOLocationID='164', count=2233307),
 Row(DOLocationID='263', count=2197062),
 Row(DOLocationID='140', count=2127942),
 Row(DOLocationID='229', count=2082053),
 Row(DOLocationID='246', count=2081797),
 Row(DOLocationID='231', count=1750589),
 Row(DOLocationID='

In [21]:
valid_values = [e['DOLocationID'] for e in top_n]
valid_values

[None,
 '236',
 '237',
 '161',
 '170',
 '162',
 '230',
 '48',
 '142',
 '239',
 '234',
 '141',
 '186',
 '163',
 '238',
 '68',
 '79',
 '107',
 '164',
 '263',
 '140',
 '229',
 '246',
 '231',
 '249',
 '100',
 '75',
 '233',
 '90',
 '137',
 '143',
 '262',
 '43',
 '113',
 '50',
 '74',
 '138',
 '151',
 '148',
 '158',
 '114',
 '166',
 '41',
 '132',
 '13',
 '264',
 '144',
 '211',
 '87',
 '42',
 '7',
 '125',
 '244',
 '261',
 '232',
 '4',
 '181',
 '224',
 '116',
 '24',
 '88',
 '129',
 '145',
 '33',
 '226',
 '223',
 '209',
 '255',
 '265',
 '45',
 '61',
 '256',
 '97',
 '152',
 '25',
 '112',
 '95',
 '49',
 '243',
 '65',
 '82',
 '1',
 '17',
 '179',
 '260',
 '80',
 '66',
 '37',
 '146',
 '225',
 '168',
 '40',
 '89',
 '193',
 '188',
 '189',
 '52',
 '14',
 '76',
 '216']

In [22]:
col = 'DOLocationID'
df_dolocationid = (
    df_pulocationid
    .withColumn(col, 
                F.when(F.col(col).isin(valid_values[:]), F.col(col))
                .otherwise('null'))
)

### Validation: `Store_and_fwd_flag`

There are values that are neither `Y` nor `N`. These will be converted to nulls.

In [23]:
valid_values = ['N', 'Y']
col = 'Store_and_fwd_flag'
df_validated_store = (
    df_dolocationid
    .withColumn(col, 
                F.when(~F.col(col).isin(valid_values), 'null')
                .otherwise(F.col(col)))
)

### Validation: `Payment_type`

In [24]:
col = 'Payment_type'
valid_values = [1, 2, 3, 4, 5, 6]

In [25]:
df_validated_payment = (
    df_validated_store
    .withColumn(col, 
                F.when(~F.col(col).isin(valid_values), None)
                .otherwise(F.col(col)))
)

### Validation: `RatecodeID`

There are codes other than the defined 1, 2, 3, 4, 5, 6 in the data dictionary. Assume that values between 1 - 1.9999 are considered to be 1, similarly for the other integers.

In [26]:
col = 'RatecodeID'
valid_values = [1, 2, 3, 4, 5, 6]

In [27]:
df_validated_ratecode = (
    df_validated_payment
    .withColumn(col, when(F.col(col).isin(valid_values), F.col(col))
                     .otherwise('null'))
)

## Validate: `Total_amount`

`Total_amount` should be >= 0.

In [28]:
col = 'Total_amount'

In [29]:
df_validated_amount = (
    df_validated_ratecode
    .withColumn(col, F.abs(F.col(col)))
)

## Validate: `Fare_amount`

`Fare_amount` should be >= 0.

In [30]:
col = 'Fare_amount'

In [31]:
df_validated_fare = (
    df_validated_amount
    .withColumn(col, F.abs(F.col(col)))
)

## Validate: `pickup_datetime`

The data should only contain pickups that occur in years 2019 and 2020. Any rows where the year is not 2019 or 2020 will be dropped.

In [32]:
(
    df_validated_fare
    .withColumn('pickup_year', F.year('pickup_datetime'))
    .filter(~F.col('pickup_year').isin([2019, 2020]))
    .count()
)

2051

There are 1430 rows that are not in 2019 or 2020.

In [33]:
df_validated_pickup_year = (
    df_validated_fare
    .withColumn('pickup_year', F.year('pickup_datetime'))
    .filter(F.col('pickup_year').isin([2019, 2020]))
)

## Validate: `dropoff_datetime`

The data should only contain pickups that occur in years 2019 and 2020. Any rows where the year is not 2019 or 2020 will be dropped.

There are 197 rows that are not in 2019 or 2020.

In [34]:
df_validated_dropoff_year = (
    df_validated_pickup_year
    .withColumn('dropoff_year', F.year('pickup_datetime'))
    .filter(F.col('dropoff_year').isin([2019, 2020]))
)

## Validate: trip duration

The the trip duration is the difference between the `dropoff_datetime` and `pickup_datetime`. The `trip_duration` is in seconds and should be positive.

In [35]:
df_add_trip_duration = (
    df_validated_dropoff_year
    .withColumn('trip_duration', 
                F.col('dropoff_datetime').cast('long') - 
                F.col('pickup_datetime').cast('long'))
)

Assume that trips last at least for one minute, hence drop rows with `trip_duration` < 60.

In [36]:
df_validated_trip_duration = df_add_trip_duration.filter(F.col('trip_duration') > 60)

In [37]:
df_validated_trip_duration.count()

115511074

## Validate `trip_distance`

### Negative `trip_distance`

The `trip_distance` should be > 0. However, there are trips with distance < 0.

These values will be replaced by it's positive distances.

In [38]:
df_validated_trip_duration.filter(F.col('trip_distance') <= 0).count()

528237

In [39]:
df_validated_trip_duration.filter(F.col('trip_distance') == 0).count()

497347

In [40]:
col = 'trip_distance'
df_validated_trip_distance = (
    df_validated_trip_duration
    .withColumn(col, F.abs(F.col(col)))
 )

### Large `trip_distance`s

There are extremely large distances. Only distances up to three standard deviations larger than the mean will be included, any values higher will be regarded as outliers and will be dropped.

There are only 202 trips that are greater than three standard deviations larger than the mean.

In [41]:
mean = df_validated_trip_distance.select(F.mean(F.col('trip_distance'))).collect()[0][0]
mean

3.3766191870930244

In [42]:
stddev = df_validated_trip_distance.select(F.stddev(F.col('trip_distance'))).collect()[0][0]
stddev

209.15294383269

In [43]:
threshold = mean + 3 * stddev
threshold

630.8354506851631

In [44]:
(
    df_validated_trip_distance
    .filter(F.col('trip_distance') > threshold)
    .count() 
)

543

In [45]:
df_validated_exclude_large_trip = (
    df_validated_trip_distance
    .filter(F.col('trip_distance') <= threshold)
)

## Validate: `Passenger_count`

The domain for this column is [1, 4]. However, there may have been larger taxis that can take up to 9 passengers. Having 0 passengers is beyong my understanding at the moment.  This, along with the nulls could be subject to an imputation strategy in the modelling stage.

In [46]:
col = 'Passenger_count'
valid_values = [1, 2, 3, 4, 5]

In [47]:
df_passenger = (
    df_validated_exclude_large_trip
    .withColumn(col, when(~F.col(col).isin(valid_values), None)
                     .otherwise(F.col(col)))
)

# Feature engineering

## Convert `trip_distance` to km

In [48]:
conversion_factor = 1.60934

In [49]:
df_fe_km = df_passenger.withColumn('trip_distance_km', F.col('trip_distance') * conversion_factor)

## Calculate speed

The average speed (km/h) is calculated by dividing the `trip_distance_km` by the `trip_duration` (seconds) multiplied by 3,600.

There is an inconsistency:
1. either the given unit for `trip_distance` in the data dictionary is incorrect, or
1. the `trip_distance` values are all incorrect, or
1. the `pickup_datetime` and `dropoff_datetime` are incorrect

The `trip_distance`s are reported to be in the unit of miles, however, even with very short `trip_duration`s (less than one hour) the `trip_distance` is often in the hundreds; it's unlikely that a NYC taxi would be able to travel 170 miles in 30 minutes.There is an inconsistency:
1. either the given unit for `trip_distance` in the data dictionary is incorrect, or
1. the `trip_distance` values are all incorrect, or
1. the `pickup_datetime` and `dropoff_datetime` are incorrect

The `trip_distance`s are reported to be in the unit of miles, however, even with very short `trip_duration`s (less than one hour) the `trip_distance` is often in the hundreds; it's unlikely that a NYC taxi would be able to travel 170 miles in 30 minutes.

In [50]:
df_speed = (
    df_fe_km
    .withColumn('speed', F.col('trip_distance_km') / F.col('trip_duration') * 3600)
)

In [51]:
speed_limit = 80

df_speed.filter(F.col('speed') <= speed_limit).count()

115467997

In [52]:
df_speed.filter(F.col('speed') <= speed_limit).count() / df_speed.count()

0.9996317738336775

Only about 10 million rows (~9%) of the data have speeds under 80kph.

### Fixing `trip_distance_km`

According to [careertrend.com](https://careertrend.com/how-many-miles-does-an-average-taxi-cab-driver-drive-yearly-13658842.html), the average taxi trip in the US is 5 miles, which is consistent with the average `trip_duration` of 15 minutes and the speed limit of 25mph in NYC. The `trip_distance` in the data is likely to be incorrect.

1. The average speed for those in with `speed` <= 80kph (50mph) will be calculated
2. This average speed will be used to recalculate the distances based on the `trip_duration`

In [53]:
average_speed = (
    df_speed
    .filter(F.col('speed') <= speed_limit)
    .select(F.mean(F.col('speed')))
    .collect()[0][0]
)

average_speed

18.486844981069407

For rows with speeds less than 80kph, the average speed is 25kph. This is a more reasonable value considering the dense traffic in NYC.

In [54]:
df_fixed_distance = (
    df_speed
    .withColumn('speed', when(F.col('speed') > speed_limit, average_speed)
                                    .otherwise(F.col('speed')))
    .withColumn('trip_distance_km', F.col('speed') * F.col('trip_duration') / 3600)
)

# Cleanup

Drop unused columns.

In [55]:
df_cleaned = df_fixed_distance.drop('trip_distance')

In [56]:
df_cleaned.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- Store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- Passenger_count: integer (nullable = true)
 |-- Fare_amount: float (nullable = true)
 |-- extra: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- improvement_surcharge: float (nullable = true)
 |-- Total_amount: float (nullable = true)
 |-- Payment_type: string (nullable = true)
 |-- congestion_surcharge: float (nullable = true)
 |-- colour: string (nullable = true)
 |-- pickup_year: integer (nullable = true)
 |-- pickup_month: integer (nullable = true)
 |-- pickup_dayofyear: integer (nullable = true)
 |-- pickup_dayofmonth: integer (nullable = true)
 |-- picku

# Save data

The data is saved in the parquet format because it is columnar. Columnar storage is preferred because the types of queries that are used are aggregations per column, such as average total amount per group etc.

In [None]:
path = processed_data_dir.joinpath('df_cleaned_100').as_posix()
df_cleaned.repartition(numPartitions=n_partitions).write.parquet(path, mode='overwrite')

In [219]:
spark.sparkContext.stop()