# Read the parquet data files and do engineering

## Setup

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("features_engineering").getOrCreate()

## Read data

In [3]:
df = spark.read.parquet("output/tripdata.parquet")

In [4]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- Lpep_dropoff_datetime: timestamp (nullable = true)
 |-- Store_and_fwd_flag: string (nullable = true)
 |-- RateCodeID: integer (nullable = true)
 |-- Pickup_longitude: double (nullable = true)
 |-- Pickup_latitude: double (nullable = true)
 |-- Dropoff_longitude: double (nullable = true)
 |-- Dropoff_latitude: double (nullable = true)
 |-- Passenger_count: integer (nullable = true)
 |-- Trip_distance: double (nullable = true)
 |-- Fare_amount: double (nullable = true)
 |-- Extra: double (nullable = true)
 |-- MTA_tax: double (nullable = true)
 |-- Tip_amount: double (nullable = true)
 |-- Tolls_amount: double (nullable = true)
 |-- Ehail_fee: double (nullable = true)
 |-- Total_amount: double (nullable = true)
 |-- Payment_type: integer (nullable = true)
 |-- Trip_type: integer (nullable = true)



In [5]:
df.select('VendorID').describe().show()

+-------+-------------------+
|summary|           VendorID|
+-------+-------------------+
|  count|              49647|
|   mean| 1.8926420528934276|
| stddev|0.30957091047222524|
|    min|                  1|
|    max|                  2|
+-------+-------------------+



Total 49647 records, equal to the previous dataframe, good to go!

## Using SQL

In [6]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("tripdata")

