# 01 - Análise exploratória básica

## Importações

In [4]:
from functools import reduce
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F

## Constantes

In [3]:
# TODO

## Scripts

Como primeiro passo, vamos instanciar o objeto SparkSession. Para este caso, vamos fazer um teste local, então vamos passar `local` ao método master.

In [5]:
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.ui.port", "8080") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")  # Para evitar warnings desnecessários
spark

your 131072x1 screen size is bogus. expect trouble
25/12/10 17:30:05 WARN Utils: Your hostname, NB74484S resolves to a loopback address: 127.0.1.1; using 172.21.137.226 instead (on interface eth0)
25/12/10 17:30:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/10 17:30:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Download dos dados

Feito isso, vamos baixar os dados para este projeto. Por fins de praticidade, vamos usar o `curl`.

In [6]:
! curl -L -o /tmp/nyc-yellow-taxi-trip-data.zip https://www.kaggle.com/api/v1/datasets/download/elemento/nyc-yellow-taxi-trip-data
! unzip -o /tmp/nyc-yellow-taxi-trip-data.zip -d ../data/raw

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
 64 1826M   64 1178M    0     0  27.1M      0  0:01:07  0:00:43  0:00:24 29.0M^C
Archive:  /tmp/nyc-yellow-taxi-trip-data.zip
  End-of-central-directory signature not found.  Either this file is not
  a zipfile, or it constitutes one disk of a multi-part archive.  In the
  latter case the central directory and zipfile comment will be found on
  the last disk(s) of this archive.
unzip:  cannot find zipfile directory in one of /tmp/nyc-yellow-taxi-trip-data.zip or
        /tmp/nyc-yellow-taxi-trip-data.zip.zip, and cannot find /tmp/nyc-yellow-taxi-trip-data.zip.ZIP, period.


### Leitura dos dados

Com os dados em mãos, vamos listar os arquivos que temos disponível para esta análise.

In [7]:
! tree ../data

/bin/bash: line 1: tree: command not found


In [8]:
! head ../data/raw/yellow_tripdata_2015-01.csv

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
2,2015-01-15 19:05:39,2015-01-15 19:23:42,1,1.59,-73.993896484375,40.750110626220703,1,N,-73.974784851074219,40.750617980957031,1,12,1,0.5,3.25,0,0.3,17.05
1,2015-01-10 20:33:38,2015-01-10 20:53:28,1,3.30,-74.00164794921875,40.7242431640625,1,N,-73.994415283203125,40.759109497070313,1,14.5,0.5,0.5,2,0,0.3,17.8
1,2015-01-10 20:33:38,2015-01-10 20:43:41,1,1.80,-73.963340759277344,40.802787780761719,1,N,-73.951820373535156,40.824413299560547,2,9.5,0.5,0.5,0,0,0.3,10.8
1,2015-01-10 20:33:39,2015-01-10 20:35:31,1,.50,-74.009086608886719,40.713817596435547,1,N,-74.004325866699219,40.719985961914063,2,3.5,0.5,0.5,0,0,0.3,4.8
1,2015-01-10 20:33:39,2015-01-10 20:52:58,1,3.00,-73.971176147460938,40.762428283691406,1,N,-74.

Desta forma, podemos verificar que os arquivos usam como separador o caracter "," e possuem header. Poderíamos passar o schema, mas tentar usar a opção `inferSchema` e, se necessário, atualizamos os tipos dos dados.

In [16]:
df_raw = spark \
    .read \
    .format("csv") \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("../data/raw/yellow_tripdata_2015-01.csv")

df_raw.show(5)

                                                                                

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RateCodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       2| 2015-01-15 19:05:39|  2015-01-15 19:23:42|              1|         1.59|  -73.993896484375|  40.7501106262207|         1|    

Com base na primeira visualização, vismos que o resultado ficou poluído. Podemos alterar isso para filtrar por um conjunto específico de dados, ou até mesmo imprimir os resultados linha a linha, conforme representado abaixo.

In [17]:
df_raw_cols = df_raw.columns
total_cols_to_display = 7

for i in range(0, len(df_raw_cols), total_cols_to_display):
    df_raw.select(df_raw_cols[i:i+total_cols_to_display]).show(5)

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+
|       2| 2015-01-15 19:05:39|  2015-01-15 19:23:42|              1|         1.59|  -73.993896484375|  40.7501106262207|
|       1| 2015-01-10 20:33:38|  2015-01-10 20:53:28|              1|          3.3|-74.00164794921875|  40.7242431640625|
|       1| 2015-01-10 20:33:38|  2015-01-10 20:43:41|              1|          1.8|-73.96334075927734| 40.80278778076172|
|       1| 2015-01-10 20:33:39|  2015-01-10 20:35:31|              1|          0.5|-74.00908660888672| 40.71381759643555|
|       1| 2015-01-10 20:33:39|  2015-01-10 20:52:58|              1|          3.0|-73.97117614746094|40.762428283691406|
+--------+--------------

