# Exploratory Data Analysis (EDA) for NYC taxy trips

In [94]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, TimestampNTZType

# Init spark session
spark = SparkSession.builder \
    .appName("NYC_Taxi_Exploration") \
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
    .getOrCreate()

In [95]:
# After explore the dataframe created the schema for fast processing

schema_fields = StructType([
    StructField("VendorID",                 IntegerType(),      False),
    StructField("tpep_pickup_datetime",     TimestampNTZType(), True),
    StructField("tpep_dropoff_datetime",    TimestampNTZType(), True),
    StructField("passenger_count",          DoubleType(),       True),
    StructField("trip_distance",            DoubleType(),       True),
    StructField("RatecodeID",               DoubleType(),       True),
    StructField("store_and_fwd_flag",       StringType(),       True),
    StructField("PULocationID",             IntegerType(),      True),
    StructField("DOLocationID",             IntegerType(),      True),
    StructField("payment_type",             IntegerType(),      True),
    StructField("fare_amount",              DoubleType(),       True),
    StructField("extra",                    DoubleType(),       True),
    StructField("mta_tax",                  DoubleType(),       True),
    StructField("tip_amount",               DoubleType(),       True),
    StructField("tolls_amount",             DoubleType(),       True),
    StructField("improvement_surcharge",    DoubleType(),       True),
    StructField("total_amount",             DoubleType(),       True),
    StructField("congestion_surcharge",     DoubleType(),       True),
    StructField("airport_fee",              DoubleType(),       True),
    StructField("ingestion_timestamp",      TimestampType(),    True),
    StructField("source_file",              StringType(),       True)
])

In [99]:
# Load data from Bronze layer
#df_bronze = spark.read.parquet("../data/bronze/nyc_taxi/trip_data/year=2023/month=01/", schema=schema_fields)
df_bronze = spark.read.parquet("../data/bronze/nyc_taxi/trip_data/", schema_fields=schema_fields)
df_zones = spark.read.option("header", "true").csv("../data/bronze/nyc_taxi/taxi_zone/taxi_zone_lookup.csv")

#df_bronze.describe().show()
df_bronze.printSchema()
df_bronze.show(5)
df_bronze.groupBy("month").count().show()


