# Ingestão dos dados para a camada bronze

A camada bronze faz parte da arquitetura de data lakehouse em múltiplas camadas (bronze, prata, ouro), e tem como principal objetivo armazenar os dados brutos em formato otimizado para processamento e leitura, preservando a integridade do que foi recebido na landing zone.

Enquanto a landing zone serve como um repositório temporário de arquivos exatamente como foram extraídos da fonte (no caso, arquivos .parquet disponibilizados pela agência de Nova York), a camada bronze representa o primeiro estágio estruturado e rastreável do pipeline de dados.

## Importação das bibliotecas necessárias

In [0]:
from pyspark.sql.functions import col

## Parâmetros para a leitura dos arquivos

Não utilizaremos um schema definido na leitura, mas o código abaixo mostra como identificar o schema de um arquivo. Isso é importante porque podem existir diferenças nos tipos de dados entre os arquivos. Quando isso acontece, é necessário fazer o cast das colunas para que o Spark consiga unificá-las em um único DataFrame sem erros.

In [0]:
df = spark.read.parquet('dbfs:/FileStore/tables/landing_zone/yellow_taxi_data/yellow_tripdata_2023_01.parquet')
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



## Funções para processar cada tipo de táxi

**Observação:** O Spark não conseguiu unificar os esquemas diferentes entre os arquivos .parquet. Por exemplo, nos dados do yellow taxi, a coluna passenger_count aparece com o tipo INT64 em um arquivo e DoubleType em outro. Esse tipo de divergência causa erro, pois o Spark não consegue fundir automaticamente tipos incompatíveis ao realizar um .read.parquet() em múltiplos arquivos.

Para contornar esse problema, realizei manualmente o cast das colunas exigidas no enunciado para garantir a padronização dos tipos durante a leitura. Com isso, foi possível unificar os dados em um único DataFrame ainda na camada bronze, preservando os dados brutos e assegurando a consistência do schema. Não criei uma única função e não usei nenhum artifício para diminuir o código, pois o objetivo é tornar clara as modificações realizadas.

#### Yellow taxi data

In [0]:
def processar_yellow_taxi(yellow_path: str):
   
    ano = '2023'
    meses = ['01', '02', '03', '04', '05']

    dfs = []

    for mes in meses:

        arquivo = f"{yellow_path}yellow_tripdata_{ano}_{mes}.parquet"
        print(f"Lendo arquivo: {arquivo}")

        df = spark.read.parquet(arquivo) \
            .withColumn("VendorID", col("VendorID").cast("long")) \
            .withColumn("passenger_count", col("passenger_count").cast("long")) \
            .withColumn("total_amount", col("total_amount").cast("double")) \
            .withColumn("tpep_pickup_datetime", col("tpep_pickup_datetime").cast("timestamp")) \
            .withColumn("tpep_dropoff_datetime", col("tpep_dropoff_datetime").cast("timestamp"))
        dfs.append(df)

    if not dfs:
        raise Exception("Nenhum arquivo foi lido com sucesso.")

    df_unificado = dfs[0]
    
    for df in dfs[1:]:
        df_unificado = df_unificado.unionByName(df, allowMissingColumns=True)

    return df_unificado

In [0]:
path = "dbfs:/FileStore/tables/landing_zone/yellow_taxi_data/"
df_yellow = processar_yellow_taxi(yellow_path=path)
display(df_yellow.limit(10))

