
# Transformação para Bronze – Normalização de Trades DEX

Este notebook realiza a transformação dos dados brutos da camada `raw` em uma tabela estruturada e otimizada na **camada Bronze**. A fonte dos dados é a tabela `workspace.ethereum.dex_trades_raw`, que contém transações extraídas da API da Bitquery sobre exchanges descentralizadas (DEX).

---

## Objetivo

Construir a **camada Bronze (`workspace.ethereum.dex_trades_bronze`)** com os seguintes propósitos:

- Estruturar os dados para facilitar análises e joins posteriores  
- Eliminar registros nulos e duplicados  
- Padronizar nomes e tipos de colunas  
- Aplicar **particionamento** por ano e mês para otimização de leitura  

---

## Etapas do Pipeline de Transformação

### 1. Seleção e Normalização dos Campos

Nesta etapa, os dados brutos com campos aninhados (como `baseCurrency`, `quoteCurrency`, `exchange` e `smartContract`) são **explodidos e renomeados** com nomes claros e consistentes. Também é realizada a **padronização de tipos de dados**, como datas, valores numéricos e contadores, garantindo um esquema coerente e auditável para análises futuras.

---

### 2. Limpeza e Deduplicação

São removidos todos os registros com valores ausentes em colunas críticas (como `data_transacao`, `quantia_trocada_usd` e `num_trades`). Após isso, aplica-se uma deduplicação total das linhas para garantir que não haja múltiplos registros com a mesma granularidade lógica.

---

### 3. Enriquecimento com Metadados

O DataFrame é enriquecido com colunas auxiliares que suportam versionamento, rastreabilidade e particionamento:

- **Ano e mês**: extraídos da `data_transacao`, permitem particionamento eficiente.
- **Fonte**: identifica a origem dos dados como `"bitquery"`.
- **Data de ingestão**: marca o momento de entrada do dado no pipeline.

---

### 4. Escrita da Tabela Bronze

Os dados tratados e normalizados são armazenados em **formato Delta**, utilizando **particionamento físico por ano e mês**. Isso habilita o *partition pruning* durante as consultas, melhora o desempenho e reduz o custo computacional. A escrita é feita no modo `overwrite`, garantindo que os dados sejam atualizados de forma íntegra a cada execução.

---

## Comparação de Performance – Raw vs Bronze

Foi realizada uma análise de desempenho entre as camadas Raw e Bronze, medindo:

- Tempo de execução da leitura completa de cada tabela  
- Quantidade de linhas retornadas  

---

## Interpretação

A camada Bronze apresentou **ganhos expressivos de performance**, mesmo mantendo volume semelhante de dados. Esses ganhos se devem a:

- Particionamento físico por `ano` e `mês`  
- Estrutura mais enxuta e normalizada  
- Tipagem explícita e ausência de campos aninhados  
- Remoção de duplicações e dados inválidos  
- Uso eficiente do Delta Lake com compactação e controle de versões  

> Em pipelines analíticos e arquiteturas escaláveis, esse tipo de otimização é fundamental para desempenho, governança e manutenção futura.

---

## Boas Práticas Aplicadas

- Conversão explícita de tipos durante a transformação  
- Padronização semântica de nomes de colunas  
- Validação de dados antes da persistência  
- Escrita em Delta com controle de particionamento e versionamento  
- Organização da lógica de ingestão em etapas modulares e reutilizáveis  

---

## Próximos Passos

- Construção da **camada Silver**, com joins e enriquecimentos  
- Criação das dimensões analíticas:
  - `dim_calendario`
  - `dim_token`
  - `dim_exchange`
- Modelagem da tabela **fato_transacao_dex**
- Desenvolvimento de dashboards com insights como:
  - Volume diário transacionado por protocolo
  - Gasto médio de gas por rede
  - Tokens mais negociados em DEXs

---



## Explodindo e renomeando campos aninhados

In [0]:
from pyspark.sql.functions import col, to_date, current_timestamp, expr
from pyspark.sql.types import DoubleType, LongType

df_raw = spark.table("workspace.ethereum.dex_trades_raw")

df_bronze = df_raw.select(
    to_date(col("date.date")).alias("data_transacao"),

    col("baseCurrency.symbol").alias("moeda_base"),
    col("BaseCurrency.name").alias("nome_base"),
    col("baseCurrency.address").alias("endereco_base"),

    col("quoteCurrency.symbol").alias("moeda_quote"),
    col("quoteCurrency.name").alias("nome_quote"),
    col("quoteCurrency.address").alias("endereco_quote"),

    col("exchange.name").alias("exchange_nome"),
    col("exchange.FullName").alias("exchange_nome_completo"),
    
    col("tradeAmount").cast(DoubleType()).alias("quantia_trocada_usd"),
    col("buyAmount").cast(DoubleType()).alias("quantia_compra"),
    col("sellAmount").cast(DoubleType()).alias("quantia_venda"),
    col("baseAmount").cast(DoubleType()).alias("quantia_base"),
    col("quotePrice").cast(DoubleType()).alias("preco_cotacao"),
    col("price").cast(DoubleType()).alias("preco_medio"),
    col("trades").cast(LongType()).alias("num_trades"),
    col("protocol").alias("protocolo"),
    col("side").alias("direcao"),
    col("gas").cast(LongType()).alias("gas"),
    col("gasPrice").cast(DoubleType()).alias("preco_gas"),
    col("gasValue").cast(DoubleType()).alias("valor_gas"),

    col("smartContract.currency.symbol").alias("simbolo_token_contrato"),
    col("smartContract.currency.name").alias("nome_token_contrato"),
    col("smartContract.contractType").alias("tipo_contrato"),

    col("network").alias("rede"),
    current_timestamp().alias("data_ingestao")
)



## Limpeza

In [0]:
df_bronze_clean = df_bronze.dropna(subset=["data_transacao", "quantia_trocada_usd", "num_trades"])
df_bronze_clean = df_bronze.dropDuplicates()

## Adicionando colunas de partição

In [0]:
df_bronze_final = df_bronze_clean.withColumn("ano", year("data_transacao")) \
                                 .withColumn("mes", month("data_transacao")) \
                                 .withColumn("fonte", lit("bitquery"))

## Criando tabela na camada Bronze

In [0]:
df_bronze_final.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("ano", "mes") \
    .saveAsTable("workspace.ethereum.dex_trades_bronze")

## Verificação de tempo de execução para monitoramento

In [0]:
from time import time

def medir_execucao(sql):
    inicio = time()
    df = spark.sql(sql)
    count = df.count()
    duracao = round(time() - inicio, 2)
    return count, duracao

raw_count, raw_duracao = medir_execucao("SELECT * FROM workspace.ethereum.dex_trades_raw")
bronze_count, bronze_duracao = medir_execucao("SELECT * FROM workspace.ethereum.dex_trades_bronze")

import pandas as pd

dados_comparacao = pd.DataFrame({
    "Camada": ["Raw", "Bronze"],
    "Linhas": [raw_count, bronze_count],
    "Tempo de Execução (s)": [raw_duracao, bronze_duracao]
})

display(spark.createDataFrame(dados_comparacao))

Databricks visualization. Run in Databricks to view.