root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = true)
 |-- source_file: string (nullable = true)
 |-- year: integer (nullable = tru

In [12]:
df_bronze.show(10, truncate=False)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------------+-------------------------------------------------------------------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|ingestion_timestamp       |source_file                                                                    |
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-------

In [None]:
# Checking basic statistics on values ​​and distances
df_bronze.select("trip_distance", "fare_amount", "total_amount").describe().show()

# Filter for inconsistent records
df_cleaned = df_bronze.filter(
    (F.col("trip_distance") > 0) & # Validate positive distance
    (F.col("total_amount") > 0) & # Validate positive total amount
    (F.col("passenger_count") > 0) # Validate positive passenger count
)

print(f"Records after cleaning: {df_cleaned.count()}")

+-------+------------------+------------------+------------------+
|summary|     trip_distance|       fare_amount|      total_amount|
+-------+------------------+------------------+------------------+
|  count|           3066766|           3066766|           3066766|
|   mean|3.8473420306601414| 18.36706861234247| 27.02038310708492|
| stddev|249.58375606858166|17.807821939337924|22.163588952492184|
|    min|               0.0|            -900.0|            -751.0|
|    max|         258928.15|            1160.1|            1169.4|
+-------+------------------+------------------+------------------+

Records after cleaning: 2884460


In [18]:
df_cleaned.groupBy("RatecodeID").count().show()

+----------+-------+
|RatecodeID|  count|
+----------+-------+
|       1.0|2746151|
|       4.0|   4204|
|       3.0|   7975|
|       2.0| 108465|
|      99.0|  11213|
|       6.0|      1|
|       5.0|   6451|
+----------+-------+



In [19]:
df_cleaned.groupBy("store_and_fwd_flag").count().show()

+------------------+-------+
|store_and_fwd_flag|  count|
+------------------+-------+
|                 Y|  18140|
|                 N|2866320|
+------------------+-------+



In [21]:
df_cleaned.groupBy("payment_type").count().show()

+------------+-------+
|payment_type|  count|
+------------+-------+
|           1|2350785|
|           3|   9266|
|           2| 508070|
|           4|  16339|
+------------+-------+



In [20]:
# Create dimension (Normalized) for location
dim_location = df_zones.select(
    F.col("LocationID").alias("location_id"),
    F.col("Borough").alias("borough"),
    F.col("Zone").alias("zone"),
    F.col("service_zone").alias("service_zone")
)

dim_location.show(5)

+-----------+-------------+--------------------+------------+
|location_id|      borough|                zone|service_zone|
+-----------+-------------+--------------------+------------+
|          1|          EWR|      Newark Airport|         EWR|
|          2|       Queens|         Jamaica Bay|   Boro Zone|
|          3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|          4|    Manhattan|       Alphabet City| Yellow Zone|
|          5|Staten Island|       Arden Heights|   Boro Zone|
+-----------+-------------+--------------------+------------+
only showing top 5 rows


In [None]:
# Defining schema structure for the Fact table

df_cleaned = df_cleaned.withColumn("store_and_fwd_flag",
    F.when(F.col("store_and_fwd_flag") == "Y", True)
     .when(F.col("store_and_fwd_flag") == "N", False)
     .otherwise(None) # For any other value, set as null
)

# Creating the Fact table with clean IDs and metrics
fact_trips = df_cleaned.select(
    F.monotonically_increasing_id().alias("trip_id"), # Surrogate key
    F.col("VendorID").alias("vendor_id"),
    F.col("tpep_pickup_datetime").alias("pickup_time"),
    F.col("tpep_dropoff_datetime").alias("dropoff_time"),
    F.regexp_extract((F.col("tpep_dropoff_datetime") - F.col("tpep_pickup_datetime")).cast("string"), r"(\d{2}:\d{2}:\d{2})", 1).alias("total_time_travel"), # New column for total travel time in seconds
    F.col("passenger_count"),
    F.col("trip_distance"),
    F.col("RatecodeID").alias("rate_code_id"),
    F.col("store_and_fwd_flag").alias("store_fwd_bool"),
    F.col("PULocationID").alias("pickup_location_id"),
    F.col("DOLocationID").alias("dropoff_location_id"),
    F.col("payment_type"),
    F.col("fare_amount"),
    F.col("extra"),
    F.col("mta_tax"),
    F.col("tip_amount"),
    F.col("tolls_amount"),
    F.col("improvement_surcharge"),
    F.col("total_amount"),
    F.col("congestion_surcharge"),
    F.col("airport_fee")
)

fact_trips.show(5)

+-----------+---------+-------------------+-------------------+-----------------+---------------+-------------+------------+--------------+------------------+-------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|    trip_id|vendor_id|        pickup_time|       dropoff_time|total_time_travel|passenger_count|trip_distance|rate_code_id|store_fwd_bool|pickup_location_id|dropoff_location_id|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+-----------+---------+-------------------+-------------------+-----------------+---------------+-------------+------------+--------------+------------------+-------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|51539607552|        2|2023-01-01 00:32:10|2023-01-01 00:40:36|       

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName("Load Gold to DB") \
        .config("spark.jars.packages", "org.postgresql:postgresql:42.7.2") \
        .getOrCreate()

:: loading settings :: url = jar:file:/Users/leonnardo.pereira/Desktop/personal/NYC%20Taxi%20Mini%20Pipeline/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/leonnardo.pereira/.ivy2.5.2/cache
The jars for the packages stored in: /Users/leonnardo.pereira/.ivy2.5.2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-aae75f6d-fd20-4719-a87a-7b6af156736f;1.0
	confs: [default]
	found org.postgresql#postgresql;42.7.2 in central
	found org.checkerframework#checker-qual;3.42.0 in central
:: resolution report :: resolve 51ms :: artifacts dl 2ms
	:: modules in use:
	org.checkerframework#checker-qual;3.42.0 from central in [default]
	org.postgresql#postgresql;42.7.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       

In [2]:
db_url = f"jdbc:postgresql://localhost:5432/nyc_taxi_warehouse"

jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", db_url) \
    .option("dbtable", "public.dim_location") \
    .option("user", "test") \
    .option("password", "test_pass") \
    .option("driver", "org.postgresql.Driver") \
    .load()

jdbcDF.show(50)

+-----------+-------------+--------------------+------------+
|location_id|      borough|                zone|service_zone|
+-----------+-------------+--------------------+------------+
|          1|          EWR|      Newark Airport|         EWR|
|          2|       Queens|         Jamaica Bay|   Boro Zone|
|          3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|          4|    Manhattan|       Alphabet City| Yellow Zone|
|          5|Staten Island|       Arden Heights|   Boro Zone|
|          6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|          7|       Queens|             Astoria|   Boro Zone|
|          8|       Queens|        Astoria Park|   Boro Zone|
|          9|       Queens|          Auburndale|   Boro Zone|
|         10|       Queens|        Baisley Park|   Boro Zone|
|         11|     Brooklyn|          Bath Beach|   Boro Zone|
|         12|    Manhattan|        Battery Park| Yellow Zone|
|         13|    Manhattan|   Battery Park City| Yellow Zone|
|       

In [2]:
db_url = f"jdbc:postgresql://localhost:5432/nyc_taxi_warehouse"

jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", db_url) \
    .option("dbtable", "public.fact_trip") \
    .option("user", "test") \
    .option("password", "test_pass") \
    .option("driver", "org.postgresql.Driver") \
    .option("fetchsize", "10000") \
    .load()

jdbcDF.show(10)

+-----------+---------+-------------------+-------------------+-----------------+---------------+-------------+------------+--------------+------------------+-------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----+-----+
|    trip_id|vendor_id|        pickup_time|       dropoff_time|total_time_travel|passenger_count|trip_distance|rate_code_id|store_fwd_bool|pickup_location_id|dropoff_location_id|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|year|month|
+-----------+---------+-------------------+-------------------+-----------------+---------------+-------------+------------+--------------+------------------+-------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----+-----+
|17179869184|        1|2023-05-01 00: