# MVP ENGENHARIA DE DADOS - PIPELINE ETL (BirdBase Springernature)

**Autor:** Thiago Maciel Barbosa  
**Descrição:** Pipeline robusto para tratar "Double Headers" e Legendas não estruturadas. Inclui download automático do Figshare/Springernature.

---

In [0]:
# Biblioteca para ler o XLSX
%pip install openpyxl
import requests
import pandas as pd
import io
import os
from pyspark.sql.functions import col, when, trim, upper, monotonically_increasing_id, expr
from pyspark.sql.types import FloatType, IntegerType, StringType

# ------------------------------------------------------------------------------
# 1. INGESTÃO DE DADOS
# ------------------------------------------------------------------------------

# Configurações
# URL fornecida: Link de download direto do Figshare
URL_FONTE = "https://springernature.figshare.com/ndownloader/files/55634729"
# Caminho onde o arquivo será salvo temporariamente no Driver do Cluster
CAMINHO_TEMPORARIO = "/tmp/BirdBase_Final.xlsx"

print("--- INICIANDO INGESTÃO DE DADOS ---")

try:
    print(f"Tentando baixar arquivo da fonte: {URL_FONTE}")
    
    # O parametro allow_redirects=True é crucial para links do Figshare
    response = requests.get(URL_FONTE, allow_redirects=True)
    response.raise_for_status()

    # Salvando o arquivo no disco local do driver para o Pandas ler
    with open(CAMINHO_TEMPORARIO, 'wb') as f:
        f.write(response.content)
    
    print(f"Download concluído! Arquivo salvo em: {CAMINHO_TEMPORARIO}")
    arquivo_para_ler = CAMINHO_TEMPORARIO

    # --- TRATAMENTO DA ABA 'DATA' (CABEÇALHO DUPLO) ---
    print(">> Lendo aba 'Data' e tratando cabeçalho duplo...")
    # header=1: Ignora a linha 0 (Categorias macro) e usa a linha 1 como cabeçalho
    # engine='openpyxl': Força o uso da biblioteca que acabamos de instalar
    pdf_data = pd.read_excel(arquivo_para_ler, sheet_name="Data", header=1, engine='openpyxl')
    
    # --- TRATAMENTO DA ABA 'LEGEND' (TEXTO NÃO ESTRUTURADO) ---
    print(">> Lendo aba 'Legend' e extraindo tabela de status...")
    pdf_legend_raw = pd.read_excel(arquivo_para_ler, sheet_name="Legend", header=None, engine='openpyxl')
    
    # Procuramos a linha que contém "IUCN Red List Status"
    try:
        # Busca dinâmica pelo texto-chave
        start_row_index = pdf_legend_raw[pdf_legend_raw[0].astype(str).str.contains("IUCN Red List Status", na=False)].index[0]
        
        # Recarrega a aba Legend pulando as linhas de texto inútil
        pdf_legend = pd.read_excel(arquivo_para_ler, sheet_name="Legend", header=start_row_index + 1, engine='openpyxl')
        
        # Seleciona apenas as colunas de Código e Descrição
        pdf_legend = pdf_legend.iloc[:, 0:2]
        pdf_legend.columns = ["Code", "Description"]
        pdf_legend = pdf_legend.dropna() # Remove linhas vazias
        print(f"   Legenda extraída com sucesso. {len(pdf_legend)} categorias encontradas.")
    except IndexError:
        print("   AVISO: Não foi possível encontrar a tabela de legenda automaticamente. Criando DataFrame vazio.")
        pdf_legend = pd.DataFrame(columns=["Code", "Description"])

    # --- CONVERSÃO PARA SPARK ---
    print(">> Convertendo Pandas para Spark DataFrames...")
    # Conversão para string para evitar erros. A tipagem correta (Float/Int) será feita na etapa de Transformação
    df_raw = spark.createDataFrame(pdf_data.astype(str))
    df_legend_spark = spark.createDataFrame(pdf_legend.astype(str))
    
    print("Ingestão concluída com sucesso!")

except Exception as e:
    print(f"ERRO FATAL NA INGESTÃO: {e}")
    raise e

# ------------------------------------------------------------------------------
# 2. TRANSFORMAÇÃO E LIMPEZA
# ------------------------------------------------------------------------------

print("\n--- INICIANDO TRANSFORMAÇÃO ---")

