In [0]:
# Camada Bronze - Cards - Magic: The Gathering 
# Objetivo: Processo EL (Extract & Load) com nomenclatura padronizada
# Características: Extract da staging (S3/Parquet) -> Load na bronze (Unity Catalog/Delta) 
# Melhorias: Nomenclatura padronizada, colunas padronizadas, merge incremental robusto

# =============================================================================
# BIBLIOTECAS UTILIZADAS
# =============================================================================
import logging
from datetime import datetime, timedelta
from pyspark.sql.functions import *

# =============================================================================
# CONFIGURAÇÃO DE SEGREDOS
# =============================================================================
def get_secret(secret_name, default_value=None):
    try:
        return dbutils.secrets.get(scope="mtg-pipeline", key=secret_name)
    except:
        if default_value is not None:
            print(f"Segredo '{secret_name}' não encontrado, usando valor padrão")
            return default_value
        else:
            print(f"Segredo obrigatório '{secret_name}' não configurado")
            raise Exception(f"Segredo '{secret_name}' não configurado")

# =============================================================================
# CONFIGURAÇÕES GLOBAIS
# =============================================================================
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

CATALOG_NAME = get_secret("catalog_name")
SCHEMA_NAME = "bronze"
TABLE_NAME = "TB_BRONZE_CARDS"

S3_BUCKET = get_secret("s3_bucket")
S3_STAGE_PREFIX = get_secret("s3_stage_prefix", "magic_the_gathering/stage")
S3_BRONZE_PREFIX = get_secret("s3_bronze_prefix", "magic_the_gathering/bronze")
S3_STAGE_PATH = f"s3://{S3_BUCKET}/{S3_STAGE_PREFIX}"
S3_BRONZE_PATH = f"{S3_BUCKET}/{S3_BRONZE_PREFIX}"

# =============================================================================
# CONFIGURAÇÃO DE PERÍODO TEMPORAL
# =============================================================================
def calculate_temporal_period(years_back=5):
    """Calcula período temporal baseado no ano atual"""
    current_year = datetime.now().year
    start_year = current_year - years_back + 1
    end_year = current_year
    
    cutoff_date = datetime(start_year, 1, 1)
    end_date = datetime(end_year, 12, 31)
    
    return {
        'years_back': years_back,
        'start_year': start_year,
        'end_year': end_year,
        'cutoff_date': cutoff_date,
        'end_date': end_date,
        'cutoff_date_str': cutoff_date.strftime("%Y-%m-%d"),
        'end_date_str': end_date.strftime("%Y-%m-%d")
    }

# Configurar período temporal
YEARS_BACK = 5
temporal_config = calculate_temporal_period(YEARS_BACK)

START_YEAR = temporal_config['start_year']
END_YEAR = temporal_config['end_year']
CUTOFF_DATE = temporal_config['cutoff_date']
END_DATE = temporal_config['end_date']
CUTOFF_DATE_STR = temporal_config['cutoff_date_str']
END_DATE_STR = temporal_config['end_date_str']

print("Iniciando pipeline EL Unity com Governança GOV - TB_BRONZE_CARDS")
print(f"Filtro temporal: {YEARS_BACK} anos completos ({START_YEAR} a {END_YEAR})")
print(f"Período: {CUTOFF_DATE_STR} até {END_DATE_STR}")

# =============================================================================
# MAPEAMENTO DE COLUNAS GOV
# =============================================================================
# Mapeamento das colunas originais para nomenclatura GOV (alinhado com Silver)
COLUMN_MAPPING = {
    # Identificação
    'id': 'ID_CARD',
    
    # Nomes e Descrições
    'name': 'NME_CARD',
    'artist': 'NME_ARTIST',
    'type': 'NME_CARD_TYPE',
    'rarity': 'NME_RARITY',
    'set': 'COD_SET',
    'setName': 'NME_SET',
    
    # Custo e Tipo
    'manaCost': 'DESC_MANA_COST',
    'cmc': 'MANA_COST',
    
    # Cores
    'colors': 'COD_COLORS',
    'colorIdentity': 'COD_COLOR_IDENTITY',
    
    # Tipos e Subtipos
    'types': 'DESC_TYPES',
    'subtypes': 'DESC_SUBTYPES',
    
    # Estatísticas
    'power': 'NME_POWER',
    'toughness': 'NME_TOUGHNESS',
    
    # Informações de Edição
    'number': 'COD_NUMBER',
    'layout': 'NME_LAYOUT',
    'multiverseid': 'ID_MULTIVERSE',
    'imageUrl': 'URL_IMAGE',
    'variations': 'DESC_VARIATIONS',
    
    # Texto
    'text': 'DESC_CARD',
    
    # Metadados
    'ingestion_timestamp': 'DT_INGESTION',
    'source': 'NME_SOURCE',
    'endpoint': 'NME_ENDPOINT'
}