Lendo arquivo: dbfs:/FileStore/tables/landing_zone/yellow_taxi_data/yellow_tripdata_2023_01.parquet
Lendo arquivo: dbfs:/FileStore/tables/landing_zone/yellow_taxi_data/yellow_tripdata_2023_02.parquet
Lendo arquivo: dbfs:/FileStore/tables/landing_zone/yellow_taxi_data/yellow_tripdata_2023_03.parquet
Lendo arquivo: dbfs:/FileStore/tables/landing_zone/yellow_taxi_data/yellow_tripdata_2023_04.parquet
Lendo arquivo: dbfs:/FileStore/tables/landing_zone/yellow_taxi_data/yellow_tripdata_2023_05.parquet


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
2,2023-01-01T00:32:10.000+0000,2023-01-01T00:40:36.000+0000,1,0.97,1.0,N,161,141,2,9.3,1.0,0.5,0.0,0.0,1.0,14.3,2.5,0.0
2,2023-01-01T00:55:08.000+0000,2023-01-01T01:01:27.000+0000,1,1.1,1.0,N,43,237,1,7.9,1.0,0.5,4.0,0.0,1.0,16.9,2.5,0.0
2,2023-01-01T00:25:04.000+0000,2023-01-01T00:37:49.000+0000,1,2.51,1.0,N,48,238,1,14.9,1.0,0.5,15.0,0.0,1.0,34.9,2.5,0.0
1,2023-01-01T00:03:48.000+0000,2023-01-01T00:13:25.000+0000,0,1.9,1.0,N,138,7,1,12.1,7.25,0.5,0.0,0.0,1.0,20.85,0.0,1.25
2,2023-01-01T00:10:29.000+0000,2023-01-01T00:21:19.000+0000,1,1.43,1.0,N,107,79,1,11.4,1.0,0.5,3.28,0.0,1.0,19.68,2.5,0.0
2,2023-01-01T00:50:34.000+0000,2023-01-01T01:02:52.000+0000,1,1.84,1.0,N,161,137,1,12.8,1.0,0.5,10.0,0.0,1.0,27.8,2.5,0.0
2,2023-01-01T00:09:22.000+0000,2023-01-01T00:19:49.000+0000,1,1.66,1.0,N,239,143,1,12.1,1.0,0.5,3.42,0.0,1.0,20.52,2.5,0.0
2,2023-01-01T00:27:12.000+0000,2023-01-01T00:49:56.000+0000,1,11.7,1.0,N,142,200,1,45.7,1.0,0.5,10.74,3.0,1.0,64.44,2.5,0.0
2,2023-01-01T00:21:44.000+0000,2023-01-01T00:36:40.000+0000,1,2.95,1.0,N,164,236,1,17.7,1.0,0.5,5.68,0.0,1.0,28.38,2.5,0.0
2,2023-01-01T00:39:42.000+0000,2023-01-01T00:50:36.000+0000,1,3.01,1.0,N,141,107,2,14.9,1.0,0.5,0.0,0.0,1.0,19.9,2.5,0.0


#### Green taxi data

In [0]:
def processar_green_taxi(green_path: str):
   
    ano = '2023'
    meses = ['01', '02', '03', '04', '05']

    dfs = []

    for mes in meses:

        arquivo = f"{green_path}green_tripdata_{ano}_{mes}.parquet"
        print(f"Lendo arquivo: {arquivo}")

        df = spark.read.parquet(arquivo) \
            .withColumn("VendorID", col("VendorID").cast("long")) \
            .withColumn("passenger_count", col("passenger_count").cast("long")) \
            .withColumn("total_amount", col("total_amount").cast("double")) \
            .withColumn("lpep_pickup_datetime", col("lpep_pickup_datetime").cast("timestamp")) \
            .withColumn("lpep_dropoff_datetime", col("lpep_dropoff_datetime").cast("timestamp"))
        dfs.append(df)

    if not dfs:
        raise Exception("Nenhum arquivo foi lido com sucesso.")

    df_unificado = dfs[0]
    
    for df in dfs[1:]:
        df_unificado = df_unificado.unionByName(df, allowMissingColumns=True)

    return df_unificado

In [0]:
path = "dbfs:/FileStore/tables/landing_zone/green_taxi_data/"
df_green = processar_green_taxi(green_path=path)
display(df_green.limit(10))