# Seleção e Renomeação (Mapeamento De -> Para)
# Usamos crases (backticks) para lidar com os nomes complexos do Excel original
df_selecionado = df_raw.select(
    col("`IOC 15.1`").cast(StringType()).alias("id_original"),
    col("`English Name (BirdLife > IOC > Clements>AviList)`").alias("nome_ingles"),
    col("`Latin (BirdLife > IOC > Clements>AviList)`").alias("nome_latin"),
    
    # Taxonomia
    col("`Order`").alias("ordem"),
    col("`Family IOC 15.1`").alias("familia"),
    
    # Conservação
    col("`2024 IUCN Red List category`").alias("cod_categoria_iucn"),
    
    # Habitat e Dieta
    col("`Primary Habitat`").alias("habitat_primario"),
    col("`Primary Diet`").alias("dieta_primaria"),
    
    # Métricas Físicas
    col("`Average Mass`").alias("massa_media"),
    col("`NormMin`").alias("altitude_min"),
    col("`NormMax`").alias("altitude_max"),
    
    # Reprodução
    col("`Clutch_Min`").alias("ninhada_min"),
    col("`Clutch_Max`").alias("ninhada_max"),
    
    # Comportamento
    col("`Mig`").alias("flag_migratorio"),
    col("`Sed`").alias("flag_sedentario"),
    
    # Detalhe da Dieta
    col("`IN-Wt`").alias("peso_dieta_inseto"),
    col("`FR-Wt`").alias("peso_dieta_fruta")
)

# Conversão Segura de Tipos (try_cast via expr)
# Isso garante que se houver texto onde deveria ter número, vira NULL em vez de erro
df_convertido = df_selecionado.select(
    col("id_original"), col("nome_ingles"), col("nome_latin"), 
    col("ordem"), col("familia"), col("cod_categoria_iucn"), 
    col("habitat_primario"), col("dieta_primaria"),
    
    expr("try_cast(massa_media AS FLOAT)").alias("massa_media"),
    expr("try_cast(altitude_min AS FLOAT)").alias("altitude_min"), 
    expr("try_cast(altitude_max AS FLOAT)").alias("altitude_max"),
    expr("try_cast(ninhada_min AS FLOAT)").alias("ninhada_min"),
    expr("try_cast(ninhada_max AS FLOAT)").alias("ninhada_max"),
    expr("try_cast(peso_dieta_inseto AS FLOAT)").alias("peso_dieta_inseto"), 
    expr("try_cast(peso_dieta_fruta AS FLOAT)").alias("peso_dieta_fruta"),
    
    col("flag_migratorio"), col("flag_sedentario")
)

# Tratamento de Nulos
df_tratado = df_convertido.fillna(0, subset=[
    "massa_media", "altitude_min", "altitude_max", 
    "ninhada_min", "ninhada_max", "peso_dieta_inseto", "peso_dieta_fruta"
])

# Lógica de Negócio (Flags Booleanas e Padronização)
# Convertendo '1'/'True' para booleano real e padronizando texto para maiúsculo
df_tratado = df_tratado.withColumn("eh_migratorio", when(col("flag_migratorio").isin("1", "1.0", "True"), True).otherwise(False)) \
                       .withColumn("eh_sedentario", when(col("flag_sedentario").isin("1", "1.0", "True"), True).otherwise(False)) \
                       .withColumn("habitat_primario", upper(trim(col("habitat_primario")))) \
                       .withColumn("dieta_primaria", upper(trim(col("dieta_primaria")))) \
                       .withColumn("cod_categoria_iucn", upper(trim(col("cod_categoria_iucn"))))

# ------------------------------------------------------------------------------
# 3. MODELAGEM (STAR SCHEMA)
# ------------------------------------------------------------------------------

print("\n--- INICIANDO MODELAGEM ---")

# DIM_CONSERVACAO
df_dim_conservacao = df_legend_spark.select(
    upper(trim(col("Code"))).alias("cod_categoria_iucn"),
    col("Description").alias("desc_status")
).distinct()
df_dim_conservacao = df_dim_conservacao.withColumn("sk_status_iucn", monotonically_increasing_id())

# DIM_HABITAT
df_dim_habitat = df_tratado.select("habitat_primario").distinct().filter(col("habitat_primario").isNotNull())
df_dim_habitat = df_dim_habitat.withColumn("sk_habitat", monotonically_increasing_id())

