In [1]:
import boto3
import os
import re
from tqdm import tqdm
import sys
from pyspark.sql.types import DateType, IntegerType, BooleanType, TimestampType, FloatType
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import pyspark.sql.functions as F
from pyspark.sql.functions import lit
import re

### Set Working Directory to Parent folder

In [2]:
os.getcwd()

'/home/jovyan/work/notebooks'

In [3]:
os.chdir('../')

# 1. Download Dataset

First step is to download the dataset from S3 - to do this, I used the boto3 library to access the public S3 data. I will be working with the 2017-18 data in this dataset.

I have developed a function that will check my data folder for the required CSVs and download missing CSVs.

In [4]:
s3 = boto3.client('s3')

In [5]:
def download_and_save_data(bucket = 'nyc-tlc',prefix = 'trip data',year_regex = '201[78]', data_folder = 'data'):
    """
    This function downloads and saves relevant data from the S3 bucket to my local machine.
    
    params:
    * s3 - a boto3.client('s3') object with relevant permissions
    * bucket - bucket of stored data
    * prefix - folder with relevant stored data
    * year_regex - the years for the project
    * data_folder - where I want to store the data
    """
    s3 = boto3.client('s3')
    print("Starting yellow cabs download")
    sys.stdout.write("[%s]" % (" " * 12))
    sys.stdout.flush()
    sys.stdout.write("\b" * (12+1)) # return to start of line, after '['
    contents = s3.list_objects(Bucket = bucket, Prefix = prefix)['Contents']
    #retrieve yellow cab keys and download file
    yellow_cab_keys = [i['Key'] for i in contents if ('yellow_tripdata' in i['Key'])&(bool(re.search(year_regex,i['Key'])))]
    for i in yellow_cab_keys:
        if (re.sub(".*/","",i) in os.listdir(data_folder+'/yellow_cabs/'))==False:
            s3.download_file('nyc-tlc',i,data_folder+'/yellow_cabs/'+re.sub(".*/","",i)) 
        sys.stdout.write("-")
        sys.stdout.flush()
    sys.stdout.write("]\n")
    print('Yellow Cabs completed')
    print("Starting green cabs download")
    sys.stdout.write("[%s]" % (" " * 12))
    sys.stdout.flush()
    sys.stdout.write("\b" * (12+1)) # return to start of line, after '['
    green_cab_keys = [i['Key'] for i in contents if ('green_tripdata' in i['Key'])&(bool(re.search(year_regex,i['Key'])))]
    for i in green_cab_keys:
        if (re.sub(".*/","",i) in os.listdir(data_folder+'/green_cabs/'))==False:
            s3.download_file('nyc-tlc',i,data_folder+'/green_cabs/'+re.sub(".*/","",i)) 
        sys.stdout.write("-")
        sys.stdout.flush()
    sys.stdout.write("]\n")
    print("Green Cabs completed")
    return "Data downloaded and saved"
        

In [6]:
download_and_save_data(bucket = 'nyc-tlc',prefix = 'trip data',year_regex = '201[78]', data_folder = 'data')

Starting yellow cabs download
[            ------------------------]
Yellow Cabs completed
Starting green cabs download
[            ------------------------]
Green Cabs completed


'Data downloaded and saved'

### Start Spark Session

In [None]:
spark = SparkSession.builder \
        .appName('assignment_1') \
        .getOrCreate() 

#### Check Versions

In [8]:
spark.version

'2.4.5'

In [9]:
spark._jvm.org.apache.hadoop.util.VersionInfo.getVersion()

'3.0.0'

### Load Data

In [11]:
g = ["data/green_cabs/" + i for i in  os.listdir('data/green_cabs')]
y = ["data/yellow_cabs/" + i for i in  os.listdir('data/yellow_cabs')]
paths = [i for y in [g,y] for i in y]

In [12]:
green = spark.read.format('csv').options(header ='true').load(g)

In [13]:
yellow = spark.read.format('csv').options(header ='true').load(y)

# 2. Data Cleaning & Feature Engineering

### i. Create a column for 'taxi_colour'

In [15]:
green = green.withColumn('taxi_colour', lit('green'))
yellow = yellow.withColumn('taxi_colour', lit('yellow'))

### ii. Create timestamp columns with same name in green and yellow datasets

In [16]:
green = green.withColumn('pickup_date', F.col('lpep_pickup_datetime').astype(TimestampType())).\
    withColumn("dropoff_date", F.col("lpep_dropoff_datetime").astype(TimestampType()))
yellow = yellow.withColumn('pickup_date', F.col('tpep_pickup_datetime').astype(TimestampType()) ).\
    withColumn("dropoff_date", F.col("tpep_dropoff_datetime").astype(TimestampType()))