Para validar que os dados foram carregados com o tipo correto, podemos usar o método `printSchema()`. 

In [18]:
df_raw.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RateCodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



### Correção dos tipos de dados

Com isso, verificamos que todas as colunas foram tratadas como string, o que não é o caso. Com isso, vamos atualizar os tipos dos dados para o tipo correto (cast):

In [19]:
df_raw = df_raw.withColumns({
    "VendorID": F.col("VendorID").cast(T.IntegerType()),
    "tpep_pickup_datetime": F.to_timestamp(F.col("tpep_pickup_datetime"), format="yyyy-MM-dd H:m:s"),
    "tpep_pickup_datetime": F.to_timestamp(F.col("tpep_pickup_datetime"), format="yyyy-MM-dd H:m:s"),
    "tpep_dropoff_datetime": F.to_timestamp(F.col("tpep_dropoff_datetime"), format="yyyy-MM-dd H:m:s"),
    "passenger_count": F.col("passenger_count").cast(T.IntegerType()),
    "trip_distance": F.col("trip_distance").cast(T.FloatType()),
    "pickup_longitude": F.col("pickup_longitude").cast(T.FloatType()),
    "pickup_latitude": F.col("pickup_latitude").cast(T.FloatType()),
    "RateCodeID": F.col("RateCodeID").cast(T.IntegerType()),
    "store_and_fwd_flag": F.col("store_and_fwd_flag").cast(T.StringType()),  # Nesse caso é desnecessário, visto que o tipo já está correto 
    "dropoff_longitude": F.col("dropoff_longitude").cast(T.FloatType()),
    "dropoff_latitude": F.col("dropoff_latitude").cast(T.FloatType()),
    "payment_type": F.col("payment_type").cast(T.IntegerType()),
    "fare_amount": F.col("fare_amount").cast(T.DecimalType(scale=2)),
    "extra": F.col("extra").cast(T.DecimalType(scale=2)),
    "mta_tax": F.col("mta_tax").cast(T.DecimalType(scale=2)),
    "tip_amount": F.col("tip_amount").cast(T.DecimalType(scale=2)),
    "tolls_amount": F.col("tolls_amount").cast(T.DecimalType(scale=2)),
    "improvement_surcharge": F.col("improvement_surcharge").cast(T.DecimalType(scale=2)),
    "total_amount": F.col("total_amount").cast(T.DecimalType(scale=2))
})

df_raw.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- RateCodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: decimal(10,2) (nullable = true)
 |-- extra: decimal(10,2) (nullable = true)
 |-- mta_tax: decimal(10,2) (nullable = true)
 |-- tip_amount: decimal(10,2) (nullable = true)
 |-- tolls_amount: decimal(10,2) (nullable = true)
 |-- improvement_surcharge: decimal(10,2) (nullable = true)
 |-- total_amount: decimal(10,2) (nullable = true)



### Análise de quebra de restrições de regras de negócio

Feito isso, podemos aplicar algumas filtragens para garantir que as regras de negócio foram respeitadas (i.e., não há valores indesejados/errados). Para tal, vamos adotar as seguintes restrições:
- `tpep_pickup_datetime`: Entre 2015 e 2016.
- `passenger_count`: Maior que zero.
- `trip_distance`: Maior que zero.
- `store_and_fwd_flag`: Igual a Y ou N.
- `payment_type`: Entre 1 e 6.
- `fare_amount`: Maior que zero.
- `mta_tax`: Maior que zero.
- `tip_amount`: Maior que zero.
- `tolls_amount`: Maior que zero.
- `improvement_surcharge`: Maior que zero.
- `total_amount`: Maior que zero.

Vamos verificar se há alguma situação que isto é descumprido.

In [14]:
df_raw.where(
    (F.year(F.col("tpep_pickup_datetime")) < 2015) | (F.year(F.col("tpep_pickup_datetime")) > 2016)
).select("tpep_pickup_datetime").show()



+--------------------+
|tpep_pickup_datetime|
+--------------------+
+--------------------+



                                                                                

In [15]:
df_raw.where(
    F.col("passenger_count") < 0
).select("passenger_count").show()



+---------------+
|passenger_count|
+---------------+
+---------------+



                                                                                

In [16]:
df_raw.where(
    F.col("trip_distance") < 0
).select("trip_distance").show()



+-------------+
|trip_distance|
+-------------+
|   -3390583.8|
+-------------+



                                                                                

