In [0]:
#importando bibliotecas
from pyspark.sql import functions
from pyspark.sql.types import *
from pyspark.sql.functions import *


In [0]:
#lendo as tabelas da camada 'raw' nyc_taxi
df_y = spark.table("workspace.nyc_taxi.yellow_taxi")
df_g = spark.table("workspace.nyc_taxi.green_taxi")

In [0]:
#flagando as tabelas pelo tipo de taxi
df_y= df_y.withColumn("taxi_color", lit("yellow_taxi"))
df_g= df_g.withColumn("taxi_color", lit("green_taxi"))


## Explorando e filtrando os dados

In [0]:
df_y.select(min("tpep_pickup_datetime")).show()
#df_g.select(min("lpep_pickup_datetime")).show()

+-------------------------+
|min(tpep_pickup_datetime)|
+-------------------------+
|      2001-01-01 00:06:49|
+-------------------------+



In [0]:
df_g.select(min("lpep_pickup_datetime")).show()

+-------------------------+
|min(lpep_pickup_datetime)|
+-------------------------+
|      2008-12-31 22:41:41|
+-------------------------+



In [0]:
df_y = df_y.filter(
    (col("tpep_pickup_datetime") >= "2023-01-01") & 
    (col("tpep_pickup_datetime") < "2023-06-01 ")
)

df_g = df_g.filter(
    (col("lpep_pickup_datetime") >= "2023-01-01") & 
    (col("lpep_pickup_datetime") < "2023-06-01 ")
)


In [0]:
df_g.count()

339621

In [0]:
df_g.select(min("lpep_pickup_datetime")).show()

+-------------------------+
|min(lpep_pickup_datetime)|
+-------------------------+
|      2023-01-01 00:01:31|
+-------------------------+



In [0]:
df_g.select(max("lpep_pickup_datetime")).show()

+-------------------------+
|max(lpep_pickup_datetime)|
+-------------------------+
|      2023-05-31 23:59:24|
+-------------------------+



In [0]:
from pyspark.sql.functions import sum as spark_sum
df_y.select([
    spark_sum(col('VendorID').isNull().cast('int')).alias('null_vendorid'),
    spark_sum(col("passenger_count").isNull().cast('int')).alias('null_passenger_count')]).show()

+-------------+--------------------+
|null_vendorid|null_passenger_count|
+-------------+--------------------+
|     13119554|            13191297|
+-------------+--------------------+



In [0]:
df_g.select([
    spark_sum(col('VendorID').isNull().cast('int')).alias('null_vendorid'),
    spark_sum(col("passenger_count").isNull().cast('int')).alias('null_passenger_count')]).show()

+-------------+--------------------+
|null_vendorid|null_passenger_count|
+-------------+--------------------+
|       271413|              275737|
+-------------+--------------------+



## Tratando os dados da coluna em formato json e preenchendo os valores null


In [0]:
def extract_fields_from_json(
    df,
    json_column,
    fields_and_types,  # Exemplo: [("VendorID", IntegerType()), ("passenger_count", IntegerType())]
    output_aliases      # Exemplo: ["VendorID_rescued", "passenger_count_rescued"]
):
    # Monta o schema apenas com os campos indicados
    rescued_schema = StructType([
        StructField(f, t, True) for f, t in fields_and_types
    ])
    # Faz o parsing da coluna json para Struct
    df2 = df.withColumn(
        "_temp_rescued_struct",
        from_json(col(json_column), rescued_schema)
    )
    # Cria as colunas com os aliases desejados
    for idx, alias in enumerate(output_aliases):
        df2 = df2.withColumn(alias, col(f"_temp_rescued_struct.{fields_and_types[idx][0]}"))
    # Limpa coluna temporária
    df2 = df2.drop("_temp_rescued_struct")
    return df2

In [0]:
# Campos que você quer extrair, com tipos
fields_and_types = [
    ("VendorID", IntegerType()),
    ("passenger_count", IntegerType())
]
# Alias na saída desejado
output_aliases = ["VendorID_resc_data", "passenger_count_resc_data"]

dfy_out = extract_fields_from_json(df_y, "_rescued_data", fields_and_types, output_aliases)
dfg_out = extract_fields_from_json(df_g, "_rescued_data", fields_and_types, output_aliases)

