# New York Taxi Data Analysis 
Group-9

## Setup

In [272]:
# generic modules
import itertools
import os
import re
import timeit
import gc

# specific module
#import wget

# common ds modules
import pandas as pd
#import plotly.express as px

# spark modules for session managment
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

# spark functions
from pyspark.sql.functions import lit
import pyspark.sql.functions as sparkle

# spark types
from pyspark.sql.types import *

# spark ml
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [273]:
# session starter named nyctaxi
spark=SparkSession.builder \
    .appName('nyctaxi') \
    .master('local[*]') \
    .config('spark.driver.memory','10G') \
    .getOrCreate()


#     .config("spark.sql.default.parallelism", "360") \ 
'''
.config("spark.driver.maxResultSize", "8g") \
    
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.sql.execution.arrow.maxRecordsPerBatch", "150000") \
    .config("spark.sql.tungsten.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "360") \
    .config("spark.rdd.compress", "true") \
'''

'\n.config("spark.driver.maxResultSize", "8g")     \n    .config("spark.sql.execution.arrow.pyspark.enabled", "true")     .config("spark.sql.execution.arrow.maxRecordsPerBatch", "150000")     .config("spark.sql.tungsten.enabled", "true")     .config("spark.sql.shuffle.partitions", "360")     .config("spark.rdd.compress", "true") '

## Download Data from the website in to docker container
https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page

In [3]:
#spark.read.csv("Dataset/yellow-2019-01.csv")

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string, _c12: string, _c13: string, _c14: string, _c15: string, _c16: string, _c17: string]

## Converting CSV to Initial Parquet

In [3]:
# reads directory, filters for csv's and feeds into loop to convert to parquet
files=[re.search(r"(.*)(\.csv)$", file).group(1) for file in os.listdir("./Dataset/") if file.endswith(".csv")]
for file in files:
    inpath = f"./Dataset/{file}.csv"
    readdf = spark.read.csv(inpath, header = "true")
    outpath = f"./Dataset/prq/{file}.parquet"
    readdf.write.parquet(outpath)

In [5]:
colours=['yellow']

## Combining Dirty Data with color of taxi

In [6]:
# combines month data into single file per colour
for colour in colours:
    # uses a sample of the first dataset to create and empty df with correct format to join to
    initpath = f"./Dataset/prq/{colour}-2019-01.parquet"
    outdf = spark.read.parquet(initpath)
    outdf = outdf.limit(0)
    # get files for loop
    files = [re.search(r"(.*)(\.parquet)$", file).group(1) for file in os.listdir("./Dataset/prq") if file.endswith(".parquet") and file.startswith(colour)]
    for file in files:
        inpath = f"./Dataset/prq/{file}.parquet"
        readdf = spark.read.parquet(inpath)
        # !! unionByName !! ensures columns match union method can result in incorrect mapping
        outdf = outdf.unionByName(readdf)
    outpath = f"./data/{colour}-all.parquet"
    outdf.write.parquet(outpath)

In [274]:
# read and check columns
yellowdf = spark.read.parquet("./data/yellow-all.parquet")
yellowdf.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge']

In [275]:
# pass data to next stage
data = yellowdf

## Uncleaned Full Dataset

In [9]:
# check record numbers match
data.count()

7667792

In [276]:
# transform dataframe 
# add new missing columns with releveant value
yellowdf = yellowdf.withColumn('trip_type', lit("1"))
yellowdf = yellowdf.withColumn('ehail_fee', lit("0"))
# create colour variable to track dataset
yellowdf = yellowdf.withColumn('colour', lit("yellow"))
yellowdf = yellowdf.withColumnRenamed("tpep_pickup_datetime", "pickup_datetime")
yellowdf = yellowdf.withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")

In [277]:
# pass data to next stage
data = yellowdf

In [278]:
# intial schema not imputed as no cleaning done
data.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)
 |-- trip_type: string (nullable = false)
 |-- ehail_fee: string (nullable = false)
 |-- colour: string (nullable = false)



In [279]:
# create view for spark.sql queries
data.createOrReplaceTempView("data_init_view")

## Grouping by Colour

In [14]:
# SQL query, group by relevent variable and create count to check splits
spark.sql("""
            SELECT colour, count(colour)
            FROM data_init_view
            GROUP by colour
        """).show()

+------+-------------+
|colour|count(colour)|
+------+-------------+
|yellow|      7667792|
+------+-------------+



## VendorID
Should be 1 or 2
   - 1-Creative Mobile Technologies
   - 2-Verifone INC.
    
- VendorId=4 contains 76823 records?
- Ratecodes include 99 for VendorId=4 which is invalid it should be in range of 1-6

In [15]:
spark.sql("""
            SELECT VendorID, count(VendorID)
            FROM data_init_view
            GROUP by VendorID
        """).show()

+--------+---------------+
|VendorID|count(VendorID)|
+--------+---------------+
|       1|        2938778|
|       4|          76823|
|       2|        4652191|
+--------+---------------+



In [16]:
spark.sql("""
            SELECT *
            FROM data_init_view
            WHERE VendorID == 4
        """).show()

+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+---------+---------+------+
|VendorID|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|trip_type|ehail_fee|colour|
+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+---------+---------+------+
|       4|2019-01-25 17:00:59|2019-01-25 17:04:53|              1|          .58|         1|                 N|         237|         262|           2|  

In [17]:
spark.sql("""
            SELECT colour, count(colour)
            FROM data_init_view
            WHERE VendorID == 4
            GROUP by colour
        """).show()

+------+-------------+
|colour|count(colour)|
+------+-------------+
|yellow|        76823|
+------+-------------+



In [18]:
spark.sql("""
            SELECT RatecodeID, count(RatecodeID)
            FROM data_init_view
            WHERE VendorID == 4
            GROUP by RatecodeID
        """).show()

+----------+-----------------+
|RatecodeID|count(RatecodeID)|
+----------+-----------------+
|         3|               78|
|        99|                5|
|         5|              225|
|         1|            75240|
|         4|               46|
|         2|             1229|
+----------+-----------------+



In [19]:
spark.sql("""
            SELECT passenger_count, count(passenger_count)
            FROM data_init_view
            WHERE VendorID == 4
            GROUP by passenger_count
        """).show()

+---------------+----------------------+
|passenger_count|count(passenger_count)|
+---------------+----------------------+
|              3|                   266|
|              5|                     9|
|              1|                 75449|
|              4|                   107|
|              2|                   992|
+---------------+----------------------+



In [20]:
spark.sql("""
            CREATE OR REPLACE TEMPORARY VIEW vid4months
            AS
            SELECT pickup_datetime,
                CASE
                    WHEN pickup_datetime LIKE '%2019-01%' THEN 'jan'
                    ELSE "unknown"
                END AS MonthGroup
            FROM data_init_view
            WHERE VendorID == 4
        """)

spark.sql("""
            SELECT MonthGroup, count(MonthGroup) as count
            FROM vid4months
            GROUP BY MonthGroup
            ORDER BY count
        """).show()


+----------+-----+
|MonthGroup|count|
+----------+-----+
|       jan|76823|
+----------+-----+



## Passenger Count
- 117381 records of passenger count 0
- For 7-9 passenger count
    - ~57 records( What are they Maxi Type?)

In [21]:
spark.sql("""
            SELECT passenger_count, count(passenger_count)
            FROM data_init_view
            GROUP by passenger_count
        """).show()

+---------------+----------------------+
|passenger_count|count(passenger_count)|
+---------------+----------------------+
|              7|                    19|
|              3|                314721|
|              8|                    29|
|              0|                117381|
|              5|                323842|
|              6|                200811|
|              9|                     9|
|              1|               5456121|
|              4|                140753|
|              2|               1114106|
+---------------+----------------------+



## RatecodeID should be in range(1-6)
- 252 with RatecodeID=99
- 145 with distance 0
- 84 PULocation 264

In [22]:
spark.sql("""
            SELECT RatecodeID, count(RatecodeID)
            FROM data_init_view
            GROUP by RatecodeID
        """).show()

+----------+-----------------+
|RatecodeID|count(RatecodeID)|
+----------+-----------------+
|         3|            11801|
|        99|              252|
|         5|            54569|
|         6|               46|
|         1|          7430139|
|         4|             4895|
|         2|           166090|
+----------+-----------------+



In [23]:
spark.sql("""
            SELECT colour, count(colour) as count
            FROM data_init_view
            WHERE NOT RatecodeID BETWEEN 1 AND 6
            GROUP BY colour
        """).show()

+------+-----+
|colour|count|
+------+-----+
|yellow|  252|
+------+-----+



In [24]:
spark.sql("""
            SELECT trip_distance, count(trip_distance) AS count
            FROM data_init_view
            WHERE NOT RatecodeID BETWEEN 1 AND 6
            GROUP by trip_distance
            ORDER BY count DESC
        """).show()

+-------------+-----+
|trip_distance|count|
+-------------+-----+
|          .00|  145|
|          .84|    4|
|         1.27|    4|
|         1.35|    3|
|          .66|    2|
|          .72|    2|
|          .74|    2|
|         1.50|    2|
|          .94|    2|
|          .73|    2|
|         3.84|    2|
|         2.57|    2|
|         2.19|    2|
|         1.18|    2|
|         1.52|    2|
|         2.78|    1|
|         1.30|    1|
|        23.89|    1|
|         1.68|    1|
|         1.74|    1|
+-------------+-----+
only showing top 20 rows



In [25]:
spark.sql("""
            SELECT PULocationID, count(PULocationID) AS count
            FROM data_init_view
            WHERE NOT RatecodeID BETWEEN 1 AND 6
            GROUP by PULocationID
            ORDER BY count DESC
        """).show()

+------------+-----+
|PULocationID|count|
+------------+-----+
|         264|   84|
|         265|   16|
|         142|    8|
|         239|    8|
|         170|    7|
|          43|    7|
|          79|    7|
|         231|    6|
|         162|    6|
|         230|    6|
|         138|    5|
|         193|    5|
|         132|    5|
|         107|    5|
|         161|    5|
|         234|    4|
|         141|    4|
|         237|    4|
|         145|    4|
|         238|    3|
+------------+-----+
only showing top 20 rows



## Payment Type should be in range(1-6)
- All valid within range
- No 6 
- Count descends as the payment type increases

In [26]:
spark.sql("""
            SELECT payment_type, count(payment_type) AS count
            FROM data_init_view
            GROUP by payment_type
            ORDER BY count DESC
        """).show()

+------------+-------+
|payment_type|  count|
+------------+-------+
|           1|5486027|
|           2|2137415|
|           3|  33186|
|           4|  11164|
+------------+-------+



## Extra, it should be 0.5 or 1
- If all the values is valid then we can change to two bools[]
- Out of range value including negatives, Overnight Charges?
- 37 unique values 
- 7 negative values
- Valid values
    - 1316580 records contains 1
    - 2116494 records contains 0.5

