# Trip Record Data Analysis with Pyspark

 ## [Trip Record Data](https://www.nyc.gov/site/tlc/about/). 

`Yellow` and `green` taxi trip records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts. The data used in the attached datasets were collected and provided to the NYC Taxi and Limousine Commission (TLC) by technology providers authorized under the Taxicab & Livery Passenger Enhancement Programs (TPEP/LPEP). The trip data was not created by the TLC, and TLC makes no representations as to the accuracy of these data.

For-Hire Vehicle (“FHV”) trip records include fields capturing the dispatching base license number and the pick-up date, time, and taxi zone location ID (shape file below). These records are generated from the FHV Trip Record submissions made by bases. Note: The TLC publishes base trip record data as submitted by the bases, and we cannot guarantee or confirm their accuracy or completeness. Therefore, this may not represent the total amount of trips dispatched by all TLC-licensed bases. The TLC performs routine reviews of the records and takes enforcement actions when necessary to ensure, to the extent possible, complete and accurate information.

<div class="admonition note alert alert-info">
<p class="first admonition-title" style="font-weight: bold;">Note</p>
<p class="last">I am assuming that you have installed apache spark and java tools in order to run everything.</p>
</div>

## Create Spark Session

In [91]:
from tqdm import tqdm
import numpy as np
import pandas as pd

In [62]:
import os
from pyspark.sql import SparkSession
import findspark

os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jdk-11'  # Path to Java
os.environ['SPARK_HOME'] = 'C:\spark-3.4.3-bin-hadoop3'  # Path to spark
# os.environ['HADOOP_HOME'] = 'C:\hadoop'
# os.environ['PATH'] += r'C:\hadoop\bin'
os.environ['PYSPARK_PYTHON'] = 'python'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python'

findspark.init()

spark = SparkSession.builder.config("spark.jars.packages", "org.apache.spark:spark-sql_2.12:3.5.3").master("local[*]").getOrCreate()

# Variables Description

## Yellow Records
- **VendorID** : A code indicating the TPEP provider that provided the record.
- **tpep_pickup_datetime** : The date and time when the meter was engaged.
- **tpep_dropoff_datetime** : The date and time when the meter was disengaged.
- **Passenger_count**: The number of passengers in the vehicle. This is a driver-entered value.
- **Trip_distance**: The elapsed trip distance in miles reported by the taximeter.
- **PULocationID**: TLC Taxi Zone in which the taximeter was engaged.
- **DOLocationID**: TLC Taxi Zone in which the taximeter was disengaged.
- **RateCodeID**: The final rate code in effect at the end of the trip. 1= Standard rate  2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride
- **Store_and_fwd_flag**: This flag indicates whether the trip record was held in vehicle  memory before sending to the vendor, aka “store and forward,”  because the vehicle did not have a connection to the server. Y= store and forward trip N= not a store and forward trip.
- **Payment_type**: A numeric code signifying how the passenger paid for the trip.  1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip
- **Fare_amount**: The time-and-distance fare calculated by the meter.
- **Extra**: Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges.
- **MTA_tax**: $0.50 MTA tax that is automatically triggered based on the metered  rate in use.
- **Improvement_surcharge**: $0.30 improvement surcharge assessed trips at the flag drop. The  improvement surcharge began being levied in 2015.
- **Tip_amount**: Tip amount – This field is automatically populated for credit card  tips. Cash tips are not included.
- **Tolls_amount**: Total amount of all tolls paid in trip. 
- **Total_amount**: The total amount charged to passengers. Does not include cash tips.
- **Congestion_Surcharge**: Total amount collected in trip for NYS congestion surcharge.
- **Airport_fee**: $1.25 for pick up only at LaGuardia and John F. Kennedy Airports.

## DataFrame Schema

In [63]:
# Create the Schema
from pyspark.sql.types import StructType, StructField, IntegerType, DecimalType, StringType, TimestampType, DoubleType, DateType, TimestampNTZType, LongType