In [7]:
query_string = f'''
WITH tmp AS (
    SELECT
    40.6281871096                                   AS minLat,
    40.6654832604                                   AS maxLat,
    -73.8212879826                                  AS minLon,
    -73.7465043413                                  AS maxLon,
    extract(dayofweek from lpep_pickup_datetime)    AS lpep_pickup_dow,
    extract(dayofweek from lpep_dropoff_datetime)   AS lpep_dropoff_dow,
    extract(hour from lpep_pickup_datetime)         AS lpep_pickup_hour,
    extract(hour from lpep_dropoff_datetime)        AS lpep_dropoff_hour,
    *
FROM
    tripdata
ORDER BY
    pickup_latitude DESC
)

SELECT
    -- existing attribute
    *,
    -- pickup at JFK
    CASE
        WHEN pickup_latitude BETWEEN minLat AND maxLat AND pickup_longitude BETWEEN minLon AND maxLon THEN 1
        ELSE 0
    END AS pickup_at_jfk,
    CASE
        WHEN dropoff_latitude BETWEEN minLat AND maxLat AND dropoff_longitude BETWEEN minLon AND maxLon THEN 1
        ELSE 0
    END AS dropoff_at_jfk,
    -- duration in seconds of the trip
    unix_timestamp(lpep_dropoff_datetime) - unix_timestamp(lpep_pickup_datetime) AS trip_duration_seconds,
    -- lpep_pickup_dow
    CASE WHEN lpep_pickup_dow = 1 THEN 1 ELSE 0 END AS lpep_pickup_dow_1, -- SUNDAY
    CASE WHEN lpep_pickup_dow = 2 THEN 1 ELSE 0 END AS lpep_pickup_dow_2,
    CASE WHEN lpep_pickup_dow = 3 THEN 1 ELSE 0 END AS lpep_pickup_dow_3,
    CASE WHEN lpep_pickup_dow = 4 THEN 1 ELSE 0 END AS lpep_pickup_dow_4,
    CASE WHEN lpep_pickup_dow = 5 THEN 1 ELSE 0 END AS lpep_pickup_dow_5,
    CASE WHEN lpep_pickup_dow = 6 THEN 1 ELSE 0 END AS lpep_pickup_dow_6,
    CASE WHEN lpep_pickup_dow = 7 THEN 1 ELSE 0 END AS lpep_pickup_dow_7,
    -- lpep_dropoff_dow
    CASE WHEN lpep_dropoff_dow = 1 THEN 1 ELSE 0 END AS lpep_dropoff_dow_1, -- SUNDAY
    CASE WHEN lpep_dropoff_dow = 2 THEN 1 ELSE 0 END AS lpep_dropoff_dow_2,
    CASE WHEN lpep_dropoff_dow = 3 THEN 1 ELSE 0 END AS lpep_dropoff_dow_3,
    CASE WHEN lpep_dropoff_dow = 4 THEN 1 ELSE 0 END AS lpep_dropoff_dow_4,
    CASE WHEN lpep_dropoff_dow = 5 THEN 1 ELSE 0 END AS lpep_dropoff_dow_5,
    CASE WHEN lpep_dropoff_dow = 6 THEN 1 ELSE 0 END AS lpep_dropoff_dow_6,
    CASE WHEN lpep_dropoff_dow = 7 THEN 1 ELSE 0 END AS lpep_dropoff_dow_7,
    -- lpep_pickup_hour
    CASE WHEN lpep_pickup_hour = 0 THEN 1 ELSE 0 END AS lpep_pickup_hour_0,
    CASE WHEN lpep_pickup_hour = 1 THEN 1 ELSE 0 END AS lpep_pickup_hour_1,
    CASE WHEN lpep_pickup_hour = 2 THEN 1 ELSE 0 END AS lpep_pickup_hour_2,
    CASE WHEN lpep_pickup_hour = 3 THEN 1 ELSE 0 END AS lpep_pickup_hour_3,
    CASE WHEN lpep_pickup_hour = 4 THEN 1 ELSE 0 END AS lpep_pickup_hour_4,
    CASE WHEN lpep_pickup_hour = 5 THEN 1 ELSE 0 END AS lpep_pickup_hour_5,
    CASE WHEN lpep_pickup_hour = 6 THEN 1 ELSE 0 END AS lpep_pickup_hour_6,
    CASE WHEN lpep_pickup_hour = 7 THEN 1 ELSE 0 END AS lpep_pickup_hour_7,
    CASE WHEN lpep_pickup_hour = 8 THEN 1 ELSE 0 END AS lpep_pickup_hour_8,
    CASE WHEN lpep_pickup_hour = 9 THEN 1 ELSE 0 END AS lpep_pickup_hour_9,
    CASE WHEN lpep_pickup_hour = 10 THEN 1 ELSE 0 END AS lpep_pickup_hour_10,
    CASE WHEN lpep_pickup_hour = 11 THEN 1 ELSE 0 END AS lpep_pickup_hour_11,
    CASE WHEN lpep_pickup_hour = 12 THEN 1 ELSE 0 END AS lpep_pickup_hour_12,
    CASE WHEN lpep_pickup_hour = 13 THEN 1 ELSE 0 END AS lpep_pickup_hour_13,
    CASE WHEN lpep_pickup_hour = 14 THEN 1 ELSE 0 END AS lpep_pickup_hour_14,
    CASE WHEN lpep_pickup_hour = 15 THEN 1 ELSE 0 END AS lpep_pickup_hour_15,
    CASE WHEN lpep_pickup_hour = 16 THEN 1 ELSE 0 END AS lpep_pickup_hour_16,
    CASE WHEN lpep_pickup_hour = 17 THEN 1 ELSE 0 END AS lpep_pickup_hour_17,
    CASE WHEN lpep_pickup_hour = 18 THEN 1 ELSE 0 END AS lpep_pickup_hour_18,
    CASE WHEN lpep_pickup_hour = 19 THEN 1 ELSE 0 END AS lpep_pickup_hour_19,
    CASE WHEN lpep_pickup_hour = 20 THEN 1 ELSE 0 END AS lpep_pickup_hour_20,
    CASE WHEN lpep_pickup_hour = 21 THEN 1 ELSE 0 END AS lpep_pickup_hour_21,
    CASE WHEN lpep_pickup_hour = 22 THEN 1 ELSE 0 END AS lpep_pickup_hour_22,
    CASE WHEN lpep_pickup_hour = 23 THEN 1 ELSE 0 END AS lpep_pickup_hour_23,
    -- lpep_dropoff_hour
    CASE WHEN lpep_dropoff_hour = 0 THEN 1 ELSE 0 END AS lpep_dropoff_hour_0,
    CASE WHEN lpep_dropoff_hour = 1 THEN 1 ELSE 0 END AS lpep_dropoff_hour_1,
    CASE WHEN lpep_dropoff_hour = 2 THEN 1 ELSE 0 END AS lpep_dropoff_hour_2,
    CASE WHEN lpep_dropoff_hour = 3 THEN 1 ELSE 0 END AS lpep_dropoff_hour_3,
    CASE WHEN lpep_dropoff_hour = 4 THEN 1 ELSE 0 END AS lpep_dropoff_hour_4,
    CASE WHEN lpep_dropoff_hour = 5 THEN 1 ELSE 0 END AS lpep_dropoff_hour_5,
    CASE WHEN lpep_dropoff_hour = 6 THEN 1 ELSE 0 END AS lpep_dropoff_hour_6,
    CASE WHEN lpep_dropoff_hour = 7 THEN 1 ELSE 0 END AS lpep_dropoff_hour_7,
    CASE WHEN lpep_dropoff_hour = 8 THEN 1 ELSE 0 END AS lpep_dropoff_hour_8,
    CASE WHEN lpep_dropoff_hour = 9 THEN 1 ELSE 0 END AS lpep_dropoff_hour_9,
    CASE WHEN lpep_dropoff_hour = 10 THEN 1 ELSE 0 END AS lpep_dropoff_hour_10,
    CASE WHEN lpep_dropoff_hour = 11 THEN 1 ELSE 0 END AS lpep_dropoff_hour_11,
    CASE WHEN lpep_dropoff_hour = 12 THEN 1 ELSE 0 END AS lpep_dropoff_hour_12,
    CASE WHEN lpep_dropoff_hour = 13 THEN 1 ELSE 0 END AS lpep_dropoff_hour_13,
    CASE WHEN lpep_dropoff_hour = 14 THEN 1 ELSE 0 END AS lpep_dropoff_hour_14,
    CASE WHEN lpep_dropoff_hour = 15 THEN 1 ELSE 0 END AS lpep_dropoff_hour_15,
    CASE WHEN lpep_dropoff_hour = 16 THEN 1 ELSE 0 END AS lpep_dropoff_hour_16,
    CASE WHEN lpep_dropoff_hour = 17 THEN 1 ELSE 0 END AS lpep_dropoff_hour_17,
    CASE WHEN lpep_dropoff_hour = 18 THEN 1 ELSE 0 END AS lpep_dropoff_hour_18,
    CASE WHEN lpep_dropoff_hour = 19 THEN 1 ELSE 0 END AS lpep_dropoff_hour_19,
    CASE WHEN lpep_dropoff_hour = 20 THEN 1 ELSE 0 END AS lpep_dropoff_hour_20,
    CASE WHEN lpep_dropoff_hour = 21 THEN 1 ELSE 0 END AS lpep_dropoff_hour_21,
    CASE WHEN lpep_dropoff_hour = 22 THEN 1 ELSE 0 END AS lpep_dropoff_hour_22,
    CASE WHEN lpep_dropoff_hour = 23 THEN 1 ELSE 0 END AS lpep_dropoff_hour_23
FROM tmp
'''
sql_results = spark.sql(query_string)
sql_results.show()

+-------------+-------------+--------------+--------------+---------------+----------------+----------------+-----------------+--------+--------------------+---------------------+------------------+----------+------------------+------------------+------------------+------------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+------------+------------+---------+-------------+--------------+---------------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+---------

In [8]:
# Check if the field pickup_at_jfk and dropoff_at_jfk are correctly created
sql_results.filter('pickup_at_jfk=1 or dropoff_at_jfk=1').select('dropoff_at_jfk', 'pickup_at_jfk').describe().show()

+-------+-------------------+--------------------+
|summary|     dropoff_at_jfk|       pickup_at_jfk|
+-------+-------------------+--------------------+
|  count|                536|                 536|
|   mean| 0.9906716417910447|0.016791044776119403|
| stddev|0.09622168921133457| 0.12860779032678457|
|    min|                  0|                   0|
|    max|                  1|                   1|
+-------+-------------------+--------------------+



## Export to Parquet

In [9]:
sql_results.write.mode("overwrite").parquet("output/tripdata_features.parquet")