In [17]:
df_raw.where(
    ~ F.col("store_and_fwd_flag").isin(["Y", "N"])
).select("store_and_fwd_flag").show()



+------------------+
|store_and_fwd_flag|
+------------------+
+------------------+



                                                                                

In [18]:
df_raw.where(
    ~ F.col("payment_type").isin([1, 2, 3, 4, 5, 6])
).select("payment_type").show()



+------------+
|payment_type|
+------------+
+------------+



                                                                                

In [19]:
df_raw.where(
    F.col("fare_amount") < 0
).select("fare_amount").show()

+-----------+
|fare_amount|
+-----------+
|      -5.50|
|      -2.50|
|      -7.00|
|      -2.50|
|      -2.50|
|      -2.50|
|      -2.50|
|      -4.00|
|      -7.50|
|      -3.00|
|      -3.50|
|      -2.50|
|      -3.00|
|      -3.50|
|      -4.00|
|      -6.00|
|      -2.50|
|      -2.50|
|      -5.50|
|      -4.00|
+-----------+
only showing top 20 rows



In [20]:
df_raw.where(
    F.col("mta_tax") < 0
).select("mta_tax").show()

+-------+
|mta_tax|
+-------+
|  -0.50|
|  -0.50|
|  -0.50|
|  -0.50|
|  -0.50|
|  -0.50|
|  -0.50|
|  -0.50|
|  -0.50|
|  -0.50|
|  -0.50|
|  -0.50|
|  -0.50|
|  -0.50|
|  -0.50|
|  -0.50|
|  -0.50|
|  -0.50|
|  -0.50|
|  -0.50|
+-------+
only showing top 20 rows



In [21]:
df_raw.where(
    F.col("tip_amount") < 0
).select("tip_amount").show()

[Stage 30:>                                                         (0 + 3) / 3]

+----------+
|tip_amount|
+----------+
|     -2.34|
|     -0.66|
|     -0.66|
|     -2.70|
|     -1.08|
|     -0.99|
|     -2.04|
|     -1.16|
|     -0.99|
|     -0.82|
|     -4.70|
|     -0.86|
|     -8.50|
|     -7.43|
|     -0.66|
|     -0.66|
|     -0.66|
|     -0.99|
|     -1.36|
|     -1.36|
+----------+
only showing top 20 rows



                                                                                

In [22]:
df_raw.where(
    F.col("improvement_surcharge") < 0
).select("improvement_surcharge").show()

+---------------------+
|improvement_surcharge|
+---------------------+
|                -0.30|
|                -0.30|
|                -0.30|
|                -0.30|
|                -0.30|
|                -0.30|
|                -0.30|
|                -0.30|
|                -0.30|
|                -0.30|
|                -0.30|
|                -0.30|
|                -0.30|
|                -0.30|
|                -0.30|
|                -0.30|
|                -0.30|
|                -0.30|
|                -0.30|
|                -0.30|
+---------------------+
only showing top 20 rows



In [23]:
df_raw.where(
    F.col("total_amount") < 0
).select("total_amount").show()

+------------+
|total_amount|
+------------+
|       -6.30|
|       -3.30|
|      -10.14|
|       -3.30|
|       -3.30|
|       -3.96|
|       -3.96|
|       -4.80|
|       -8.30|
|       -3.80|
|       -4.30|
|       -3.30|
|       -3.80|
|       -4.30|
|       -4.80|
|       -6.80|
|       -3.30|
|       -3.30|
|       -6.30|
|       -4.80|
+------------+
only showing top 20 rows



Além destes casos, vamos verificar se há valores de longitude e latitude que destoam muito.

In [24]:
df_raw.select(
    F.min("pickup_longitude").alias("min_pickup_longitude"),
    F.median("pickup_longitude").alias("median_pickup_longitude"),
    F.avg("pickup_longitude").alias("avg_pickup_longitude"),
    F.max("pickup_longitude").alias("max_pickup_longitude"),
    
    F.min("pickup_latitude").alias("min_pickup_latitude"),
    F.median("pickup_latitude").alias("median_pickup_latitude"),
    F.avg("pickup_latitude").alias("avg_pickup_latitude"),
    F.max("pickup_latitude").alias("max_pickup_latitude")
).show(vertical=True)

[Stage 35:>                                                         (0 + 1) / 1]

-RECORD 0-------------------------------------
 min_pickup_longitude    | -161.69867         
 median_pickup_longitude | -73.98155212402344 
 avg_pickup_longitude    | -72.7645182845853  
 max_pickup_longitude    | 94.64387           
 min_pickup_latitude     | -77.03949          
 median_pickup_latitude  | 40.75334930419922  
 avg_pickup_latitude     | 40.084704988038204 
 max_pickup_latitude     | 404.7              



                                                                                