In [0]:
# =============================================================================
# FUNÇÕES UTILITÁRIAS
# =============================================================================
def setup_unity_catalog():
    # Configura o Unity Catalog e schema
    try:
        spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG_NAME}")
        spark.sql(f"USE CATALOG {CATALOG_NAME}")
        spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA_NAME}")
        spark.sql(f"USE SCHEMA {SCHEMA_NAME}")
        return True
    except Exception as e:
        print(f"Erro ao configurar Unity Catalog: {e}")
        return False

def extract_from_staging(table_name):
    # EXTRACT: Lê dados da camada staging
    try:
        # Para TB_BRONZE_CARDS, ler do arquivo cards da staging
        if table_name == "TB_BRONZE_CARDS":
            stage_path = f"{S3_STAGE_PATH}/*_cards.parquet"
        else:
            stage_path = f"{S3_STAGE_PATH}/*_{table_name}.parquet"
        df = spark.read.parquet(stage_path)
        print(f"Extraídos {df.count()} registros de staging para {table_name}")
        return df
    except Exception as e:
        print(f"Erro no EXTRACT de staging: {e}")
        return None

def apply_governance_naming(df):
    # Aplica nomenclatura GOV nas colunas
    try:
        # Obter colunas disponíveis no DataFrame
        available_columns = df.columns
        
        # Criar mapeamento apenas para colunas que existem
        column_mapping = {}
        for original_col, gov_col in COLUMN_MAPPING.items():
            if original_col in available_columns:
                column_mapping[original_col] = gov_col
        
        # Renomear colunas
        for original_col, gov_col in column_mapping.items():
            df = df.withColumnRenamed(original_col, gov_col)
        
        print(f"Aplicada nomenclatura GOV em {len(column_mapping)} colunas")
        print(f"Colunas renomeadas: {list(column_mapping.keys())}")
        
        return df
    except Exception as e:
        print(f"Erro ao aplicar nomenclatura GOV: {e}")
        return df

def apply_temporal_filter(df, table_name):
    # Aplica filtro temporal de 5 anos
    if table_name == "TB_BRONZE_CARDS":
        # Para TB_BRONZE_CARDS, aplicar filtro baseado no releaseDate dos sets
        # Filtro: cards de sets lançados nos últimos 5 anos
        from pyspark.sql.functions import col, current_date, date_sub
        
        try:
            # Usar período completo de 5 anos (2020 a 2024)
            from pyspark.sql.functions import to_date
            
            # Ler dados de sets da staging
            sets_stage_path = f"{S3_STAGE_PATH}/*_sets.parquet"
            print(f"Carregando dados de sets para filtro temporal: {sets_stage_path}")
            
            sets_df = spark.read.parquet(sets_stage_path)
            print(f"Sets carregados: {sets_df.count()} registros")
            
            # Filtrar sets do período completo (5 anos completos)
            sets_filtered = sets_df.filter(
                (col("releaseDate") >= CUTOFF_DATE_STR) & 
                (col("releaseDate") <= END_DATE_STR)
            )
            print(f"Sets filtrados ({START_YEAR} a {END_YEAR}): {sets_filtered.count()} registros")
            
            # Obter lista de códigos de sets válidos
            valid_sets = sets_filtered.select("code").distinct()
            valid_set_codes = [row.code for row in valid_sets.collect()]
            print(f"Sets válidos para filtro: {len(valid_set_codes)} códigos")
            
            # Aplicar filtro nos cards baseado nos sets válidos
            df_filtered = df.filter(col("set").isin(valid_set_codes))
            
            total_before = df.count()
            total_after = df_filtered.count()
            
            print(f"Filtro temporal aplicado para TB_BRONZE_CARDS:")
            print(f"  - Período: {CUTOFF_DATE_STR} até {END_DATE_STR}")
            print(f"  - Anos completos: {START_YEAR} a {END_YEAR}")
            print(f"  - Sets válidos: {len(valid_set_codes)}")
            print(f"  - Registros antes: {total_before}")
            print(f"  - Registros depois: {total_after}")
            print(f"  - Registros removidos: {total_before - total_after}")
            
            return df_filtered
            
        except Exception as e:
            print(f"Erro ao aplicar filtro temporal baseado em sets: {e}")
            print("Continuando sem filtro temporal...")
            return df
    
    return df