In [27]:
spark.sql("""
            SELECT extra, count(extra) AS count
            FROM data_init_view
            GROUP by extra
            ORDER BY count DESC
        """).show()

+-----+-------+
|extra|  count|
+-----+-------+
|    0|4199855|
|  0.5|2116494|
|    1|1316580|
|  4.5|  31241|
| -0.5|   2201|
|   -1|    863|
|  0.8|    229|
| -4.5|     79|
|  1.3|     74|
| 17.5|     63|
|  1.8|     34|
|  2.5|     21|
|  0.3|     10|
|   18|      9|
|    3|      7|
| 18.5|      6|
|  5.3|      4|
|  0.2|      3|
| 0.25|      1|
| 10.9|      1|
+-----+-------+
only showing top 20 rows



In [28]:
spark.sql("""
            SELECT extra, count(extra) AS count
            FROM data_init_view
            GROUP by extra
            ORDER BY count DESC
        """).count()

37

In [29]:
spark.sql("""
            SELECT extra, count(extra) AS count
            FROM data_init_view
            WHERE extra < 0
            GROUP by extra
            ORDER BY count DESC
        """).count()

7

## Mta_Tax should be 0.5
- If all values are valid change to bool[]
- 7625883 valid values
- 34984: 0 values
- 6819: -0.5 value(Refund?)
    - Check other out of range and negative values

In [30]:
spark.sql("""
            SELECT mta_tax, count(mta_tax) AS count
            FROM data_init_view
            GROUP by mta_tax
            ORDER BY count DESC
        """).show()

+-------+-------+
|mta_tax|  count|
+-------+-------+
|    0.5|7625883|
|      0|  34984|
|   -0.5|   6819|
|   0.25|     97|
|   0.35|      2|
|  32.53|      1|
|  37.51|      1|
|    0.9|      1|
|   2.42|      1|
|   60.8|      1|
|      1|      1|
|   18.3|      1|
+-------+-------+



### Out of range value
- 6925: Out of range value

In [31]:
spark.sql("""
            SELECT colour, count(colour) AS count
            FROM data_init_view
            WHERE mta_tax != "0"
            AND mta_tax != "0.5"
            GROUP by colour
            ORDER BY count DESC
        """).show()

+------+-----+
|colour|count|
+------+-----+
|yellow| 6925|
+------+-----+



## Improvement_Surcharge should be 0.3
- If all values are valid change to bool[]
- 7658005 valid 0.3 values
- 7129: -0.3 value(Refund?)
- 2657: 0 value
- 1: 0.6 value
    - All 0 trip_diatance
    - All PU/Do Id= 265

In [32]:
spark.sql("""
            SELECT improvement_surcharge, count(improvement_surcharge) AS count
            FROM data_init_view
            GROUP by improvement_surcharge
            ORDER BY count DESC
        """).show()

+---------------------+-------+
|improvement_surcharge|  count|
+---------------------+-------+
|                  0.3|7658005|
|                 -0.3|   7129|
|                    0|   2657|
|                  0.6|      1|
+---------------------+-------+



In [33]:
spark.sql("""
            SELECT *
            FROM data_init_view
            WHERE improvement_surcharge = "1"
        """).show()

+--------+---------------+----------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+---------+---------+------+
|VendorID|pickup_datetime|dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|trip_type|ehail_fee|colour|
+--------+---------------+----------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+---------+---------+------+
+--------+---------------+----------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----

## Trip Type should be 1 for Yellow Taxi
- All are valid

In [34]:
spark.sql("""
            SELECT trip_type, count(trip_type) AS count
            FROM data_init_view
            GROUP by trip_type
            ORDER BY count DESC
        """).show()

+---------+-------+
|trip_type|  count|
+---------+-------+
|        1|7667792|
+---------+-------+



## Check Location Value
(Should be an integer from 1-265)- from Taxizone lookup Table:-
    https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
- PULocationID
- DULocationID
- Both
    - No non integer values 
    - No null

In [35]:
# look for non integer values
spark.sql("""
            SELECT *
            FROM data_init_view
            WHERE PULocationID BETWEEN 1 AND 265
            and mod(PULocationID, 1) != 0
        """).show()

+--------+---------------+----------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+---------+---------+------+
|VendorID|pickup_datetime|dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|trip_type|ehail_fee|colour|
+--------+---------------+----------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+---------+---------+------+
+--------+---------------+----------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----

In [36]:
# count of location values, most first
spark.sql("""
            SELECT PULocationID, count(PULocationID) AS count
            FROM data_init_view
            GROUP by PULocationID
            ORDER BY count DESC
        """).show()

+------------+------+
|PULocationID| count|
+------------+------+
|         237|332473|
|         236|323008|
|         161|312392|
|         162|277166|
|         230|263646|
|         186|260712|
|          48|240903|
|         170|238978|
|         234|237648|
|         142|235144|
|         239|207883|
|         163|199682|
|         132|196612|
|          79|193955|
|         141|192380|
|         138|184334|
|         107|176786|
|         164|172647|
|          68|171971|
|         238|162192|
+------------+------+
only showing top 20 rows



In [37]:
spark.sql("""
            SELECT *
            FROM data_init_view
            WHERE NOT DOLocationID BETWEEN 1 AND 265
        """).show()

+--------+---------------+----------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+---------+---------+------+
|VendorID|pickup_datetime|dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|trip_type|ehail_fee|colour|
+--------+---------------+----------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+---------+---------+------+
+--------+---------------+----------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----

In [38]:
spark.sql("""
            SELECT DOLocationID, count(DOLocationID) AS count
            FROM data_init_view
            GROUP by DOLocationID
            ORDER BY count DESC
        """).show()

+------------+------+
|DOLocationID| count|
+------------+------+
|         236|334323|
|         237|296185|
|         161|293782|
|         170|242037|
|         162|232451|
|         230|225336|
|         142|214164|
|          48|208624|
|         234|204386|
|         239|204350|
|         141|202184|
|         186|189486|
|         163|175754|
|         238|175310|
|          79|168608|
|          68|167144|
|         107|162697|
|         263|158297|
|         164|154200|
|         140|152042|
+------------+------+
only showing top 20 rows



In [39]:
spark.sql("""
            SELECT DOLocationID, count(DOLocationID) AS count
            FROM data_init_view
            WHERE DOLocationID >= 264
            GROUP by DOLocationID
            ORDER BY count DESC
        """).show()

+------------+------+
|DOLocationID| count|
+------------+------+
|         264|149094|
|         265| 16817|
+------------+------+



## Check dates are within range:-
(Should be 2019-01-01 00:00:00 to 2019-01-31 23:59:59)
- Pickup_datetime
    - min 2001-02-02 14:55:07
        - 441 low records
    - Max 2088-01-24 00:25:39
        - 96 high records
- Dropoff_datetime
    - min 2001-02-02 15:07:27
        - 316 low records
    - Max 2088-01-24 07:28:25
        - 3011 hign records

In [40]:
spark.sql("""
            SELECT MIN(pickup_datetime), MAX(pickup_datetime), MIN(dropoff_datetime),  MAX(dropoff_datetime)
            FROM data_init_view
        """).show()

+--------------------+--------------------+---------------------+---------------------+
|min(pickup_datetime)|max(pickup_datetime)|min(dropoff_datetime)|max(dropoff_datetime)|
+--------------------+--------------------+---------------------+---------------------+
| 2001-02-02 14:55:07| 2088-01-24 00:25:39|  2001-02-02 15:07:27|  2088-01-24 07:28:25|
+--------------------+--------------------+---------------------+---------------------+



In [41]:
# calculate tripdays using datediff (simpler)
spark.sql("""
            WITH tripdaysTable AS (
            SELECT *, datediff(dropoff_datetime, pickup_datetime) as tripdays
            FROM data_init_view
            )
            SELECT tripdays, count(tripdays) AS count
            FROM tripdaysTable
            GROUP by tripdays
            ORDER BY count DESC
        """).show()

+--------+-------+
|tripdays|  count|
+--------+-------+
|       0|7597276|
|       1|  70508|
|     -58|      1|
|      -2|      1|
|     -19|      1|
|      30|      1|
|      22|      1|
|      24|      1|
|       5|      1|
|       2|      1|
+--------+-------+



In [42]:
spark.sql("""
            WITH countsTable AS (
                WITH tripdaysTable AS (
                    SELECT *, datediff(dropoff_datetime, pickup_datetime) as tripdays
                    FROM data_init_view
                    )
                SELECT tripdays, count(tripdays) AS count
                FROM tripdaysTable
                GROUP by tripdays
                ORDER BY count DESC
            )
            SELECT sum(count)
            FROM countsTable
            WHERE tripdays != "0"
            AND tripdays != "1"
        """).show()

+----------+
|sum(count)|
+----------+
|         8|
+----------+



In [43]:
# inspect out of range values
spark.sql("""
            SELECT *
            FROM data_init_view
            WHERE pickup_datetime < "2019-01-01 00:00:00"
            ORDER BY pickup_datetime DESC
        """).show()

+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+---------+---------+------+
|VendorID|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|trip_type|ehail_fee|colour|
+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+---------+---------+------+
|       2|2018-12-31 23:59:58|2019-01-01 00:03:52|              1|          .66|         1|                 N|         162|         170|           2|  

In [280]:
# count high out of range values
spark.sql("""
            SELECT *
            FROM data_init_view
            WHERE pickup_datetime > "2019-01-31 23:59:59" 
        """).count()

96

In [45]:
# count low out of range values
spark.sql("""
            SELECT *
            FROM data_init_view
            WHERE pickup_datetime < "2019-01-01 00:00:00"
        """).count()

441

In [46]:
spark.sql("""
            SELECT *
            FROM data_init_view
            WHERE dropoff_datetime < "2019-01-01 00:00:00"
        """).count()

316

In [48]:
spark.sql("""
            SELECT *
            FROM data_init_view
            WHERE dropoff_datetime > "2019-01-31 23:59:59" 
            ORDER BY dropoff_datetime
        """).count()

3011

In [49]:
# investigate nye values
spark.sql("""
            SELECT *
            FROM data_init_view
            WHERE dropoff_datetime > "2019-01-31 23:59:59"
            AND pickup_datetime < "2019-01-31 23:59:59" 
            ORDER BY dropoff_datetime
        """).count()

2915

In [53]:
# show a candiate value to evaluate
spark.sql("""
            SELECT *
            FROM data_init_view
            WHERE dropoff_datetime > "2020-01-01 23:59:59"
            AND pickup_datetime < "2019-12-31 23:59:59" 
            ORDER BY dropoff_datetime
        """).show()

+--------+---------------+----------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+---------+---------+------+
|VendorID|pickup_datetime|dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|trip_type|ehail_fee|colour|
+--------+---------------+----------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+---------+---------+------+
+--------+---------------+----------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----

## PU and DO Location containing 264 and 265 Location
- PU
    - 163631 with PU Location 264 and 265
