<a href="https://colab.research.google.com/github/vggd18/pyspark-etl-acidentes-recife/blob/main/etl_acidentes_recife.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Projeto de Data Lakehouse: Acidentes de Trânsito do Recife

**Objetivo:** Construir um pipeline de dados completo (ETL) utilizando PySpark e Delta Lake para processar dados abertos de acidentes de trânsito da cidade do Recife. O projeto segue a arquitetura Medalhão (Bronze, Silver, Gold) para criar um Data Lakehouse robusto, otimizado e pronto para análises.

**Ferramentas:** PySpark, Delta Lake, Python

---

## 1. Configuração do Ambiente (Environment Setup)

Nesta seção, preparamos nosso ambiente de desenvolvimento no Google Colab, instalando as bibliotecas necessárias e configurando a sessão Spark com suporte ao Delta Lake.

### 1.1. Download dos Dados de Origem (Source Data)

A primeira etapa do pipeline é a ingestão dos dados brutos. Aqui, fazemos o download dos arquivos CSV anuais (2019-2024) diretamente do [Portal de Dados Abertos da Prefeitura do Recife](http://dados.recife.pe.gov.br/dataset/acidentes-de-transito-com-e-sem-vitimas).

In [1]:
import os
import requests

urls = {
  "2019": "http://dados.recife.pe.gov.br/dataset/44087d2d-73b5-4ab3-9bd8-78da7436eed1/resource/3531bafe-d47d-415e-b154-a881081ac76c/download/acidentes-2019.csv",
  "2020": "http://dados.recife.pe.gov.br/dataset/44087d2d-73b5-4ab3-9bd8-78da7436eed1/resource/fc1c8460-0406-4fff-b51a-e79205d1f1ab/download/acidentes_2020-novo.csv",
  "2021": "http://dados.recife.pe.gov.br/dataset/44087d2d-73b5-4ab3-9bd8-78da7436eed1/resource/2caa8f41-ccd9-4ea5-906d-f66017d6e107/download/acidentes2021.csv",
  "2022": "http://dados.recife.pe.gov.br/dataset/44087d2d-73b5-4ab3-9bd8-78da7436eed1/resource/971e0228-fa9c-4a42-b360-c842b29d2f56/download/acidentes2022.csv",
  "2023": "http://dados.recife.pe.gov.br/dataset/44087d2d-73b5-4ab3-9bd8-78da7436eed1/resource/d26b864b-0f7b-403e-b142-fd9989acaaf5/download/acidentes2023.csv",
  "2024": "http://dados.recife.pe.gov.br/dataset/44087d2d-73b5-4ab3-9bd8-78da7436eed1/resource/29afbf42-a36c-475c-8b75-761e17e67679/download/acidentes2024.csv"
}

output_dir = 'data'
os.makedirs(output_dir, exist_ok=True)

for year, url in urls.items():
  file_name = f"acidentes_{year}.csv"
  file_path =  os.path.join(output_dir, file_name)

  response = requests.get(url)

  if response.status_code == 200:
    with open(file_path, 'wb') as file:
      file.write(response.content)
    print(f"Archive {file_name} downloaded successfully.")
  else:
    print(f"Failed to download {file_name}. Status code: {response.status_code}")

print("All files downloaded successfully.")



Archive acidentes_2019.csv downloaded successfully.
Archive acidentes_2020.csv downloaded successfully.
Archive acidentes_2021.csv downloaded successfully.
Archive acidentes_2022.csv downloaded successfully.
Archive acidentes_2023.csv downloaded successfully.
Archive acidentes_2024.csv downloaded successfully.
All files downloaded successfully.


### 1.2. Inicialização da Sessão Spark (Spark Session)

Configuramos uma sessão Spark habilitada para o Delta Lake. Utilizamos a função `configure_spark_with_delta_pip` que garante a correta configuração das dependências Java (JARs), resolvendo os desafios de compatibilidade do ambiente.

In [2]:
!pip install pyspark==3.5.1 delta-spark==3.2.0 -q

In [3]:
from pyspark.sql import SparkSession
from delta import *

builder = (
  SparkSession.builder.appName("EtlAcidentesRecife")
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

print("SparkSession and Delta Lake configured successfully!")

SparkSession and Delta Lake configured successfully!


## 2. Camada Bronze: Ingestão e Armazenamento dos Dados Brutos

O objetivo da Camada Bronze é criar uma cópia fiel, histórica e imutável dos dados de origem. Nesta etapa, lemos todos os arquivos CSV, lidamos com as inconsistências de schema e salvamos os dados em uma única tabela Delta particionada.

### 2.1. Análise de Consistência do Schema (Schema Drift Analysis)

Antes de carregar todos os arquivos, é uma boa prática verificar se eles possuem a mesma estrutura. Nosso script de análise revelou um **Schema Drift** significativo: os nomes e o número de colunas mudam ao longo dos anos. Esta descoberta é crucial e justifica a necessidade de um schema unificado manual.

In [4]:
df_acidentes = spark.read \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .option("delimiter", ";") \
  .csv('data/acidentes_2019.csv')

In [5]:
df_acidentes.printSchema()

root
 |-- DATA: date (nullable = true)
 |-- hora: string (nullable = true)
 |-- natureza_acidente: string (nullable = true)
 |-- situacao: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- endereco: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- detalhe_endereco_acidente: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- endereco_cruzamento: string (nullable = true)
 |-- numero_cruzamento: string (nullable = true)
 |-- referencia_cruzamento: string (nullable = true)
 |-- bairro_cruzamento: string (nullable = true)
 |-- num_semaforo: integer (nullable = true)
 |-- sentido_via: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- descricao: string (nullable = true)
 |-- auto: integer (nullable = true)
 |-- moto: integer (nullable = true)
 |-- ciclom: integer (nullable = true)
 |-- ciclista: integer (nullable = true)
 |-- pedestre: integer (nullable = true)
 |-- onibus: integer (nullable = true)
 |-- caminhao: inte

In [6]:
df_acidentes.show(5, truncate=False)

+----------+--------+-----------------+----------+-----------+-------------------------------+------+---------------------------+-----------------------------------------------------------+-------------------------------+-----------------+-----------------------------------------------------------+-----------------+------------+-----------+------------------------+-----------------------------------------------------------------------------------+----+----+------+--------+--------+------+--------+-------+------+-------+-------------+-------------------+-----------+-----------------+---------------+------------+---------------+-----------------+--------------+------------------+-----------+----------------+------------+------------+
|DATA      |hora    |natureza_acidente|situacao  |bairro     |endereco                       |numero|detalhe_endereco_acidente  |complemento                                                |endereco_cruzamento            |numero_cruzamento|referencia_cruzamen

In [7]:
df_acidentes.describe().show()

+-------+--------+-----------------+---------+---------+-------------------+------------------+-------------------------+--------------------+-------------------+------------------+---------------------+-----------------+------------------+-----------------+--------------------+------------------+------------------+------------------+------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+-------------+-------------------+-----------+-----------------+---------------+------------+---------------+--------------+--------------+------------------+-----------+------------+------------+------------+
|summary|    hora|natureza_acidente| situacao|   bairro|           endereco|            numero|detalhe_endereco_acidente|         complemento|endereco_cruzamento| numero_cruzamento|referencia_cruzamento|bairro_cruzamento|      num_semaforo|      sentido_via|                tipo|         descricao|          

In [8]:
import os
import csv

data_dir = 'data/'

csv_files = sorted([f for f in os.listdir(data_dir) if f.endswith('.csv')])

if not csv_files:
  print("No CSV files found in the directory 'data/'")
else:
  base_path = os.path.join(data_dir, csv_files[0])
  base_header = spark.read.option("delimiter", ";").option("header", "true").csv(base_path).columns

  equals = True

  for f in csv_files[1:]:
    current_path = os.path.join(data_dir, f)
    current_header = spark.read.option("delimiter", ";").option("header", "true").csv(current_path).columns
    if current_header != base_header:
      equals = False
      print(f"\n!!! ALERT: The header of '{f}' is DIFFERENT! Analysis:")
      base_set = set(base_header)
      current_set = set(current_header)
      removed_columns = base_set - current_set
      if removed_columns:
          print(f"  - Missing columns in this file: {list(removed_columns)}")
      added_columns = current_set - base_set
      if added_columns:
          print(f"  - Extra columns found in this file: {list(added_columns)}")
      if len(base_header) != len(current_header):
            print(f"  - Column count diverges: {len(base_header)} in reference vs. {len(current_header)} in this file.") # Translated
      print("-" * 30)

  if equals:
      print("\nGreat news! All CSV files have the same header.")


!!! ALERT: The header of 'acidentes_2020.csv' is DIFFERENT! Analysis:
  - Missing columns in this file: ['referencia_cruzamento', 'numero_cruzamento', 'endereco_cruzamento', 'DATA']
  - Extra columns found in this file: ['data']
  - Column count diverges: 41 in reference vs. 38 in this file.
------------------------------

!!! ALERT: The header of 'acidentes_2021.csv' is DIFFERENT! Analysis:
  - Missing columns in this file: ['endereco_cruzamento', 'referencia_cruzamento', 'DATA', 'numero_cruzamento', 'descricao']
  - Extra columns found in this file: ['data']
  - Column count diverges: 41 in reference vs. 37 in this file.
------------------------------

!!! ALERT: The header of 'acidentes_2022.csv' is DIFFERENT! Analysis:
  - Missing columns in this file: ['natureza_acidente', 'endereco_cruzamento', 'referencia_cruzamento', 'DATA', 'numero_cruzamento', 'descricao']
  - Extra columns found in this file: ['Protocolo', 'data', 'natureza']
  - Column count diverges: 41 in reference vs. 3

In [9]:
df_acidentes2024 = spark.read \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .option("delimiter", ";") \
  .csv('data/acidentes_2024.csv')

df_acidentes2024.describe().show()

+-------+-----------+------------+----------+-------+--------------------+-----------------+-------------------------+--------------------+-----------------+------------+-----------+--------------------+----+----+------+--------+--------+------+--------+-------+------+-------+-------------+-------------------+-----------+-----------------+-----------+------------+---------------+--------------+--------------+------------------+-----------+------------+------------+------------+
|summary|  Protocolo|    natureza|  situacao| bairro|            endereco|           numero|detalhe_endereco_acidente|         complemento|bairro_cruzamento|num_semaforo|sentido_via|                tipo|auto|moto|ciclom|ciclista|pedestre|onibus|caminhao|viatura|outros|vitimas|vitimasfatais|acidente_verificado|tempo_clima|situacao_semaforo|sinalizacao|condicao_via|conservacao_via|ponto_controle|situacao_placa|velocidade_max_via|mao_direcao|divisao_via1|divisao_via2|divisao_via3|
+-------+-----------+------------+

### 2.2. Definição do Schema Unificado (Unified Schema Definition)

Com base na análise, definimos um **schema unificado e explícito** usando `StructType`. Este schema representa o "superconjunto" de todas as colunas encontradas em todos os arquivos, padronizando os nomes (ex: `DATA` para `data`) e definindo os tipos de dados corretos. Manter todas as colunas como `nullable=True` nesta etapa garante que a ingestão não falhe por dados faltantes na origem.

In [10]:
from pyspark.sql.types import StructType, StructField, StringType, DateType, IntegerType

unified_schema = StructType([
  StructField('data', DateType(), True),
  StructField('hora', StringType(), True),
  StructField('natureza_acidente', StringType(), True),
  StructField('situacao', StringType(), True),
  StructField('protocolo', StringType(), True),

  StructField('bairro', StringType(), True),
  StructField('endereco', StringType(), True),
  StructField('numero', StringType(), True),

  StructField('detalhe_endereco_acidente', StringType(), True),
  StructField('complemento', StringType(), True),
  StructField('endereco_cruzamento', StringType(), True),
  StructField('numero_cruzamento', StringType(), True),
  StructField('referencia_cruzamento', StringType(), True),
  StructField('bairro_cruzamento', StringType(), True),
  StructField('num_semaforo', IntegerType(), True),
  StructField('sentido_via', StringType(), True),
  StructField('tipo', StringType(), True),
  StructField('descricao', StringType(), True),

  StructField('auto', IntegerType(), True),
  StructField('moto', IntegerType(), True),
  StructField('ciclom', IntegerType(), True),
  StructField('ciclista', IntegerType(), True),
  StructField('pedestre', IntegerType(), True),
  StructField('onibus', IntegerType(), True),
  StructField('caminhao', IntegerType(), True),
  StructField('viatura', IntegerType(), True),
  StructField('outros', IntegerType(), True),
  StructField('vitimas', IntegerType(), True),
  StructField('vitimas_fatais', IntegerType(), True),

  StructField('acidente_verificado', StringType(), True),
  StructField('tempo_clima', StringType(), True),
  StructField('situacao_semaforo', StringType(), True),
  StructField('sinalizacao', StringType(), True),
  StructField('condicao_via', StringType(), True),
  StructField('conservacao_via', StringType(), True),
  StructField('ponto_controle', StringType(), True),
  StructField('situacao_placa', StringType(), True),
  StructField('velocidade_max_via', IntegerType(), True),
  StructField('mao_direcao', StringType(), True),

  StructField('divisao_via1', StringType(), True),
  StructField('divisao_via2', StringType(), True),
  StructField('divisao_via3', StringType(), True)

])

schema_columns = unified_schema.names
print(f"Your Schema have {len(schema_columns)} columns.")
print(unified_schema)

Your Schema have 42 columns.
StructType([StructField('data', DateType(), True), StructField('hora', StringType(), True), StructField('natureza_acidente', StringType(), True), StructField('situacao', StringType(), True), StructField('protocolo', StringType(), True), StructField('bairro', StringType(), True), StructField('endereco', StringType(), True), StructField('numero', StringType(), True), StructField('detalhe_endereco_acidente', StringType(), True), StructField('complemento', StringType(), True), StructField('endereco_cruzamento', StringType(), True), StructField('numero_cruzamento', StringType(), True), StructField('referencia_cruzamento', StringType(), True), StructField('bairro_cruzamento', StringType(), True), StructField('num_semaforo', IntegerType(), True), StructField('sentido_via', StringType(), True), StructField('tipo', StringType(), True), StructField('descricao', StringType(), True), StructField('auto', IntegerType(), True), StructField('moto', IntegerType(), True), St

In [11]:
import os

data_dir = 'data'
all_files = sorted([os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.csv')])

all_headers = set()

for file_path in all_files:
  header = spark.read.option("delimiter", ";").option("header", "true").csv(file_path).columns
  all_headers.update(header)

unique_column_list = sorted(list(all_headers))

print(f"Found {len(unique_column_list)} unique columns across all files:")
print(unique_column_list)

Found 44 unique columns across all files:
['DATA', 'Protocolo', 'acidente_verificado', 'auto', 'bairro', 'bairro_cruzamento', 'caminhao', 'ciclista', 'ciclom', 'complemento', 'condicao_via', 'conservacao_via', 'data', 'descricao', 'detalhe_endereco_acidente', 'divisao_via1', 'divisao_via2', 'divisao_via3', 'endereco', 'endereco_cruzamento', 'hora', 'mao_direcao', 'moto', 'natureza', 'natureza_acidente', 'num_semaforo', 'numero', 'numero_cruzamento', 'onibus', 'outros', 'pedestre', 'ponto_controle', 'referencia_cruzamento', 'sentido_via', 'sinalizacao', 'situacao', 'situacao_placa', 'situacao_semaforo', 'tempo_clima', 'tipo', 'velocidade_max_via', 'viatura', 'vitimas', 'vitimasfatais']


In [12]:
source_columns_standardized = [header.lower() for header in unique_column_list]
print(f"The source files have {len(source_columns_standardized)} unique standardized columns.")

extra_columns = set(schema_columns) - set(source_columns_standardized)
if extra_columns:
  print(f"The extra columns are: {extra_columns}")
else:
  print("No extra columns found.")

over_columns = set(source_columns_standardized) - set(schema_columns)
if over_columns:
  print(f"The missing columns are: {over_columns}")
else:
  print("No missing columns found.")

The source files have 44 unique standardized columns.
The extra columns are: {'vitimas_fatais'}
The missing columns are: {'vitimasfatais', 'natureza'}


### 2.3. Carga, Mapeamento e Enriquecimento (Ingestion, Mapping & Enrichment)

Implementamos um loop para processar cada arquivo CSV. Para lidar com o Schema Drift, utilizamos o padrão **"Read, Rename, and UnionByName"**:
1.  **Read:** Lemos cada arquivo individualmente, deixando o Spark inferir os nomes originais do cabeçalho.
2.  **Rename:** Renomeamos as colunas inconsistentes para se alinharem ao nosso schema padrão.
3.  **UnionByName:** Unimos o DataFrame tratado ao DataFrame final, alinhando as colunas pelo nome.

Além disso, enriquecemos os dados com duas colunas de metadados essenciais:
- `source_file`: Para rastreabilidade da origem de cada registro.
- `year`: Para permitir o particionamento da tabela.

In [13]:
df_bronze_final = spark.createDataFrame([], unified_schema)

In [14]:
data_dir = 'data'
all_files = sorted([os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.csv')])

for file_path in all_files:
    df_raw = spark.read.option("header", "true").option("delimiter", ";").option('inferSchema', True).csv(file_path)

    df_renamed = df_raw.withColumnRenamed('DATA','data') \
                       .withColumnRenamed('natureza', 'natureza_acidente') \
                       .withColumnRenamed('vitimasfatais', 'vitimas_fatais') \
                       .withColumnRenamed('Protocolo', 'protocolo')

    df_bronze_final = df_bronze_final.unionByName(df_renamed, allowMissingColumns=True)

In [15]:
final_df_columns = df_bronze_final.columns

expected_schema_columns = unified_schema.names

missing_columns = set(expected_schema_columns) - set(final_df_columns)
extra_columns = set(final_df_columns) - set(expected_schema_columns)

print("="*50)
print("Final Schema Validation Report")
print("="*50)

if not missing_columns and not extra_columns:
    print(f"✅ SUCCESS! The final DataFrame schema perfectly matches the expected unified schema.")
    print(f"Total columns loaded: {len(final_df_columns)}")
else:
    print("⚠️ ATTENTION: Discrepancy found!")
    if missing_columns:
        print(f"  - Columns defined in schema but MISSING from final DataFrame: {list(missing_columns)}")
    if extra_columns:
        print(f"  - EXTRA columns found in final DataFrame that were not in schema: {list(extra_columns)}")

Final Schema Validation Report
✅ SUCCESS! The final DataFrame schema perfectly matches the expected unified schema.
Total columns loaded: 42


In [16]:
from pyspark.sql.functions import input_file_name, year, col


df_bronze_with_metadata = df_bronze_final \
  .withColumn('source_file', input_file_name()) \
  .withColumn('year', year(col('data')))

print("DataFrame with source_file metadata column:")
df_bronze_with_metadata.select("data", "bairro", "source_file", "year").show(5, truncate=False)

DataFrame with source_file metadata column:
+----------+-----------+---------------------------------------+----+
|data      |bairro     |source_file                            |year|
+----------+-----------+---------------------------------------+----+
|2019-01-01|IPSEP      |file:///content/data/acidentes_2019.csv|2019|
|2019-01-01|BOA VIAGEM |file:///content/data/acidentes_2019.csv|2019|
|2019-01-01|BOA VIAGEM |file:///content/data/acidentes_2019.csv|2019|
|2019-01-01|IMBIRIBEIRA|file:///content/data/acidentes_2019.csv|2019|
|2019-01-01|JAQUEIRA   |file:///content/data/acidentes_2019.csv|2019|
+----------+-----------+---------------------------------------+----+
only showing top 5 rows



### 2.4. Salvando a Tabela Bronze (Saving the Bronze Table)

Finalmente, salvamos o DataFrame unificado e enriquecido como uma tabela Delta, que é a base do nosso Lakehouse. A tabela é salva no modo `overwrite` e **particionada por ano** (`partitionBy("year")`) para otimizar drasticamente a performance de futuras consultas que filtrem por data.*texto em itálico*

In [17]:
bronze_table_path = "delta_lake/bronze/acidentes"

print("Saving Bronze table...")

df_bronze_with_metadata.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("year") \
    .save(bronze_table_path)

print(f"SUCCESS: Bronze table saved successfully to '{bronze_table_path}'")

Saving Bronze table...
SUCCESS: Bronze table saved successfully to 'delta_lake/bronze/acidentes'


## Conclusão da Camada Bronze

A Camada Bronze está completa! Nossos dados brutos de múltiplos arquivos inconsistentes estão agora armazenados em uma única tabela Delta, com schema unificado, metadados de linhagem e otimizada para leitura com particionamento.

**Próximo Passo:** Iniciar a **Camada Silver**, onde vamos limpar, validar e enriquecer estes dados.

## 3. Camada Silver: Limpeza e Enriquecimento de Dados

Iniciamos a Camada Silver, o coração do nosso processo de ETL. O objetivo aqui é transformar os dados brutos e não confiáveis da Camada Bronze em uma fonte de dados limpa, consistente e enriquecida, pronta para análises mais complexas. Aplicaremos regras de negócio e técnicas de limpeza para garantir a **Qualidade e Governança dos Dados**, como defendido por **Reis & Housley** e **Kimball**.

### 3.1. Leitura da Tabela Bronze

O primeiro passo é carregar nossa tabela Delta da Camada Bronze. Esta ação representa o início do pipeline que move os dados entre as camadas Bronze e Silver.

In [18]:
df_silver = spark.read.format("delta").load(bronze_table_path)

print("Reading from Bronze Delta table to start the Silver process...")
df_silver.printSchema()
print(f"\nTotal records read from Bronze: {df_silver.count()}")
df_silver.show(5, truncate=False)

Reading from Bronze Delta table to start the Silver process...
root
 |-- data: date (nullable = true)
 |-- hora: string (nullable = true)
 |-- natureza_acidente: string (nullable = true)
 |-- situacao: string (nullable = true)
 |-- protocolo: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- endereco: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- detalhe_endereco_acidente: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- endereco_cruzamento: string (nullable = true)
 |-- numero_cruzamento: string (nullable = true)
 |-- referencia_cruzamento: string (nullable = true)
 |-- bairro_cruzamento: string (nullable = true)
 |-- num_semaforo: string (nullable = true)
 |-- sentido_via: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- descricao: string (nullable = true)
 |-- auto: string (nullable = true)
 |-- moto: string (nullable = true)
 |-- ciclom: string (nullable = true)
 |-- ciclista: string (nullable = true)


### 3.2. Limpeza e Padronização de Dados (Data Cleaning)

Esta etapa foca em melhorar a qualidade e a consistência dos dados, aplicando várias técnicas essenciais de **Limpeza de Dados (McKinney)**.


#### 3.2.1. Padronização de Case e Tratamento de Nulos

Primeiro, padronizamos colunas categóricas para maiúsculas para garantir consistência em agrupamentos e filtros. Em seguida, tratamos valores ausentes em colunas categóricas chave (como `bairro`), substituindo `NULL` por um valor explícito (`'NAO INFORMADO'`), melhorando a usabilidade dos dados conforme as práticas de **Qualidade de Dados (Kimball)**.

In [19]:
from pyspark.sql.functions import upper, col

columns_to_upper = ['natureza_acidente', 'situacao', 'bairro', 'endereco', 'bairro_cruzamento', 'sentido_via', 'tipo', 'tempo_clima', 'mao_direcao']

df_silver_casing = df_silver
for col_name in columns_to_upper:
  df_silver_casing = df_silver_casing.withColumn(col_name, upper(col(col_name)))

print("Verification of the loop transformation:")
df_silver_casing.select(columns_to_upper).show(5, truncate=False)

Verification of the loop transformation:
+-----------------+----------+-----------+---------------------------------+-----------------+-----------+---------------+-----------+-----------+
|natureza_acidente|situacao  |bairro     |endereco                         |bairro_cruzamento|sentido_via|tipo           |tempo_clima|mao_direcao|
+-----------------+----------+-----------+---------------------------------+-----------------+-----------+---------------+-----------+-----------+
|COM VÍTIMA       |FINALIZADA|CASA FORTE |EST DO ENCANAMENTO               |CASA FORTE       |NULL       |COLISÃO        |NULL       |NULL       |
|VÍTIMA FATAL     |FINALIZADA|BOA VIAGEM |AV ENGENHEIRO DOMINGOS FERREIRA  |BOA VIAGEM       |NULL       |COLISÃO        |NULL       |NULL       |
|COM VÍTIMA       |FINALIZADA|IMBIRIBEIRA|AV MARECHAL MASCARENHAS DE MORAES|IMBIRIBEIRA      |NULL       |COLISÃO LATERAL|NULL       |NULL       |
|COM VÍTIMA       |FINALIZADA|DERBY      |AV GOVERNADOR AGAMENON MAGALHAES |D

In [20]:
subset = ['natureza_acidente', 'situacao', 'bairro', 'sentido_via', 'tipo']

df_silver_nulls_handled = df_silver_casing.na.fill('NÃO INFORMADO', subset)

print("\nVerification of null handling:")
df_silver_nulls_handled.filter(col('situacao') == 'NÃO INFORMADO').select(subset).show(5, truncate=False)


Verification of null handling:
+-----------------+-------------+------------------+-------------+-------------------------+
|natureza_acidente|situacao     |bairro            |sentido_via  |tipo                     |
+-----------------+-------------+------------------+-------------+-------------------------+
|NÃO INFORMADO    |NÃO INFORMADO|BOMBA DO HEMETÉRIO|CIDADE       |CHOQUE OBJETO FIXO       |
|NÃO INFORMADO    |NÃO INFORMADO|SANTO ANTÔNIO     |SUBURBIO     |ABALROAMENTO LONGITUDINAL|
|NÃO INFORMADO    |NÃO INFORMADO|IPUTINGA          |CIDADE       |ABALROAMENTO LONGITUDINAL|
|NÃO INFORMADO    |NÃO INFORMADO|IPUTINGA          |CIDADE       |ABALROAMENTO LONGITUDINAL|
|NÃO INFORMADO    |NÃO INFORMADO|AFOGADOS          |NÃO INFORMADO|COLISÃO                  |
+-----------------+-------------+------------------+-------------+-------------------------+



#### 3.2.2. Correção de Tipos de Dados Numéricos

Durante a implementação das validações, notamos um comportamento inesperado: linhas com contagens válidas (ex: 1 ou 2) estavam sendo removidas. A investigação revelou que as colunas numéricas de contagem foram lidas como `StringType` na Camada Bronze, devido a valores formatados com vírgula (ex: `'1,0'`) em alguns dos arquivos de origem.

Para corrigir isso, aplicamos uma transformação em duas etapas:
1.  **Substituímos** a vírgula por ponto (`regexp_replace`).
2.  **Convertemos** a coluna para o tipo `IntegerType` (`.cast()`).

Este passo é um exemplo prático de depuração e correção de tipos de dados em um pipeline real.

In [21]:
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql.types import IntegerType

df_to_fix = df_silver_nulls_handled
counter_columns = ["auto", "moto", "ciclom", "ciclista", "pedestre", "onibus","caminhao", "viatura", "outros", "vitimas", "vitimas_fatais"]

print("--- Correcting data types for counter columns ---")

for col_name in counter_columns:
  df_to_fix = df_to_fix.withColumn(col_name, regexp_replace(col(col_name),',','.').cast(IntegerType()))

df_silver_types_fixed = df_to_fix

print("\nSchema after data type correction:")
df_silver_types_fixed.printSchema()

--- Correcting data types for counter columns ---

Schema after data type correction:
root
 |-- data: date (nullable = true)
 |-- hora: string (nullable = true)
 |-- natureza_acidente: string (nullable = false)
 |-- situacao: string (nullable = false)
 |-- protocolo: string (nullable = true)
 |-- bairro: string (nullable = false)
 |-- endereco: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- detalhe_endereco_acidente: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- endereco_cruzamento: string (nullable = true)
 |-- numero_cruzamento: string (nullable = true)
 |-- referencia_cruzamento: string (nullable = true)
 |-- bairro_cruzamento: string (nullable = true)
 |-- num_semaforo: string (nullable = true)
 |-- sentido_via: string (nullable = false)
 |-- tipo: string (nullable = false)
 |-- descricao: string (nullable = true)
 |-- auto: integer (nullable = true)
 |-- moto: integer (nullable = true)
 |-- ciclom: integer (nullable = true)
 |-- cicl

### 3.3. Enriquecimento de Dados (Feature Engineering)

Nesta fase de **Transformação e Mapeamento (McKinney)**, criamos novas colunas (*features*) que agregam valor analítico e facilitam as consultas.

* **Atributos de Data:** A partir da coluna `data`, derivamos `month`, `day_of_month` e `day_of_week`. Este é um passo fundamental na criação de uma futura **Dimensão Calendário (Kimball)**.
* **Atributos de Tempo:** Usando lógica condicional (`when/otherwise`), transformamos a coluna `hora` em uma categoria `periodo_do_dia`, o que simplifica análises baseadas em períodos.

In [22]:
from pyspark.sql.functions import month, col, date_format, day

df_silver_enriched = df_silver_types_fixed.withColumn('day_of_month', day(col('data'))) \
  .withColumn('month', month(col('data'))) \
  .withColumn('day_of_week', date_format(col('data'), 'E'))

print("Verification of the new 'columns' column:")
df_silver_enriched.select("data", "month", "day_of_week", "day_of_month").show(5, truncate=False)

Verification of the new 'columns' column:
+----------+-----+-----------+------------+
|data      |month|day_of_week|day_of_month|
+----------+-----+-----------+------------+
|2025-12-29|12   |Mon        |29          |
|2025-12-29|12   |Mon        |29          |
|2025-12-29|12   |Mon        |29          |
|2025-12-29|12   |Mon        |29          |
|2025-12-29|12   |Mon        |29          |
+----------+-----+-----------+------------+
only showing top 5 rows



In [23]:
from pyspark.sql.functions import col, hour, when

df_silver_final = df_silver_enriched.withColumn("periodo_do_dia",
  when((hour(col("hora")) >= 6) & (hour(col("hora")) < 12), "MANHA") \
  .when((hour(col("hora")) >= 12) & (hour(col("hora")) < 18), "TARDE") \
  .when((hour(col("hora")) >= 18) & (hour(col("hora")) <= 23), "NOITE") \
  .otherwise("MADRUGADA")
)

print("Verification of the new 'periodo_do_dia' column:")
df_silver_final.groupBy("periodo_do_dia").count().orderBy("count", ascending=False).show(truncate=False)

Verification of the new 'periodo_do_dia' column:
+--------------+-----+
|periodo_do_dia|count|
+--------------+-----+
|MANHA         |11889|
|TARDE         |8046 |
|MADRUGADA     |6565 |
|NOITE         |4050 |
+--------------+-----+



### 3.4. Validação e Garantia de Qualidade (Data Quality Assurance)

Com os dados limpos e enriquecidos, realizamos as validações finais para garantir a integridade da tabela Silver antes de salvá-la.

#### 3.4.1. Remoção de Duplicatas

Removemos quaisquer linhas que sejam 100% idênticas em todas as colunas usando `.dropDuplicates()`. Este passo garante a unicidade dos registros e evita métricas infladas.

In [24]:
initial_count = df_silver_final.count()
print(f"Row count before deduplication: {initial_count}")

df_silver_deduplicated = df_silver_final.dropDuplicates()

final_count = df_silver_deduplicated.count()
print(f"Row count after deduplication: {final_count}")
print(f"Number of duplicate rows removed: {initial_count - final_count}")

if initial_count == final_count:
  print("\nResult: No duplicate rows were found.")
else:
  print("\nResult: Duplicate rows were successfully removed.")

Row count before deduplication: 30550
Row count after deduplication: 30549
Number of duplicate rows removed: 1

Result: Duplicate rows were successfully removed.


#### 3.4.2. Validação de Regras de Negócio

Finalmente, após corrigir os tipos de dados, aplicamos a regra de negócio de que nenhuma coluna de contagem pode ser negativa. O filtro confirmou que não haviam registros com valores negativos, validando a qualidade dos dados neste quesito e mostrando que a origem, apesar dos problemas de formato, não continha dados logicamente inválidos.

In [25]:
from pyspark.sql.functions import col

print("--- Running the filtering logic ---")
df_filtered = df_silver_deduplicated
for c in counter_columns:
  df_filtered = df_filtered.filter((col(c) >= 0) | (col(c).isNull()))

removed_rows_df = df_silver_deduplicated.subtract(df_filtered)

print(f"\nTotal rows removed: {removed_rows_df.count()}")
print("\n--- Inspecting a sample of the REMOVED rows ---")
print("Showing the values in the counter columns for the rows that were filtered out:")

removed_rows_df.select(counter_columns).show(20, truncate=False)

df_silver_final_validated = df_filtered

--- Running the filtering logic ---

Total rows removed: 0

--- Inspecting a sample of the REMOVED rows ---
Showing the values in the counter columns for the rows that were filtered out:
+----+----+------+--------+--------+------+--------+-------+------+-------+--------------+
|auto|moto|ciclom|ciclista|pedestre|onibus|caminhao|viatura|outros|vitimas|vitimas_fatais|
+----+----+------+--------+--------+------+--------+-------+------+-------+--------------+
+----+----+------+--------+--------+------+--------+-------+------+-------+--------------+



### 3.5. Salvando a Tabela Silver (Saving the Silver Table)

Como passo final da Etapa 2, persistimos nosso DataFrame limpo, enriquecido e validado. Ao salvá-lo como uma nova tabela Delta, materializamos o resultado do nosso pipeline Bronze-to-Silver.

Esta tabela Silver servirá como uma fonte da verdade confiável e performática para as próximas etapas de análise e agregação na Camada Gold. Mantemos o particionamento por `year` para garantir que as consultas continuem otimizadas.

In [26]:
df_to_save = df_silver_final_validated

silver_table_path = "delta_lake/silver/acidentes_limpos"

print("Saving Silver table...")

df_to_save.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("year") \
    .save(silver_table_path)

print(f"SUCCESS: Silver table saved successfully to '{silver_table_path}'")

Saving Silver table...
SUCCESS: Silver table saved successfully to 'delta_lake/silver/acidentes_limpos'


## Conclusão da Camada Silver

A Camada Silver está completa! Transformamos os dados brutos e inconsistentes da Camada Bronze em um conjunto de dados de alta qualidade. Durante este processo, nós:

* **Limpamos e Padronizamos** os dados, tratando valores nulos e garantindo a consistência de colunas categóricas.
* **Corrigimos Tipos de Dados**, resolvendo um problema sutil de formato nos dados de origem.
* **Enriquecemos** o dataset com novos atributos derivados de data e hora, agregando valor analítico.
* **Validamos** as regras de negócio, removendo duplicatas e garantindo a integridade lógica dos dados.

O resultado é uma tabela Delta (`acidentes_limpos`) que representa a "fonte única da verdade" para os acidentes de trânsito do Recife, pronta para ser consumida pela Camada Gold.

**Próximo Passo:** Iniciar a **Camada Gold**, onde construiremos agregações e Data Marts específicos para responder a perguntas de negócio.

## 4. Camada Gold: Modelagem Dimensional para Análise

Finalmente, na Camada Gold, transformamos os dados limpos da Silver em modelos otimizados para consumo por analistas e ferramentas de BI. O objetivo é criar "Produtos de Dados" que respondam a perguntas de negócio de forma rápida e intuitiva. Para isso, implementaremos um **Modelo Dimensional** no estilo **Star Schema (Kimball)**.

In [27]:
df_gold = spark.read.format("delta").load(silver_table_path)

### 4.1. Construção das Tabelas de Dimensão

O primeiro passo na criação de um Star Schema é construir as Tabelas de Dimensão. Elas fornecem o contexto descritivo ("quem, o quê, onde, quando") para as nossas métricas. Para cada dimensão, selecionamos os atributos únicos da tabela Silver e geramos uma **chave substituta** (*surrogate key*) numérica para garantir a integridade e a performance dos `JOIN`s.

#### 4.1.1. Dim_TipoAcidente
Esta dimensão cataloga as combinações únicas de `natureza_acidente` e `tipo` de acidente.

In [28]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

acidente_columns = ['tipo','natureza_acidente']
dim_tipo_acidente_base = df_gold.select(acidente_columns).distinct()
windowSpec = Window.orderBy(acidente_columns)
dim_tipo_acidente = dim_tipo_acidente_base.withColumn('tipo_acidente_key', row_number().over(windowSpec))

In [29]:
dim_tipo_acidente.select("tipo_acidente_key", "natureza_acidente", "tipo").show(truncate=False)

+-----------------+-----------------+-------------------------+
|tipo_acidente_key|natureza_acidente|tipo                     |
+-----------------+-----------------+-------------------------+
|1                |COM VÍTIMA       |ABALROAMENTO LONGITUDINAL|
|2                |NÃO INFORMADO    |ABALROAMENTO LONGITUDINAL|
|3                |SEM VÍTIMA       |ABALROAMENTO LONGITUDINAL|
|4                |VÍTIMA FATAL     |ABALROAMENTO LONGITUDINAL|
|5                |COM VÍTIMA       |ABALROAMENTO TRANSVERSAL |
|6                |SEM VÍTIMA       |ABALROAMENTO TRANSVERSAL |
|7                |VÍTIMA FATAL     |ABALROAMENTO TRANSVERSAL |
|8                |COM VÍTIMA       |ACID. DE PERCURSO        |
|9                |SEM VÍTIMA       |ACID. DE PERCURSO        |
|10               |SEM VÍTIMA       |ALAGAMENTO               |
|11               |SEM VÍTIMA       |APOIO CELPE              |
|12               |SEM VÍTIMA       |APOIO COMPESA            |
|13               |COM VÍTIMA       |APO

#### 4.1.2. Dim_Localizacao
Esta dimensão cataloga as combinações únicas de `bairro`, `endereco` e `sentido_via`, representando as diferentes localidades dos acidentes.

In [30]:
localizacao_columns = ['bairro','endereco','sentido_via']

dim_localizacao_base = df_gold.select(localizacao_columns).distinct()
windowSpec = Window.orderBy(localizacao_columns)
dim_localizacao = dim_localizacao_base.withColumn('localizacao_key', row_number().over(windowSpec))

In [31]:
dim_localizacao.select('*').show()

+-------+--------------------+-------------+---------------+
| bairro|            endereco|  sentido_via|localizacao_key|
+-------+--------------------+-------------+---------------+
|      0|AV NORTE MIGUEL A...|NÃO INFORMADO|              1|
|AFLITOS|AV CONS ROSA E SILVA|NÃO INFORMADO|              2|
|AFLITOS|AV CONS ROSA E SILVA|     SUBURBIO|              3|
|AFLITOS|            AV NORTE|NÃO INFORMADO|              4|
|AFLITOS|      AV RUI BARBOSA|NÃO INFORMADO|              5|
|AFLITOS|    AV SANTOS DUMONT|       CIDADE|              6|
|AFLITOS|    AV SANTOS DUMONT|NÃO INFORMADO|              7|
|AFLITOS|    AV SANTOS DUMONT|     SUBURBIO|              8|
|AFLITOS|       CAIS DO APOLO|     SUBURBIO|              9|
|AFLITOS|       RUA ANGUSTURA|NÃO INFORMADO|             10|
|AFLITOS|RUA CAPITAO SAMPA...|     SUBURBIO|             11|
|AFLITOS| RUA CARNEIRO VILELA|NÃO INFORMADO|             12|
|AFLITOS| RUA CARNEIRO VILELA|     SUBURBIO|             13|
|AFLITOS|    RUA CONS PO

#### 4.1.3. Dim_Tempo
Esta é uma dimensão calendário clássica, contendo uma linha para cada dia único em que ocorreu um acidente. Criamos uma chave `data_key` no formato `YYYYMMDD` para otimização de `JOIN`s.

In [32]:
tempo_columns = ['data', 'year', 'month', 'day_of_month', 'day_of_week']
dim_tempo = df_gold.select(
  date_format(col('data'), 'yyyyMMdd').cast(IntegerType()).alias('tempo_key'),
  *tempo_columns
 ).distinct().orderBy('tempo_key')

In [33]:
dim_tempo.printSchema()
dim_tempo.show(10)

root
 |-- tempo_key: integer (nullable = true)
 |-- data: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day_of_month: integer (nullable = true)
 |-- day_of_week: string (nullable = true)

+---------+----------+----+-----+------------+-----------+
|tempo_key|      data|year|month|day_of_month|day_of_week|
+---------+----------+----+-----+------------+-----------+
| 20190101|2019-01-01|2019|    1|           1|        Tue|
| 20190102|2019-01-02|2019|    1|           2|        Wed|
| 20190103|2019-01-03|2019|    1|           3|        Thu|
| 20190104|2019-01-04|2019|    1|           4|        Fri|
| 20190105|2019-01-05|2019|    1|           5|        Sat|
| 20190106|2019-01-06|2019|    1|           6|        Sun|
| 20190107|2019-01-07|2019|    1|           7|        Mon|
| 20190108|2019-01-08|2019|    1|           8|        Tue|
| 20190109|2019-01-09|2019|    1|           9|        Wed|
| 20190110|2019-01-10|2019|    1|          10| 

### 4.2. Construção da Tabela Fato

Com as dimensões prontas, construímos a Tabela Fato (`fato_acidentes`). Esta é a tabela central do nosso modelo, contendo:
- **Chaves Estrangeiras:** As chaves (`data_key`, `localizacao_key`, `tipo_acidente_key`) que se conectam às nossas dimensões.
- **Métricas (Fatos):** Os valores numéricos que queremos analisar (`quantidade_vitimas`, `total_veiculos_envolvidos`, etc.).

Construímos a tabela juntando (`JOIN`) a tabela Silver com cada uma das dimensões para buscar as chaves e, em seguida, selecionamos e calculamos as colunas de fatos.

In [34]:
from pyspark.sql.functions import col, lit, coalesce, expr

vehicle_cols = ["auto", "moto", "ciclom", "ciclista", "pedestre", "onibus","caminhao", "viatura", "outros"]

sum_expr_str = " + ".join([f"coalesce({c}, 0)" for c in vehicle_cols])

df_fact_acidentes_joined = df_gold.join(dim_tempo, on='data', how='left') \
  .join(dim_localizacao, on=localizacao_columns, how='left') \
  .join(dim_tipo_acidente, on=acidente_columns, how='left')
fact_acidentes = df_fact_acidentes_joined.withColumn('contagem_acidentes', lit(1)) \
  .withColumn('total_veiculos_envolvidos', expr(sum_expr_str)) \
  .select('tempo_key', 'localizacao_key', 'tipo_acidente_key',
          coalesce(col('vitimas'), lit(0)).alias('quantidade_vitimas'),
          coalesce(col('vitimas_fatais'), lit(0)).alias('quantidade_vitimas_fatais'),
          'contagem_acidentes',
          'total_veiculos_envolvidos'
          )

print(f"Count before join: {df_gold.count()}")
print(f"Count after join:  {fact_acidentes.count()}")

fact_acidentes.printSchema()
print('='*100)
fact_acidentes.show(10)

Count before join: 30549
Count after join:  30549
root
 |-- tempo_key: integer (nullable = true)
 |-- localizacao_key: integer (nullable = true)
 |-- tipo_acidente_key: integer (nullable = true)
 |-- quantidade_vitimas: integer (nullable = false)
 |-- quantidade_vitimas_fatais: integer (nullable = false)
 |-- contagem_acidentes: integer (nullable = false)
 |-- total_veiculos_envolvidos: integer (nullable = false)

+---------+---------------+-----------------+------------------+-------------------------+------------------+-------------------------+
|tempo_key|localizacao_key|tipo_acidente_key|quantidade_vitimas|quantidade_vitimas_fatais|contagem_acidentes|total_veiculos_envolvidos|
+---------+---------------+-----------------+------------------+-------------------------+------------------+-------------------------+
| 20200115|           2437|               34|                 0|                        0|                 1|                        2|
| 20200120|           2097|           

### 4.3. Salvando as Tabelas do Data Mart

Como passo final, persistimos todas as tabelas do nosso Star Schema (3 Dimensões e 1 Fato) como tabelas Delta na camada Gold. Isso as materializa em disco, prontas para serem lidas por qualquer ferramenta de análise.

In [52]:
gold_tables_to_save = {
    "dim_tipo_acidente": dim_tipo_acidente,
    "dim_localizacao": dim_localizacao,
    "dim_tempo": dim_tempo,
    "fact_acidentes": fact_acidentes
}
for table_name, df_object in gold_tables_to_save.items():
  gold_table_path = f"delta_lake/gold/{table_name}"
  df_object.write \
    .format("delta") \
    .mode("overwrite") \
    .save(gold_table_path)

  print(f"Saving '{table_name}' table to '{gold_table_path}'...")

print(f"SUCCESS: All Gold tables saved successfully!")

Saving 'dim_tipo_acidente' table to 'delta_lake/gold/dim_tipo_acidente'...
Saving 'dim_localizacao' table to 'delta_lake/gold/dim_localizacao'...
Saving 'dim_tempo' table to 'delta_lake/gold/dim_tempo'...
Saving 'fact_acidentes' table to 'delta_lake/gold/fact_acidentes'...
SUCCESS: All Gold tables saved successfully!


### 4.4. Data Mart 2: Análise de Veículos Envolvidos (Modelo Unpivot)

Para responder a perguntas mais específicas sobre os veículos, construímos um segundo Data Mart com uma abordagem de modelagem diferente. O objetivo aqui é analisar os dados pela ótica dos veículos, e não apenas dos acidentes.

Para isso, aplicamos uma transformação avançada de **Unpivot** (usando a função `stack`) para mudar a granularidade dos dados de "um registro por acidente" para "um registro por tipo de veículo por acidente".

Este modelo, que resolve uma relação muitos-para-muitos através de uma tabela fato de granularidade fina, nos permite analisar diretamente quais veículos estão mais envolvidos e em que contextos, demonstrando uma técnica de ETL mais complexa e poderosa. O processo envolveu:
1.  Criação de uma nova dimensão, `Dim_Vehicle`.
2.  Aplicação da transformação `unpivot` na tabela Silver.
3.  Criação de uma nova tabela Fato (`Fato_Acidentes_Veiculos`) juntando o resultado do unpivot com todas as dimensões.

In [47]:
vehicle_cols = ['auto', 'moto', 'ciclom', 'ciclista', 'pedestre', 'onibus', 'caminhao', 'viatura', 'outros']
vehicle_rows = [(vehicle, ) for vehicle in vehicle_cols]
dim_vehicle_base = spark.createDataFrame(vehicle_rows, ['vehicle_type'],)

windowSpec = Window.orderBy('vehicle_type')
dim_vehicle = dim_vehicle_base.withColumn('vehicle_key', row_number().over(windowSpec))

print("\n Dimension Vehicle")
dim_vehicle.show()

n_cols = len(vehicle_cols)
args_str = ", ".join([f"'{c}', {c}" for c in vehicle_cols])
formula_pivot = f"stack({n_cols}, {args_str}) as (vehicle_type, quantity)"

df_unpivoted = df_gold.select(
    "data",
    "bairro",
    "endereco",
    "sentido_via",
    "natureza_acidente",
    "tipo",
    expr(formula_pivot)
).filter(col("quantity") > 0)

print("\n Unpivoted Table")
df_unpivoted.show()


 Dimension Vehicle
+------------+-----------+
|vehicle_type|vehicle_key|
+------------+-----------+
|        auto|          1|
|    caminhao|          2|
|    ciclista|          3|
|      ciclom|          4|
|        moto|          5|
|      onibus|          6|
|      outros|          7|
|    pedestre|          8|
|     viatura|          9|
+------------+-----------+


 Unpivoted Table
+----------+-----------+--------------------+-------------+-----------------+--------------------+------------+--------+
|      data|     bairro|            endereco|  sentido_via|natureza_acidente|                tipo|vehicle_type|quantity|
+----------+-----------+--------------------+-------------+-----------------+--------------------+------------+--------+
|2019-02-02| CASA FORTE|AV DEZESSETE DE A...|     SUBURBIO|       COM VÍTIMA|ABALROAMENTO TRAN...|        auto|       1|
|2019-02-02| CASA FORTE|AV DEZESSETE DE A...|     SUBURBIO|       COM VÍTIMA|ABALROAMENTO TRAN...|        moto|       1|
|2019

In [51]:
df_acidente_veiculo = df_unpivoted.join(dim_tempo, on='data', how='left') \
  .join(dim_localizacao, on=localizacao_columns, how='left') \
  .join(dim_tipo_acidente, on=acidente_columns, how='left') \
  .join(dim_vehicle, on='vehicle_type', how='left') \
  .select('tempo_key', 'localizacao_key', 'tipo_acidente_key', 'vehicle_key', 'quantity')

windowSpec = Window.orderBy('tempo_key')

fact_acidente_veiculo = df_acidente_veiculo.withColumn('acidente_veiculo_key', row_number().over(windowSpec))

fact_acidente_veiculo.show()

+---------+---------------+-----------------+-----------+--------+--------------------+
|tempo_key|localizacao_key|tipo_acidente_key|vehicle_key|quantity|acidente_veiculo_key|
+---------+---------------+-----------------+-----------+--------+--------------------+
| 20190101|            661|               39|          1|       2|                   1|
| 20190101|            917|                6|          1|       2|                   2|
| 20190101|           4233|               34|          1|       1|                   3|
| 20190101|           4233|               34|          4|       1|                   4|
| 20190101|           2784|               44|          1|       1|                   5|
| 20190101|           2784|               44|          5|       1|                   6|
| 20190101|           3130|               39|          1|       2|                   7|
| 20190101|           2542|               44|          1|       3|                   8|
| 20190101|           2394|     

#### Salvando o Data Mart de Veículos
Finalmente, persistimos as novas tabelas na camada Gold.

In [53]:
gold_tables_to_save = {
    "dim_veiculo": dim_vehicle,
    "intermediate_veiculos_por_acidente": df_unpivoted,
    "fact_acidente_veiculo": fact_acidente_veiculo
}
for table_name, df_object in gold_tables_to_save.items():
  gold_table_path = f"delta_lake/gold/{table_name}"
  df_object.write \
    .format("delta") \
    .mode("overwrite") \
    .save(gold_table_path)

  print(f"Saving '{table_name}' table to '{gold_table_path}'...")

print(f"SUCCESS: All Gold tables saved successfully!")

Saving 'dim_veiculo' table to 'delta_lake/gold/dim_veiculo'...
Saving 'intermediate_veiculos_por_acidente' table to 'delta_lake/gold/intermediate_veiculos_por_acidente'...
Saving 'fact_acidente_veiculo' table to 'delta_lake/gold/fact_acidente_veiculo'...
SUCCESS: All Gold tables saved successfully!