def check_delta_table_exists(delta_path):
    # Verifica se a tabela Delta existe
    try:
        # Verificar se _delta_log existe
        delta_log_path = f"{delta_path}/_delta_log"
        try:
            files = dbutils.fs.ls(delta_log_path)
            if files:
                # Tentar ler a tabela para verificar se tem dados
                df = spark.read.format("delta").load(delta_path)
                count = df.count()
                print(f"Tabela Delta existe com {count} registros")
                return True, count > 0
            else:
                return False, False
        except:
            return False, False
    except Exception as e:
        print(f"Erro ao verificar tabela Delta: {e}")
        return False, False

def get_delta_schema_for_merge(delta_path):
    # Obtém o schema da tabela Delta existente, excluindo colunas de particionamento
    try:
        df = spark.read.format("delta").load(delta_path)
        schema_fields = [field.name for field in df.schema.fields]
        
        # Excluir colunas de particionamento do schema para merge
        partition_columns = ['RELEASE_YEAR', 'RELEASE_MONTH']
        merge_schema = [col for col in schema_fields if col not in partition_columns]
        
        print(f"Schema completo da tabela Delta: {schema_fields}")
        print(f"Schema para merge (sem particionamento): {merge_schema}")
        return merge_schema
    except Exception as e:
        print(f"Erro ao obter schema Delta: {e}")
        return []

def prepare_dataframe_for_merge(df, table_name, target_schema=None):
    # Prepara DataFrame para merge, garantindo compatibilidade de schema
    if not df:
        return None
    
    try:
        # Aplicar nomenclatura GOV
        df = apply_governance_naming(df)
        
        # Elimina duplicidade de ID_CARD no DataFrame de origem
        total_before = df.count()
        df = df.dropDuplicates(['ID_CARD'])
        total_after = df.count()
        print(f"Removidas {total_before - total_after} duplicatas baseadas em 'ID_CARD'")
        
        # Aplicar filtro temporal
        df = apply_temporal_filter(df, table_name)
        
        # Se temos schema de destino, filtrar colunas compatíveis
        if target_schema:
            available_columns = df.columns
            compatible_columns = [col for col in available_columns if col in target_schema]
            
            if not compatible_columns:
                print("Nenhuma coluna compatível encontrada para merge")
                return None
            
            print(f"Colunas compatíveis para merge: {compatible_columns}")
            df = df.select(compatible_columns)
        
        return df
    except Exception as e:
        print(f"Erro ao preparar DataFrame para merge: {e}")
        return None

def execute_merge_with_specific_columns(delta_table, merge_df, merge_columns):
    # Executa merge especificando exatamente quais colunas atualizar
    try:
        # Construir a condição de merge (cards usa 'ID_CARD' como chave)
        merge_condition = "bronze.ID_CARD = novo.ID_CARD"
        
        # Construir as ações de UPDATE especificando colunas
        update_actions = {}
        for col_name in merge_columns:
            if col_name != 'ID_CARD':  # Não atualizar a chave de merge
                update_actions[col_name] = f"novo.{col_name}"
        
        # Construir as ações de INSERT especificando colunas
        insert_actions = {}
        for col_name in merge_columns:
            insert_actions[col_name] = f"novo.{col_name}"
        
        print(f"Executando merge com {len(update_actions)} colunas para UPDATE")
        print(f"Colunas para UPDATE: {list(update_actions.keys())}")
        
        # Executar merge com ações específicas
        delta_table.alias("bronze").merge(
            merge_df.alias("novo"),
            merge_condition
        ).whenMatchedUpdate(set=update_actions) \
         .whenNotMatchedInsert(values=insert_actions) \
         .execute()
        
        print("Merge Delta executado com sucesso")
        return True
    except Exception as e:
        print(f"Erro no merge Delta: {e}")
        return False