- DO
    - 165911 with DO Location 264 and 265
- 264
    - 138614 where both are 264
- 265
    - 3034 where both are 265

In [50]:
# count where pickup location was unknown
spark.sql("""
            SELECT PULocationID, count(PULocationID) AS count
            FROM data_init_view
            WHERE PULocationID = "264"
            OR PULocationID = "265"
            GROUP by PULocationID
            ORDER BY count DESC
        """).show()

+------------+------+
|PULocationID| count|
+------------+------+
|         264|159760|
|         265|  3871|
+------------+------+



In [51]:
# and for dropoff
spark.sql("""
            SELECT DOLocationID, count(DOLocationID) AS count
            FROM data_init_view
            WHERE DOLocationID = "264"
            OR DOLocationID = "265"
            GROUP by DOLocationID
            ORDER BY count DESC
        """).show()

+------------+------+
|DOLocationID| count|
+------------+------+
|         264|149094|
|         265| 16817|
+------------+------+



In [52]:
# count where pu location was 264 unknown as was dropoff
spark.sql("""
            SELECT DOLocationID, count(DOLocationID) AS count
            FROM data_init_view
            WHERE PULocationID = "264"
            AND DOLocationID = "264"
            GROUP by DOLocationID
            ORDER BY count DESC
        """).show()

+------------+------+
|DOLocationID| count|
+------------+------+
|         264|138614|
+------------+------+



In [281]:
# count where pu location was 264 unknown as was dropoff
spark.sql("""
            SELECT DOLocationID, count(DOLocationID) AS count
            FROM data_init_view
            WHERE PULocationID = "265"
            AND DOLocationID = "265"
            GROUP by DOLocationID
            ORDER BY count DESC
        """).show()

+------------+-----+
|DOLocationID|count|
+------------+-----+
|         265| 3034|
+------------+-----+



In [53]:
# where pickup was other unknown but dropoff was not
spark.sql("""
            SELECT *
            FROM data_init_view
            WHERE PULocationID = "265"
            AND DOLocationID != "264"
            AND DOLocationID != "265"
        """).count()

811

In [54]:
# where dropoff was  unknown but pickup was not
spark.sql("""
            SELECT *
            FROM data_init_view
            WHERE DOLocationID = "265"
            AND PULocationID != "264"
            AND PULocationID != "265"
        """).count()

13316

In [55]:
spark.sql("""
            SELECT *
            FROM data_init_view
            WHERE DOLocationID = "264"
            AND PULocationID != "264"
            AND PULocationID != "265"
        """).count()

10454

In [56]:
spark.sql("""
            SELECT *
            FROM data_init_view
            WHERE DOLocationID = "265"
            AND PULocationID = "265"
        """).count()

3034

In [57]:
data.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)
 |-- trip_type: string (nullable = false)
 |-- ehail_fee: string (nullable = false)
 |-- colour: string (nullable = false)



In [58]:
spark.catalog.dropTempView("data_init_view")

## Remove duplicate rows

In [59]:
dataunique = data.distinct()

In [60]:
dataU = dataunique
dataU.count()

7667792

## Initial Cleaning
Duplicates?
    - Run Distinct
### Outcomes from Initial EDA
1. Drop ehail_fee
2. Drop ratecode=99
3. Convert store_and_fwd_flag to bool
4. Drop bad dates
    - Low pickup_datetime<2019-01-01 00:00:00
    - High pickup_datetime>2019-01-31 23:59:59
    - Low dropoff_datetime<2019-01-01 00:00:00
    - High dropoff_datetime>2019-01-31 23:59:59
### Datatypes 
  - Initially all Strings 

In [61]:
dataU.createOrReplaceTempView("data_u_view")

In [62]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW clean AS (
                SELECT *
                FROM data_u_view
                WHERE RatecodeID != "99"
                AND trip_type IS NOT NULL
                AND PULocationID != "265"
                AND pickup_datetime BETWEEN "2019-01-01 00:00:00" AND "2019-01-31 23:59:59"
                AND dropoff_datetime BETWEEN "2019-01-01 00:00:00" AND "2019-01-02 23:59:59"
            )
        """)

DataFrame[]

In [63]:
spark.sql("""       
            CREATE OR REPLACE TEMP VIEW clean2 AS (
            SELECT VendorID, 
                pickup_datetime,
                dropoff_datetime,
                passenger_count,
                trip_distance,
                RatecodeID,
                PULocationID,
                DOLocationID,
                payment_type,
                fare_amount,
                extra,
                mta_tax,
                tip_amount,
                tolls_amount,
                improvement_surcharge,
                total_amount,
            CASE WHEN store_and_fwd_flag = "Y" THEN "1"
            ELSE "0"
            END AS store_and_fwd_flag,
            CASE WHEN trip_type = "1" THEN "0"
            ELSE "0"
            END AS dispatched,
            CASE WHEN colour = "yellow" THEN "0"
            ELSE "0"
            END AS colour
            FROM clean
            )
        """)

DataFrame[]

In [64]:
dataC = spark.sql("SELECT * FROM clean2")

In [65]:
dataC.columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'store_and_fwd_flag',
 'dispatched',
 'colour']

## Cleck all columns for NA/Null
- NA
- N/A
- NAN
- NULL
- NIL
- " "

In [66]:
# common na strings as list
commonNA = ["NA", "N/A", "NAN", "NIL", "NULL", " "]

# columns from dataset
columns = dataC.columns

# blank output list
anil = []

# write a sql query string that converts any in the common na list in all columns in columns list to null using SQL NULLIF
# first by na string
for nval in commonNA:
    scol = []
    for col in columns:
        # constrcuts string for each column
        nif = f"NULLIF(UPPER({col}), UPPER('{nval}')) AS {col}"
        scol.append(nif)
    scol = ", ".join(scol)
    # puts all null strings into select statement
    scol = f"SELECT {scol} FROM clean2"
    anil.append(scol)

# check output
anil

["SELECT NULLIF(UPPER(VendorID), UPPER('NA')) AS VendorID, NULLIF(UPPER(pickup_datetime), UPPER('NA')) AS pickup_datetime, NULLIF(UPPER(dropoff_datetime), UPPER('NA')) AS dropoff_datetime, NULLIF(UPPER(passenger_count), UPPER('NA')) AS passenger_count, NULLIF(UPPER(trip_distance), UPPER('NA')) AS trip_distance, NULLIF(UPPER(RatecodeID), UPPER('NA')) AS RatecodeID, NULLIF(UPPER(PULocationID), UPPER('NA')) AS PULocationID, NULLIF(UPPER(DOLocationID), UPPER('NA')) AS DOLocationID, NULLIF(UPPER(payment_type), UPPER('NA')) AS payment_type, NULLIF(UPPER(fare_amount), UPPER('NA')) AS fare_amount, NULLIF(UPPER(extra), UPPER('NA')) AS extra, NULLIF(UPPER(mta_tax), UPPER('NA')) AS mta_tax, NULLIF(UPPER(tip_amount), UPPER('NA')) AS tip_amount, NULLIF(UPPER(tolls_amount), UPPER('NA')) AS tolls_amount, NULLIF(UPPER(improvement_surcharge), UPPER('NA')) AS improvement_surcharge, NULLIF(UPPER(total_amount), UPPER('NA')) AS total_amount, NULLIF(UPPER(store_and_fwd_flag), UPPER('NA')) AS store_and_fwd_f

In [67]:
# create a query for each na string, replaceing view each update
for query in anil:
    fq = f"CREATE OR REPLACE TEMP VIEW clean2 AS ({query})"
    spark.sql(fq)

In [68]:
# write to df
dataC = spark.sql("SELECT * FROM clean2")

In [69]:
dataC.columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'store_and_fwd_flag',
 'dispatched',
 'colour']

In [70]:
dataC3 = dataC

In [71]:
# dataC3 = spark.read.parquet("./data/clean2.parquet")
dataC3.createOrReplaceTempView("data_c3_view")

## Trip Length

In [72]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW trip_view AS (
            SELECT *, datediff(dropoff_datetime, pickup_datetime) as tripdays
            FROM data_c3_view
            )
        """)

DataFrame[]

In [73]:
tripClean = spark.sql("""
            SELECT *
            FROM trip_view
            WHERE tripdays <= 1
        """)

## Refunds
- Initially used just pickup datetime, dropoff datetime and fare_amount needs to include pickup and dropoff location and use total amount
    - no more triples
    - 420
- refund flag and remove duplicate record

In [74]:
dataTC = tripClean

In [75]:
#dataTC = dataTC.repartition(180) # increase from 36 due to shuffle spill
dataTC.createOrReplaceTempView("data_TC_view")

In [76]:
# count number of potential refunds
# use absolute to match negative totals with positives
spark.sql("""
            SELECT pickup_datetime, dropoff_datetime, PULocationID, DOLocationID, ABS(total_amount) AS fare, count(*) AS count
            FROM data_TC_view
            GROUP BY pickup_datetime, dropoff_datetime, PULocationID, DOLocationID, fare
            HAVING count > 1
            ORDER BY count DESC
            """).count()

420

In [77]:
# show examples
spark.sql("""
            SELECT pickup_datetime, dropoff_datetime, PULocationID, DOLocationID, ABS(total_amount) AS fare, count(*) AS count
            FROM data_TC_view
            GROUP BY pickup_datetime, dropoff_datetime, PULocationID, DOLocationID, fare
            HAVING count > 1
            ORDER BY count DESC
            """).show()

+-------------------+-------------------+------------+------------+----+-----+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|fare|count|
+-------------------+-------------------+------------+------------+----+-----+
|2019-01-02 12:51:51|2019-01-02 12:52:05|          93|          93| 3.8|    2|
|2019-01-01 18:38:10|2019-01-01 18:39:35|          75|          75|52.8|    2|
|2019-01-02 13:21:02|2019-01-02 13:24:38|         236|         239| 5.3|    2|
|2019-01-02 15:49:13|2019-01-02 15:55:43|         237|         238| 7.8|    2|
|2019-01-01 00:33:02|2019-01-01 00:36:38|         166|         151| 6.3|    2|
|2019-01-01 16:18:12|2019-01-01 16:30:03|         144|         158|10.8|    2|
|2019-01-01 14:48:32|2019-01-01 14:52:57|         209|         261| 5.3|    2|
|2019-01-01 04:50:18|2019-01-01 04:50:25|         264|         235|80.3|    2|
|2019-01-01 18:25:16|2019-01-01 18:25:24|         142|          43|52.8|    2|
|2019-01-02 17:32:52|2019-01-02 17:38:31|         23

In [78]:
# create view redistributing data to reduce later shuffle
spark.sql("""
            CREATE OR REPLACE TEMP VIEW data_TC_redis AS
            SELECT *
            FROM data_TC_view
            DISTRIBUTE BY PULocationID
            SORT BY DOLocationID, pickup_datetime, dropoff_datetime
            """)

