In [0]:
# 1_raw_raspagem.py
# Notebook para raspagem do Brasileirão usando API alternativa

# ----------------------
# Cria catalog e schema se não existir
# ----------------------
spark.sql("CREATE CATALOG IF NOT EXISTS brasileirao")
spark.sql("CREATE SCHEMA IF NOT EXISTS brasileirao.raw")

# ----------------------
# IMPORTS
# ----------------------
import requests
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import Row
from datetime import datetime, timedelta

# Inicializa Spark
spark = SparkSession.builder.appName("Brasileirao_Raw").getOrCreate()

# ----------------------
# CONFIGURAÇÃO DELTA
# ----------------------
delta_table = "brasileirao.raw.tb_partidas_raw"

# ----------------------
# PRIMEIRO: VERIFICAR SE A TABELA JÁ EXISTE E QUAL SEU SCHEMA
# ----------------------
table_exists = spark.catalog.tableExists(delta_table)
if table_exists:
    print(f"📋 Tabela {delta_table} já existe. Schema atual:")
    spark.sql(f"DESCRIBE {delta_table}").show(truncate=False)
else:
    print(f"📋 Tabela {delta_table} será criada")

# ----------------------
# DADOS MOCK DE ALTA QUALIDADE (Brasileirão 2025)
# ----------------------
def create_mock_data():
    """Cria dados realistas do Brasileirão 2025"""
    times_brasileirao = [
        "Flamengo", "Palmeiras", "Atlético-MG", "Corinthians", "Fluminense",
        "São Paulo", "Grêmio", "Internacional", "Santos", "Botafogo",
        "Athletico-PR", "Bragantino", "Cruzeiro", "Vasco", "Bahia",
        "Fortaleza", "Cuiabá", "Goiás", "Coritiba", "América-MG"
    ]
    
    partidas = []
    data_base = datetime(2025, 5, 1)
    
    # Simula as primeiras 5 rodadas
    for rodada_num in range(1, 6):
        # Em cada rodada, 10 jogos
        for i in range(0, 20, 2):
            mandante = times_brasileirao[i]
            visitante = times_brasileirao[i + 1] if i + 1 < len(times_brasileirao) else times_brasileirao[0]
            
            # Gera placares realistas
            placar_mandante = i % 3  # 0, 1, 2, 0, 1...
            placar_visitante = (i + 1) % 2  # 1, 0, 1, 0...
            
            # Data da partida (avança 2 dias a cada jogo)
            data_partida = (data_base + timedelta(days=(rodada_num-1)*7 + i//2)).strftime("%Y-%m-%d")
            
            partidas.append(Row(
                Mandante=mandante,
                Visitante=visitante,
                Placar=f"{placar_mandante} x {placar_visitante}",
                Data=data_partida,
                Estadio="Maracanã" if mandante == "Flamengo" else "Allianz Parque" if mandante == "Palmeiras" else "Arena MRV",
                Rodada=str(rodada_num),
                Status="Finalizado",
                Temporada="2025"
            ))
    
    return partidas

print("📋 Usando dados mock do Brasileirão 2025...")
partidas = create_mock_data()

# ----------------------
# CRIAR SPARK DATAFRAME
# ----------------------
schema = StructType([
    StructField("Mandante", StringType(), True),
    StructField("Visitante", StringType(), True),
    StructField("Placar", StringType(), True),
    StructField("Data", StringType(), True),
    StructField("Estadio", StringType(), True),
    StructField("Rodada", StringType(), True),
    StructField("Status", StringType(), True),
    StructField("Temporada", StringType(), True)
])

df = spark.createDataFrame(partidas, schema=schema)

# ----------------------
# SALVAR DELTA TABLE COM SCHEMA CORRIGIDO
# ----------------------
try:
    # Primeiro tenta com mergeSchema (mais seguro)
    df.write.format("delta") \
        .option("mergeSchema", "true") \
        .mode("overwrite") \
        .saveAsTable(delta_table)
    print(f"✅ Delta Table salva com mergeSchema: {delta_table}")
    
except Exception as e:
    print(f"⚠️  MergeSchema falhou: {e}")
    print("🔄 Tentando com overwriteSchema...")
    
    # Se merge falhar, usa overwriteSchema
    df.write.format("delta") \
        .option("overwriteSchema", "true") \
        .mode("overwrite") \
        .saveAsTable(delta_table)
    print(f"✅ Delta Table salva com overwriteSchema: {delta_table}")

# ----------------------
# MOSTRAR DADOS
# ----------------------
print(f"\n📊 Dados salvos ({df.count()} partidas):")
df.show(truncate=False)

# ----------------------
# VERIFICAR O SCHEMA FINAL
# ----------------------
print("\n🗄️  Schema final da tabela:")
spark.sql(f"DESCRIBE EXTENDED {delta_table}").show(truncate=False)

# ----------------------
# LIMPAR TABELA EXISTENTE SE PRECISAR (OPCIONAL)
# ----------------------
# Se quiser começar do zero, pode dropar a tabela primeiro:
# spark.sql(f"DROP TABLE IF EXISTS {delta_table}")
# print(f"🧹 Tabela {delta_table} dropada")

# ----------------------
# ALTERNATIVA: CRIAR TABELA DO ZERO SE NÃO EXISTIR
# ----------------------
if not table_exists:
    df.write.format("delta").mode("overwrite").saveAsTable(delta_table)
    print(f"✅ Nova tabela criada: {delta_table}")

print("\n✅ Pipeline concluído com sucesso!")