In [25]:
df_raw.approxQuantile("pickup_longitude", [0.125, 0.25, 0.5, 0.75, 0.875], 0.05)

                                                                                

[-74.00151062011719,
 -73.99201965332031,
 -73.98197174072266,
 -73.96885681152344,
 -73.9552230834961]

In [26]:
df_raw.approxQuantile("pickup_latitude", [0.125, 0.25, 0.5, 0.75, 0.875], 0.05)

                                                                                

[40.71966552734375,
 40.73529815673828,
 40.75261688232422,
 40.765907287597656,
 40.775489807128906]

In [27]:
df_raw.approxQuantile("pickup_latitude", [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9], 0.05)

                                                                                

[40.7192268371582,
 40.72956848144531,
 40.73906707763672,
 40.746517181396484,
 40.75261688232422,
 40.758270263671875,
 40.76383972167969,
 40.77016067504883,
 40.7799072265625]

In [28]:
### CONTINUAR DAQUI!!

Podemos concatenar os resultados e ver os registros que rompem pelo menos uma dessas restrições por:

In [29]:
df_raw.where(
    (F.year(F.col("tpep_pickup_datetime")) < 2015) | (F.year(F.col("tpep_pickup_datetime")) > 2016) |
    (F.col("passenger_count") < 0) |
    (F.col("trip_distance") < 0) |
    ~ (F.col("store_and_fwd_flag").isin(["Y", "N"])) |
    ~ (F.col("payment_type").isin([1, 2, 3, 4, 5, 6])) |
    (F.col("fare_amount") < 0) |
    (F.col("mta_tax") < 0) |
    (F.col("tip_amount") < 0) |
    (F.col("improvement_surcharge") < 0) |
    (F.col("total_amount") < 0)
).show(n=1, vertical=True)

-RECORD 0------------------------------------
 VendorID              | 2                   
 tpep_pickup_datetime  | 2016-03-10 07:08:46 
 tpep_dropoff_datetime | 2016-03-10 07:14:05 
 passenger_count       | 1                   
 trip_distance         | 0.69                
 pickup_longitude      | -73.95706           
 pickup_latitude       | 40.80213            
 RateCodeID            | 1                   
 store_and_fwd_flag    | N                   
 dropoff_longitude     | -73.94838           
 dropoff_latitude      | 40.80315            
 payment_type          | 3                   
 fare_amount           | -5.50               
 extra                 | 0.00                
 mta_tax               | -0.50               
 tip_amount            | 0.00                
 tolls_amount          | 0.00                
 improvement_surcharge | -0.30               
 total_amount          | -6.30               
only showing top 1 row



In [30]:
all_data_count = df_raw.count()

nc_data_count = df_raw.where(
    (F.year(F.col("tpep_pickup_datetime")) < 2015) | (F.year(F.col("tpep_pickup_datetime")) > 2016) |
    (F.col("passenger_count") < 0) |
    (F.col("trip_distance") < 0) |
    ~ (F.col("store_and_fwd_flag").isin(["Y", "N"])) |
    ~ (F.col("payment_type").isin([1, 2, 3, 4, 5, 6])) |
    (F.col("fare_amount") < 0) |
    (F.col("mta_tax") < 0) |
    (F.col("tip_amount") < 0) |
    (F.col("improvement_surcharge") < 0) |
    (F.col("total_amount") < 0)
).count()

print(f'Total de registros que rompem as restrições: {nc_data_count} ({(nc_data_count/all_data_count * 100):.2f}% do total)')



Total de registros que rompem as restrições: 17215 (0.04% do total)


                                                                                

_Obs: Vale destacar que isto também poderia ser feito por SQL puro por meio de_:

In [31]:
spark.sql("""
    WITH nc_data_count AS (
        SELECT
            COUNT(*) AS nc_count
        FROM
            {df_raw}
        WHERE
            YEAR(tpep_pickup_datetime) NOT BETWEEN 2015 AND 2016
            OR passenger_count < 0
            OR trip_distance < 0
            OR store_and_fwd_flag NOT IN ("Y", "N")
            OR payment_type NOT IN (1, 2, 3, 4, 5, 6)
            OR fare_amount < 0
            OR mta_tax < 0
            OR tip_amount < 0
            OR improvement_surcharge < 0
            OR total_amount < 0
    ),

    data_count AS (
        SELECT
            COUNT(*) as total_count
        FROM
            {df_raw}
    )

    SELECT
        nc_count,
        total_count,
        ROUND(nc_count/total_count * 100, 2) AS percentage 
    FROM
        nc_data_count
    CROSS JOIN
        data_count
""", df_raw=df_raw).show()



