In [81]:
from pyspark.sql import SparkSession
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StringType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, expr, lit, substring, concat, concat_ws, when, coalesce
from pyspark.sql import functions as F
from functools import reduce

spark = SparkSession.builder.master("local[1]") \
                    .appName('nyc-taxi.com') \
                    .getOrCreate()

In [82]:
df = spark.read.parquet('data/yellow_tripdata_2025-01.parquet', header=True, inferSchema=True)
sample_df = df.sample(withReplacement=False, fraction=0.01, seed=42)

print("Total original:", df.count())
print("Total amostrado:", sample_df.count())

Total original: 3475226
Total amostrado: 35082


In [115]:
sample_df.show()

+--------+--------------------+---------------------+---------------+------------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+----+-----+---+----+-----------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|     trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|year|month|day|hour|day_of_week|trip_duration|
+--------+--------------------+---------------------+---------------+------------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+----+-----+---+----+----

In [84]:
sample_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)



In [85]:
print(sample_df.count())
print(df.count())

35082
3475226


Limpeza, filtragem e transformação de dados.

In [86]:
from pyspark.sql.functions import count, when, col

sample_df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       0|                   0|                    0|           5483|            0|      5483|              5483|           0|    

In [87]:
sample_df = sample_df.dropDuplicates()
print('df.count após dropar duplicatas: ', sample_df.count())

df.count após dropar duplicatas:  35082


In [88]:
sample_df = sample_df.filter(col("tpep_pickup_datetime").isNotNull()) \
       .filter(col("tpep_dropoff_datetime").isNotNull()) \
       .filter(col("trip_distance") > 0) \
       .filter(col("fare_amount") >= 0) \
       .filter(col("total_amount") > 0) \
       .filter(col("passenger_count") > 0) \
       .filter(col("RatecodeID").isNotNull()) \
       .filter(col("passenger_count").isNotNull()) \
       .filter(col("store_and_fwd_flag").isNotNull()) \
       .filter(col("congestion_surcharge").isNotNull()) \
       .filter(col("Airport_fee").isNotNull())

print(sample_df.count())

28440


In [89]:
sample_df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       2| 2025-01-01 10:48:53|  2025-01-01 10:52:59|              1|         2.61|         1|                 N|         132|    

In [90]:
from pyspark.sql.functions import count, when, col

sample_df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       0|                   0|                    0|              0|            0|         0|                 0|           0|    

In [91]:
sample_df.describe().show()

+-------+-------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+---------------------+------------------+--------------------+-------------------+-------------------+
|summary|           VendorID|   passenger_count|    trip_distance|        RatecodeID|store_and_fwd_flag|      PULocationID|      DOLocationID|      payment_type|       fare_amount|             extra|            mta_tax|        tip_amount|      tolls_amount|improvement_surcharge|      total_amount|congestion_surcharge|        Airport_fee| cbd_congestion_fee|
+-------+-------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+---------------------+-

In [92]:
df = df.filter(col("RatecodeID").isin([1, 2, 3, 4, 5, 6])) \
       .filter(col("payment_type").isin([0, 1, 2, 3, 4, 5, 6])) \
       .filter(col("store_and_fwd_flag").isin(["Y", "N"]))

print(sample_df.count())

28440


In [93]:
from pyspark.sql.functions import year, month, dayofmonth, hour, dayofweek

sample_df = sample_df.withColumn("year", year("tpep_pickup_datetime")) \
       .withColumn("month", month("tpep_pickup_datetime")) \
       .withColumn("day", dayofmonth("tpep_pickup_datetime")) \
       .withColumn("hour", hour("tpep_pickup_datetime")) \
       .withColumn("day_of_week", dayofweek("tpep_pickup_datetime"))  # 1=domingo, 7=sábado

sample_df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+----+-----+---+----+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|year|month|day|hour|day_of_week|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+----+-----+---+----+-----------+
|       2| 2025-01-01 10:48:53|  2

In [95]:
from pyspark.sql.functions import unix_timestamp, round