DataFrame[]

In [79]:
# create view of duplicates / refunds
spark.sql("""
            CREATE OR REPLACE TEMP VIEW refunds AS
            SELECT pickup_datetime, dropoff_datetime, PULocationID, DOLocationID, ABS(total_amount) AS fare, count(*) AS count
            FROM data_TC_view
            GROUP BY PULocationID, DOLocationID, pickup_datetime, dropoff_datetime, fare
            HAVING count > 1
            DISTRIBUTE BY PULocationID
            SORT BY DOLocationID, pickup_datetime, dropoff_datetime
            """)

DataFrame[]

In [80]:
# create refund flag, when multiple flag is 1
refunds = spark.sql("""
            SELECT data_tc_redis.*, CASE WHEN refunds.count = "2" THEN "1"
                                    ELSE "0"
                                    END AS refunded_flag
            FROM data_tc_redis
            LEFT JOIN refunds
            ON data_tc_redis.PULocationID = refunds.PULocationID
            AND data_tc_redis.DOLocationID = refunds.DOLocationID
            AND data_tc_redis.pickup_datetime = refunds.pickup_datetime
            AND data_tc_redis.dropoff_datetime = refunds.dropoff_datetime
            """)

In [81]:
dataR = refunds

In [82]:
# dataR = spark.read.parquet("./data/refundFlagged.parquet")
dataR.createOrReplaceTempView("data_R_view")

In [83]:
# check number of "refunds"
spark.sql("""
            SELECT refunded_flag, count(refunded_flag)
            FROM data_R_view
            GROUP by refunded_flag
        """).show()

+-------------+--------------------+
|refunded_flag|count(refunded_flag)|
+-------------+--------------------+
|            0|              384221|
|            1|                 840|
+-------------+--------------------+



In [84]:
# inspect examples
spark.sql("""
            SELECT *
            FROM data_R_view
            WHERE refunded_flag = 1
            """).show()

+--------+-------------------+-------------------+---------------+-------------+----------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+------------------+----------+------+--------+-------------+
|VendorID|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|RatecodeID|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|store_and_fwd_flag|dispatched|colour|tripdays|refunded_flag|
+--------+-------------------+-------------------+---------------+-------------+----------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+------------------+----------+------+--------+-------------+
|       2|2019-01-02 19:40:58|2019-01-02 19:44:44|              1|          .51|         1|         237|         141|           2|        4.5|    1|    0.5|         0|     

In [85]:
# count where total amount is 0
spark.sql("""
            SELECT *
            FROM data_R_view
            WHERE refunded_flag = 1
            AND NOT total_amount < "0"
            AND NOT total_amount > "0"           
            """).count()

0

In [88]:
# some equal fares so need to investigate
equalfares = spark.sql("""
                        SELECT pickup_datetime, dropoff_datetime, PULocationID, DOLocationID, total_amount, count(*) AS count
                        FROM data_R_view
                        GROUP BY PULocationID, DOLocationID, pickup_datetime, dropoff_datetime, total_amount
                        HAVING count > 1
                        """)
# equalfares.cache()
equalfares.createOrReplaceTempView("dups_view")

In [89]:
equalfares.count()

0

In [90]:
# use left join exlusive to remove furhter duplicates
removedups = spark.sql("""
            SELECT data_R_view.*
            FROM data_R_view
            LEFT JOIN dups_view
            ON data_R_view.PULocationID = dups_view.PULocationID
            AND data_R_view.DOLocationID = dups_view.DOLocationID
            AND data_R_view.pickup_datetime = dups_view.pickup_datetime
            AND data_R_view.dropoff_datetime = dups_view.dropoff_datetime
            WHERE dups_view.PULocationID IS NULL
            AND dups_view.DOLocationID IS NULL
            AND dups_view.pickup_datetime IS NULL
            AND dups_view.dropoff_datetime IS NULL
            """)

In [91]:
datarfd = removedups

In [92]:
datarfd.createOrReplaceTempView("rem_dups_view")

In [93]:
spark.sql("""
            SELECT pickup_datetime, dropoff_datetime, PULocationID, DOLocationID, total_amount, count(*) AS count
            FROM rem_dups_view
            GROUP BY PULocationID, DOLocationID, pickup_datetime, dropoff_datetime, total_amount
            HAVING count > 1
            """).show()

+---------------+----------------+------------+------------+------------+-----+
|pickup_datetime|dropoff_datetime|PULocationID|DOLocationID|total_amount|count|
+---------------+----------------+------------+------------+------------+-----+
+---------------+----------------+------------+------------+------------+-----+



In [94]:
removeRefund = spark.sql("""
            SELECT *
            FROM rem_dups_view
            WHERE NOT (refunded_flag = "1" AND total_amount < "0")
            """)

In [95]:
# check impact
removeRefund.count()

384641

## Check negative values again

In [96]:
# datarfd = spark.read.parquet("./data/refunds-cleaned.parquet")
datarfd = removeRefund
datarfd.createOrReplaceTempView("ref_clean_view")

In [97]:
# extra
spark.sql("""
            SELECT extra, count(extra) AS count
            FROM ref_clean_view
            GROUP by extra
            ORDER BY count DESC
        """).show()

+-----+------+
|extra| count|
+-----+------+
|    0|204848|
|  0.5|126463|
|    1| 50785|
|  4.5|  2486|
| -0.5|    12|
|  2.5|    11|
|  0.8|    10|
| 17.5|     9|
|    3|     5|
|  1.3|     3|
|  1.8|     2|
| 18.5|     2|
|   -1|     1|
|   18|     1|
|  3.5|     1|
|  5.3|     1|
| -4.5|     1|
+-----+------+



In [98]:
# mta_tax 

spark.sql("""
            SELECT mta_tax, count(mta_tax) AS count
            FROM ref_clean_view
            GROUP by mta_tax
            ORDER BY count DESC
        """).show()

+-------+------+
|mta_tax| count|
+-------+------+
|    0.5|382200|
|      0|  2417|
|   -0.5|    24|
+-------+------+



In [99]:
spark.sql("""
            SELECT improvement_surcharge, count(improvement_surcharge) AS count
            FROM ref_clean_view
            GROUP by improvement_surcharge
            ORDER BY count DESC
        """).show()

+---------------------+------+
|improvement_surcharge| count|
+---------------------+------+
|                  0.3|384455|
|                    0|   160|
|                 -0.3|    26|
+---------------------+------+



## Drop low qty out of range values

In [100]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW rem_one AS
            SELECT *
            FROM ref_clean_view
            WHERE mta_tax = "0.5"
            OR mta_tax = "0"
        """)

DataFrame[]

In [108]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW rem_two AS
            SELECT *
            FROM rem_one
            WHERE improvement_surcharge = "0.3"
            OR improvement_surcharge = "0"
        """)

DataFrame[]

In [102]:
cols = spark.sql("""
            SELECT *
            FROM rem_two
        """)

In [103]:
cols.columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'store_and_fwd_flag',
 'dispatched',
 'colour',
 'tripdays',
 'refunded_flag']

In [104]:
cols.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dispatched: string (nullable = true)
 |-- colour: string (nullable = true)
 |-- tripdays: integer (nullable = true)
 |-- refunded_flag: string (nullable = false)



## convert mta_tax and improvement_surchage to bools

In [111]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW clean_one AS
            SELECT VendorID,
                    pickup_datetime,
                    dropoff_datetime,
                    passenger_count,
                    trip_distance,
                    RatecodeID,
                    PULocationID,
                    DOLocationID,
                    payment_type,
                    fare_amount,
                    extra,
                    tip_amount,
                    tolls_amount,
                    total_amount,
                    store_and_fwd_flag,
                    dispatched,
                    colour,
                    tripdays,
                    refunded_flag,
                    CASE WHEN mta_tax = "0.5" THEN "1"
                    ELSE "0"
                    END AS mta_tax,
                    CASE WHEN improvement_surcharge = "0.3" THEN "1"
                    ELSE "0"
                    END AS improvement_surcharge
            FROM rem_two
        """)

DataFrame[]

## Convert Datatypes

In [119]:
# cleaner data enable use of smaller impact datatypes
spark.sql("""
            CREATE OR REPLACE TEMP VIEW clean_two AS
            SELECT TINYINT(VendorID),
                        TIMESTAMP(pickup_datetime),
                        TIMESTAMP(dropoff_datetime),
                        TINYINT(passenger_count),
                        FLOAT(trip_distance),
                        TINYINT(RatecodeID),
                        SMALLINT(PULocationID),
                        SMALLINT(DOLocationID),
                        TINYINT(payment_type),
                        FLOAT(fare_amount),
                        FLOAT(extra),
                        FLOAT(tip_amount),
                        FLOAT(tolls_amount),
                        FLOAT(total_amount),
                        BOOLEAN(store_and_fwd_flag),
                        BOOLEAN(dispatched),
                        BOOLEAN(colour) AS yellow,
                        tripdays,
                        BOOLEAN(refunded_flag),
                        BOOLEAN(mta_tax),
                        BOOLEAN(improvement_surcharge)
            FROM clean_one
        """)

DataFrame[]

In [120]:
out_clean = spark.sql("""
                            SELECT *
                            FROM clean_two
                        """)

In [121]:
# datainitclean = spark.read.parquet("./data/out_init_clean.parquet")
datainitclean = out_clean
datainitclean.createOrReplaceTempView("init_clean_view")

In [122]:
# need to convert floats to decimals for arithmetic

spark.sql("""
            CREATE OR REPLACE TEMP VIEW ic_two_view AS
            SELECT VendorID,
                    pickup_datetime,
                    dropoff_datetime,
                    passenger_count,
                    CAST(trip_distance AS DECIMAL(10,3)),
                    RatecodeID,
                    PULocationID,
                    DOLocationID,
                    payment_type,
                    CAST(fare_amount AS DECIMAL(10,3)),
                    CAST(extra AS DECIMAL(10,3)),
                    CAST(tip_amount AS DECIMAL(10,3)),
                    CAST(tolls_amount AS DECIMAL(10,3)),
                    CAST(total_amount AS DECIMAL(10,3)),
                    store_and_fwd_flag,
                    dispatched,
                    yellow,
                    tripdays,
                    refunded_flag,
                    mta_tax,
                    improvement_surcharge
            FROM init_clean_view
        """)

DataFrame[]

In [123]:
spark.sql("""
            SELECT yellow, count(yellow) AS count
            FROM ic_two_view
            GROUP BY yellow
            ORDER BY count DESC
            """).show()

+------+------+
|yellow| count|
+------+------+
| false|384615|
+------+------+



## Total_amount investigation

In [124]:
spark.sql("""
            WITH calc AS (
                WITH conv AS (
                    SELECT fare_amount, extra, tip_amount, tolls_amount, total_amount,
                        CASE WHEN mta_tax = true THEN 0.5 ELSE 0 END AS tax,
                        CASE WHEN improvement_surcharge = true THEN 0.3 ELSE 0 END AS sur
                    FROM ic_two_view
                )
                SELECT CASE WHEN (fare_amount +
                    extra +
                    tax +
                    tip_amount +
                    tolls_amount +
                    sur ) = total_amount THEN 1
                ELSE 0
                END AS totals_equal
                FROM conv
            )
            SELECT totals_equal, count(totals_equal) AS count
            FROM calc
            GROUP BY totals_equal
            ORDER BY count DESC
            """).show()

+------------+------+
|totals_equal| count|
+------------+------+
|           1|383613|
|           0|  1002|
+------------+------+



In [125]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW summed AS
                WITH conv AS (
                    SELECT fare_amount, extra, tip_amount, tolls_amount, total_amount,
                        CASE WHEN mta_tax = true THEN 0.5 ELSE 0 END AS tax,
                        CASE WHEN improvement_surcharge = true THEN 0.3 ELSE 0 END AS sur
                    FROM ic_two_view
                )
                SELECT *, (fare_amount +
                    extra +
                    tax +
                    tip_amount +
                    tolls_amount +
                    sur ) AS sum_costs
                FROM conv
            """)