+--------+-----------+----------+
|nc_count|total_count|percentage|
+--------+-----------+----------+
|   17215|   47248845|      0.04|
+--------+-----------+----------+



                                                                                

Ademais, podemos verificar dados nulos também:

In [32]:
df_raw.select(
    [F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(f"nulls_{c}") for c in df_raw.columns]
).show(vertical=True)



-RECORD 0--------------------------
 nulls_VendorID              | 0   
 nulls_tpep_pickup_datetime  | 0   
 nulls_tpep_dropoff_datetime | 0   
 nulls_passenger_count       | 0   
 nulls_trip_distance         | 0   
 nulls_pickup_longitude      | 0   
 nulls_pickup_latitude       | 0   
 nulls_RateCodeID            | 0   
 nulls_store_and_fwd_flag    | 0   
 nulls_dropoff_longitude     | 0   
 nulls_dropoff_latitude      | 0   
 nulls_payment_type          | 0   
 nulls_fare_amount           | 0   
 nulls_extra                 | 0   
 nulls_mta_tax               | 0   
 nulls_tip_amount            | 0   
 nulls_tolls_amount          | 0   
 nulls_improvement_surcharge | 3   
 nulls_total_amount          | 0   



                                                                                

Feito isso, vamos verificar os casos em que o campo é vazio.

In [33]:
df_raw.where(
    F.col("improvement_surcharge").isNull() 
).show(vertical=True)



-RECORD 0------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2015-01-02 08:46:23 
 tpep_dropoff_datetime | 2015-01-02 09:09:26 
 passenger_count       | 1                   
 trip_distance         | 3.6                 
 pickup_longitude      | 0.0                 
 pickup_latitude       | 0.0                 
 RateCodeID            | 1                   
 store_and_fwd_flag    | N                   
 dropoff_longitude     | 0.0                 
 dropoff_latitude      | 0.0                 
 payment_type          | 1                   
 fare_amount           | 17.00               
 extra                 | 0.00                
 mta_tax               | 0.50                
 tip_amount            | 3.50                
 tolls_amount          | 0.00                
 improvement_surcharge | NULL                
 total_amount          | 21.00               
-RECORD 1------------------------------------
 VendorID              | 1        

                                                                                

In [34]:
# Forma com eval
# df_raw.where(
#     eval(" | ".join([f"F.col('{c}').isNull()" for c in df_raw.columns]))
# ).show(vertical=True)

# Forma com reduce
df_raw.where(
    reduce(lambda x, y: x | y, [F.col(c).isNull() for c in df_raw.columns])
).show(vertical=True)



-RECORD 0------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2015-01-02 08:46:23 
 tpep_dropoff_datetime | 2015-01-02 09:09:26 
 passenger_count       | 1                   
 trip_distance         | 3.6                 
 pickup_longitude      | 0.0                 
 pickup_latitude       | 0.0                 
 RateCodeID            | 1                   
 store_and_fwd_flag    | N                   
 dropoff_longitude     | 0.0                 
 dropoff_latitude      | 0.0                 
 payment_type          | 1                   
 fare_amount           | 17.00               
 extra                 | 0.00                
 mta_tax               | 0.50                
 tip_amount            | 3.50                
 tolls_amount          | 0.00                
 improvement_surcharge | NULL                
 total_amount          | 21.00               
-RECORD 1------------------------------------
 VendorID              | 1        

                                                                                

### Tratamento de inconsistências

Finalmente, agora podemos definir as estratégias para tratar estas inconsistências. Definiremos o seguinte:
- `tpep_pickup_datetime`:
    - Valor fora do intervalo 2015 e 2016 -> Remoção
    - Valor nulo -> Remoção
- `passenger_count`:
    - Valor menor que zero -> Troca (multiplicação por -1)
    - Valor igual a zero -> Remoção
    - Valor nulo -> Remoção
- `trip_distance`:
    - Valor menor que zero -> Troca (multiplicação por -1)
    - Valor igual a zero -> Remoção
    - Valor nulo -> Remoção
- `store_and_fwd_flag`: Igual a Y ou N.
    - Valor diferente de Y ou N -> Remoção
    - Valor nulo -> Remoção
- `payment_type`:
    - Valor fora do intervalo de 1 a 6 -> Remoção
    - Valor nulo -> Remoção
- `fare_amount`:
    - Valor menor que zero -> Troca (multiplicação por -1)
    - Valor nulo -> Remoção
- `mta_tax`:
    - Valor menor que zero -> Troca (multiplicação por -1)
    - Valor nulo -> Remoção