def check_unity_table_exists(full_table_name):
    # Verifica se a tabela Unity Catalog existe
    try:
        # Listar tabelas no schema
        tables_df = spark.sql(f"SHOW TABLES IN {CATALOG_NAME}.{SCHEMA_NAME}")
        tables_list = [row.tableName for row in tables_df.collect()]
        exists = TABLE_NAME in tables_list
        print(f"Tabela Unity Catalog '{full_table_name}' existe: {exists}")
        return exists
    except Exception as e:
        print(f"Erro ao verificar tabela Unity Catalog: {e}")
        return False

def create_or_update_unity_catalog(full_table_name, delta_path):
    # Cria ou atualiza tabela Unity Catalog preservando modificações existentes
    try:
        table_exists = check_unity_table_exists(full_table_name)
        
        if not table_exists:
            # Criar tabela pela primeira vez
            print(f"Criando tabela Unity Catalog: {full_table_name}")
            create_table_sql = f"""
            CREATE TABLE {full_table_name}
            USING DELTA
            LOCATION '{delta_path}'
            COMMENT 'Tabela bronze de fato de cards do Magic: The Gathering com nomenclatura GOV'
            """
            spark.sql(create_table_sql)
            print(f"Tabela Unity Catalog criada: {full_table_name}")
        else:
            # Tabela já existe, apenas atualizar propriedades de pipeline (sem perder modificações)
            print(f"Tabela Unity Catalog já existe: {full_table_name}")
            print("Atualizando apenas propriedades de pipeline (preservando modificações existentes)")
        
        # Sempre atualizar propriedades de pipeline (não afeta modificações customizadas)
        try:
            spark.sql(f"""
            ALTER TABLE {full_table_name} SET TBLPROPERTIES (
                'bronze_layer' = 'true',
                'data_source' = 'mtg_api',
                'last_processing_date' = '{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}',
                'table_type' = 'bronze',
                'load_mode' = 'incremental_merge_gov',
                'governance_applied' = 'true',
                'naming_convention' = 'GOV',
                'temporal_window_years' = '{YEARS_BACK}',
                'temporal_filter_source' = 'sets_releaseDate',
                'temporal_filter_column' = 'set',
                'partitioning' = 'RELEASE_YEAR_MONTH'
            )
            """)
            print("Propriedades de pipeline atualizadas")
        except Exception as e:
            print(f"Aviso: Não foi possível atualizar propriedades de pipeline: {e}")
            
    except Exception as e:
        print(f"Erro ao criar/atualizar tabela Unity Catalog: {e}")