### iii. Remove columns

Some columns have been removed from the dataset. Namely:
1. ehail_fee:  This is not in the data dictionary, but assume it is the fee for a cab to be 'hailed'. It does not exist in the yellow cabs dataset so therefore it should be excluded
2. Trip_type: only exists in the green taxi dataset - refers to whether a cab was dispatched or hailed
3. date/time stamps: these were transformed into new columns and had names aligned.
4. fair_amount: removed as per assignment brief

I have decided to leave 'ID' fields as strings for now, as they do not represent numbers but rather a categorical variable

In [17]:
cols =['VendorID',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'taxi_colour',
 'pickup_date',
 'dropoff_date']

In [18]:
green = green.select(cols)
yellow = yellow.select(cols)

### iv. Merge datasets
Merge the datasets into 1 dataframe, and view top 5 records and schema

In [19]:
dfs = green.union(yellow)

In [20]:
dfs.show(5)

+--------+------------------+----------+------------+------------+---------------+-------------+-----+-------+----------+------------+---------------------+------------+------------+-----------+-------------------+-------------------+
|VendorID|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|taxi_colour|        pickup_date|       dropoff_date|
+--------+------------------+----------+------------+------------+---------------+-------------+-----+-------+----------+------------+---------------------+------------+------------+-----------+-------------------+-------------------+
|       2|                 N|         1|         264|         264|              1|          .00|  0.2|      0|         0|           0|                    0|         1.7|           1|      green|2017-03-01 00:30:18|2017-03-01 00:30:47|
|       2|                 N|         1|          95|       

In [21]:
dfs.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- taxi_colour: string (nullable = false)
 |-- pickup_date: timestamp (nullable = true)
 |-- dropoff_date: timestamp (nullable = true)



### v. Convert DataTypes
Numeric datatypes need to be changed to a float or integer, categorical variables can remain as a string

In [10]:
dfs = spark.read.parquet('data/combined_cleaned_data.parquet')