DataFrame[]

In [126]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW diffs AS
            SELECT *, total_amount - sum_costs AS totals_diff
            FROM summed
            """)

DataFrame[]

In [127]:
# check schema
spark.sql("""
            SELECT *
            FROM diffs
            WHERE totals_diff > "0"
            OR totals_diff < "0"
            """).printSchema()

root
 |-- fare_amount: decimal(10,3) (nullable = true)
 |-- extra: decimal(10,3) (nullable = true)
 |-- tip_amount: decimal(10,3) (nullable = true)
 |-- tolls_amount: decimal(10,3) (nullable = true)
 |-- total_amount: decimal(10,3) (nullable = true)
 |-- tax: decimal(11,1) (nullable = false)
 |-- sur: decimal(11,1) (nullable = false)
 |-- sum_costs: decimal(17,3) (nullable = true)
 |-- totals_diff: decimal(18,3) (nullable = true)



## Further EDA Driven Cleaning
- Total sum error flag, remove values where not equal to; total_amount or, +1.95
- drop where extra != 0, 0.5 or 1
- drop distances > 60
    - drop distances <= 0
- drop times > 120mins
    - drop times <= 0
- drop fare amount > 260
    - drop fare_amount <= 0
- split out location 264 + 265
- split out RatecodeID = 2 (airport)
- drop RatecodeID = 6
- drop tip > 40
    - drop tip < 0

In [128]:
# datainitclean = spark.read.parquet("./data/out_init_clean.parquet")
datainitclean.createOrReplaceTempView("init_clean_view")

In [137]:
# need to convert floats to decimals for arithmetic

spark.sql("""
            CREATE OR REPLACE TEMP VIEW ic_two_view AS
            SELECT VendorID,
                    pickup_datetime,
                    dropoff_datetime,
                    passenger_count,
                    CAST(trip_distance AS DECIMAL(10,3)),
                    RatecodeID,
                    PULocationID,
                    DOLocationID,
                    payment_type,
                    CAST(fare_amount AS DECIMAL(10,3)),
                    CAST(extra AS DECIMAL(10,3)),
                    CAST(tip_amount AS DECIMAL(10,3)),
                    CAST(tolls_amount AS DECIMAL(10,3)),
                    CAST(total_amount AS DECIMAL(10,3)),
                    store_and_fwd_flag,
                    dispatched,
                    yellow,
                    tripdays,
                    refunded_flag,
                    mta_tax,
                    improvement_surcharge
            FROM init_clean_view
        """)

DataFrame[]

In [138]:
# add fee
spark.sql("""
            CREATE OR REPLACE TEMP VIEW conv AS
                    SELECT *,
                        CASE WHEN mta_tax = true THEN 0.5 ELSE 0 END AS mta_tax_fee,
                        CASE WHEN improvement_surcharge = true THEN 0.3 ELSE 0 END AS surchage_fee
                    FROM ic_two_view
            """)

DataFrame[]

In [139]:
# add sum
spark.sql("""
            CREATE OR REPLACE TEMP VIEW summed AS
                SELECT *, (fare_amount +
                    extra +
                    mta_tax_fee +
                    tip_amount +
                    tolls_amount +
                    surchage_fee ) AS sum_costs
                FROM conv
            """)

DataFrame[]

In [140]:
# add diffs
spark.sql("""
            CREATE OR REPLACE TEMP VIEW diffs AS
            SELECT *, total_amount - sum_costs AS totals_diff
            FROM summed
            """)

DataFrame[]

In [141]:
#add unix timestamp
spark.sql("""
            CREATE OR REPLACE TEMP VIEW trip_sec_view AS
            SELECT *, (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime)) AS trip_time_sec
            FROM diffs
            """)

DataFrame[]

In [142]:
# check impact of where difference is out of range
spark.sql("""
            SELECT totals_diff
            FROM trip_sec_view
            WHERE NOT (totals_diff = 1.95 OR totals_diff = 0)
            """).count()

116

In [143]:
# relativley low impact, filter as planned

spark.sql("""
            CREATE OR REPLACE TEMP VIEW clean_2_1_view AS
            SELECT *
            FROM trip_sec_view
            WHERE (totals_diff = 1.95 OR totals_diff = 0)
            AND (extra = 0 OR extra = 0.5 OR extra = 1)
            AND (trip_distance > 0 AND trip_distance <= 60)
            AND (trip_time_sec > 0 AND trip_time_sec <= (120*60))
            AND (fare_amount > 0 AND fare_amount <= 260)
            AND PULocationID != 264
            AND PULocationID != 265 
            AND DOLocationID != 264 
            AND DOLocationID != 265
            AND RatecodeID != 2
            AND RatecodeID != 6
            AND (tip_amount >= 0 AND tip_amount <= 40)
            """)

DataFrame[]

In [144]:
# create object from view
cleancontinuous = spark.sql("""
            SELECT *
            FROM clean_2_1_view
            """)

## Feature Engineering
- maxi taxi flag, passenger_count = 7-9
- med taxi flag, passenger_count = 4-6
- passengerless, passenger_count = 0
- short trip flag (same pickup and dropoff)
- total error flag (=1.95 out)
- trip_time_sec
- trip_time_min
- trip_time_hour
- avg_speed (mph)
- AUC trip_meta = distance x time
    - https://en.wikipedia.org/wiki/Absement
    - https://physics.stackexchange.com/questions/158425/which-physical-entities-equal-distance-times-time
- Time of dataset
    - year
    - month of dataset
    - week of dataset,
    - day of dataset
- of year
    - month
    - week
    - day
- day of month
- day of week
- hour of day
- min of hour
- min of day

- Add borough's

- add service zones

In [145]:
cleancontinuous.printSchema()

root
 |-- VendorID: byte (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: byte (nullable = true)
 |-- trip_distance: decimal(10,3) (nullable = true)
 |-- RatecodeID: byte (nullable = true)
 |-- PULocationID: short (nullable = true)
 |-- DOLocationID: short (nullable = true)
 |-- payment_type: byte (nullable = true)
 |-- fare_amount: decimal(10,3) (nullable = true)
 |-- extra: decimal(10,3) (nullable = true)
 |-- tip_amount: decimal(10,3) (nullable = true)
 |-- tolls_amount: decimal(10,3) (nullable = true)
 |-- total_amount: decimal(10,3) (nullable = true)
 |-- store_and_fwd_flag: boolean (nullable = true)
 |-- dispatched: boolean (nullable = true)
 |-- yellow: boolean (nullable = true)
 |-- tripdays: integer (nullable = true)
 |-- refunded_flag: boolean (nullable = true)
 |-- mta_tax: boolean (nullable = true)
 |-- improvement_surcharge: boolean (nullable = true)
 |-- mta_tax_fee: decimal(11,1) 

In [146]:
cleancontinuous.count()

360628

In [147]:
cleancontinuous.createOrReplaceTempView("clean_cont_view")

In [148]:
# create new flags for taxi size, no passneger, short trip and near two error
# calculate time related variables

spark.sql("""
            CREATE OR REPLACE TEMP VIEW eng_1_view AS
            SELECT *,
                BOOLEAN(CASE WHEN passenger_count BETWEEN 7 AND 9 THEN true
                ELSE false END) AS flag_maxi_taxi,
                BOOLEAN(CASE WHEN passenger_count BETWEEN 4 AND 6 THEN true
                ELSE false END) AS flag_lg_taxi,
                BOOLEAN(CASE WHEN passenger_count = 0 THEN true
                ELSE false END) AS flag_no_passenger,
                BOOLEAN(CASE WHEN PUlocationID = DOlocationID THEN true
                ELSE false END) AS flag_short_trip,
                BOOLEAN(CASE WHEN totals_diff = 1.95 THEN true
                ELSE false END) AS flag_near_two_error,
                CAST((trip_time_sec / 60) AS DECIMAL(20, 6)) AS trip_time_min,
                ((trip_time_sec * trip_distance)/2) AS trip_meta,
                DATEDIFF(pickup_datetime, "2017-01-01") AS PU_day_of_data,
                YEAR(pickup_datetime) AS PU_year,
                MONTH(pickup_datetime) AS PU_month_of_year,
                WEEKOFYEAR(pickup_datetime) AS PU_week_of_year,
                DAYOFMONTH(pickup_datetime) AS PU_day_of_month,
                DAYOFWEEK(pickup_datetime) AS PU_day_of_week,
                HOUR(pickup_datetime) AS PU_hour_of_day,
                MINUTE(pickup_datetime) AS PU_min_of_hour
            FROM clean_cont_view
            """)

DataFrame[]

In [149]:
# check structure of time output (0 or 24)
spark.sql("""
            SELECT DISTINCT PU_hour_of_day
            FROM eng_1_view
            ORDER BY PU_hour_of_day
            """).show(26)

+--------------+
|PU_hour_of_day|
+--------------+
|             0|
|             1|
|             2|
|             3|
|             4|
|             5|
|             6|
|             7|
|             8|
|             9|
|            10|
|            11|
|            12|
|            13|
|            14|
|            15|
|            16|
|            17|
|            18|
|            19|
|            20|
|            21|
|            22|
|            23|
+--------------+



In [150]:
# calculate derived time variables
spark.sql("""
            CREATE OR REPLACE TEMP VIEW eng_2_view AS
            SELECT *,
              (trip_time_min / 60) AS trip_time_hour,
              CASE WHEN PU_year = 2019 THEN 0
              ELSE 1 END AS PU_year_of_data,
              (PU_min_of_hour + (PU_hour_of_day * 60)) AS PU_min_of_day
            FROM eng_1_view
            """)

DataFrame[]

In [151]:
# further derived time variables and speed
spark.sql("""
            CREATE OR REPLACE TEMP VIEW eng_3_view AS
            SELECT *,
                (trip_distance / trip_time_hour) AS trip_avg_speed,
                (PU_month_of_year + (PU_year_of_data * 12)) AS PU_month_of_data,
                (PU_week_of_year + (PU_year_of_data * 52)) AS PU_week_of_data,
                (PU_day_of_data - (PU_year_of_data * 365)) AS PU_day_of_year
            FROM eng_2_view
            """)

DataFrame[]

In [152]:
firstVarEng = spark.sql("""
                SELECT *
                FROM eng_3_view
                """)

In [153]:
firstVarEng.printSchema()

root
 |-- VendorID: byte (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: byte (nullable = true)
 |-- trip_distance: decimal(10,3) (nullable = true)
 |-- RatecodeID: byte (nullable = true)
 |-- PULocationID: short (nullable = true)
 |-- DOLocationID: short (nullable = true)
 |-- payment_type: byte (nullable = true)
 |-- fare_amount: decimal(10,3) (nullable = true)
 |-- extra: decimal(10,3) (nullable = true)
 |-- tip_amount: decimal(10,3) (nullable = true)
 |-- tolls_amount: decimal(10,3) (nullable = true)
 |-- total_amount: decimal(10,3) (nullable = true)
 |-- store_and_fwd_flag: boolean (nullable = true)
 |-- dispatched: boolean (nullable = true)
 |-- yellow: boolean (nullable = true)
 |-- tripdays: integer (nullable = true)
 |-- refunded_flag: boolean (nullable = true)
 |-- mta_tax: boolean (nullable = true)
 |-- improvement_surcharge: boolean (nullable = true)
 |-- mta_tax_fee: decimal(11,1) 

## Add location groupings

In [154]:
firstVarEng.createOrReplaceTempView("eng_3_view")

In [155]:
firstVarEngCluster = spark.sql("""
                                SELECT *
                                FROM eng_3_view
                                CLUSTER BY PUlocationID, DOlocationID
                                """)

In [156]:
firstVarEngCluster.createOrReplaceTempView("eng_3_view_cluster")

In [157]:
taxizone = spark.read.csv("Dataset/taxi+_zone_lookup.csv", header = True)
# minimze partitons of small dataset to reduce shuffle (similar to broadcast)
taxizone.repartition(1)

DataFrame[LocationID: string, Borough: string, Zone: string, service_zone: string]

In [158]:
taxizone.createOrReplaceTempView("taxizone")

In [159]:
# sort to reduce shuffle
spark.sql("""
        SELECT *
        FROM taxizone
        WHERE LocationID != 264
        AND LocationID != 265
        SORT BY LocationID
        """)

DataFrame[LocationID: string, Borough: string, Zone: string, service_zone: string]

In [160]:
spark.sql("""
        CREATE OR REPLACE TEMP VIEW PUtaxizone AS
        SELECT LocationID AS PULocationID, Borough AS PUBorough, Zone AS PUZone, service_zone AS PUServiceZone
        FROM taxizone
        SORT BY PULocationID
        """)

DataFrame[]

In [161]:
spark.sql("""
        CREATE OR REPLACE TEMP VIEW DOtaxizone AS
        SELECT LocationID AS DOLocationID, Borough AS DOBorough, Zone AS DOZone, service_zone AS DOServiceZone
        FROM taxizone
        SORT BY DOLocationID
        """)

DataFrame[]

In [162]:
spark.sql("""
        CREATE OR REPLACE TEMP VIEW PUjoin AS
        SELECT eng_3_view_cluster.*, PUtaxizone.PUBorough, PUtaxizone.PUZone, PUtaxizone.PUServiceZone
        FROM eng_3_view_cluster
        LEFT JOIN PUtaxizone
        ON eng_3_view_cluster.PULocationID = PUtaxizone.PULocationID
        """)

DataFrame[]

In [163]:
spark.sql("""
        CREATE OR REPLACE TEMP VIEW DOjoin AS
        SELECT PUjoin.*, DOtaxizone.DOBorough, DOtaxizone.DOZone, DOtaxizone.DOServiceZone
        FROM PUjoin
        LEFT JOIN DOtaxizone
        ON PUjoin.DOLocationID = DOtaxizone.DOLocationID
        """)

DataFrame[]

In [164]:
zonejoined = spark.sql("""
                        SELECT *
                        FROM DOjoin
                        """)

In [166]:
zonejoined.printSchema()

root
 |-- VendorID: byte (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: byte (nullable = true)
 |-- trip_distance: decimal(10,3) (nullable = true)
 |-- RatecodeID: byte (nullable = true)
 |-- PULocationID: short (nullable = true)
 |-- DOLocationID: short (nullable = true)
 |-- payment_type: byte (nullable = true)
 |-- fare_amount: decimal(10,3) (nullable = true)
 |-- extra: decimal(10,3) (nullable = true)
 |-- tip_amount: decimal(10,3) (nullable = true)
 |-- tolls_amount: decimal(10,3) (nullable = true)
 |-- total_amount: decimal(10,3) (nullable = true)
 |-- store_and_fwd_flag: boolean (nullable = true)
 |-- dispatched: boolean (nullable = true)
 |-- yellow: boolean (nullable = true)
 |-- tripdays: integer (nullable = true)
 |-- refunded_flag: boolean (nullable = true)
 |-- mta_tax: boolean (nullable = true)
 |-- improvement_surcharge: boolean (nullable = true)
 |-- mta_tax_fee: decimal(11,1) 

In [167]:
zonejoined = zonejoined.drop('trip_avg_speed')
zonejoined.createOrReplaceTempView("zonejoined_view")

In [168]:
# recast speed
spark.sql("""
        CREATE OR REPLACE TEMP VIEW checknew_view AS
        SELECT *, CAST(ROUND((trip_distance / trip_time_hour), 2) AS DECIMAL(10,2)) AS trip_avg_speed
        FROM zonejoined_view
        """)

DataFrame[]

In [169]:
spark.sql("""
        SELECT trip_avg_speed
        FROM checknew_view
        """).summary().show()

+-------+----------------+
|summary|  trip_avg_speed|
+-------+----------------+
|  count|          360628|
|   mean|       14.783688|
| stddev|125.787304080096|
|    min|            0.02|
|    25%|            9.06|
|    50%|           11.75|
|    75%|           15.75|
|    max|        32040.32|
+-------+----------------+



In [170]:
avgspeed = spark.sql("""
                    SELECT trip_avg_speed, fare_amount, pickup_datetime, PU_week_of_data
                    FROM checknew_view
                    """).sample(fraction = 0.001).toPandas()

In [319]:
#import plotly.express as px
#avgspeedFilter = avgspeed[avgspeed['trip_avg_speed'] < 60]
#px.histogram(avgspeedFilter, x = "trip_avg_speed")

In [318]:
#px.scatter(avgspeedFilter, x = "trip_avg_speed", y = "fare_amount")

In [317]:
#avgspeedFilterWeek = avgspeedFilter[avgspeedFilter['PU_week_of_data'] == 48]
#px.scatter(avgspeedFilterWeek, x = "pickup_datetime", y = "trip_avg_speed")

## Check trip_meta
Intended to capture relationship between time and distance

In [178]:
checkmeta = spark.sql("""
                        SELECT *
                        FROM checknew_view
                        """).drop("trip_meta")

In [179]:
checkmeta.createOrReplaceTempView("checkmeta_view")

In [180]:
spark.sql("""
        CREATE OR REPLACE TEMP VIEW tripmeta AS
        SELECT *,
        ((trip_time_min * trip_distance)/2) AS trip_meta
        FROM checkmeta_view
        """)

DataFrame[]

In [182]:
tripmeta = spark.sql("""
                    SELECT trip_meta, fare_amount, pickup_datetime, PU_week_of_data, RatecodeID, PUBorough, yellow
                    FROM tripmeta
                    """).sample(fraction = 0.001).toPandas()

In [284]:
tripmeta.count()

trip_meta          347
fare_amount        347
pickup_datetime    347
PU_week_of_data    347
RatecodeID         347
PUBorough          347
yellow             347
dtype: int64

## Export the combined one month worth of data into a single file with an optimised file format.
- structure

    - yellow (colour) - RatecodeID
          \- Year of data
              \- Month of data
                  \- (bucket) day of data
- clean using new metrics
    - 0 < speed < 60
    - fare > 2.5 https://www1.nyc.gov/site/tlc/passengers/taxi-fare.page

In [187]:
featureout = spark.sql("""
                    SELECT *
                    FROM tripmeta
                    WHERE fare_amount >= 2.5
                    AND trip_avg_speed > 0
                    AND trip_avg_speed <= 60
                    CLUSTER BY yellow, RatecodeID, PU_year_of_data, PU_month_of_data, pickup_datetime
                    """)

In [188]:
featureout.write.partitionBy("yellow","RatecodeID","PU_year_of_data", "PU_month_of_data").parquet("./data/featureout.parquet")

In [189]:
featureout = spark.read.parquet("./data/featureout.parquet")

In [190]:
featureout.printSchema()

root
 |-- VendorID: byte (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: byte (nullable = true)
 |-- trip_distance: decimal(10,3) (nullable = true)
 |-- PULocationID: short (nullable = true)
 |-- DOLocationID: short (nullable = true)
 |-- payment_type: byte (nullable = true)
 |-- fare_amount: decimal(10,3) (nullable = true)
 |-- extra: decimal(10,3) (nullable = true)
 |-- tip_amount: decimal(10,3) (nullable = true)
 |-- tolls_amount: decimal(10,3) (nullable = true)
 |-- total_amount: decimal(10,3) (nullable = true)
 |-- store_and_fwd_flag: boolean (nullable = true)
 |-- dispatched: boolean (nullable = true)
 |-- tripdays: integer (nullable = true)
 |-- refunded_flag: boolean (nullable = true)
 |-- mta_tax: boolean (nullable = true)
 |-- improvement_surcharge: boolean (nullable = true)
 |-- mta_tax_fee: decimal(11,1) (nullable = true)
 |-- surchage_fee: decimal(11,1) (nullable = true)
 |-- sum_c

In [191]:
featureout.count()

360343

In [192]:
featureout.createOrReplaceTempView("start_view")

Day 1 of week = Sunday
## Answer the list of provided business questions
### a. For each year and month
#### i. What was the total number of trips?
##### Year (note:-Currently it is for only one month of the year 2019)

In [193]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW year_i AS
            SELECT PU_year, count(*) AS total_trips
            FROM start_view
            GROUP BY PU_year
            ORDER BY total_trips
            """)