In [0]:
from pyspark.sql.functions import sum as spark_sum
dfy_out.select([
    spark_sum(col('VendorID').isNull().cast('int')).alias('null_vendorid'),
    spark_sum(col("VendorID_resc_data").isNull().cast('int')).alias('null_vendor_id_resc'),
    spark_sum(col("passenger_count").isNull().cast('int')).alias('null_passenger_count'),
    spark_sum(col('passenger_count_resc_data').isNull().cast('int')).alias('null_passenger_count_resc')
]).show()

+-------------+-------------------+--------------------+-------------------------+
|null_vendorid|null_vendor_id_resc|null_passenger_count|null_passenger_count_resc|
+-------------+-------------------+--------------------+-------------------------+
|     13119554|            3066728|            13191297|                  3423650|
+-------------+-------------------+--------------------+-------------------------+



In [0]:
dfg_out.select([
    spark_sum(col('VendorID').isNull().cast('int')).alias('null_vendorid'),
    spark_sum(col("VendorID_resc_data").isNull().cast('int')).alias('null_vendor_id_resc'),
    spark_sum(col("passenger_count").isNull().cast('int')).alias('null_passenger_count'),
    spark_sum(col('passenger_count_resc_data').isNull().cast('int')).alias('null_passenger_count_resc')
]).show()

+-------------+-------------------+--------------------+-------------------------+
|null_vendorid|null_vendor_id_resc|null_passenger_count|null_passenger_count_resc|
+-------------+-------------------+--------------------+-------------------------+
|       271413|              68208|              275737|                    86780|
+-------------+-------------------+--------------------+-------------------------+



In [0]:
#coalesce para preencher os dados de vendorID e de passenger_count que estavam faltando com os valores do rescued_data.
dfy_out = (
dfy_out.withColumn("VendorID",coalesce(col("VendorID"), col("VendorID_resc_data")))
       .withColumn("passenger_count",coalesce(col("passenger_count"), col("passenger_count_resc_data")))
    )

dfg_out = (
dfg_out.withColumn("VendorID",coalesce(col("VendorID"), col("VendorID_resc_data")))
       .withColumn("passenger_count",coalesce(col("passenger_count"), col("passenger_count_resc_data")))
    )

In [0]:
#Acho que aqui vale a pena renomear para o nome das colunas originais
#dfy_out = dfy_out.withColumnRenamed("VendorID_final","VendorID").withColumnRenamed("passenger_count_final","passenger_count")
#dfg_out = dfg_out.withColumnRenamed("VendorID_final","VendorID").withColumnRenamed("passenger_count_final","passenger_count")

In [0]:
#colunas solicitadas no case
df_y_filt = dfy_out.select("VendorID","passenger_count","total_amount","tpep_pickup_datetime","tpep_dropoff_datetime","taxi_color")
df_g_filt = dfg_out.select("VendorID","passenger_count","total_amount","lpep_pickup_datetime","lpep_dropoff_datetime","taxi_color")

In [0]:
df_g_filt.select([
    spark_sum(col('VendorID').isNull().cast('int')).alias('null_vendorid'),
    spark_sum(col("passenger_count").isNull().cast('int')).alias('null_passenger_count')
]).show()

+-------------+--------------------+
|null_vendorid|null_passenger_count|
+-------------+--------------------+
|            0|               22896|
+-------------+--------------------+



In [0]:
df_y_filt.select([
    spark_sum(col('VendorID').isNull().cast('int')).alias('null_vendorid'),
    spark_sum(col("passenger_count").isNull().cast('int')).alias('null_passenger_count')
]).show()

+-------------+--------------------+
|null_vendorid|null_passenger_count|
+-------------+--------------------+
|            0|              428665|
+-------------+--------------------+



## Enviando os dados tratados para a camada de consumo

In [0]:
path_consumo_y = "s3://datalake-joao/consumo/yellow_taxi_curated"
df_y_filt.write.format("delta").mode("overwrite").save(path_consumo_y)


In [0]:
path_consumo_g = "s3://datalake-joao/consumo/green_taxi_curated"
df_g_filt.write.format("delta").mode("overwrite").save(path_consumo_g)

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS curated_nyc_taxi;

In [0]:
%sql
DROP TABLE IF EXISTS curated_nyc_taxi.green_taxi_curated;

CREATE TABLE curated_nyc_taxi.green_taxi_curated
USING DELTA
LOCATION 's3://datalake-joao/consumo/green_taxi_curated'

In [0]:
%sql
DROP TABLE IF EXISTS curated_nyc_taxi.yellow_taxi_curated;

CREATE TABLE curated_nyc_taxi.yellow_taxi_curated
USING DELTA
LOCATION 's3://datalake-joao/consumo/yellow_taxi_curated'