Lendo arquivo: dbfs:/FileStore/tables/landing_zone/green_taxi_data/green_tripdata_2023_01.parquet
Lendo arquivo: dbfs:/FileStore/tables/landing_zone/green_taxi_data/green_tripdata_2023_02.parquet
Lendo arquivo: dbfs:/FileStore/tables/landing_zone/green_taxi_data/green_tripdata_2023_03.parquet
Lendo arquivo: dbfs:/FileStore/tables/landing_zone/green_taxi_data/green_tripdata_2023_04.parquet
Lendo arquivo: dbfs:/FileStore/tables/landing_zone/green_taxi_data/green_tripdata_2023_05.parquet


VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
2,2023-01-01T00:26:10.000+0000,2023-01-01T00:37:11.000+0000,N,1.0,166,143,1,2.58,14.9,1.0,0.5,4.03,0.0,,1.0,24.18,1.0,1.0,2.75
2,2023-01-01T00:51:03.000+0000,2023-01-01T00:57:49.000+0000,N,1.0,24,43,1,1.81,10.7,1.0,0.5,2.64,0.0,,1.0,15.84,1.0,1.0,0.0
2,2023-01-01T00:35:12.000+0000,2023-01-01T00:41:32.000+0000,N,1.0,223,179,1,0.0,7.2,1.0,0.5,1.94,0.0,,1.0,11.64,1.0,1.0,0.0
1,2023-01-01T00:13:14.000+0000,2023-01-01T00:19:03.000+0000,N,1.0,41,238,1,1.3,6.5,0.5,1.5,1.7,0.0,,1.0,10.2,1.0,1.0,0.0
1,2023-01-01T00:33:04.000+0000,2023-01-01T00:39:02.000+0000,N,1.0,41,74,1,1.1,6.0,0.5,1.5,0.0,0.0,,1.0,8.0,1.0,1.0,0.0
2,2023-01-01T00:53:31.000+0000,2023-01-01T01:11:04.000+0000,N,1.0,41,262,1,2.78,17.7,1.0,0.5,0.0,0.0,,1.0,22.95,2.0,1.0,2.75
1,2023-01-01T00:09:14.000+0000,2023-01-01T00:26:39.000+0000,N,1.0,181,45,2,3.8,19.1,3.75,1.5,4.85,0.0,,1.0,29.2,1.0,1.0,2.75
2,2023-01-01T00:11:58.000+0000,2023-01-01T00:24:55.000+0000,N,1.0,24,75,1,1.88,14.2,1.0,0.5,0.0,0.0,,1.0,16.7,2.0,1.0,0.0
2,2023-01-01T00:41:29.000+0000,2023-01-01T00:46:26.000+0000,N,1.0,41,166,2,1.11,7.2,1.0,0.5,1.0,0.0,,1.0,10.7,1.0,1.0,0.0
2,2023-01-01T00:50:32.000+0000,2023-01-01T01:13:42.000+0000,N,1.0,24,140,1,4.22,24.7,1.0,0.5,3.0,0.0,,1.0,32.95,1.0,1.0,2.75


#### FHV e FHVHV taxi data

In [0]:
def processar_dados_taxi(taxi_path: str, tipo: str):
   
    ano = '2023'
    meses = ['01', '02', '03', '04', '05']

    dfs = []

    for mes in meses:

        arquivo = f"{taxi_path}{tipo}_tripdata_{ano}_{mes}.parquet"
        print(f"Lendo arquivo: {arquivo}")

        df = spark.read.parquet(arquivo)
        dfs.append(df)

    if not dfs:
        raise Exception("Nenhum arquivo foi lido com sucesso.")

    df_unificado = dfs[0]
    
    for df in dfs[1:]:
        df_unificado = df_unificado.unionByName(df, allowMissingColumns=True)

    return df_unificado

In [0]:
path = "dbfs:/FileStore/tables/landing_zone/fhv_taxi_data/"
df_fhv = processar_dados_taxi(taxi_path=path, tipo='fhv')
display(df_fhv.limit(10))

