## Task 1
### Create a program that produces a typed parquet file
The parquet file result was saved in directory 'data/green_tripdata_2013-09'

In [15]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master('local') \
    .appName('Onemount Test') \
    .getOrCreate()
df = spark.read.csv('data/green_tripdata_2013-09.csv', header = True)

for column_name in df.columns:
    df = df.withColumnRenamed(column_name, column_name.replace(" ", "").lower())

df.printSchema()

df.repartition(1).write.mode('overwrite').parquet('data/green_tripdata_2013-09')

root
 |-- vendorid: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- ratecodeid: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- trip_type: string (nullable = true)



## Task 2
### Create a derived dataset, from the one created above
The parquet file result was saved in directory 'data/nyc_taxi_analysis'

#### One-Hot encoding for each hour of the day
Add 24 columns correspond to 24 hours of the day: **0_hour, 1_hour, 2_hour...**

In [16]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType

df = spark.read.parquet('data/green_tripdata_2013-09')

hour_of_day = F.udf(lambda pickup_hour, dropoff_hour, i: 1 if pickup_hour == i 
                    or dropoff_hour == i else 0, IntegerType())
for i in range(24):
    df = df.withColumn(str(i)+'_hour', hour_of_day(F.hour('lpep_pickup_datetime'), 
                                                   F.hour('lpep_dropoff_datetime'), 
                                                   F.lit(i)))

df.createOrReplaceTempView('nyc_taxi')
df_sql = spark.sql('''
    SELECT sum(3_hour) total_3_hour,sum(18_hour) total_18_hour,sum(15_hour) total_15_hour,sum(0_hour) total_0_hour FROM nyc_taxi
''')

df_sql.show()

+------------+-------------+-------------+------------+
|total_3_hour|total_18_hour|total_15_hour|total_0_hour|
+------------+-------------+-------------+------------+
|        1116|         4381|         3216|        2463|
+------------+-------------+-------------+------------+



#### One-Hot encoding for each day	of the week
Add 7 columns correspond to 7 days of the week: **monday, tuesday, wednesday, thursday, friday, saturday, sunday**

In [17]:
week = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']

day_of_week = F.udf(lambda pickup_day, dropoff_day, d: 1 if pickup_day == d
                    or dropoff_day == d else 0, IntegerType())
for d in week:
    df = df.withColumn(d, day_of_week(F.lower(F.date_format('lpep_pickup_datetime', 'EEEE')), 
                                                   F.lower(F.date_format('lpep_dropoff_datetime', 'EEEE')), 
                                                   F.lit(d)))

df.createOrReplaceTempView('nyc_taxi')
df_sql = spark.sql('''
    SELECT sum(monday) total_monday, sum(sunday) total_sunday FROM nyc_taxi
''')

df_sql.show()

+------------+------------+
|total_monday|total_sunday|
+------------+------------+
|        7370|        7990|
+------------+------------+



#### Duration in seconds of the trip
Add a column **trip_duration** type long

In [18]:
timeDiff = F.unix_timestamp('lpep_dropoff_datetime')- F.unix_timestamp('lpep_pickup_datetime')
df = df.withColumn('trip_duration', timeDiff)

df.createOrReplaceTempView('nyc_taxi')
df_sql = spark.sql('''
    SELECT lpep_pickup_datetime, lpep_dropoff_datetime, trip_duration FROM nyc_taxi
''')

df_sql.show()

+--------------------+---------------------+-------------+
|lpep_pickup_datetime|lpep_dropoff_datetime|trip_duration|
+--------------------+---------------------+-------------+
| 2013-09-01 00:02:00|  2013-09-01 00:54:51|         3171|
| 2013-09-01 00:02:34|  2013-09-01 00:20:59|         1105|
| 2013-09-01 00:03:06|  2013-09-01 00:28:03|         1497|
| 2013-09-01 00:03:30|  2013-09-01 00:23:02|         1172|
| 2013-09-01 00:05:12|  2013-09-01 00:30:55|         1543|
| 2013-09-01 00:05:18|  2013-09-01 00:23:31|         1093|
| 2013-09-01 00:05:51|  2013-09-01 00:13:32|          461|
| 2013-09-01 00:07:01|  2013-09-01 00:13:56|          415|
| 2013-09-01 00:07:55|  2013-09-01 00:24:19|          984|
| 2013-09-01 00:07:58|  2013-09-01 00:38:32|         1834|
| 2013-09-01 00:08:30|  2013-09-01 00:13:58|          328|
| 2013-09-01 00:09:25|  2013-09-01 00:27:14|         1069|
| 2013-09-01 00:09:41|  2013-09-01 00:17:37|          476|
| 2013-09-01 00:11:14|  2013-09-01 00:16:05|          29

#### An int encoding to indicate if the pickup or dropoff locations were at JFK airport
Add a lolumn **relative_jfk_airport** type int: 1 is pick up or drop off at JFK airport else 0

In [19]:
# no time for researching, so i copied it on internet
import math
def cal_bounding_box(longitude:float, latitude:float):
    R = 6371  # earth radius in km
    radius = 5 # km
    min_lon = longitude - math.degrees(radius/R/math.cos(math.radians(latitude)))
    max_lon = longitude + math.degrees(radius/R/math.cos(math.radians(latitude)))
    min_lat = latitude - math.degrees(radius/R)
    max_lat = latitude + math.degrees(radius/R)
    
    return min_lon, max_lon, min_lat, max_lat
    

In [20]:
longitude_jfk_airport = -73.7787443
latitude_jfk_airport = 40.6398262

min_lon, max_lon, min_lat, max_lat = cal_bounding_box(longitude_jfk_airport, latitude_jfk_airport)

is_jfk_airport = F.udf(lambda p_lon, p_lat, d_lon, d_lat: 
                       1 if (min_lon<=p_lon and p_lon <= max_lon and min_lat <= p_lat and p_lat <= max_lat) 
                           or (min_lon<=d_lon and d_lon <= max_lon and min_lat <= d_lat and d_lat <= max_lat)
                       else 0, IntegerType())

df = df.withColumn('relative_jfk_airport', 
                   is_jfk_airport(df.pickup_longitude.cast(FloatType()), 
                                  df.pickup_latitude.cast(FloatType()), 
                                  df.dropoff_longitude.cast(FloatType()), 
                                  df.dropoff_latitude.cast(FloatType())))

df.createOrReplaceTempView('nyc_taxi')
df_sql = spark.sql('''
    SELECT count(*) no_jfk_airport FROM nyc_taxi WHERE relative_jfk_airport = 1
''')

df_sql.show()

+--------------+
|no_jfk_airport|
+--------------+
|           893|
+--------------+



#### Save the new parquet file

In [21]:
df.printSchema()
df.repartition(1).write.mode('overwrite').parquet('data/nyc_taxi_analysis')

root
 |-- vendorid: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- ratecodeid: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- trip_type: string (nullable = true)
 |-- 0_hour: integer (nullable = true)
 |-- 1_hour: integer (nullable = true)
 |-- 2_hour: integer 