- `tip_amount`:
    - Valor menor que zero -> Troca (multiplicação por -1)
    - Valor nulo -> Substituição por 0
- `tolls_amount`:
    - Valor menor que zero -> Troca (multiplicação por -1)
    - Valor nulo -> Substituição por 0
- `improvement_surcharge`:
    - Valor menor que zero -> Troca (multiplicação por -1)
    - Valor nulo -> Substituição por 0
- `total_amount`:
    - Valor menor que zero -> Troca (multiplicação por -1)
    - Valor nulo -> Remoção

Ademais, valores em string serão padronizados para garantir consistência.

Antes de mais nada, vamos criar um novo DataFrame a partir do antigo.

In [35]:
df_stg = df_raw.select("*")  # Faz uma deep copy
df_stg is df_raw

False

Feito isso, agora podemos começar o tratamento.

#### Tratamento dos valores nulos e/ou remoção

In [36]:
cols_to_remove_nulls = [
    "tpep_pickup_datetime",
    "passenger_count",
    "trip_distance",
    "store_and_fwd_flag",
    "payment_type",
    "fare_amount",
    "mta_tax",
    "total_amount"
]

df_stg = df_stg.where(
    ~ reduce(lambda x, y: x | y, [F.col(c).isNull() for c in cols_to_remove_nulls])
)
df_stg.count()

                                                                                

47248845

In [37]:
cols_to_replace_0 = [
    "tip_amount",
    "tolls_amount",
    "improvement_surcharge"
]

df_stg = df_stg.fillna(0, subset=cols_to_replace_0)
df_stg.count()

                                                                                

47248845

#### Correção de valores inconsistentes

In [38]:
cols_to_revert = [
    "passenger_count",
    "trip_distance",
    "fare_amount",
    "mta_tax",
    "tip_amount",
    "tolls_amount",
    "improvement_surcharge",
    "total_amount"
]

df_stg = df_stg.withColumns(
    {c: F.abs(F.col(c)) for c in cols_to_revert}
)
df_stg.count()

                                                                                

47248845

In [39]:
df_stg = df_stg.withColumn("store_and_fwd_flag", F.trim(F.col("store_and_fwd_flag")))
df_stg.count()

                                                                                

47248845

#### Remoção dos valores fora dos valores estabelecidos

In [40]:
df_stg = df_stg.where(
    (F.year(F.col("tpep_pickup_datetime")) >= 2015) & (F.year(F.col("tpep_pickup_datetime")) <= 2016) &
    (F.col("passenger_count") > 0) &
    (F.col("trip_distance") > 0) &
    (F.col("store_and_fwd_flag").isin(["Y", "N"])) &
    (F.col("payment_type").isin([1, 2, 3, 4, 5, 6])) &
    (F.col("fare_amount") >= 0) &
    (F.col("mta_tax") >= 0) &
    (F.col("tip_amount") >= 0) &
    (F.col("improvement_surcharge") >= 0) &
    (F.col("total_amount") >= 0)
)
df_stg.count()

                                                                                

46959802

E vamos remover duplicatas caso elas existam.

In [8]:
df_stg = df_stg.dropDuplicates()
df_stg.count()

NameError: name 'df_stg' is not defined

Feito isso, podemos exportar os nossos dados para o que chamaremos de staging. Para que seja possível particionar os nossos dados, vamos criar uma coluna de ano e uma de mês do início da corrida.

In [None]:
df_stg = df_stg.withColumns({
    "pickup_year": F.year(F.col("tpep_pickup_datetime")),
    "pickup_month": F.month(F.col("tpep_pickup_datetime"))
})
df_stg.show(1, vertical=True)

[Stage 82:>                                                         (0 + 1) / 1]

-RECORD 0------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2015-01-10 20:33:39 
 tpep_dropoff_datetime | 2015-01-10 20:42:20 
 passenger_count       | 3                   
 trip_distance         | 0.8                 
 pickup_longitude      | -74.00266           
 pickup_latitude       | 40.734142           
 RateCodeID            | 1                   
 store_and_fwd_flag    | N                   
 dropoff_longitude     | -73.99501           
 dropoff_latitude      | 40.726326           
 payment_type          | 1                   
 fare_amount           | 7.00                
 extra                 | 0.50                
 mta_tax               | 0.50                
 tip_amount            | 1.66                
 tolls_amount          | 0.00                
 improvement_surcharge | 0.30                
 total_amount          | 9.96                
 pickup_year           | 2015                
 pickup_month          | 1        

                                                                                

In [None]:
df_stg.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("compression", "gzip") \
    .partitionBy("pickup_year", "pickup_month") \
    .save("../data/staging/yellow_tripdata")

                                                                                