def load_to_bronze_unity_incremental_gov(df, table_name):
    # LOAD: Carrega dados na camada bronze (Unity + Incremental + GOV)
    if not df:
        return None
    try:
        delta_path = f"s3://{S3_BRONZE_PATH}/{table_name}"
        full_table_name = f"{CATALOG_NAME}.{SCHEMA_NAME}.{table_name}"
        from delta.tables import DeltaTable
        
        # Verificar se tabela Delta existe
        delta_exists, has_data = check_delta_table_exists(delta_path)
        
        if not delta_exists or not has_data:
            # Primeira criação com particionamento
            print("Criando tabela Delta pela primeira vez com nomenclatura GOV...")
            df_processed = prepare_dataframe_for_merge(df, table_name)
            
            if table_name == "TB_BRONZE_CARDS":
                # Para TB_BRONZE_CARDS, particionamento por ano de lançamento do set
                # JOIN com sets para pegar releaseDate real
                from pyspark.sql.functions import current_timestamp, lit, year, month
                
                # Carregar dados de sets
                sets_stage_path = f"{S3_STAGE_PATH}/*_sets.parquet"
                print(f"Carregando sets para particionamento: {sets_stage_path}")
                sets_df = spark.read.parquet(sets_stage_path)
                
                # Selecionar apenas colunas necessárias dos sets
                sets_for_join = sets_df.select("code", "releaseDate")
                print(f"Sets carregados para JOIN: {sets_for_join.count()} registros")
                
                # JOIN com sets para pegar releaseDate
                df_with_sets = df_processed.join(sets_for_join, 
                                               df_processed.COD_SET == sets_for_join.code, 
                                               "left")
                
                # Criar colunas de particionamento baseadas no releaseDate real
                df_with_partition = df_with_sets.withColumn("RELEASE_YEAR", 
                                  year(col("releaseDate"))) \
                       .withColumn("RELEASE_MONTH", 
                                  month(col("releaseDate")))
                
                # Remover colunas de JOIN desnecessárias
                df_with_partition = df_with_partition.drop("code", "releaseDate")
                
                print(f"Particionamento criado com releaseDate real dos sets")
                
                df_with_partition.write.format("delta") \
                       .mode("overwrite") \
                       .option("overwriteSchema", "true") \
                       .partitionBy("RELEASE_YEAR", "RELEASE_MONTH") \
                       .save(delta_path)
            else:
                df_processed.write.format("delta") \
                       .mode("overwrite") \
                       .option("overwriteSchema", "true") \
                       .save(delta_path)
            print("Tabela Delta criada com sucesso")
        else:
            # Para merge, obter schema da tabela existente (excluindo particionamento) e preparar dados
            print("Tabela Delta existe, preparando para merge...")
            target_schema = get_delta_schema_for_merge(delta_path)
            
            # Preparar DataFrame para merge
            merge_df = prepare_dataframe_for_merge(df, table_name, target_schema)
            if not merge_df:
                print("Nenhum DataFrame compatível para merge")
                return None
            
            # Executar merge com colunas específicas (sem particionamento)
            print("Executando merge Delta...")
            delta_table = DeltaTable.forPath(spark, delta_path)
            
            # Usar função específica para merge com colunas definidas
            merge_success = execute_merge_with_specific_columns(delta_table, merge_df, target_schema)
            if not merge_success:
                print("Falha no merge Delta")
                return None

        # Criar ou atualizar tabela Unity Catalog (PRESERVANDO MODIFICAÇÕES)
        create_or_update_unity_catalog(full_table_name, delta_path)
        
        return df
    except Exception as e:
        print(f"Erro no LOAD para bronze {table_name}: {e}")
        return None

def process_el_unity_incremental_gov(table_name):
    # Executa o pipeline EL Unity + Incremental + GOV
    stage_df = extract_from_staging(table_name)
    if not stage_df:
        print(f"Falha no EXTRACT de staging para {table_name}")
        return None
    
    result_df = load_to_bronze_unity_incremental_gov(stage_df, table_name)
    return result_df

def query_bronze_unity(table_name):
    # Consulta na tabela bronze Unity Catalog
    try:
        full_table_name = f"{CATALOG_NAME}.{SCHEMA_NAME}.{table_name}"
        count_query = f"SELECT COUNT(*) as total FROM {full_table_name}"
        count_result = spark.sql(count_query)
        count_result.show()
        
    except Exception as e:
        print(f"Erro ao consultar tabela bronze: {e}")

def show_delta_info(table_name):
    # Mostra informações da tabela Delta
    try:
        delta_path = f"s3://{S3_BRONZE_PATH}/{table_name}"
        from delta.tables import DeltaTable
        delta_table = DeltaTable.forPath(spark, delta_path)
        history = delta_table.history()
        print(f"Versões Delta: {history.count()}")
            
    except Exception as e:
        print(f"Erro ao mostrar informações Delta: {e}")

def show_sample_data(table_name):
    # Mostra dados de exemplo da tabela
    try:
        full_table_name = f"{CATALOG_NAME}.{SCHEMA_NAME}.{table_name}"
        sample_query = f"SELECT * FROM {full_table_name} LIMIT 5"
        print("Dados de exemplo:")
        spark.sql(sample_query).show(truncate=False)
        
    except Exception as e:
        print(f"Erro ao mostrar dados de exemplo: {e}")