In [11]:
dfs.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- store_and_fwd_flag: boolean (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- extra: float (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- taxi_colour: string (nullable = true)
 |-- pickup_date: timestamp (nullable = true)
 |-- dropoff_date: timestamp (nullable = true)



In [12]:
dfs= dfs.withColumn('passenger_count', F.col('passenger_count').astype(IntegerType())).\
    withColumn('extra', F.col('extra').astype(FloatType())).\
    withColumn('mta_tax', F.col('mta_tax').astype(FloatType())).\
    withColumn('improvement_surcharge', F.col('improvement_surcharge').astype(FloatType())).\
    withColumn('total_amount', F.col('total_amount').astype(FloatType())).\
    withColumn('trip_distance', F.col('trip_distance').astype(FloatType())).\
    withColumn('tip_amount', F.col('tip_amount').astype(FloatType())).\
    withColumn('tolls_amount', F.col('tolls_amount').astype(FloatType()))

### vi. Create Columns

In [13]:
dfs = dfs.withColumn('trip_duration',F.col("dropoff_date").cast("long") - F.col('pickup_date').cast("long"))

In [14]:
dfs.show(5)

+--------+------------------+----------+------------+------------+---------------+-------------+-----+-------+----------+------------+---------------------+------------+------------+-----------+-------------------+-------------------+-------------+
|VendorID|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|taxi_colour|        pickup_date|       dropoff_date|trip_duration|
+--------+------------------+----------+------------+------------+---------------+-------------+-----+-------+----------+------------+---------------------+------------+------------+-----------+-------------------+-------------------+-------------+
|       2|             false|         1|          66|          33|              5|         0.51|  0.5|    0.5|       0.7|         0.0|                  0.3|         6.0|           1|      green|2018-06-01 00:33:55|2018-06-01 00:36:13|          138|
|   

# 3. Save Data In Optimized Format

In [15]:
dfs.write.mode('overwrite').parquet('data/combined_data.parquet')

### 3a. Remove data that appears errornous
I decided to remove data that falls outside of the specified year ranges and where the trip duration is negative (i.e. drop off was before pick up).

In [None]:
dfs = spark.read.parquet('combined_data.parquet')

In [16]:
dfs.select("trip_duration").describe().show()

+-------+-----------------+
|summary|    trip_duration|
+-------+-----------------+
|  count|        236847300|
|   mean|1018.441442178146|
| stddev|206405.6298083972|
|    min|      -2911502804|
|    max|         45466304|
+-------+-----------------+



In [7]:
from pyspark.sql.functions import year

In [19]:
dfs.select(year("pickup_date")).describe().show()

+-------+-------------------+
|summary|  year(pickup_date)|
+-------+-------------------+
|  count|          236847300|
|   mean|  2017.471231265883|
| stddev|0.49917167113572786|
|    min|               2017|
|    max|               2018|
+-------+-------------------+



In [20]:
dfs_cleaned = dfs.where("YEAR(pickup_date) <2019 AND YEAR(pickup_date) >2016 AND trip_duration > 0")

In [21]:
dfs_cleaned.write.mode('overwrite').parquet('data/combined_clean_data.parquet')

#### check values after subset

In [8]:
dfs_cleaned.select(year("pickup_date")).describe().show()

+-------+-------------------+
|summary|  year(pickup_date)|
+-------+-------------------+
|  count|          236638069|
|   mean| 2017.4712503253227|
| stddev|0.49917276919436876|
|    min|               2017|
|    max|               2018|
+-------+-------------------+



In [9]:
spark.stop()

# 4. SQL Queries

In [1]:
import boto3
import os
import re
from tqdm import tqdm
import sys
from pyspark.sql.types import DateType, IntegerType, BooleanType, TimestampType, FloatType
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import pyspark.sql.functions as F
from pyspark.sql.functions import lit
import re

In [2]:
os.getcwd()


'/home/jovyan/work/notebooks'

In [3]:
os.chdir('../')

In [4]:
spark = SparkSession.builder \
        .appName('assignment_1') \
        .getOrCreate() 

In [5]:
dfs = spark.read.parquet('data/combined_clean_data.parquet')

In [6]:
dfs.show(5)

+--------+------------------+----------+------------+------------+---------------+-------------+-----+-------+----------+------------+---------------------+------------+------------+-----------+-------------------+-------------------+-------------+
|VendorID|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|taxi_colour|        pickup_date|       dropoff_date|trip_duration|
+--------+------------------+----------+------------+------------+---------------+-------------+-----+-------+----------+------------+---------------------+------------+------------+-----------+-------------------+-------------------+-------------+
|       2|             false|         1|         244|         116|              1|         1.77|  0.0|    0.5|       0.0|         0.0|                  0.3|         8.8|           2|     yellow|2017-03-13 09:09:07|2017-03-13 09:17:28|          501|
|   

## a. For each year and month:
### i. what was the total number of trips?

In [7]:
dfs.createOrReplaceTempView("combined_data")

In [5]:
query = open('sql/4a_i.sql','r').read()
print(query)

SELECT COUNT(*) as trips, MONTH(pickup_date) as month, YEAR(pickup_date) as year
FROM combined_data
GROUP BY MONTH(pickup_date), YEAR(pickup_date)
ORDER BY YEAR(pickup_date) desc,  MONTH(pickup_date) desc


In [9]:
spark.sql(query).show(24)

+--------+-----+----+
|   trips|month|year|
+--------+-----+----+
| 8850453|   12|2018|
| 8794310|   11|2018|
| 9520702|   10|2018|
| 8699790|    9|2018|
| 8508476|    8|2018|
| 8527273|    7|2018|
| 9445404|    6|2018|
|10013372|    5|2018|
|10097508|    4|2018|
|10258208|    3|2018|
| 9254724|    2|2018|
| 9545547|    1|2018|
|10405588|   12|2017|
|10148489|   11|2017|
|10684481|   10|2017|
| 9819538|    9|2017|
| 9281644|    8|2017|
| 9495191|    7|2017|
|10623800|    6|2017|
|11151727|    5|2017|
|11116768|    4|2017|
|11442309|    3|2017|
|10182399|    2|2017|
|10770368|    1|2017|
+--------+-----+----+



### ii. Which day of week had the most trips?

In [7]:
dfs.createOrReplaceTempView("combined_data")

In [6]:
query = open('sql/4a_ii.sql','r').read()
print(query)

select b1.trips,b2.week_day, b1.month,b1.year from
(select max(b.trips) as trips, b.month, b.year  from 
    (SELECT COUNT(*) as trips, DAYOFWEEK(pickup_date) as week_day,MONTH(pickup_date) as month, YEAR(pickup_date) as year
FROM combined_data
GROUP BY  DAYOFWEEK(pickup_date),MONTH(pickup_date), YEAR(pickup_date)) as b
Group by b.month, b.year) as b1
inner join (SELECT COUNT(*) as trips,  DAYOFWEEK(pickup_date) as week_day,MONTH(pickup_date) as month, YEAR(pickup_date) as year
FROM combined_data
GROUP BY  DAYOFWEEK(pickup_date),MONTH(pickup_date), YEAR(pickup_date)) as b2 
on (b1.trips, b1.month,b1.year)=(b2.trips, b2.month,b2.year)
ORDER BY b2.year desc,  b2.month desc


In [9]:
spark.sql(query).show(24)

+-------+--------+-----+----+
|  trips|week_day|month|year|
+-------+--------+-----+----+
|1507495|       7|   12|2018|
|1523623|       6|   11|2018|
|1574898|       4|   10|2018|
|1471635|       7|    9|2018|
|1487246|       4|    8|2018|
|1455733|       3|    7|2018|
|1643980|       6|    6|2018|
|1743709|       5|    5|2018|
|1522737|       2|    4|2018|
|1810353|       6|    3|2018|
|1463629|       6|    2|2018|
|1626790|       4|    1|2018|
|1829711|       6|   12|2017|
|1742118|       4|   11|2017|
|1675164|       3|   10|2017|
|1723433|       6|    9|2017|
|1605374|       5|    8|2017|
|1528712|       7|    7|2017|
|1854007|       5|    6|2017|
|1859731|       4|    5|2017|
|1967438|       7|    4|2017|
|2032711|       6|    3|2017|
|1615027|       7|    2|2017|
|1700391|       3|    1|2017|
+-------+--------+-----+----+



### iii. Which hour of the day had the most trips? 

In [10]:
dfs.createOrReplaceTempView("combined_data")

In [7]:
query = open('sql/4a_iii.sql','r').read()
print(query)

select b1.trips,b2.hour, b1.month,b1.year from
(select max(b.trips) as trips, b.month, b.year  from 
    (SELECT COUNT(*) as trips, HOUR(pickup_date) as hour,MONTH(pickup_date) as month, YEAR(pickup_date) as year
FROM combined_data
GROUP BY HOUR(pickup_date),MONTH(pickup_date), YEAR(pickup_date)) as b
Group by b.month, b.year) as b1
inner join (SELECT COUNT(*) as trips, HOUR(pickup_date) as hour,MONTH(pickup_date) as month, YEAR(pickup_date) as year
FROM combined_data
GROUP BY  HOUR(pickup_date),MONTH(pickup_date), YEAR(pickup_date)) as b2 
on (b1.trips, b1.month,b1.year)=(b2.trips, b2.month,b2.year)
ORDER BY b2.year desc,  b2.month desc


In [12]:
spark.sql(query).show(24)

+------+----+-----+----+
| trips|hour|month|year|
+------+----+-----+----+
|551064|  18|   12|2018|
|555083|  18|   11|2018|
|612201|  18|   10|2018|
|556887|  18|    9|2018|
|553112|  18|    8|2018|
|547322|  18|    7|2018|
|591334|  18|    6|2018|
|636495|  18|    5|2018|
|660835|  18|    4|2018|
|666998|  18|    3|2018|
|613828|  18|    2|2018|
|632430|  18|    1|2018|
|646656|  18|   12|2017|
|650151|  18|   11|2017|
|683472|  18|   10|2017|
|621732|  19|    9|2017|
|587847|  18|    8|2017|
|588692|  18|    7|2017|
|656305|  18|    6|2017|
|689546|  18|    5|2017|
|696864|  18|    4|2017|
|734461|  19|    3|2017|
|672653|  18|    2|2017|
|692103|  18|    1|2017|
+------+----+-----+----+



### iv. What was the average number of passengers?

In [7]:
dfs.createOrReplaceTempView("combined_data")

In [9]:
query = open('sql/4a_iv.sql','r').read()
print(query)

SELECT AVG(passenger_count) as passenger_count, MONTH(pickup_date) as month, YEAR(pickup_date) as year
FROM combined_data
GROUP BY  MONTH(pickup_date), YEAR(pickup_date)
ORDER BY YEAR(pickup_date) desc, MONTH(pickup_date) desc


In [9]:
spark.sql(query).show(24)

+------------------+-----+----+
|   passenger_count|month|year|
+------------------+-----+----+
|1.5752000490822333|   12|2018|
|  1.55926013524654|   11|2018|
|1.5531575297703888|   10|2018|
| 1.570099852984957|    9|2018|
|1.5824966774308349|    8|2018|
|1.5861990111023772|    7|2018|
|1.5790617320339078|    6|2018|
|1.5780397452526482|    5|2018|
|   1.5819909229089|    4|2018|
|1.5820507831387314|    3|2018|
|1.5766915361279277|    2|2018|
|1.5868474588203274|    1|2018|
|1.6096649223474926|   12|2017|
|1.5907292208721908|   11|2017|
|1.5959072789777997|   10|2017|
|1.6035457065291667|    9|2017|
| 1.609642429724734|    8|2017|
|1.6154271146309749|    7|2017|
|1.5996005195880947|    6|2017|
|1.5955439906303301|    5|2017|
| 1.601958860704838|    4|2017|
|1.5927111389842732|    3|2017|
|1.5990340783149433|    2|2017|
|1.6034515255189052|    1|2017|
+------------------+-----+----+



### v. What was the average amount paid per trip (total_amount)?

In [7]:
dfs.createOrReplaceTempView("combined_data")

In [10]:
query = open('sql/4a_v.sql','r').read()
print(query)

SELECT AVG(total_amount), MONTH(pickup_date) as month, YEAR(pickup_date) as year
FROM combined_data
GROUP BY  MONTH(pickup_date), YEAR(pickup_date)
ORDER BY YEAR(pickup_date) desc, MONTH(pickup_date) desc


In [9]:
spark.sql(query).show(24)

+------------------+-----+----+
| avg(total_amount)|month|year|
+------------------+-----+----+
|16.436609998199245|   12|2018|
| 16.78761144761138|   11|2018|
| 16.90458035477482|   10|2018|
|16.805398872795774|    9|2018|
| 16.57291976715495|    8|2018|
|  16.5406384863263|    7|2018|
|  16.6268418697513|    6|2018|
|16.731315664714753|    5|2018|
|16.239193330053897|    4|2018|
|15.877975423904495|    3|2018|
|15.365707331375857|    2|2018|
|15.366687762922156|    1|2018|
| 16.01037110281087|   12|2017|
|16.304495679901407|   11|2017|
|16.554461719969563|   10|2017|
| 16.49286644970195|    9|2017|
| 16.28606673788002|    8|2017|
|16.189206986182032|    7|2017|
|16.449241718116834|    6|2017|
|16.537219583692274|    5|2017|
|16.084471557084566|    4|2017|
|15.980580262136435|    3|2017|
|15.447472305861448|    2|2017|
|15.281115687377275|    1|2017|
+------------------+-----+----+



### vi. What was the average amount paid per passenger (total_amount)?

In [10]:
dfs.createOrReplaceTempView("combined_data")

In [11]:
query = open('sql/4a_vi.sql','r').read()
print(query)

SELECT AVG(total_amount/passenger_count) as cost_per_passenger, MONTH(pickup_date) as month, YEAR(pickup_date) as year
FROM combined_data
GROUP BY   MONTH(pickup_date), YEAR(pickup_date)
ORDER BY YEAR(pickup_date) desc,  MONTH(pickup_date) desc


In [12]:
spark.sql(query).show(24)

+------------------+-----+----+
|cost_per_passenger|month|year|
+------------------+-----+----+
|13.513153185236861|   12|2018|
|13.918583215039412|   11|2018|
|14.042497138982277|   10|2018|
|13.911523668408037|    9|2018|
| 13.67558963521544|    8|2018|
|13.637959361794742|    7|2018|
| 13.73353095947967|    6|2018|
|13.833440853044157|    5|2018|
|13.408293335529835|    4|2018|
|13.118159156422765|    3|2018|
|12.744442427910286|    2|2018|
|12.704552575643287|    1|2018|
|13.084454042445142|   12|2017|
|13.451115974509026|   11|2017|
|13.656228678419502|   10|2017|
|13.568315565331623|    9|2017|
|13.376032087675778|    8|2017|
|13.265769164522586|    7|2017|
| 13.58484491714844|    6|2017|
| 13.63129327961534|    5|2017|
| 13.24615287436987|    4|2017|
|13.226031480897179|    3|2017|
|12.745868074725589|    2|2017|
|12.628996085969037|    1|2017|
+------------------+-----+----+



## b. For each taxi colour (yellow and green)
### i. What was the average, median, minimum and maximum trip duration in seconds?

In [13]:
dfs.createOrReplaceTempView("combined_data")

In [12]:
query = open('sql/4b_i.sql','r').read()
print(query)

SELECT AVG(trip_duration) as average, percentile_approx(trip_duration, 0.5) as median, MIN(trip_duration) as minimum,MAX(trip_duration) as maximum , taxi_colour
FROM combined_data
GROUP BY taxi_colour


In [15]:
spark.sql(query).show()

+------------------+------+-------+--------+-----------+
|           average|median|minimum| maximum|taxi_colour|
+------------------+------+-------+--------+-----------+
|1264.4590899553366|   626|      1|  202989|      green|
|1021.8073857437691|   670|      1|45466304|     yellow|
+------------------+------+-------+--------+-----------+



### ii. What was the average, median, minimum and maximum trip distance in km?

In [16]:
dfs.createOrReplaceTempView("combined_data")

In [13]:
query = open('sql/4b_ii.sql','r').read()
print(query)

SELECT AVG(trip_distance*1.60934) as average, percentile_approx(trip_distance*1.60934, 0.5) as median, MIN(trip_distance*1.60934) as minimum,MAX(trip_distance*1.60934) as maximum , taxi_colour
FROM combined_data
GROUP BY taxi_colour


In [18]:
spark.sql(query).show()

+-----------------+------------------+-------+------------------+-----------+
|          average|            median|minimum|           maximum|taxi_colour|
+-----------------+------------------+-------+------------------+-----------+
|4.666716746319932|2.9129053079128266|    0.0|12883.861334091796|      green|
|4.722008193182318| 2.591037423021793|    0.0|  304943.929100625|     yellow|
+-----------------+------------------+-------+------------------+-----------+



### iii. What was the average, median, minimum and maximum speed in km per hour?

In [19]:
dfs.createOrReplaceTempView("combined_data")

In [14]:
query = open('sql/4b_iii.sql','r').read()
print(query)

SELECT AVG(trip_distance*1.60934*3600/trip_duration) as average, percentile_approx(trip_distance*1.60934*3600/trip_duration, 0.5) as median, MIN(trip_distance*1.60934*3600/trip_duration) as minimum,MAX(trip_distance*1.60934*3600/trip_duration) as maximum , taxi_colour
FROM combined_data
GROUP BY taxi_colour


In [21]:
spark.sql(query).show()

+------------------+------------------+-------+--------------------+-----------+
|           average|            median|minimum|             maximum|taxi_colour|
+------------------+------------------+-------+--------------------+-----------+
| 22.55420227846247|17.746235862338864|    0.0|  194955.45644036864|      green|
|21.121994010781364|15.872942045264047|    0.0|1.6385046936749998E7|     yellow|
+------------------+------------------+-------+--------------------+-----------+



## c. What was the percentage of trips where the driver received tips?

In [22]:
dfs.createOrReplaceTempView("combined_data")

In [15]:
query = open('sql/4c.sql','r').read()
print(query)

(SELECT 'tip_received' as tip_received, count(*) as tip,
count(*) * 100.0 / (select count(*) from combined_data) as tip_percent
FROM combined_data
WHERE tip_amount > 0)
UNION
(SELECT 'no_tip' as tip_received, count(*) as tip,
count(*) * 100.0 / (select count(*) from combined_data) as tip_percent
FROM combined_data
WHERE tip_amount = 0)



In [24]:
spark.sql(query).show()

+------------+---------+-----------------+
|tip_received|      tip|      tip_percent|
+------------+---------+-----------------+
|      no_tip| 87592694|37.01547023695498|
|tip_received|149043497|62.98373614602137|
+------------+---------+-----------------+



## d. For trips where the driver received tips, What was the percentage where the driver received tips of at least $10.

In [25]:
dfs.createOrReplaceTempView("combined_data")

In [28]:
query = open('sql/4d.sql','r').read()
print(query)

(SELECT 'tip_over10' as tip_received, count(*) as tip,
count(*) * 100.0 / (select count(*) from combined_data where tip_amount>0) as tip_percent
FROM combined_data
WHERE tip_amount >= 10)
UNION
(SELECT 'tip_under10' as tip_received, count(*) as tip,
count(*) * 100.0 / (select count(*) from combined_data where tip_amount>0) as tip_percent
FROM combined_data
WHERE tip_amount < 10
AND tip_amount>0)



In [29]:
spark.sql(query).show()

+------------+---------+-----------------+
|tip_received|      tip|      tip_percent|
+------------+---------+-----------------+
|  tip_over10|  4979240| 3.34079654612506|
| tip_under10|144064257|96.65920345387494|
+------------+---------+-----------------+



## e. Classify each trip into bins of durations:
* Under 5 Mins
* From 5 mins to 10 mins
* From 10 mins to 20 mins
* From 20 mins to 30 mins
* At least 30 mins
### Then for each bins, calculate: 
* Average speed (km per hour)
* Average distance per dollar (km per $)


#### Average Speed

In [30]:
dfs.createOrReplaceTempView("combined_data")

In [33]:
query = open('sql/4e_i.sql','r').read()
print(query)

(SELECT 'Under 5 Mins' as duration, AVG(trip_distance*1.60934*3600/trip_duration) as average
FROM combined_data
WHERE trip_duration < 60*5)
UNION
(SELECT '5-10 mins' as duration, AVG(trip_distance*1.60934*3600/trip_duration) as average
FROM combined_data
WHERE trip_duration >= 60*5
AND trip_duration < 60*10)
UNION
(SELECT '10 to 20 mins' as duration, AVG(trip_distance*1.60934*3600/trip_duration) as average
FROM combined_data
WHERE trip_duration >= 60*10
AND trip_duration < 60*20)
UNION
(SELECT '20 to 30 mins' as duration, AVG(trip_distance*1.60934*3600/trip_duration) as average
FROM combined_data
WHERE trip_duration >= 60*20
AND trip_duration < 60*30)
UNION
(SELECT 'At least 30 mins' as duration, AVG(trip_distance*1.60934*3600/trip_duration) as average
FROM combined_data
WHERE trip_duration >= 60*30)



In [34]:
spark.sql(query).show()

+----------------+------------------+
|        duration|           average|
+----------------+------------------+
|       5-10 mins|16.498508623452828|
|   20 to 30 mins| 20.46148926681209|
|    Under 5 Mins| 38.65416235197025|
|   10 to 20 mins|16.994521521610125|
|At least 30 mins| 24.05022920905049|
+----------------+------------------+



#### Average distance per dollar (km per $)

In [7]:
dfs.createOrReplaceTempView("combined_data")

In [8]:
query = open('sql/4e_ii.sql','r').read()
print(query)

(SELECT 'Under 5 Mins' as duration, AVG(trip_distance*1.60934/total_amount) as km_per_dollar
FROM combined_data
WHERE trip_duration < 60*5)
UNION
(SELECT '5-10 mins' as duration, AVG(trip_distance*1.60934/total_amount) as  km_per_dollar
FROM combined_data
WHERE trip_duration >= 60*5
AND trip_duration < 60*10)
UNION
(SELECT '10 to 20 mins' as duration, AVG(trip_distance*1.60934/total_amount) as  km_per_dollar
FROM combined_data
WHERE trip_duration >= 60*10
AND trip_duration < 60*20)
UNION
(SELECT '20 to 30 mins' as duration, AVG(trip_distance*1.60934/total_amount) as  km_per_dollar
FROM combined_data
WHERE trip_duration >= 60*20
AND trip_duration < 60*30)
UNION
(SELECT 'At least 30 mins' as duration, AVG(trip_distance*1.60934/total_amount) as  km_per_dollar
FROM combined_data
WHERE trip_duration >= 60*30)



In [9]:
spark.sql(query).show()

+----------------+-------------------+
|        duration|      km_per_dollar|
+----------------+-------------------+
|   10 to 20 mins|0.26022949571501425|
|       5-10 mins| 0.2152663771114019|
|At least 30 mins|0.38057057660315985|
|   20 to 30 mins| 0.3074613587352144|
|    Under 5 Mins|  0.170903336953278|
+----------------+-------------------+



## f. Which duration bin will you advise a taxi driver to target to maximise his income?

Based on the above query, shorter trips have a income to distance ratio. However, this does not take into account that taxi's have wait time. A longer trips may be more profitable if wait times are typically long (i.e. time between fares), however, if times between fares are short - then shorter trips are more profitable.

Assuming the wait times are approximately the same, to maximise the income, the driver should look at the fare with the highest hourly rate (total amount/hour). 

In [6]:
dfs.createOrReplaceTempView("combined_data")

In [7]:
query = open('sql/4f.sql','r').read()
print(query)

(SELECT 'Under 5 Mins' as duration, AVG(total_amount*3600/trip_duration) as hourly_rate
FROM combined_data
WHERE trip_duration < 60*5)
UNION
(SELECT '5-10 mins' as duration, AVG(total_amount*3600/trip_duration) as hourly_rate
FROM combined_data
WHERE trip_duration >= 60*5
AND trip_duration < 60*10)
UNION
(SELECT '10 to 20 mins' as duration, AVG(total_amount*3600/trip_duration) as hourly_rate
FROM combined_data
WHERE trip_duration >= 60*10
AND trip_duration < 60*20)
UNION
(SELECT '20 to 30 mins' as duration, AVG(total_amount*3600/trip_duration) as hourly_rate
FROM combined_data
WHERE trip_duration >= 60*20
AND trip_duration < 60*30)
UNION
(SELECT 'At least 30 mins' as duration, AVG(total_amount*3600/trip_duration) as hourly_rate
FROM combined_data
WHERE trip_duration >= 60*30)



In [8]:
spark.sql(query).show()

+----------------+-----------------+
|        duration|      hourly_rate|
+----------------+-----------------+
|At least 30 mins|64.56451775422099|
|       5-10 mins|76.67600165536875|
|   20 to 30 mins|63.14220842975798|
|   10 to 20 mins|63.13422234928635|
|    Under 5 Mins|811.7968481049274|
+----------------+-----------------+



In [9]:
spark.stop()

# 5. Train an ML Model to predict 'total_amount

In [10]:
import boto3
import os
import re
from tqdm import tqdm
import sys
from pyspark.sql.types import DateType, IntegerType, BooleanType, TimestampType, FloatType
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import pyspark.sql.functions as F
from pyspark.sql.functions import lit
import re

In [11]:
os.getcwd()

'/home/jovyan/work'

In [12]:
os.chdir('../')

In [13]:
spark = SparkSession.builder \
        .appName('assignment_1') \
        .getOrCreate() 

In [14]:
dfs = spark.read.parquet('data/combined_clean_data.parquet')

In [15]:
dfs.show(5)

+--------+------------------+----------+------------+------------+---------------+-------------+-----+-------+----------+------------+---------------------+------------+------------+-----------+-------------------+-------------------+-------------+
|VendorID|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|taxi_colour|        pickup_date|       dropoff_date|trip_duration|
+--------+------------------+----------+------------+------------+---------------+-------------+-----+-------+----------+------------+---------------------+------------+------------+-----------+-------------------+-------------------+-------------+
|       2|             false|         1|         244|         116|              1|         1.77|  0.0|    0.5|       0.0|         0.0|                  0.3|         8.8|           2|     yellow|2017-03-13 09:09:07|2017-03-13 09:17:28|          501|
|   

## Data Cleaning

In [16]:
cat_cols = ['RatecodeID','payment_type','taxi_colour','VendorID']
cols = ["VendorID",
 "RatecodeID",
 "passenger_count",
 "trip_distance",
 "extra",
 "mta_tax",
 "tip_amount",
 "tolls_amount",
 "improvement_surcharge",
 "total_amount",
 "payment_type",
 "taxi_colour",
 "pickup_date",
 "dropoff_date",
"trip_duration"]

In [17]:
df = dfs.select(cols)

In [18]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

In [19]:
stages = []
for cat_col in cat_cols:
    col_indexer = StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_ind")
    col_encoder = OneHotEncoderEstimator(inputCols=[f"{cat_col}_ind"], outputCols=[f"{cat_col}_ohe"])
    stages += [col_indexer, col_encoder]

In [20]:
num_cols = [
 "passenger_count",
 "trip_distance",
 "extra",
 "mta_tax",
 "tip_amount",
 "tolls_amount",
 "improvement_surcharge",
 "trip_duration"]

In [21]:
cat_cols_ohe = [f"{cat_col}_ohe" for cat_col in cat_cols]

In [22]:
assembler = VectorAssembler(inputCols=cat_cols_ohe + num_cols, outputCol="features")

In [23]:
stages += [assembler]

In [24]:
from pyspark.ml import Pipeline

In [25]:
pipeline = Pipeline(stages=stages)

## Split into training and test

In [26]:
import pyspark.sql.functions as F

In [27]:
pipeline_model = pipeline.fit(df)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:38155)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/IPython/core/interactiveshell.py

Py4JError: An error occurred while calling o113.transform

In [19]:
df = pipeline_model.transform(df)

In [20]:
df_train = df.filter(F.col("pickup_date")<"2018-10-01")

In [21]:
df_train.show(5)

+--------+----------+---------------+-------------+-----+-------+----------+------------+---------------------+------------+------------+-----------+-------------------+-------------------+-------------+--------------+--------------+----------------+----------------+---------------+---------------+------------+-------------+--------------------+
|VendorID|RatecodeID|passenger_count|trip_distance|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|taxi_colour|        pickup_date|       dropoff_date|trip_duration|RatecodeID_ind|RatecodeID_ohe|payment_type_ind|payment_type_ohe|taxi_colour_ind|taxi_colour_ohe|VendorID_ind| VendorID_ohe|            features|
+--------+----------+---------------+-------------+-----+-------+----------+------------+---------------------+------------+------------+-----------+-------------------+-------------------+-------------+--------------+--------------+----------------+----------------+---------------+---------------+-----

In [22]:
df_test = df.filter(F.col("pickup_date")>="2018-10-01")

### Train Model

In [23]:
from pyspark.ml.regression import RandomForestRegressor

In [24]:
rf = RandomForestRegressor(featuresCol='features', labelCol='total_amount', numTrees=1, maxDepth=10, minInstancesPerNode=100)

In [None]:
rf_model = rf.fit(df_train)
rf_model.save("models/rf_model")