Podemos validar se tudo deu certo por:

In [None]:
! tree ../data/staging 

[01;34m../data/staging[0m
└── [01;34myellow_tripdata[0m
    ├── [00m_SUCCESS[0m
    ├── [01;34mpickup_year=2015[0m
    │   └── [01;34mpickup_month=1[0m
    │       ├── [00mpart-00000-3652ae2f-7b9e-4387-b651-562550fbf9fb.c000.gz.parquet[0m
    │       ├── [00mpart-00001-3652ae2f-7b9e-4387-b651-562550fbf9fb.c000.gz.parquet[0m
    │       ├── [00mpart-00002-3652ae2f-7b9e-4387-b651-562550fbf9fb.c000.gz.parquet[0m
    │       ├── [00mpart-00003-3652ae2f-7b9e-4387-b651-562550fbf9fb.c000.gz.parquet[0m
    │       ├── [00mpart-00004-3652ae2f-7b9e-4387-b651-562550fbf9fb.c000.gz.parquet[0m
    │       ├── [00mpart-00005-3652ae2f-7b9e-4387-b651-562550fbf9fb.c000.gz.parquet[0m
    │       ├── [00mpart-00006-3652ae2f-7b9e-4387-b651-562550fbf9fb.c000.gz.parquet[0m
    │       ├── [00mpart-00007-3652ae2f-7b9e-4387-b651-562550fbf9fb.c000.gz.parquet[0m
    │       ├── [00mpart-00008-3652ae2f-7b9e-4387-b651-562550fbf9fb.c000.gz.parquet[0m
    │       ├── [00mpart-00009-3652a

### Enriquecimento dos dados

Por fim, iremos fazer algumas transformações e enriquecimento nos dados para facilitar na análise posterior. Para este caso, o que iremos fazer é alterar a coluna `trip_distance` para que fique em kilometros para, então, criar as colunas:

- `trip_duration_sec`: Duração da corrida (subtração entre `tpep_dropoff_datetime` e `tpep_pickup_datetime`).
- `trip_avg_speed`: Velocidade média da corrida (razão entre `trip_distance` e `trip_duration`).
- `pickup_longitude_bin`, `pickup_latitude_bin`, `dropoff_longitude_bin` e `dropoff_latitude_bin`: Faixas geográficas criadas a partir das coordenadas de embarque e desembarque, obtidas por discretização das variáveis de latitude e longitude. Essas faixas permitem agrupar regiões próximas e analisar padrões espaciais das corridas.
- `tip_rate`: Taxa de gorjeta em relação ao valor da corrida, mostrando quanto o passageiro deu de gorjeta proporcionalmente ao valor da corrida (razão entre `tip_amount` e `fare_amount`).

Antes de mais nada, vamos criar o dataframe curated.

In [None]:
df_cur = df_stg.select("*")
df_cur is df_stg

False

Feito isso, podemos seguir. Vamos começar pela alteração da coluna `trip_distance` para que fique em kms.

In [None]:
df_cur = df_cur.withColumn("trip_distance", F.col("trip_distance") * 1.609344)  # 1 milha ~ 1.609344 km
df_cur.show(1, vertical=True)

[Stage 88:>                                                         (0 + 1) / 1]

-RECORD 0------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2015-01-10 20:33:39 
 tpep_dropoff_datetime | 2015-01-10 20:42:20 
 passenger_count       | 3                   
 trip_distance         | 1.2874752191848755  
 pickup_longitude      | -74.00266           
 pickup_latitude       | 40.734142           
 RateCodeID            | 1                   
 store_and_fwd_flag    | N                   
 dropoff_longitude     | -73.99501           
 dropoff_latitude      | 40.726326           
 payment_type          | 1                   
 fare_amount           | 7.00                
 extra                 | 0.50                
 mta_tax               | 0.50                
 tip_amount            | 1.66                
 tolls_amount          | 0.00                
 improvement_surcharge | 0.30                
 total_amount          | 9.96                
 pickup_year           | 2015                
 pickup_month          | 1        

                                                                                

Em seguida, vamos criar as colunas `trip_duration`, `tip_avg_speed` e `trip_rate`. Vamos deixar as colunas de bins por se tratar de um caso um pouco mais complexo.

In [None]:
df_cur = df_cur.withColumns({
    "trip_duration_sec": F.unix_timestamp("tpep_dropoff_datetime") - F.unix_timestamp("tpep_pickup_datetime"),  # Já retorna em segundos
    "tip_rate": F.col("tip_amount")/F.col("fare_amount")
})
df_cur = df_cur.withColumn("trip_avg_speed", F.col("trip_distance")/(F.col("trip_duration_sec")/60/60))  # Para ficar em km/h
df_cur.show(1, vertical=True)



-RECORD 0------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2015-01-10 20:33:39 
 tpep_dropoff_datetime | 2015-01-10 20:42:20 
 passenger_count       | 3                   
 trip_distance         | 1.2874752191848755  
 pickup_longitude      | -74.00266           
 pickup_latitude       | 40.734142           
 RateCodeID            | 1                   
 store_and_fwd_flag    | N                   
 dropoff_longitude     | -73.99501           
 dropoff_latitude      | 40.726326           
 payment_type          | 1                   
 fare_amount           | 7.00                
 extra                 | 0.50                
 mta_tax               | 0.50                
 tip_amount            | 1.66                
 tolls_amount          | 0.00                
 improvement_surcharge | 0.30                
 total_amount          | 9.96                
 pickup_year           | 2015                
 pickup_month          | 1        

                                                                                

Agora, podemos calcular os bins. Para tal, vamos utilizar o `QuantileDiscretizer` do módulo de ML.

In [None]:
columns_to_discretize = [
    "pickup_longitude", 
    "pickup_latitude", 
    "dropoff_longitude", 
    "dropoff_latitude"
]

df_cur = df_cur.drop(*[f"{c}_bin" for c in columns_to_discretize])

for c in columns_to_discretize:
    discretizer = QuantileDiscretizer(
        numBuckets=10,
        inputCol=c,
        outputCol=f"{c}_bin"
    )

    qd_model = discretizer.fit(df_cur)
    df_cur = qd_model.transform(df_cur)
    
    print(f"Intervalos para {c}: {qd_model.getSplits()}")

                                                                                

Intervalos para pickup_longitude: [-inf, -74.00230407714844, -73.99411010742188, -73.98981475830078, -73.98558807373047, -73.9815902709961, -73.9770278930664, -73.970947265625, -73.96157836914062, -73.95040130615234, inf]


                                                                                

Intervalos para pickup_latitude: [-inf, 40.718929290771484, 40.73113250732422, 40.74054718017578, 40.74776840209961, 40.753395080566406, 40.75917434692383, 40.76435089111328, 40.77155685424805, 40.78007888793945, inf]


                                                                                

Intervalos para dropoff_longitude: [-inf, -74.0025634765625, -73.9937515258789, -73.98895263671875, -73.98403930664062, -73.97968292236328, -73.97490692138672, -73.96775817871094, -73.95744323730469, -73.9458999633789, inf]




Intervalos para dropoff_latitude: [-inf, 40.71405029296875, 40.72899627685547, 40.739959716796875, 40.74785614013672, 40.75387954711914, 40.75968933105469, 40.7652473449707, 40.773799896240234, 40.7841911315918, inf]


                                                                                

In [None]:
df_cur.select(["pickup_longitude", "pickup_longitude_bin", "pickup_latitude", "pickup_latitude_bin"]).show(5)

[Stage 110:>                                                        (0 + 1) / 1]

+----------------+--------------------+---------------+-------------------+
|pickup_longitude|pickup_longitude_bin|pickup_latitude|pickup_latitude_bin|
+----------------+--------------------+---------------+-------------------+
|       -74.00266|                 0.0|      40.734142|                2.0|
|       -73.99113|                 2.0|       40.75008|                4.0|
|       -74.01024|                 0.0|      40.710724|                0.0|
|       -73.91871|                 9.0|      40.701317|                0.0|
|      -73.978745|                 5.0|      40.763767|                6.0|
+----------------+--------------------+---------------+-------------------+
only showing top 5 rows



                                                                                

In [None]:
df_cur.select(["dropoff_longitude", "dropoff_longitude_bin", "dropoff_latitude", "dropoff_latitude_bin"]).show(5)



+-----------------+---------------------+----------------+--------------------+
|dropoff_longitude|dropoff_longitude_bin|dropoff_latitude|dropoff_latitude_bin|
+-----------------+---------------------+----------------+--------------------+
|        -73.99501|                  1.0|       40.726326|                 1.0|
|        -73.98861|                  3.0|        40.73489|                 2.0|
|        -73.82712|                  9.0|       40.879898|                 9.0|
|        -73.86155|                  9.0|       40.768215|                 7.0|
|        -73.98117|                  4.0|       40.781273|                 8.0|
+-----------------+---------------------+----------------+--------------------+
only showing top 5 rows



                                                                                

Finalmente, podemos salvar os dados para realizar as análises no próximo notebook.

In [None]:
df_cur.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("compression", "gzip") \
    .partitionBy("pickup_year", "pickup_month") \
    .save("../data/curated/yellow_tripdata")

                                                                                