Lendo arquivo: dbfs:/FileStore/tables/landing_zone/fhv_taxi_data/fhv_tripdata_2023_01.parquet
Lendo arquivo: dbfs:/FileStore/tables/landing_zone/fhv_taxi_data/fhv_tripdata_2023_02.parquet
Lendo arquivo: dbfs:/FileStore/tables/landing_zone/fhv_taxi_data/fhv_tripdata_2023_03.parquet
Lendo arquivo: dbfs:/FileStore/tables/landing_zone/fhv_taxi_data/fhv_tripdata_2023_04.parquet
Lendo arquivo: dbfs:/FileStore/tables/landing_zone/fhv_taxi_data/fhv_tripdata_2023_05.parquet


dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
B00008,2023-01-01T00:30:00.000+0000,2023-01-01T01:00:00.000+0000,,,,B00008
B00078,2023-01-01T00:01:00.000+0000,2023-01-01T03:15:00.000+0000,,,,B00078
B00111,2023-01-01T00:30:00.000+0000,2023-01-01T01:05:00.000+0000,,,,B03406
B00112,2023-01-01T00:34:45.000+0000,2023-01-01T00:52:03.000+0000,,14.0,,B00112
B00112,2023-01-01T00:11:20.000+0000,2023-01-01T00:22:03.000+0000,,14.0,,B00112
B00112,2023-01-01T00:33:28.000+0000,2023-01-01T00:53:46.000+0000,,29.0,,B00112
B00112,2023-01-01T00:33:11.000+0000,2023-01-01T00:48:45.000+0000,,14.0,,B00112
B00112,2023-01-01T00:55:24.000+0000,2023-01-01T01:02:55.000+0000,,14.0,,B00112
B00112,2023-01-01T00:39:16.000+0000,2023-01-01T00:39:23.000+0000,,14.0,,B00112
B00112,2023-01-01T00:50:10.000+0000,2023-01-01T00:50:17.000+0000,,14.0,,B00112


In [0]:
path = "dbfs:/FileStore/tables/landing_zone/fhvhv_taxi_data/"
df_fhvhv = processar_dados_taxi(taxi_path=path, tipo='fhvhv')
display(df_fhvhv.limit(10))

Lendo arquivo: dbfs:/FileStore/tables/landing_zone/fhvhv_taxi_data/fhvhv_tripdata_2023_01.parquet
Lendo arquivo: dbfs:/FileStore/tables/landing_zone/fhvhv_taxi_data/fhvhv_tripdata_2023_02.parquet
Lendo arquivo: dbfs:/FileStore/tables/landing_zone/fhvhv_taxi_data/fhvhv_tripdata_2023_03.parquet
Lendo arquivo: dbfs:/FileStore/tables/landing_zone/fhvhv_taxi_data/fhvhv_tripdata_2023_04.parquet
Lendo arquivo: dbfs:/FileStore/tables/landing_zone/fhvhv_taxi_data/fhvhv_tripdata_2023_05.parquet