schema = StructType([StructField('VendorID', IntegerType(), True), 
            StructField('tpep_pickup_datetime', TimestampNTZType(), True), 
            StructField('tpep_dropoff_datetime', TimestampNTZType(), True), 
            StructField('passenger_count', LongType(), True), 
            StructField('trip_distance', DoubleType(), True), 
            StructField('RatecodeID', LongType(), True), 
            StructField('store_and_fwd_flag', StringType(), True), 
            StructField('PULocationID', IntegerType(), True), 
            StructField('DOLocationID', IntegerType(), True), 
            StructField('payment_type', LongType(), True), 
            StructField('fare_amount', DoubleType(), True), 
            StructField('extra', DoubleType(), True), 
            StructField('mta_tax', DoubleType(), True), 
            StructField('tip_amount', DoubleType(), True), 
            StructField('tolls_amount', DoubleType(), True), 
            StructField('improvement_surcharge', DoubleType(), True), 
            StructField('total_amount', DoubleType(), True), 
            StructField('congestion_surcharge', DoubleType(), True), 
            StructField('Airport_fee', DoubleType(), True)])

## Load Data

In [64]:
# Path of each file in order to read and union every parquet. We are using only Yellow trips
files = [r'D:\github\PySpark\Databases\yellow_tripdata_2024-01.parquet', 
         r'D:\github\PySpark\Databases\yellow_tripdata_2024-02.parquet', 
         r'D:\github\PySpark\Databases\yellow_tripdata_2024-03.parquet']

# Create a DataFrame empty with the schema that we defined before.
df = spark.createDataFrame([], schema)

for file in tqdm(files):
    temp = spark.read.format("parquet") \
            .option("header", "true") \
            .schema(schema) \
            .load(file)
    
    df = df.union(temp)

100%|██████████| 3/3 [00:00<00:00, 32.61it/s]


In [65]:
print(f'DataFrame has a total of {df.count()} rows.')

DataFrame has a total of 9554778 rows.


# Manage DataFrame

## Create new Columns
We'd like to add day of the week, hour, duration of the trip. More information about Spark SQL functions [pypsary.sql.functions](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html).  

In [66]:
# Day of the week

# Notación Punto
from pyspark.sql.functions import dayofweek, hour, unix_timestamp, col, round

# Extract the day of the week of a given date/timestamp as integer. 
# Ranges from 1 for a Sunday through to 7 for a Saturday
df = df.withColumn("DAY_OF_WEEK", dayofweek(df["tpep_pickup_datetime"]))

# Get hour of a given date
df = df.withColumn("START_HOUR", hour(df["tpep_pickup_datetime"]))

# Calculate difference between tpep_dropoff_datetime and tpep_pickup_datetime
df = df.withColumn(
    "time_diff_minutes",
   round( (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 60, 2)
)

In [87]:
from pyspark.sql.functions import col
from pyspark.ml.feature import Bucketizer

splits = list(np.linspace(0,10000,10001))

# Usar Bucketizer para asignar bins
bucketizer = Bucketizer(splits=splits, inputCol="time_diff_minutes", outputCol="bin")
df_binned = bucketizer.transform(df)


In [99]:
(9455 / 60) / 24

6.565972222222222

In [73]:
df_binned.selectExpr("min(time_diff_minutes) as min_value", "max(time_diff_minutes) as max_value").show()

+---------+---------+
|min_value|max_value|
+---------+---------+
|   -52.07|   9455.4|
+---------+---------+



In [92]:
# Count Values for each bin
df_binned.createOrReplaceTempView("bin_df")

query = """ 
    SELECT 
        bin,
        COUNT(VendorID)
    FROM
        bin_df
    WHERE
        time_diff_minutes > 0
    GROUP BY
        bin
    ORDER BY
        bin
"""

histogram = spark.sql(query)