sample_df = sample_df.withColumn("trip_duration", round((unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60, 2))

sample_df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+----+-----+---+----+-----------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|year|month|day|hour|day_of_week|trip_duration|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+----+-----+---+----+-----------+-------

In [97]:
sample_df = sample_df.withColumn("trip_distance", col("trip_distance") * 1.60934)

sample_df.show(5)

+--------+--------------------+---------------------+---------------+-----------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+----+-----+---+----+-----------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|    trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|year|month|day|hour|day_of_week|trip_duration|
+--------+--------------------+---------------------+---------------+-----------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+----+-----+---+----+-------

In [98]:
sample_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 

In [117]:
# from pyspark.sql.functions import year, month, dayofmonth, hour, dayofweek, monotonically_increasing_id
# 
# dim_tempo = sample_df.select("tpep_pickup_datetime") \
#     .withColumn("year", year("tpep_pickup_datetime")) \
#     .withColumn("month", month("tpep_pickup_datetime")) \
#     .withColumn("day", dayofmonth("tpep_pickup_datetime")) \
#     .withColumn("hour", hour("tpep_pickup_datetime")) \
#     .withColumn("day_of_week", dayofweek("tpep_pickup_datetime")) \
#     .dropDuplicates()
# 
# dim_tempo = dim_tempo.withColumn("dim_tempo_id", monotonically_increasing_id())
# 
# print("Tabela dim_tempo:")
# dim_tempo.show(5, truncate=False)

from pyspark.sql.functions import year, month, dayofmonth, hour, dayofweek, monotonically_increasing_id

dim_tempo = sample_df.select(
    year("tpep_pickup_datetime").alias("year"),
    month("tpep_pickup_datetime").alias("month"),
    dayofmonth("tpep_pickup_datetime").alias("day"),
    hour("tpep_pickup_datetime").alias("hour"),
    dayofweek("tpep_pickup_datetime").alias("day_of_week")
)
print("dim_tempo sem remover duplicata: ", dim_tempo.count())
dim_tempo = dim_tempo.dropDuplicates(["year", "month", "day", "hour", "day_of_week"])
print("dim_tempo removendo duplicata: ", dim_tempo.count())


dim_tempo = dim_tempo.withColumn("dim_tempo_id", monotonically_increasing_id())

# print("Tabela dim_tempo (corrigida):")
# dim_tempo.show(5)

dim_tempo sem remover duplicata:  28440
dim_tempo removendo duplicata:  737


In [118]:
from pyspark.sql.functions import col, when

dim_pagamento = sample_df.select("payment_type").dropDuplicates()

dim_pagamento = dim_pagamento.withColumn("descricao",
    when(col("payment_type") == 1, "Cartão")
    .when(col("payment_type") == 2, "Dinheiro")
    .when(col("payment_type") == 3, "Sem cobrança")
    .when(col("payment_type") == 4, "Contestação")
    .when(col("payment_type") == 5, "Desconhecido")
    .when(col("payment_type") == 6, "Corrida anulada")
    .otherwise("Outro")
)

dim_pagamento = dim_pagamento.withColumn("dim_pagamento_id", monotonically_increasing_id())

print("Tabela dim_pagamento:")
dim_pagamento.show()

Tabela dim_pagamento:
+------------+------------+----------------+
|payment_type|   descricao|dim_pagamento_id|
+------------+------------+----------------+
|           1|      Cartão|               0|
|           3|Sem cobrança|               1|
|           2|    Dinheiro|               2|
|           4| Contestação|               3|
+------------+------------+----------------+



In [119]:
dim_tarifa = sample_df.select("RatecodeID").dropDuplicates()

dim_tarifa = dim_tarifa.withColumn("descricao",
    when(col("RatecodeID") == 1, "Padrão")
    .when(col("RatecodeID") == 2, "JFK")
    .when(col("RatecodeID") == 3, "Newark")
    .when(col("RatecodeID") == 4, "Nassau/Westchester")
    .when(col("RatecodeID") == 5, "Tarifa negociada")
    .when(col("RatecodeID") == 6, "Corrida em grupo")
    .otherwise("Desconhecido")
)

dim_tarifa = dim_tarifa.withColumn("dim_tarifa_id", monotonically_increasing_id())

print("Tabela dim_tarifa:")
dim_tarifa.show()

Tabela dim_tarifa:
+----------+------------------+-------------+
|RatecodeID|         descricao|dim_tarifa_id|
+----------+------------------+-------------+
|         6|  Corrida em grupo|            0|
|         5|  Tarifa negociada|            1|
|         1|            Padrão|            2|
|         3|            Newark|            3|
|         2|               JFK|            4|
|         4|Nassau/Westchester|            5|
|        99|      Desconhecido|            6|
+----------+------------------+-------------+



In [120]:
df_com_tempo = sample_df.withColumn("year", year("tpep_pickup_datetime")) \
       .withColumn("month", month("tpep_pickup_datetime")) \
       .withColumn("day", dayofmonth("tpep_pickup_datetime")) \
       .withColumn("hour", hour("tpep_pickup_datetime")) \
       .withColumn("day_of_week", dayofweek("tpep_pickup_datetime"))


df_joined = df_com_tempo.join(dim_tempo, on=["year", "month", "day", "hour", "day_of_week"], how="left") \
                        .join(dim_pagamento, on="payment_type", how="left") \
                        .join(dim_tarifa, on="RatecodeID", how="left")

fato_corrida = df_joined.select(
    "passenger_count",
    "trip_distance",
    "trip_duration",
    "fare_amount",
    "tip_amount",
    "tolls_amount",
    "total_amount",
    "dim_tempo_id",
    "dim_pagamento_id",
    "dim_tarifa_id"
)

fato_corrida = fato_corrida.withColumn("corrida_id", monotonically_increasing_id())

# print("Tabela fato_corrida:")
# fato_corrida.printSchema()
fato_corrida.show()

+---------------+------------------+-------------+-----------+----------+------------+------------+------------+----------------+-------------+----------+
|passenger_count|     trip_distance|trip_duration|fare_amount|tip_amount|tolls_amount|total_amount|dim_tempo_id|dim_pagamento_id|dim_tarifa_id|corrida_id|
+---------------+------------------+-------------+-----------+----------+------------+------------+------------+----------------+-------------+----------+
|              1|         4.2003774|          4.1|       12.1|       0.0|         0.0|       15.35|         669|               2|            2|         0|
|              1|         1.3357522|          4.8|        7.2|       1.0|         0.0|         9.7|         277|               0|            2|         1|
|              4|         3.0094658|        10.03|       12.1|       0.0|         0.0|        16.1|         160|               2|            2|         2|
|              2|         7.3707772|        19.58|       23.3|      5.