hvfhs_license_num,dispatching_base_num,originating_base_num,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,trip_miles,trip_time,base_passenger_fare,tolls,bcf,sales_tax,congestion_surcharge,airport_fee,tips,driver_pay,shared_request_flag,shared_match_flag,access_a_ride_flag,wav_request_flag,wav_match_flag
HV0003,B03404,B03404,2023-01-01T00:18:06.000+0000,2023-01-01T00:19:24.000+0000,2023-01-01T00:19:38.000+0000,2023-01-01T00:48:07.000+0000,48,68,0.94,1709,25.95,0.0,0.78,2.3,2.75,0.0,5.22,27.83,N,N,,N,N
HV0003,B03404,B03404,2023-01-01T00:48:42.000+0000,2023-01-01T00:56:20.000+0000,2023-01-01T00:58:39.000+0000,2023-01-01T01:33:08.000+0000,246,163,2.78,2069,60.14,0.0,1.8,5.34,2.75,0.0,0.0,50.15,N,N,,N,N
HV0003,B03404,B03404,2023-01-01T00:15:35.000+0000,2023-01-01T00:20:14.000+0000,2023-01-01T00:20:27.000+0000,2023-01-01T00:37:54.000+0000,9,129,8.81,1047,24.37,0.0,0.73,2.16,0.0,0.0,0.0,20.22,N,N,,N,N
HV0003,B03404,B03404,2023-01-01T00:35:24.000+0000,2023-01-01T00:39:30.000+0000,2023-01-01T00:41:05.000+0000,2023-01-01T00:48:16.000+0000,129,129,0.67,431,13.8,0.0,0.41,1.22,0.0,0.0,0.0,7.9,N,N,,N,N
HV0003,B03404,B03404,2023-01-01T00:43:15.000+0000,2023-01-01T00:51:10.000+0000,2023-01-01T00:52:47.000+0000,2023-01-01T01:04:51.000+0000,129,92,4.38,724,20.49,0.0,0.61,1.82,0.0,0.0,0.0,16.48,N,N,,N,N
HV0005,B03406,,2023-01-01T00:21:34.000+0000,,2023-01-01T00:29:05.000+0000,2023-01-01T00:49:54.000+0000,130,38,4.921,1249,18.29,0.0,0.43,1.27,0.0,0.0,0.0,16.81,N,N,N,N,N
HV0005,B03406,,2023-01-01T00:47:17.000+0000,,2023-01-01T00:55:29.000+0000,2023-01-01T01:16:07.000+0000,38,10,5.517,1238,25.76,0.0,0.77,2.29,0.0,0.0,0.0,23.65,N,N,N,N,N
HV0003,B03404,B03404,2023-01-01T00:06:54.000+0000,2023-01-01T00:08:59.000+0000,2023-01-01T00:10:29.000+0000,2023-01-01T00:18:22.000+0000,90,231,1.89,473,14.51,0.0,0.44,1.29,2.75,0.0,0.0,13.73,N,N,,N,Y
HV0003,B03404,B03404,2023-01-01T00:15:22.000+0000,2023-01-01T00:21:39.000+0000,2023-01-01T00:22:10.000+0000,2023-01-01T00:33:14.000+0000,125,246,2.65,664,13.0,0.0,0.39,1.15,2.75,0.0,0.0,13.61,N,N,,N,Y
HV0003,B03404,B03404,2023-01-01T00:26:02.000+0000,2023-01-01T00:39:09.000+0000,2023-01-01T00:39:09.000+0000,2023-01-01T01:03:50.000+0000,68,231,3.26,1481,30.38,0.0,0.91,2.7,2.75,0.0,0.0,28.25,N,N,,N,Y


## Carregar todos os arquivos na camada bronze

In [0]:
def escrever_bronze(df, tipo: str, bronze_path: str):
    
    destino = f"{bronze_path}/{tipo}_taxi_data"
    
    print(f"Escrevendo dados para o tipo '{tipo}' na camada bronze em: {destino}")
    
    # Escrita como Delta

    df.write.format('delta').mode('overwrite').save(destino)
    
    # Criacao da tabela no catalogo (hive)

    spark.sql(f"DROP TABLE IF EXISTS bronze_{tipo}_taxi_data")
    
    spark.sql(f"""
        CREATE TABLE bronze_{tipo}_taxi_data
        USING DELTA
        LOCATION '{destino}'
    """)
    
    print(f"Tabela 'bronze_{tipo}_taxi_data' criada com sucesso.")

In [0]:
bronze_path = 'dbfs:/FileStore/tables/bronze'

escrever_bronze(df_yellow, 'yellow', bronze_path)
escrever_bronze(df_green, 'green', bronze_path)
escrever_bronze(df_fhv, 'fhv', bronze_path)
escrever_bronze(df_fhvhv, 'fhvhv', bronze_path)

