In [1]:
from pyspark.sql import *
from pyspark.sql.types import StructField, IntegerType, StructType, StringType, DateType, DoubleType, TimestampType

spark = SparkSession.builder.appName('OneMountGroupTest').getOrCreate()


data_file = "green_tripdata_2013-09.csv"

In [2]:
def get_schema():
    schema=[StructField('VendorID',IntegerType(),True),
           StructField('lpep_pickup_datetime',TimestampType(),True),
           StructField('Lpep_dropoff_datetime',TimestampType(),True),
           StructField('Store_and_fwd_flag',StringType(),True),
           StructField('RateCodeID',IntegerType(),True),
           StructField('Pickup_longitude',DoubleType(),True),
           StructField('Pickup_latitude',DoubleType(),True),
           StructField('Dropoff_longitude',DoubleType(),True),
           StructField('Dropoff_latitude',DoubleType(),True),
           StructField('Passenger_count',IntegerType(),True),
           StructField('Trip_distance',DoubleType(),True),
           StructField('Fare_amount',DoubleType(),True),
           StructField('Extra',DoubleType(),True),
           StructField('MTA_tax',DoubleType(),True),
           StructField('Tip_amount',DoubleType(),True),
           StructField('Tolls_amount',DoubleType(),True),
           StructField('Ehail_fee',DoubleType(),True),
           StructField('Total_amount',DoubleType(),True),
           StructField('Payment_type',IntegerType(),True),
           StructField('Trip_type',IntegerType(),True)
    ]
    return StructType(fields=schema)



# Read data from file and convert the schema
df =  spark.read.format("csv").option("header", "true").option("timestampFormat", "yyyy-MM-dd HH:mm:ss").schema(get_schema()).load(data_file)


In [3]:
# Task 1. Create a program that produces a typed parquet file (https://parquet.apache.org/) from this file
stored_file = '_data/converted_data.parquet'
df.write.save(stored_file)


# Read data from Parquet and check type 
df_from_parquet = spark.read.parquet(stored_file)

df_from_parquet.printSchema

<bound method DataFrame.printSchema of DataFrame[VendorID: int, lpep_pickup_datetime: timestamp, Lpep_dropoff_datetime: timestamp, Store_and_fwd_flag: string, RateCodeID: int, Pickup_longitude: double, Pickup_latitude: double, Dropoff_longitude: double, Dropoff_latitude: double, Passenger_count: int, Trip_distance: double, Fare_amount: double, Extra: double, MTA_tax: double, Tip_amount: double, Tolls_amount: double, Ehail_fee: double, Total_amount: double, Payment_type: int, Trip_type: int]>

In [4]:
# Task 2.Create a derived dataset, from the one created above, using a SQL statement that selects all existing columns and adds these new columns:
#* One-Hot encoding for each hour of the day
#* One-Hot encoding for each day	of the week
#* Duration in seconds of the trip
#* An int encoding to indicate if the pickup or dropoff locations were at JFK airport. (Use a bounding box from the GPS coordinates to determine this). Provide pseudo code if out of time. This column is optional.

# We assume the following values are GPS coordinates of JSK Airport, please change it in case you need
# JSK Airport Bounding BOX: 10KM from the JSK Airport LAT,LONG (40.5544017, 40.73440170000001, -73.8781194, -73.6981194)
jsk_gps = {
    "max_lat": 40.73440170000001,
    "min_lat": 40.5544017,
    "max_long": -73.6981194,
    "min_long": -73.8781194
}


def is_at_JSK(lat , long):
    return  1 if lat <= jsk_gps['max_lat'] and lat >= jsk_gps['min_lat'] and long <= jsk_gps['max_long'] and long >= jsk_gps['min_long'] else 0

spark.udf.register("is_at_JSK", is_at_JSK)


# INIT SQL 

sql_table = "Trip_Data"

df_from_parquet.createOrReplaceTempView(sql_table)


### To generate new cols: 
### - en_h : One-Hot encoding for each hour of the day
### - en_d : One-Hot encoding for each day	of the week
### - duration : Duration in seconds of the trip
### - is_at_JFK: An int encoding to indicate if the pickup or dropoff locations were at JFK airport


sql_query = """
select *, 
       hour(lpep_pickup_datetime) as en_h, 
       dayofweek(lpep_pickup_datetime) as en_d,
       unix_timestamp(Lpep_dropoff_datetime) - unix_timestamp(lpep_pickup_datetime) as duration,
       CASE WHEN is_at_JSK(Pickup_latitude, Pickup_longitude) = 1 
                 OR is_at_JSK(Dropoff_latitude, Dropoff_longitude) = 1 
                     THEN 1
                 ELSE 0
                 END AS is_JSK
from {0}
""".format(sql_table)


df_sql = spark.sql(sql_query)

# Save new data to Parquet
new_stored_file = '_data/new_converted_data.parquet'
df_sql.write.save(new_stored_file)



In [5]:
# Test new data
test_df_from_parquet = spark.read.parquet(new_stored_file)

test_sql_table = "Trip_Data_New"

test_df_from_parquet.createOrReplaceTempView(test_sql_table)

test_sql_query = """
select  * 
from {0}
""".format(test_sql_table)

# Sample datas
spark.sql(test_sql_query).head(100)



[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2013, 9, 1, 0, 2), Lpep_dropoff_datetime=datetime.datetime(2013, 9, 1, 0, 54, 51), Store_and_fwd_flag='N', RateCodeID=1, Pickup_longitude=-73.95240783691406, Pickup_latitude=40.810726165771484, Dropoff_longitude=-73.98394012451172, Dropoff_latitude=40.67628479003906, Passenger_count=5, Trip_distance=14.35, Fare_amount=50.5, Extra=0.5, MTA_tax=0.5, Tip_amount=10.3, Tolls_amount=0.0, Ehail_fee=None, Total_amount=61.8, Payment_type=1, Trip_type=None, en_h=0, en_d=1, duration=3171, is_JSK=0),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2013, 9, 1, 0, 2, 34), Lpep_dropoff_datetime=datetime.datetime(2013, 9, 1, 0, 20, 59), Store_and_fwd_flag='N', RateCodeID=1, Pickup_longitude=-73.96302032470703, Pickup_latitude=40.71183395385742, Dropoff_longitude=-73.96664428710938, Dropoff_latitude=40.68169021606445, Passenger_count=1, Trip_distance=3.24, Fare_amount=15.0, Extra=0.5, MTA_tax=0.5, Tip_amount=0.0, Tolls_amount=0.0, Ehail_fe

In [7]:

# Data for is_JSK
test_jsk_sql_query = """
select is_JSK, count(*)
from {0}
GROUP BY is_JSK
""".format(test_sql_table)
# Sample datas
spark.sql(test_jsk_sql_query).head(100)

[Row(is_JSK=1, count(1)=8496), Row(is_JSK=0, count(1)=41151)]