# DIM_DIETA
df_dim_dieta = df_tratado.select("dieta_primaria").distinct().filter(col("dieta_primaria").isNotNull())
df_dim_dieta = df_dim_dieta.withColumn("sk_dieta", monotonically_increasing_id())

# DIM_TAXONOMIA
df_dim_taxonomia = df_tratado.select("familia", "ordem").distinct()
df_dim_taxonomia = df_dim_taxonomia.withColumn("sk_familia", monotonically_increasing_id())

# DIM_ESPECIE
df_dim_especie = df_tratado.select("id_original", "nome_ingles", "nome_latin").distinct()
df_dim_especie = df_dim_especie.withColumn("sk_especie", monotonically_increasing_id())

print("Gerando Tabela Fato...")

# Joins para substituir valores originais pelas Chaves (SKs)
df_fato = df_tratado.alias("t") \
    .join(df_dim_especie.alias("de"), col("t.id_original") == col("de.id_original"), "left") \
    .join(df_dim_habitat.alias("dh"), col("t.habitat_primario") == col("dh.habitat_primario"), "left") \
    .join(df_dim_dieta.alias("dd"), col("t.dieta_primaria") == col("dd.dieta_primaria"), "left") \
    .join(df_dim_taxonomia.alias("dt"), (col("t.familia") == col("dt.familia")) & (col("t.ordem") == col("dt.ordem")), "left") \
    .join(df_dim_conservacao.alias("dc"), col("t.cod_categoria_iucn") == col("dc.cod_categoria_iucn"), "left")

# Seleção final das colunas da Fato
df_fato_final = df_fato.select(
    col("de.sk_especie"),
    col("dh.sk_habitat"),
    col("dd.sk_dieta"),
    col("dt.sk_familia"),
    col("dc.sk_status_iucn"),
    col("t.massa_media"),
    col("t.ninhada_min"),
    col("t.ninhada_max"),
    col("t.altitude_min"),
    col("t.altitude_max"),
    col("t.eh_migratorio"),
    col("t.eh_sedentario"),
    col("t.peso_dieta_inseto"),
    col("t.peso_dieta_fruta")
)

# ------------------------------------------------------------------------------
# 4. CARGA (SALVANDO NO DELTA LAKE)
# ------------------------------------------------------------------------------

print("\n--- SALVANDO TABELAS DELTA ---")

# 1. CRIAR O BANCO DE DADOS (SCHEMA)
print("Criando/Verificando banco de dados 'birdbase'...")
spark.sql("CREATE DATABASE IF NOT EXISTS birdbase")

# 2. SALVAR AS TABELAS DENTRO DO BANCO

print("Salvando dim_habitat...")
df_dim_habitat.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("birdbase.dim_habitat")
print("Salvando dim_dieta...")
df_dim_dieta.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("birdbase.dim_dieta")
print("Salvando dim_taxonomia...")
df_dim_taxonomia.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("birdbase.dim_taxonomia")
print("Salvando dim_conservacao...")
df_dim_conservacao.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("birdbase.dim_conservacao")
print("Salvando dim_especie...")
df_dim_especie.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("birdbase.dim_especie")
print("Salvando fato_metricas_aves...")
df_fato_final.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("birdbase.fato_metricas_aves")
print("Pipeline BirdBase concluído com sucesso! Tabelas salvas no banco 'birdbase'.")

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
--- INICIANDO INGESTÃO DE DADOS ---
Tentando baixar arquivo da fonte: https://springernature.figshare.com/ndownloader/files/55634729
Download concluído! Arquivo salvo em: /tmp/BirdBase_Final.xlsx
>> Lendo aba 'Data' e tratando cabeçalho duplo...
>> Lendo aba 'Legend' e extraindo tabela de status...
   Legenda extraída com sucesso. 137 categorias encontradas.
>> Convertendo Pandas para Spark DataFrames...
Ingestão concluída com sucesso!

--- INICIANDO TRANSFORMAÇÃO ---

--- INICIANDO MODELAGEM ---
Gerando Tabela Fato...

--- SALVANDO TABELAS DELTA ---
Criando/Verificando banco de dados 'birdbase'...
Salvando dim_habitat...
Salvando dim_dieta...
Salvando dim_taxonomia...
Salvando dim_conservacao...
Salvando dim_especie...
Salvando fato_metricas_aves...
Pipeline BirdBase concluído com sucesso! Tabelas salvas no banco 'birdbase'.