def check_partitioning_working(table_name):
    # Verifica se o particionamento está funcionando
    try:
        delta_path = f"s3://{S3_BRONZE_PATH}/{table_name}"
        
        # Verificar se existem partições
        try:
            partitions = dbutils.fs.ls(delta_path)
            has_partitions = any("RELEASE_YEAR=" in p.path for p in partitions)
            
            if has_partitions:
                print("Particionamento funcionando: Encontradas partições RELEASE_YEAR")
                return True
            else:
                print("Particionamento não encontrado: Nenhuma partição RELEASE_YEAR")
                return False
                
        except Exception as e:
            print(f"Erro ao verificar particionamento: {e}")
            return False
            
    except Exception as e:
        print(f"Erro ao verificar particionamento: {e}")
        return False

def show_governance_info():
    # Mostra informações sobre a governança aplicada
    print("\n=== INFORMAÇÕES DE GOVERNAÇA GOV ===")
    print(f"Total de colunas mapeadas: {len(COLUMN_MAPPING)}")
    print("Padrões aplicados:")
    print("- NME_: Nomes e descrições")
    print("- COD_: Códigos e identificadores")
    print("- ID_: Identificadores únicos")
    print("- FLG_: Flags booleanos")
    print("- DT_: Datas e timestamps")
    print("- DESC_: Descrições e textos")
    print("- MANA_COST: Custo de mana convertido")
    print("- CMC: Converted Mana Cost")
    print("- NME_POWER/NME_TOUGHNESS: Estatísticas de criatura")
    
    # Mostrar mapeamento específico das colunas principais
    print("\n=== MAPEAMENTO PRINCIPAL DE COLUNAS ===")
    print("Identificação:")
    print(f"  id → ID_CARD")
    
    print("\nNomes e Descrições:")
    print(f"  name → NME_CARD")
    print(f"  artist → NME_ARTIST")
    print(f"  type → NME_CARD_TYPE")
    print(f"  rarity → NME_RARITY")
    
    print("\nCusto e Tipo:")
    print(f"  manaCost → DESC_MANA_COST")
    print(f"  cmc → MANA_COST")
    
    print("\nCores:")
    print(f"  colors → COD_COLORS")
    print(f"  color_category → NME_COLOR_CATEGORY")
    
    print("\nEstatísticas:")
    print(f"  power → NME_POWER")
    print(f"  toughness → NME_TOUGHNESS")
    
    print("\nFlags:")
    print(f"  is_creature → FLG_CREATURE")
    print(f"  is_legendary → FLG_LEGENDARY")
    
    print("\nTexto:")
    print(f"  text → DESC_CARD")
    print(f"  flavorText → DESC_FLAVOR")
    
    print("\nDatas:")
    print(f"  created_at → DT_CREATED")
    print(f"  updated_at → DT_UPDATED")
    print(f"  ingestion_timestamp → DT_INGESTION")
    
    print("\n=== FILTRO TEMPORAL ===")
    print("Filtro baseado em releaseDate dos sets")
    print(f"{YEARS_BACK} anos completos ({START_YEAR} a {END_YEAR})")
    print("Cards filtrados por sets válidos")
    
    print("\n=== PARTICIONAMENTO ===")
    print("Particionamento por RELEASE_YEAR e RELEASE_MONTH")
    print("Baseado no releaseDate real dos sets")
    print("JOIN com dados de sets para particionamento correto")
    print("Tabela: TB_BRONZE_CARDS")



In [0]:
# =============================================================================
# EXECUÇÃO PRINCIPAL
# =============================================================================

try:
    spark
except NameError:
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.getOrCreate()

setup_success = setup_unity_catalog()
if not setup_success:
    raise Exception("Falha ao configurar Unity Catalog")

# Mostrar informações de governança
show_governance_info()

cards_bronze_df = process_el_unity_incremental_gov("TB_BRONZE_CARDS")

if cards_bronze_df:
    print("Pipeline executado com sucesso")
    query_bronze_unity("TB_BRONZE_CARDS")
    show_delta_info("TB_BRONZE_CARDS")
    check_partitioning_working("TB_BRONZE_CARDS")
    #show_sample_data("TB_BRONZE_CARDS")
else:
    print("Falha no pipeline") 