spark.sql("""
            SELECT *
            FROM year_i
            """).show()

+-------+-----------+
|PU_year|total_trips|
+-------+-----------+
|   2019|     360343|
+-------+-----------+



##### Month

In [194]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW month_i AS
            SELECT PU_month_of_data, count(*) AS total_trips
            FROM start_view
            GROUP BY PU_month_of_data
            ORDER BY PU_month_of_data
            """)

spark.sql("""
            SELECT *
            FROM month_i
            """).show(26)

+----------------+-----------+
|PU_month_of_data|total_trips|
+----------------+-----------+
|               1|     360343|
+----------------+-----------+



#### ii. Which weekday had the most trips?
##### Year

In [195]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW year_ii AS
            SELECT PU_year,
            CASE WHEN PU_day_of_week = 1 THEN "Sun"
            WHEN PU_day_of_week = 2 THEN "Mon"
            WHEN PU_day_of_week = 3 THEN "Tue"
            WHEN PU_day_of_week = 4 THEN "Wed"
            WHEN PU_day_of_week = 5 THEN "Thu"
            WHEN PU_day_of_week = 6 THEN "Fri"
            WHEN PU_day_of_week = 7 THEN "Sat"
            ELSE "unknown" END AS PU_day_most_trips,
            COUNT(*) AS PU_day_most_trips_total
            FROM start_view
            GROUP BY PU_year, PU_day_of_week
            HAVING PU_day_most_trips_total IN (
                SELECT MAX(count)
                    FROM (
                        SELECT PU_year, PU_day_of_week, COUNT(*) as count
                        FROM start_view
                        GROUP BY PU_year, PU_day_of_week
                    )
                GROUP BY PU_year
            )
            ORDER BY PU_year
            """)