Escrevendo dados para o tipo 'yellow' na camada bronze em: dbfs:/FileStore/tables/bronze/yellow_taxi_data
Tabela 'bronze_yellow_taxi_data' criada com sucesso.
Escrevendo dados para o tipo 'green' na camada bronze em: dbfs:/FileStore/tables/bronze/green_taxi_data
Tabela 'bronze_green_taxi_data' criada com sucesso.
Escrevendo dados para o tipo 'fhv' na camada bronze em: dbfs:/FileStore/tables/bronze/fhv_taxi_data
Tabela 'bronze_fhv_taxi_data' criada com sucesso.
Escrevendo dados para o tipo 'fhvhv' na camada bronze em: dbfs:/FileStore/tables/bronze/fhvhv_taxi_data
Tabela 'bronze_fhvhv_taxi_data' criada com sucesso.


A partir deste momento, não é mais necessário realizar a leitura direta dos arquivos '.parquet'. Podemos utilizar as tabelas Delta registradas que oferecem diversas vantagens importantes para a organização e eficiência da solução do teste. Em primeiro lugar, temos a abstração do caminho físico dos arquivos, tornando possível acessar os dados apenas pelo nome da tabela, o que simplifica consultas e manipulações. Além disso, o Delta Lake oferece otimizações internas, como controle de metadados, indexação e log de transações, que resultam em melhor desempenho na leitura e escrita dos dados.

## Exemplo de consulta

In [0]:
%sql
SELECT *
FROM bronze_yellow_taxi_data
LIMIT 10

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
2,2023-01-01T00:32:10.000+0000,2023-01-01T00:40:36.000+0000,1,0.97,1.0,N,161,141,2,9.3,1.0,0.5,0.0,0.0,1.0,14.3,2.5,0.0
2,2023-01-01T00:55:08.000+0000,2023-01-01T01:01:27.000+0000,1,1.1,1.0,N,43,237,1,7.9,1.0,0.5,4.0,0.0,1.0,16.9,2.5,0.0
2,2023-01-01T00:25:04.000+0000,2023-01-01T00:37:49.000+0000,1,2.51,1.0,N,48,238,1,14.9,1.0,0.5,15.0,0.0,1.0,34.9,2.5,0.0
1,2023-01-01T00:03:48.000+0000,2023-01-01T00:13:25.000+0000,0,1.9,1.0,N,138,7,1,12.1,7.25,0.5,0.0,0.0,1.0,20.85,0.0,1.25
2,2023-01-01T00:10:29.000+0000,2023-01-01T00:21:19.000+0000,1,1.43,1.0,N,107,79,1,11.4,1.0,0.5,3.28,0.0,1.0,19.68,2.5,0.0
2,2023-01-01T00:50:34.000+0000,2023-01-01T01:02:52.000+0000,1,1.84,1.0,N,161,137,1,12.8,1.0,0.5,10.0,0.0,1.0,27.8,2.5,0.0
2,2023-01-01T00:09:22.000+0000,2023-01-01T00:19:49.000+0000,1,1.66,1.0,N,239,143,1,12.1,1.0,0.5,3.42,0.0,1.0,20.52,2.5,0.0
2,2023-01-01T00:27:12.000+0000,2023-01-01T00:49:56.000+0000,1,11.7,1.0,N,142,200,1,45.7,1.0,0.5,10.74,3.0,1.0,64.44,2.5,0.0
2,2023-01-01T00:21:44.000+0000,2023-01-01T00:36:40.000+0000,1,2.95,1.0,N,164,236,1,17.7,1.0,0.5,5.68,0.0,1.0,28.38,2.5,0.0
2,2023-01-01T00:39:42.000+0000,2023-01-01T00:50:36.000+0000,1,3.01,1.0,N,141,107,2,14.9,1.0,0.5,0.0,0.0,1.0,19.9,2.5,0.0