spark.sql("""
            SELECT *
            FROM year_ii
            """).show()

+-------+-----------------+-----------------------+
|PU_year|PU_day_most_trips|PU_day_most_trips_total|
+-------+-----------------+-----------------------+
|   2019|              Wed|                 183833|
+-------+-----------------+-----------------------+



##### Month

In [196]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW month_ii AS
            SELECT PU_month_of_data,
            CASE WHEN PU_day_of_week = 1 THEN "Sun"
            WHEN PU_day_of_week = 2 THEN "Mon"
            WHEN PU_day_of_week = 3 THEN "Tue"
            WHEN PU_day_of_week = 4 THEN "Wed"
            WHEN PU_day_of_week = 5 THEN "Thu"
            WHEN PU_day_of_week = 6 THEN "Fri"
            WHEN PU_day_of_week = 7 THEN "Sat"
            ELSE "unknown" END AS PU_day_most_trips,
            COUNT(*) AS PU_day_most_trips_total
            FROM start_view
            GROUP BY PU_month_of_data, PU_day_of_week
            HAVING PU_day_most_trips_total IN (
                SELECT MAX(count)
                FROM (
                        SELECT PU_month_of_data, PU_day_of_week, COUNT(1) as count
                        FROM start_view
                        GROUP BY PU_month_of_data, PU_day_of_week
                     )
                GROUP BY PU_month_of_data
            )
            ORDER BY PU_month_of_data
            """)

spark.sql("""
            SELECT *
            FROM month_ii
            """).show(26)

+----------------+-----------------+-----------------------+
|PU_month_of_data|PU_day_most_trips|PU_day_most_trips_total|
+----------------+-----------------+-----------------------+
|               1|              Wed|                 183833|
+----------------+-----------------+-----------------------+



#### iii. Which hour of the day had the most trips?
##### Year

In [197]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW year_iii AS
            SELECT PU_year,
            PU_hour_of_day AS PU_hour_most_trips,
            COUNT(*) AS PU_hour_most_trips_total
            FROM start_view
            GROUP BY PU_year, PU_hour_of_day
            HAVING PU_hour_most_trips_total IN (
                SELECT MAX(count)
                    FROM (
                        SELECT PU_year, PU_hour_of_day, COUNT(*) as count
                        FROM start_view
                        GROUP BY PU_year, PU_hour_of_day
                    )
                GROUP BY PU_year
            )
            ORDER BY PU_year
            """)


spark.sql("""
            SELECT *
            FROM year_iii
            """).show()

+-------+------------------+------------------------+
|PU_year|PU_hour_most_trips|PU_hour_most_trips_total|
+-------+------------------+------------------------+
|   2019|                18|                   22239|
+-------+------------------+------------------------+



##### Month

In [198]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW month_iii AS
            SELECT PU_month_of_data,
            PU_hour_of_day AS PU_hour_most_trips,
            COUNT(*) AS PU_hour_most_trips_total
            FROM start_view
            GROUP BY PU_month_of_data, PU_hour_of_day
            HAVING PU_hour_most_trips_total IN (
                SELECT MAX(count)
                    FROM (
                        SELECT PU_month_of_data, PU_hour_of_day, COUNT(*) as count
                        FROM start_view
                        GROUP BY PU_month_of_data, PU_hour_of_day
                    )
                GROUP BY PU_month_of_data
            )
            ORDER BY PU_month_of_data
            """)

spark.sql("""
            SELECT *
            FROM month_iii
            """).show(26)

+----------------+------------------+------------------------+
|PU_month_of_data|PU_hour_most_trips|PU_hour_most_trips_total|
+----------------+------------------+------------------------+
|               1|                18|                   22239|
+----------------+------------------+------------------------+



#### iv. What was the average number of passengers?
##### Year

In [199]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW year_iv AS
            SELECT PU_year, AVG(passenger_count) AS average_passenger_count
            FROM start_view
            GROUP BY PU_year
            ORDER BY PU_year
            """)


spark.sql("""
            SELECT *
            FROM year_iv
            """).show()

+-------+-----------------------+
|PU_year|average_passenger_count|
+-------+-----------------------+
|   2019|     1.6472860580058444|
+-------+-----------------------+



##### Month

In [200]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW month_iv AS
            SELECT PU_month_of_data, AVG(passenger_count) AS average_passenger_count
            FROM start_view
            GROUP BY PU_month_of_data
            ORDER BY PU_month_of_data
            """)

spark.sql("""
            SELECT *
            FROM month_iv
            """).show(26)

+----------------+-----------------------+
|PU_month_of_data|average_passenger_count|
+----------------+-----------------------+
|               1|     1.6472860580058444|
+----------------+-----------------------+



#### v. What was the average amount paid per trip (total_amount)?
##### Year

In [201]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW year_v AS
            SELECT PU_year, AVG(total_amount) AS average_total_amount
            FROM start_view
            GROUP BY PU_year
            ORDER BY PU_year
            """)


spark.sql("""
            SELECT *
            FROM year_v
            """).show()

+-------+--------------------+
|PU_year|average_total_amount|
+-------+--------------------+
|   2019|          14.6329136|
+-------+--------------------+



##### Month

In [202]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW month_v AS
            SELECT PU_month_of_data, AVG(total_amount) AS average_total_amount
            FROM start_view
            GROUP BY PU_month_of_data
            ORDER BY PU_month_of_data
            """)

spark.sql("""
            SELECT *
            FROM month_v
            """).show(26)

+----------------+--------------------+
|PU_month_of_data|average_total_amount|
+----------------+--------------------+
|               1|          14.6329136|
+----------------+--------------------+



#### vi. What was the average amount paid per passenger (total_amount)?
##### Year

In [207]:
spark.sql("""
             CREATE OR REPLACE TEMP VIEW year_vi AS
             SELECT PU_year,
             CAST(AVG(total_amount / passenger_count) AS DECIMAL(6,3)) AS avg_total_per_passenger
             FROM (
                SELECT PU_year,
                total_amount,
                CASE WHEN passenger_count = 0 THEN 1
                ELSE passenger_count
                END AS passenger_count
                FROM start_view
             )
             GROUP BY PU_year
             ORDER BY PU_year
            """)

spark.sql("""
            SELECT *
            FROM year_vi
            """).show()

+-------+-----------------------+
|PU_year|avg_total_per_passenger|
+-------+-----------------------+
|   2019|                 11.748|
+-------+-----------------------+



##### Month

In [204]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW month_vi AS
             SELECT PU_month_of_data,
             CAST(AVG(total_amount / passenger_count) AS DECIMAL(6,3)) AS avg_total_per_passenger
             FROM (
                SELECT PU_month_of_data,
                total_amount,
                CASE WHEN passenger_count = 0 THEN 1
                ELSE passenger_count
                END AS passenger_count
                FROM start_view
             )
             GROUP BY PU_month_of_data
             ORDER BY PU_month_of_data
            """)

spark.sql("""
            SELECT *
            FROM month_vi
            """).show(26)

+----------------+-----------------------+
|PU_month_of_data|avg_total_per_passenger|
+----------------+-----------------------+
|               1|                 11.748|
+----------------+-----------------------+



#### b. For each taxi colour
##### i. What was the average, median, minimum and maximum trip duration in seconds?

In [210]:
spark.sql("""
            SELECT yellow,
            AVG(trip_time_sec) AS average_trip_sec,
            percentile_approx(trip_time_sec, 0.5) AS median_trip_sec,
            MIN(trip_time_sec) AS min_trip_sec,
            MAX(trip_time_sec) AS max_trip_sec
            FROM start_view
            GROUP BY yellow
            """).show()

+------+-----------------+---------------+------------+------------+
|yellow| average_trip_sec|median_trip_sec|min_trip_sec|max_trip_sec|
+------+-----------------+---------------+------------+------------+
| false|705.6616501499959|            574|           2|        7136|
+------+-----------------+---------------+------------+------------+



##### ii. What was the average, median, minimum and maximum trip distance in km?

In [212]:
spark.sql("""
            SELECT yellow,
            AVG(trip_distance * 1.609344) AS average_trip_distanc_km,
            percentile_approx(trip_distance * 1.609344, 0.5) AS median_trip_distance_km,
            MIN(trip_distance * 1.609344) AS min_trip_distance_km,
            MAX(trip_distance * 1.609344) AS max_trip_distance_km
            FROM start_view
            GROUP BY yellow
            """).show()

+------+-----------------------+-----------------------+--------------------+--------------------+
|yellow|average_trip_distanc_km|median_trip_distance_km|min_trip_distance_km|max_trip_distance_km|
+------+-----------------------+-----------------------+--------------------+--------------------+
| false|        4.5988111399017|              2.7358848|         0.016093440|        80.064864000|
+------+-----------------------+-----------------------+--------------------+--------------------+



#### iii. What was the average, median, minimum and maximum speed in km per hour?

In [213]:
spark.sql("""
            SELECT yellow,
            AVG((trip_distance * 1.609344) / trip_time_hour) AS average_trip_kmph,
            percentile_approx((trip_distance * 1.609344) / trip_time_hour, 0.5) AS median_trip_kmph,
            MIN((trip_distance * 1.609344) / trip_time_hour) AS min_trip_kmph,
            MAX((trip_distance * 1.609344) / trip_time_hour) AS max_trip_kmph
            FROM start_view
            GROUP BY yellow
            """).show()

+------+--------------------+------------------+--------------------+--------------------+
|yellow|   average_trip_kmph|  median_trip_kmph|       min_trip_kmph|       max_trip_kmph|
+------+--------------------+------------------+--------------------+--------------------+
| false|21.40461549665005...|18.892300095584897|0.029491669286134...|96.56067862427144...|
+------+--------------------+------------------+--------------------+--------------------+



#### c. What was the percentage of trips where the driver received tips?
    - ignores cash tips

In [214]:
spark.sql("""
            SELECT FORMAT_STRING("%s%%", 
            FORMAT_NUMBER(COUNT(CASE WHEN tip_amount > 0 THEN 1 ELSE null END) / COUNT(*) * 100, 2)
            ) AS tipped_trips_of_all_trips
            FROM start_view
            """).show()

+-------------------------+
|tipped_trips_of_all_trips|
+-------------------------+
|                   59.38%|
+-------------------------+



#### d. For trips where the driver received tips, What was the percentage where the driver received tips of at least $10.

In [215]:
spark.sql("""
            SELECT FORMAT_STRING("%s%%", 
            FORMAT_NUMBER(COUNT(
            CASE WHEN tip_amount > 10 THEN 1 ELSE null END) / COUNT(
            CASE WHEN tip_amount > 0 THEN 1 ELSE null END) * 100, 2)
            ) AS tip_greater_than_10_of_tipped_trips
            FROM start_view
            """).show()

+-----------------------------------+
|tip_greater_than_10_of_tipped_trips|
+-----------------------------------+
|                              1.21%|
+-----------------------------------+



#### e. Classify each trip into bins of durations:
- Under 5 Mins
- From 5 mins to 10 mins
- From 10 mins to 20 mins
- From 20 mins to 30 mins
- At least 30 mins

In [216]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW binned_trips_view AS
            SELECT *, CASE WHEN trip_time_min < 5 THEN "a-under-five"
                WHEN (trip_time_min >= 5 AND trip_time_min < 10) THEN "b-five-ten"
                WHEN (trip_time_min >= 10 AND trip_time_min < 20) THEN "c-ten-twenty"
                WHEN (trip_time_min >= 20 AND trip_time_min < 30) THEN "d-twenty-thirty"
                WHEN trip_time_min >= 30 THEN "e-over-thirty"
                ELSE "unknown" END AS trip_time_bins
            FROM start_view
            """)

DataFrame[]

#### e (cont). Then for each bins, calculate
- Average speed (km per hour)
- Average distance per dollar (km per $)
- extra:
    - Average mins per dollar
    - Volume

In [217]:
spark.sql("""
            SELECT trip_time_bins, ROUND(AVG((trip_distance * 1.609344) / trip_time_hour), 3) AS average_trip_kmph,
            ROUND(AVG((trip_distance * 1.609344) / total_amount), 3) AS average_trip_kmpdollar,
            ROUND(AVG(trip_time_min / total_amount), 3) AS average_trip_mins_per_dollar,
            COUNT(*) AS trip_volume
            FROM binned_trips_view
            GROUP BY trip_time_bins
            ORDER BY trip_time_bins
            """).show()

+---------------+-----------------+----------------------+----------------------------+-----------+
| trip_time_bins|average_trip_kmph|average_trip_kmpdollar|average_trip_mins_per_dollar|trip_volume|
+---------------+-----------------+----------------------+----------------------------+-----------+
|   a-under-five|           20.926|                 0.176|                       0.528|      69139|
|     b-five-ten|           18.603|                 0.236|                       0.790|     119764|
|   c-ten-twenty|           21.340|                 0.300|                       0.931|     120530|
|d-twenty-thirty|           28.058|                 0.366|                       0.911|      36813|
|  e-over-thirty|           30.726|                 0.404|                       0.909|      14097|
+---------------+-----------------+----------------------+----------------------------+-----------+



In [218]:
featureout.printSchema()

root
 |-- VendorID: byte (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: byte (nullable = true)
 |-- trip_distance: decimal(10,3) (nullable = true)
 |-- PULocationID: short (nullable = true)
 |-- DOLocationID: short (nullable = true)
 |-- payment_type: byte (nullable = true)
 |-- fare_amount: decimal(10,3) (nullable = true)
 |-- extra: decimal(10,3) (nullable = true)
 |-- tip_amount: decimal(10,3) (nullable = true)
 |-- tolls_amount: decimal(10,3) (nullable = true)
 |-- total_amount: decimal(10,3) (nullable = true)
 |-- store_and_fwd_flag: boolean (nullable = true)
 |-- dispatched: boolean (nullable = true)
 |-- tripdays: integer (nullable = true)
 |-- refunded_flag: boolean (nullable = true)
 |-- mta_tax: boolean (nullable = true)
 |-- improvement_surcharge: boolean (nullable = true)
 |-- mta_tax_fee: decimal(11,1) (nullable = true)
 |-- surchage_fee: decimal(11,1) (nullable = true)
 |-- sum_c

## END

In [283]:
featureout.createOrReplaceTempView("start_view")

In [220]:
sample = featureout.sample(fraction = 0.001, seed = 55)

In [221]:
sample.createOrReplaceTempView("sample_view")

In [222]:
# drop some uneeded cols
proc = sample.drop("PU_year", "tripdays")

In [223]:
procRateOne = proc.filter(proc.RatecodeID == 1)
procRateOne = procRateOne.withColumn("yellow", sparkle.col("yellow").astype(BooleanType()))

In [224]:
procRateOne.createOrReplaceTempView("procRateOne_view")

In [225]:
procRateOne = spark.sql("""
                        SELECT *, UNIX_TIMESTAMP(pickup_datetime) AS pickup_datetime_unix,
                        UNIX_TIMESTAMP(dropoff_datetime) AS dropoff_datetime_unix
                        FROM procRateOne_view
                        """)

In [249]:
cats = ["PULocationID",
        "DOLocationID",
        "PUBorough",
        "PUZone",
        "PUServiceZone",
        "DOBorough",
        "DOZone",
        "DOServiceZone"]

In [250]:
stages = []

In [251]:
# create stages to encode all categoricals through string indexer then to one hot encoder
for cat in cats:
    col_indexer = StringIndexer(inputCol=cat, outputCol=f"{cat}_ind")
    col_encoder = OneHotEncoderEstimator(inputCols=[f"{cat}_ind"], outputCols=[f"{cat}_ohe"])
    stages += [col_indexer, col_encoder]

In [252]:
cats_ohe = [f"{cat}_ohe" for cat in cats]

In [253]:
nums = ["VendorID",
        "pickup_datetime_unix",
        "dropoff_datetime_unix",
        "passenger_count",
        "trip_distance",
        "payment_type",
        "extra",
        "tip_amount",
        "tolls_amount",
        "store_and_fwd_flag",
        "dispatched",
        "refunded_flag",
        "mta_tax",
        "improvement_surcharge",
        "trip_time_sec",
        "flag_maxi_taxi",
        "flag_lg_taxi",
        "flag_no_passenger",
        "flag_short_trip",
        "flag_near_two_error",
        "trip_time_min",
        "PU_day_of_data",
        "trip_time_hour",
        "PU_week_of_data",
        "trip_avg_speed",
        "trip_meta",
        "yellow",
        "PU_year_of_data"]

In [254]:
# assemble using vector assembler into feature column
assembler = VectorAssembler(inputCols=cats_ohe + nums, outputCol="features")

In [255]:
# stages += [label, assembler]
stages += [assembler]

In [256]:
# create pipeline
pipeline = Pipeline(stages=stages)

In [257]:
# fit pipeline to  data
pipeline_model = pipeline.fit(procRateOne)

In [258]:
# transform dataset with pipeline (applying transformatons by column name)
pipeOut = pipeline_model.transform(procRateOne)

In [259]:
# create view so spark sql unix timestamp can be used to prevent missmatch timezones
pipeOut.createOrReplaceTempView("pipeout_view")

In [260]:
# change outcome to label as required by pyspark ml
train_data = spark.sql("""
                        SELECT features, total_amount AS label
                        FROM pipeout_view
                        WHERE pickup_datetime_unix < unix_timestamp("2019-01-31 00:00:00")
                        """)

test_data = spark.sql("""
                        SELECT features, total_amount AS label
                        FROM pipeout_view
                        WHERE pickup_datetime_unix >= unix_timestamp("2019-01-31 00:00:00")
                        """)

In [261]:
# show minimised dataset with features vector column and label output column
train_data.show()

+--------------------+------+
|            features| label|
+--------------------+------+
|(317,[10,100,140,...| 6.960|
|(317,[9,64,140,14...| 8.300|
|(317,[50,132,192,...|18.800|
|(317,[5,64,140,14...| 6.300|
|(317,[11,66,140,1...| 5.760|
|(317,[1,80,140,14...|42.580|
|(317,[9,84,140,14...|10.560|
|(317,[24,83,140,1...| 9.350|
|(317,[8,67,140,14...| 7.880|
|(317,[5,65,140,14...| 7.300|
|(317,[47,63,140,1...| 9.300|
|(317,[30,106,140,...|10.700|
|(317,[15,120,141,...|37.850|
|(317,[11,67,140,1...| 8.760|
|(317,[15,86,141,1...|51.060|
|(317,[41,81,140,1...| 8.400|
|(317,[40,67,140,1...|11.750|
|(317,[36,100,140,...| 8.750|
|(317,[12,79,140,1...|17.760|
|(317,[26,85,140,1...|15.950|
+--------------------+------+
only showing top 20 rows



In [262]:
glm = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=5, regParam=0.3)

In [263]:
model = glm.fit(train_data)

In [264]:
# create predictions
train_preds = model.transform(train_data)
test_preds = model.transform(test_data)

In [265]:
evaluator = RegressionEvaluator(metricName='rmse')

In [268]:
# inpsect
train_preds.show()

+--------------------+------+------------------+
|            features| label|        prediction|
+--------------------+------+------------------+
|(317,[10,100,140,...| 6.960|7.1537970440240315|
|(317,[9,64,140,14...| 8.300|  8.34966574691316|
|(317,[50,132,192,...|18.800|18.808501453742792|
|(317,[5,64,140,14...| 6.300| 6.545561579327114|
|(317,[11,66,140,1...| 5.760| 5.342554414551159|
|(317,[1,80,140,14...|42.580| 43.27442648695251|
|(317,[9,84,140,14...|10.560|10.835104541624787|
|(317,[24,83,140,1...| 9.350| 9.341703361595137|
|(317,[8,67,140,14...| 7.880|  7.30642526538395|
|(317,[5,65,140,14...| 7.300| 7.772162316757203|
|(317,[47,63,140,1...| 9.300| 9.315792287501608|
|(317,[30,106,140,...|10.700|10.548232054940854|
|(317,[15,120,141,...|37.850|37.851803495009335|
|(317,[11,67,140,1...| 8.760| 6.967902820775635|
|(317,[15,86,141,1...|51.060| 49.47836953049682|
|(317,[41,81,140,1...| 8.400| 8.319851729401307|
|(317,[40,67,140,1...|11.750|10.988011618248493|
|(317,[36,100,140,..

In [269]:
test_vis = test_preds.toPandas()