In [0]:
# ==============================================================================
# NOTEBOOK DE INFERÊNCIA - PREVISÃO DE PARTIDAS FUTURAS
# ==============================================================================
from pyspark.ml import PipelineModel
from pyspark.sql import functions as F
from pyspark.ml.feature import IndexToString, VectorAssembler
import mlflow
import os

print("="*80)
print("PIPELINE DE INFERÊNCIA - PREVISÃO BRASILEIRÃO")
print("="*80)

spark.sql("USE CATALOG previsao_brasileirao")

# ==============================================================================
# PASSO 1: CONFIGURAR E CARREGAR MODELOS
# ==============================================================================
print("\n[PASSO 1] Configurando ambiente e carregando modelos...")
print("-"*80)

schema_modelos = "diamond"

# CONFIGURAÇÃO CRÍTICA: Volume path para Databricks Serverless
volume_path = "/Volumes/previsao_brasileirao/diamond/mlflow_models"
os.environ['MLFLOW_DFS_TMP'] = volume_path

print(f"📁 Volume configurado: {volume_path}")

# Configurar MLflow
mlflow.set_registry_uri("databricks-uc")

# Carregar metadados dos modelos
df_modelos_info = spark.table(f"{schema_modelos}.modelos_registry") \
    .filter("ativo = true") \
    .orderBy(F.desc("versao"))

print("\n📊 Modelos disponíveis:")
display(df_modelos_info.select("nome_modelo", "tipo", "acuracia", "run_id"))

# Pegar o run_id mais recente
run_id = df_modelos_info.select("run_id").first()["run_id"]
print(f"\n✅ Usando Run ID: {run_id}")

# Carregar os modelos
print("\nCarregando modelos treinados...")

try:
    modelo_geral = mlflow.spark.load_model(
        f"runs:/{run_id}/modelo_geral",
        dfs_tmpdir=volume_path
    )
    print("  ✅ Modelo Geral carregado")
    
    modelo_lr = mlflow.spark.load_model(
        f"runs:/{run_id}/modelo_lr",
        dfs_tmpdir=volume_path
    )
    print("  ✅ Modelo LR carregado")
    
    modelo_rf = mlflow.spark.load_model(
        f"runs:/{run_id}/modelo_rf",
        dfs_tmpdir=volume_path
    )
    print("  ✅ Modelo RF carregado")
    
    modelo_final = mlflow.spark.load_model(
        f"runs:/{run_id}/modelo_final",
        dfs_tmpdir=volume_path
    )
    print("  ✅ Meta-Modelo carregado")
    
except Exception as e:
    print(f"\n❌ ERRO ao carregar modelos: {e}")
    print("\n🔧 Possíveis soluções:")
    print("1. Verifique se o notebook '04_Diamond_Model_Training' foi executado")
    print("2. Execute novamente o PASSO 5 do notebook de treinamento")
    raise

# Carregar mapa de labels
df_labels_map = spark.table(f"{schema_modelos}.label_mapping")
label_mapping_dict = {row['resultado']: row['label'] for row in df_labels_map.collect()}
labels_array = [k for k, v in sorted(label_mapping_dict.items(), key=lambda x: x[1])]

print(f"\n✅ Labels mapeados: {labels_array}")

# Recriar transformadores
assembler_meta = VectorAssembler(
    inputCols=["prob_geral", "prob_lr", "prob_rf"],
    outputCol="features"
)

converter = IndexToString(
    inputCol="prediction",
    outputCol="previsao_texto",
    labels=labels_array
)

print("  ✅ Transformadores criados")

# Carregar dados auxiliares
df_partidas_historico = spark.table("gold.feature_store")
df_clubes = spark.table("silver.clubes")

print(f"\n✅ Dados históricos: {df_partidas_historico.count()} partidas")
print(f"✅ Clubes cadastrados: {df_clubes.count()}")

print("\n" + "="*80)
print("✅ PASSO 1 CONCLUÍDO - Modelos carregados com sucesso!")
print("="*80)

# ==============================================================================
# PASSO 2: CARREGAR E PREPARAR PARTIDAS FUTURAS
# ==============================================================================
print("\n[PASSO 2] Carregando partidas futuras...")
print("-"*80)

df_futuro_raw = spark.table("bronze.partidas_raw") \
    .filter("placar_oficial_mandante IS NULL AND valida = true") \
    .orderBy("partida_data")

num_partidas_futuras = df_futuro_raw.count()
print(f"\n📋 Partidas futuras encontradas: {num_partidas_futuras}")

if num_partidas_futuras == 0:
    print("\n⚠️ ATENÇÃO: Nenhuma partida futura encontrada!")
    print("\n💡 Possíveis motivos:")
    print("  - A rodada atual já foi concluída")
    print("  - A próxima rodada ainda não foi disponibilizada pela API")
    print("\n🔧 Solução:")
    print("  Execute o notebook '01_Bronze_Ingestao' para atualizar os dados")
    
    # Ainda assim, continuar para mostrar o processo
    print("\n  (Continuando com o processo para demonstração...)")
else:
    print("\n✅ Preview das partidas futuras:")
    display(df_futuro_raw.select(
        "partida_id", "rodada", "partida_data",
        "clube_casa_id", "clube_visitante_id"
    ).limit(5))

# ==============================================================================
# PASSO 3: ENGENHARIA DE FEATURES PARA PARTIDAS FUTURAS
# ==============================================================================
print("\n[PASSO 3] Aplicando engenharia de features...")
print("-"*80)

# Pegar Elo mais recente de cada time
latest_elos = df_partidas_historico \
    .groupBy("mandante_id") \
    .agg(F.max("rodada").alias("rodada")) \
    .join(df_partidas_historico, ["mandante_id", "rodada"]) \
    .select("mandante_id", F.col("elo_mandante_pre_jogo").alias("elo_atual"))

# Pegar médias móveis mais recentes
latest_mms_m = df_partidas_historico \
    .groupBy("mandante_id") \
    .agg(F.max("rodada").alias("rodada")) \
    .join(df_partidas_historico, ["mandante_id", "rodada"]) \
    .select("mandante_id", "mm_gols_m", "mm_fin_m", "mm_des_m")

latest_mms_v = df_partidas_historico \
    .groupBy("visitante_id") \
    .agg(F.max("rodada").alias("rodada")) \
    .join(df_partidas_historico, ["visitante_id", "rodada"]) \
    .select("visitante_id", "mm_gols_v", "mm_fin_v", "mm_des_v")

# Preparar partidas futuras
df_futuro_features = df_futuro_raw.select(
    F.col("partida_id"),
    F.col("rodada"),
    F.col("clube_casa_id").alias("mandante_id"),
    F.col("clube_visitante_id").alias("visitante_id"),
    F.to_timestamp("partida_data").alias("data_partida")
)

# Juntar Elo do mandante
df_com_elo_m = df_futuro_features.join(
    latest_elos,
    df_futuro_features["mandante_id"] == latest_elos["mandante_id"],
    "left"
).withColumnRenamed("elo_atual", "elo_mandante_pre_jogo") \
 .drop(latest_elos["mandante_id"])

# Juntar Elo do visitante
df_com_elo_mv = df_com_elo_m.join(
    latest_elos,
    df_com_elo_m["visitante_id"] == latest_elos["mandante_id"],
    "left"
).withColumnRenamed("elo_atual", "elo_visitante_pre_jogo") \
 .drop(latest_elos["mandante_id"])

# Juntar médias móveis do mandante
df_com_mm_m = df_com_elo_mv.join(
    latest_mms_m,
    df_com_elo_mv["mandante_id"] == latest_mms_m["mandante_id"],
    "left"
).drop(latest_mms_m["mandante_id"])

# Juntar médias móveis do visitante
df_com_tudo = df_com_mm_m.join(
    latest_mms_v,
    df_com_mm_m["visitante_id"] == latest_mms_v["visitante_id"],
    "left"
).drop(latest_mms_v["visitante_id"])

# Calcular diferenciais
df_futuro_features_final = df_com_tudo \
    .withColumn("elo_diff", F.col("elo_mandante_pre_jogo") - F.col("elo_visitante_pre_jogo")) \
    .withColumn("diff_mm_gols", F.col("mm_gols_m") - F.col("mm_gols_v")) \
    .withColumn("diff_mm_fin", F.col("mm_fin_m") - F.col("mm_fin_v")) \
    .withColumn("diff_mm_des", F.col("mm_des_m") - F.col("mm_des_v")) \
    .na.fill(0)

print("  ✅ Features calculadas")

print("\n" + "="*80)
print("✅ PASSO 3 CONCLUÍDO - Features prontas!")
print("="*80)

# ==============================================================================
# PASSO 4: FAZER PREVISÕES
# ==============================================================================
print("\n[PASSO 4] Gerando previsões...")
print("-"*80)

if num_partidas_futuras > 0:
    # Nível 1: Previsões dos especialistas
    print("\n  Nível 1: Modelos Especialistas...")
    pred_geral_futuro = modelo_geral.transform(df_futuro_features_final)
    pred_lr_futuro = modelo_lr.transform(df_futuro_features_final)
    pred_rf_futuro = modelo_rf.transform(df_futuro_features_final)
    print("    ✅ Previsões dos especialistas geradas")
    
    # Preparar meta-features
    meta_features_geral = pred_geral_futuro.select("partida_id", F.col("probability").alias("prob_geral"))
    meta_features_lr = pred_lr_futuro.select("partida_id", F.col("probability").alias("prob_lr"))
    meta_features_rf = pred_rf_futuro.select("partida_id", F.col("probability").alias("prob_rf"))
    
    df_inferencia_meta = df_futuro_features_final.select("partida_id") \
        .join(meta_features_geral, "partida_id") \
        .join(meta_features_lr, "partida_id") \
        .join(meta_features_rf, "partida_id")
    
    df_inferencia_meta = assembler_meta.transform(df_inferencia_meta)
    
    # Nível 2: Meta-modelo
    print("  Nível 2: Meta-Modelo...")
    previsoes_finais = modelo_final.transform(df_inferencia_meta)
    print("    ✅ Previsões finais geradas")
    
    # Converter índices para texto
    previsoes_texto = converter.transform(previsoes_finais)
    
    # Juntar com nomes dos clubes
    df_resultado_final = df_futuro_features_final \
        .join(previsoes_texto.select("partida_id", "previsao_texto", "probability"), "partida_id") \
        .join(df_clubes.alias("mandante"), F.col("mandante_id") == F.col("mandante.clube_id")) \
        .join(df_clubes.alias("visitante"), F.col("visitante_id") == F.col("visitante.clube_id")) \
        .withColumn("competicao", F.lit("Brasileirão Série A")) \
        .select(
            "competicao",
            F.date_format("data_partida", "dd/MM/yyyy HH:mm").alias("data_hora_partida"),
            F.col("mandante.nome").alias("time_mandante"),
            F.col("visitante.nome").alias("time_visitante"),
            F.col("previsao_texto").alias("previsao"),
            F.col("probability").alias("confianca_probabilidades")
        ).orderBy("data_hora_partida")
    
    # Salvar na camada Diamond
    df_resultado_final.write \
        .mode("overwrite") \
        .format("delta") \
        .saveAsTable("diamond.previsoes_proximas_partidas")
    
    print("\n  ✅ Previsões salvas em 'diamond.previsoes_proximas_partidas'")
    
    # Criar view temporária
    df_resultado_final.createOrReplaceTempView("visualizacao_previsoes")
    
    print("\n" + "="*80)
    print("✅ PASSO 4 CONCLUÍDO - Previsões geradas!")
    print("="*80)
    
    # ==============================================================================
    # VISUALIZAÇÃO FINAL
    # ==============================================================================
    print("\n" + "="*80)
    print("📊 PREVISÕES DAS PRÓXIMAS PARTIDAS")
    print("="*80)
    
    display(df_resultado_final)
    
    print("\n✅ Você pode consultar as previsões usando:")
    print("   SELECT * FROM diamond.previsoes_proximas_partidas")
    print("   ou")
    print("   SELECT * FROM visualizacao_previsoes")
    
else:
    print("\n⚠️ Nenhuma previsão gerada pois não há partidas futuras")
    print("   Execute o notebook '01_Bronze_Ingestao' e tente novamente")

print("\n" + "="*80)
print("✅✅✅ PIPELINE DE INFERÊNCIA CONCLUÍDO! ✅✅✅")
print("="*80)

In [0]:
# ==============================================================================
# NOTEBOOK DE INFERÊNCIA - PREVISÃO DE REBAIXAMENTO
# ==============================================================================
from pyspark.ml import PipelineModel
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import IndexToString, VectorAssembler
import mlflow
import os
import pandas as pd
from itertools import combinations

print("="*80)
print("PIPELINE DE INFERÊNCIA - PREVISÃO DE REBAIXAMENTO BRASILEIRÃO")
print("="*80)

spark.sql("USE CATALOG previsao_brasileirao")

# ==============================================================================
# PASSO 1: CONFIGURAR E CARREGAR MODELOS
# ==============================================================================
print("\n[PASSO 1] Configurando ambiente e carregando modelos...")
print("-"*80)

schema_modelos = "diamond"

# CONFIGURAÇÃO CRÍTICA: Volume path para Databricks Serverless
volume_path = "/Volumes/previsao_brasileirao/diamond/mlflow_models"
os.environ['MLFLOW_DFS_TMP'] = volume_path

print(f"📁 Volume configurado: {volume_path}")

# Configurar MLflow
mlflow.set_registry_uri("databricks-uc")

# Carregar metadados dos modelos
df_modelos_info = spark.table(f"{schema_modelos}.modelos_registry") \
    .filter("ativo = true") \
    .orderBy(F.desc("versao"))

print("\n📊 Modelos disponíveis:")
display(df_modelos_info.select("nome_modelo", "tipo", "acuracia", "run_id"))

# Pegar o run_id mais recente
run_id = df_modelos_info.select("run_id").first()["run_id"]
print(f"\n✅ Usando Run ID: {run_id}")

# Carregar os modelos
print("\nCarregando modelos treinados...")

try:
    modelo_geral = mlflow.spark.load_model(
        f"runs:/{run_id}/modelo_geral",
        dfs_tmpdir=volume_path
    )
    print("  ✅ Modelo Geral carregado")
    
    modelo_lr = mlflow.spark.load_model(
        f"runs:/{run_id}/modelo_lr",
        dfs_tmpdir=volume_path
    )
    print("  ✅ Modelo LR carregado")
    
    modelo_rf = mlflow.spark.load_model(
        f"runs:/{run_id}/modelo_rf",
        dfs_tmpdir=volume_path
    )
    print("  ✅ Modelo RF carregado")
    
    modelo_final = mlflow.spark.load_model(
        f"runs:/{run_id}/modelo_final",
        dfs_tmpdir=volume_path
    )
    print("  ✅ Meta-Modelo carregado")
    
except Exception as e:
    print(f"\n❌ ERRO ao carregar modelos: {e}")
    print("\n🔧 Possíveis soluções:")
    print("1. Verifique se o notebook '04_Diamond_Model_Training' foi executado")
    print("2. Execute novamente o PASSO 5 do notebook de treinamento")
    raise

# Carregar mapa de labels
df_labels_map = spark.table(f"{schema_modelos}.label_mapping")
label_mapping_dict = {row['resultado']: row['label'] for row in df_labels_map.collect()}
labels_array = [k for k, v in sorted(label_mapping_dict.items(), key=lambda x: x[1])]

print(f"\n✅ Labels mapeados: {labels_array}")

# Recriar transformadores
assembler_meta = VectorAssembler(
    inputCols=["prob_geral", "prob_lr", "prob_rf"],
    outputCol="features"
)

converter = IndexToString(
    inputCol="prediction",
    outputCol="previsao_texto",
    labels=labels_array
)

print("  ✅ Transformadores criados")

# Carregar dados auxiliares
df_partidas_historico = spark.table("gold.feature_store")
df_clubes = spark.table("silver.clubes")

print(f"\n✅ Dados históricos: {df_partidas_historico.count()} partidas")
print(f"✅ Clubes cadastrados: {df_clubes.count()}")

print("\n" + "="*80)
print("✅ PASSO 1 CONCLUÍDO - Modelos carregados com sucesso!")
print("="*80)

# ==============================================================================
# DIAGNÓSTICO: VERIFICAR DADOS DA API
# ==============================================================================
print("\n[DIAGNÓSTICO] Verificando integridade dos dados...")
print("-"*80)

# Verificar quantos clubes únicos temos
df_clubes_bronze = spark.table("bronze.clubes_info_raw")
num_clubes = df_clubes_bronze.count()
print(f"\n📊 Total de clubes cadastrados: {num_clubes}")

# Verificar se temos 20 times (Série A)
if num_clubes != 20:
    print(f"⚠️ ATENÇÃO: Esperado 20 clubes da Série A, encontrado {num_clubes}")

# Mostrar todos os clubes por clube_id
print("\n📋 Lista de clubes por ID:")
df_clubes_diagnostico = df_clubes_bronze.select(
    F.col("id").alias("clube_id"),
    "nome",
    "nome_fantasia",
    "abreviacao",
    "slug"
).orderBy("clube_id")

display(df_clubes_diagnostico)

print("\n✅ Diagnóstico concluído")
print("-"*80)

# ==============================================================================
# PASSO 2: CARREGAR PARTIDAS REALIZADAS E CALCULAR TABELA ATUAL
# ==============================================================================
print("\n[PASSO 2] Calculando classificação atual...")
print("-"*80)

# Partidas já realizadas
df_partidas_realizadas = spark.table("bronze.partidas_raw") \
    .filter("placar_oficial_mandante IS NOT NULL AND valida = true")

# Calcular pontos de cada time
df_pontos_mandante = df_partidas_realizadas.select(
    F.col("clube_casa_id").alias("clube_id"),
    F.when(F.col("placar_oficial_mandante") > F.col("placar_oficial_visitante"), 3)
     .when(F.col("placar_oficial_mandante") == F.col("placar_oficial_visitante"), 1)
     .otherwise(0).alias("pontos"),
    F.col("placar_oficial_mandante").alias("gols_marcados"),
    F.col("placar_oficial_visitante").alias("gols_sofridos")
)

df_pontos_visitante = df_partidas_realizadas.select(
    F.col("clube_visitante_id").alias("clube_id"),
    F.when(F.col("placar_oficial_visitante") > F.col("placar_oficial_mandante"), 3)
     .when(F.col("placar_oficial_visitante") == F.col("placar_oficial_mandante"), 1)
     .otherwise(0).alias("pontos"),
    F.col("placar_oficial_visitante").alias("gols_marcados"),
    F.col("placar_oficial_mandante").alias("gols_sofridos")
)

df_classificacao_atual = df_pontos_mandante.union(df_pontos_visitante) \
    .groupBy("clube_id") \
    .agg(
        F.sum("pontos").alias("pontos_atuais"),
        F.count("*").alias("jogos_realizados"),
        F.sum("gols_marcados").alias("gols_pro"),
        F.sum("gols_sofridos").alias("gols_contra")
    ) \
    .withColumn("saldo_gols", F.col("gols_pro") - F.col("gols_contra")) \
    .join(df_clubes, "clube_id") \
    .select("clube_id", "nome", "pontos_atuais", "jogos_realizados", "saldo_gols")

print("\n📊 Classificação Atual:")
display(df_classificacao_atual.orderBy(F.desc("pontos_atuais"), F.desc("saldo_gols")))

# ==============================================================================
# PASSO 3: CARREGAR E PREPARAR PARTIDAS FUTURAS
# ==============================================================================
print("\n[PASSO 3] Carregando partidas futuras...")
print("-"*80)

df_futuro_raw = spark.table("bronze.partidas_raw") \
    .filter("placar_oficial_mandante IS NULL AND valida = true") \
    .orderBy("partida_data")

num_partidas_futuras = df_futuro_raw.count()
print(f"\n📋 Partidas futuras encontradas: {num_partidas_futuras}")

if num_partidas_futuras == 0:
    print("\n⚠️ ATENÇÃO: Nenhuma partida futura encontrada!")
    print("\n💡 Execute o notebook '01_Bronze_Ingestao' para atualizar os dados")
    raise Exception("Sem partidas futuras para análise")

print("\n✅ Preview das partidas futuras:")
display(df_futuro_raw.select(
    "partida_id", "rodada", "partida_data",
    "clube_casa_id", "clube_visitante_id"
).limit(5))

# ==============================================================================
# PASSO 4: ENGENHARIA DE FEATURES PARA PARTIDAS FUTURAS
# ==============================================================================
print("\n[PASSO 4] Aplicando engenharia de features...")
print("-"*80)

# Pegar Elo mais recente de cada time
latest_elos = df_partidas_historico \
    .groupBy("mandante_id") \
    .agg(F.max("rodada").alias("rodada")) \
    .join(df_partidas_historico, ["mandante_id", "rodada"]) \
    .select("mandante_id", F.col("elo_mandante_pre_jogo").alias("elo_atual"))

# Pegar médias móveis mais recentes
latest_mms_m = df_partidas_historico \
    .groupBy("mandante_id") \
    .agg(F.max("rodada").alias("rodada")) \
    .join(df_partidas_historico, ["mandante_id", "rodada"]) \
    .select("mandante_id", "mm_gols_m", "mm_fin_m", "mm_des_m")

latest_mms_v = df_partidas_historico \
    .groupBy("visitante_id") \
    .agg(F.max("rodada").alias("rodada")) \
    .join(df_partidas_historico, ["visitante_id", "rodada"]) \
    .select("visitante_id", "mm_gols_v", "mm_fin_v", "mm_des_v")

# Preparar partidas futuras
df_futuro_features = df_futuro_raw.select(
    F.col("partida_id"),
    F.col("rodada"),
    F.col("clube_casa_id").alias("mandante_id"),
    F.col("clube_visitante_id").alias("visitante_id"),
    F.to_timestamp("partida_data").alias("data_partida")
)

# Juntar Elo do mandante
df_com_elo_m = df_futuro_features.join(
    latest_elos,
    df_futuro_features["mandante_id"] == latest_elos["mandante_id"],
    "left"
).withColumnRenamed("elo_atual", "elo_mandante_pre_jogo") \
 .drop(latest_elos["mandante_id"])

# Juntar Elo do visitante
df_com_elo_mv = df_com_elo_m.join(
    latest_elos,
    df_com_elo_m["visitante_id"] == latest_elos["mandante_id"],
    "left"
).withColumnRenamed("elo_atual", "elo_visitante_pre_jogo") \
 .drop(latest_elos["mandante_id"])

# Juntar médias móveis do mandante
df_com_mm_m = df_com_elo_mv.join(
    latest_mms_m,
    df_com_elo_mv["mandante_id"] == latest_mms_m["mandante_id"],
    "left"
).drop(latest_mms_m["mandante_id"])

# Juntar médias móveis do visitante
df_com_tudo = df_com_mm_m.join(
    latest_mms_v,
    df_com_mm_m["visitante_id"] == latest_mms_v["visitante_id"],
    "left"
).drop(latest_mms_v["visitante_id"])

# Calcular diferenciais
df_futuro_features_final = df_com_tudo \
    .withColumn("elo_diff", F.col("elo_mandante_pre_jogo") - F.col("elo_visitante_pre_jogo")) \
    .withColumn("diff_mm_gols", F.col("mm_gols_m") - F.col("mm_gols_v")) \
    .withColumn("diff_mm_fin", F.col("mm_fin_m") - F.col("mm_fin_v")) \
    .withColumn("diff_mm_des", F.col("mm_des_m") - F.col("mm_des_v")) \
    .na.fill(0)

print("  ✅ Features calculadas")

print("\n" + "="*80)
print("✅ PASSO 4 CONCLUÍDO - Features prontas!")
print("="*80)

# ==============================================================================
# PASSO 5: FAZER PREVISÕES E SIMULAR RESULTADOS
# ==============================================================================
print("\n[PASSO 5] Gerando previsões e simulando resultados...")
print("-"*80)

# Nível 1: Previsões dos especialistas
print("\n  Nível 1: Modelos Especialistas...")
pred_geral_futuro = modelo_geral.transform(df_futuro_features_final)
pred_lr_futuro = modelo_lr.transform(df_futuro_features_final)
pred_rf_futuro = modelo_rf.transform(df_futuro_features_final)
print("    ✅ Previsões dos especialistas geradas")

# Preparar meta-features
meta_features_geral = pred_geral_futuro.select("partida_id", F.col("probability").alias("prob_geral"))
meta_features_lr = pred_lr_futuro.select("partida_id", F.col("probability").alias("prob_lr"))
meta_features_rf = pred_rf_futuro.select("partida_id", F.col("probability").alias("prob_rf"))

df_inferencia_meta = df_futuro_features_final.select("partida_id") \
    .join(meta_features_geral, "partida_id") \
    .join(meta_features_lr, "partida_id") \
    .join(meta_features_rf, "partida_id")

df_inferencia_meta = assembler_meta.transform(df_inferencia_meta)

# Nível 2: Meta-modelo
print("  Nível 2: Meta-Modelo...")
previsoes_finais = modelo_final.transform(df_inferencia_meta)
print("    ✅ Previsões finais geradas")

# Converter índices para texto
previsoes_texto = converter.transform(previsoes_finais)

# ==============================================================================
# PASSO 6: ANÁLISE AVANÇADA DE REBAIXAMENTO
# ==============================================================================
print("\n[PASSO 6] Análise de risco de rebaixamento...")
print("-"*80)

# Juntar previsões com dados das partidas
df_previsoes_completas = df_futuro_features_final \
    .join(previsoes_texto.select("partida_id", "previsao_texto", "probability"), "partida_id")

# Importar função para trabalhar com vetores
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, ArrayType

# UDF para converter SparseVector em array denso
@udf(returnType=ArrayType(DoubleType()))
def vector_to_array(v):
    """Converte SparseVector/DenseVector para array Python"""
    if v is None:
        return [0.0, 0.0, 0.0]
    return v.toArray().tolist()

# Função para extrair pontos esperados baseado na probabilidade
def calcular_pontos_esperados(df):
    """
    Calcula pontos esperados baseado nas probabilidades do modelo
    
    Ordem dos labels (definida no notebook 04):
    - Índice 0: EMPATE
    - Índice 1: VITORIA_MANDANTE  
    - Índice 2: VITORIA_VISITANTE
    
    Pontuação:
    - VITORIA = 3 pontos
    - EMPATE = 1 ponto
    - DERROTA = 0 pontos
    """
    # Converter probability (SparseVector) para array
    df_com_array = df.withColumn("prob_array", vector_to_array(F.col("probability")))
    
    return df_com_array.select(
        "partida_id",
        "mandante_id",
        "visitante_id",
        "previsao_texto",
        "probability",
        
        # Extrair probabilidades individuais do array
        F.col("prob_array")[0].alias("prob_empate"),
        F.col("prob_array")[1].alias("prob_vitoria_mandante"),
        F.col("prob_array")[2].alias("prob_vitoria_visitante"),
        
        # Calcular pontos esperados
        # Mandante: (prob_vitoria * 3) + (prob_empate * 1) + (prob_derrota * 0)
        (F.col("prob_array")[1] * 3 + F.col("prob_array")[0] * 1).alias("pontos_esperados_mandante"),
        # Visitante: (prob_vitoria * 3) + (prob_empate * 1) + (prob_derrota * 0)
        (F.col("prob_array")[2] * 3 + F.col("prob_array")[0] * 1).alias("pontos_esperados_visitante")
    )

df_pontos_esperados = calcular_pontos_esperados(df_previsoes_completas)

print("\n📊 Pontos esperados por partida:")
display(df_pontos_esperados.limit(10))

# Agregar pontos esperados por time (com aliases para evitar ambiguidade)
df_pontos_esperados_mandante = df_pontos_esperados.groupBy("mandante_id") \
    .agg(
        F.sum("pontos_esperados_mandante").alias("pontos_esperados_m"),
        F.count("*").alias("jogos_restantes_m")
    ) \
    .withColumnRenamed("mandante_id", "clube_id")

df_pontos_esperados_visitante = df_pontos_esperados.groupBy("visitante_id") \
    .agg(
        F.sum("pontos_esperados_visitante").alias("pontos_esperados_v"),
        F.count("*").alias("jogos_restantes_v")
    ) \
    .withColumnRenamed("visitante_id", "clube_id")

# Combinar pontos de casa e fora (usando aliases para evitar conflito)
df_mandante_aliased = df_pontos_esperados_mandante.alias("m")
df_visitante_aliased = df_pontos_esperados_visitante.alias("v")

df_pontos_totais_esperados = df_mandante_aliased \
    .join(df_visitante_aliased, F.col("m.clube_id") == F.col("v.clube_id"), "full") \
    .select(
        F.coalesce(F.col("m.clube_id"), F.col("v.clube_id")).alias("clube_id"),
        (F.coalesce(F.col("m.pontos_esperados_m"), F.lit(0)) + 
         F.coalesce(F.col("v.pontos_esperados_v"), F.lit(0))).alias("pontos_esperados_total"),
        (F.coalesce(F.col("m.jogos_restantes_m"), F.lit(0)) + 
         F.coalesce(F.col("v.jogos_restantes_v"), F.lit(0))).alias("jogos_restantes_total")
    )

# Projeção final: pontos atuais + pontos esperados
# A tabela df_classificacao_atual já tem a coluna "nome", então não precisamos fazer join com clubes novamente
df_projecao_final = df_classificacao_atual \
    .join(df_pontos_totais_esperados, "clube_id", "left") \
    .fillna(0, subset=["pontos_esperados_total", "jogos_restantes_total"]) \
    .withColumn("pontos_projetados", F.col("pontos_atuais") + F.col("pontos_esperados_total")) \
    .withColumn("total_jogos_campeonato", F.col("jogos_realizados") + F.col("jogos_restantes_total"))

# Calcular força do calendário (média do Elo dos adversários)
df_adversarios_mandante = df_futuro_features_final \
    .groupBy("mandante_id") \
    .agg(F.avg("elo_visitante_pre_jogo").alias("forca_adversarios")) \
    .withColumnRenamed("mandante_id", "clube_id")

df_adversarios_visitante = df_futuro_features_final \
    .groupBy("visitante_id") \
    .agg(F.avg("elo_mandante_pre_jogo").alias("forca_adversarios")) \
    .withColumnRenamed("visitante_id", "clube_id")

df_forca_calendario = df_adversarios_mandante \
    .join(df_adversarios_visitante, "clube_id", "full") \
    .select(
        F.coalesce(df_adversarios_mandante["clube_id"], df_adversarios_visitante["clube_id"]).alias("clube_id"),
        ((F.coalesce(df_adversarios_mandante["forca_adversarios"], F.lit(0)) + 
          F.coalesce(df_adversarios_visitante["forca_adversarios"], F.lit(0))) / 2).alias("forca_media_adversarios")
    )

# Juntar força do calendário
df_projecao_completa = df_projecao_final \
    .join(df_forca_calendario, "clube_id", "left") \
    .fillna(0, subset=["forca_media_adversarios"])

# Calcular ranking e zona de rebaixamento
window_rank = Window.orderBy(F.desc("pontos_projetados"), F.desc("saldo_gols"))

df_analise_rebaixamento = df_projecao_completa \
    .withColumn("posicao_projetada", F.row_number().over(window_rank)) \
    .withColumn("zona_rebaixamento", F.when(F.col("posicao_projetada") >= 17, "SIM").otherwise("NÃO")) \
    .withColumn("distancia_z4", 
        F.col("pontos_projetados") - F.nth_value("pontos_projetados", 17).over(Window.orderBy(F.desc("pontos_projetados")))
    ) \
    .withColumn("risco_rebaixamento",
        F.when(F.col("posicao_projetada") >= 17, "ALTO")
         .when(F.col("posicao_projetada") >= 14, "MÉDIO")
         .when(F.col("posicao_projetada") >= 11, "BAIXO")
         .otherwise("MÍNIMO")
    )

# Adicionar métrica de dificuldade do calendário
df_analise_final = df_analise_rebaixamento \
    .withColumn("dificuldade_calendario",
        F.when(F.col("forca_media_adversarios") > 1520, "MUITO DIFÍCIL")
         .when(F.col("forca_media_adversarios") > 1480, "DIFÍCIL")
         .when(F.col("forca_media_adversarios") > 1450, "MÉDIO")
         .otherwise("FAVORÁVEL")
    )

# Salvar tabela completa de análise
df_analise_final.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("diamond.analise_rebaixamento")

print("\n✅ Análise de rebaixamento salva em 'diamond.analise_rebaixamento'")

# ==============================================================================
# VISUALIZAÇÃO FINAL - ANÁLISE DE REBAIXAMENTO (USANDO CLUBE_ID)
# ==============================================================================
print("\n" + "="*80)
print("⚠️ ANÁLISE DE RISCO DE REBAIXAMENTO")
print("="*80)

# IMPORTANTE: Trabalhar APENAS com clube_id para evitar confusão
df_visualizacao = df_analise_final.select(
    "posicao_projetada",
    "clube_id",
    "nome",
    "pontos_atuais",
    "jogos_realizados",
    "jogos_restantes_total",
    F.round("pontos_esperados_total", 2).alias("pontos_esperados"),
    F.round("pontos_projetados", 2).alias("pontos_projetados"),
    "saldo_gols",
    "zona_rebaixamento",
    "risco_rebaixamento",
    "dificuldade_calendario",
    F.round("distancia_z4", 2).alias("distancia_z4")
).orderBy("posicao_projetada")

print("\n📊 TABELA DE PROJEÇÃO FINAL COM ANÁLISE DE REBAIXAMENTO:")
print("IMPORTANTE: Use 'clube_id' como referência principal, não o nome!")
display(df_visualizacao)

# Times em risco
print("\n" + "-"*80)
print("🚨 TIMES EM ZONA DE REBAIXAMENTO (Posições 17-20):")
print("-"*80)
df_zona_rebaixamento = df_visualizacao.filter("zona_rebaixamento = 'SIM'")
display(df_zona_rebaixamento)

print("\n" + "-"*80)
print("⚠️ TIMES EM SITUAÇÃO DE ALERTA (Posições 14-16):")
print("-"*80)
df_alerta = df_visualizacao.filter("posicao_projetada BETWEEN 14 AND 16")
display(df_alerta)

# ==============================================================================
# VERIFICAÇÃO: MAPA DE CLUBE_ID → NOME CORRETO
# ==============================================================================
print("\n" + "="*80)
print("🔍 VERIFICAÇÃO: MAPEAMENTO CLUBE_ID → INFORMAÇÕES")
print("="*80)

df_mapa_clubes = spark.table("silver.clubes").select(
    "clube_id",
    "nome",
    "abreviacao",
    "nome_completo"
).orderBy("clube_id")

print("\n📋 Todos os clubes cadastrados no sistema:")
display(df_mapa_clubes)

print("\n" + "-"*80)
print("🎯 CLUBES NA ANÁLISE DE REBAIXAMENTO (clube_id):")
print("-"*80)

# Usar aliases para evitar ambiguidade no join
df_visualizacao_aliased = df_visualizacao.alias("viz")
df_mapa_clubes_aliased = df_mapa_clubes.alias("clubes")

df_ids_analise = df_visualizacao_aliased \
    .join(df_mapa_clubes_aliased, F.col("viz.clube_id") == F.col("clubes.clube_id"), "left") \
    .select(
        F.col("viz.clube_id"),
        F.col("viz.nome").alias("nome_api"),
        F.col("clubes.nome_completo"),
        F.col("clubes.abreviacao"),
        F.col("viz.posicao_projetada"),
        F.round(F.col("viz.pontos_projetados"), 2).alias("pontos_projetados")
    ).orderBy("posicao_projetada")

display(df_ids_analise)

# Salvar previsões das próximas partidas também
df_resultado_final = df_futuro_features_final \
    .join(previsoes_texto.select("partida_id", "previsao_texto", "probability"), "partida_id") \
    .join(df_clubes.alias("mandante"), F.col("mandante_id") == F.col("mandante.clube_id")) \
    .join(df_clubes.alias("visitante"), F.col("visitante_id") == F.col("visitante.clube_id")) \
    .withColumn("competicao", F.lit("Brasileirão Série A")) \
    .select(
        "competicao",
        F.date_format("data_partida", "dd/MM/yyyy HH:mm").alias("data_hora_partida"),
        F.col("mandante.nome").alias("time_mandante"),
        F.col("visitante.nome").alias("time_visitante"),
        F.col("previsao_texto").alias("previsao"),
        F.col("probability").alias("confianca_probabilidades")
    ).orderBy("data_hora_partida")

df_resultado_final.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("diamond.previsoes_proximas_partidas")

print("\n✅ Previsões das próximas partidas salvas em 'diamond.previsoes_proximas_partidas'")

print("\n" + "="*80)
print("📈 MÉTRICAS CHAVE DA ANÁLISE (POR CLUBE_ID):")
print("="*80)

# Calcular algumas estatísticas
stats = df_analise_final.select(
    F.avg("pontos_projetados").alias("media_pontos"),
    F.min("pontos_projetados").alias("min_pontos"),
    F.max("pontos_projetados").alias("max_pontos")
).collect()[0]

linha_corte = df_analise_final.filter("posicao_projetada = 17").select("clube_id", "pontos_projetados").collect()
if linha_corte:
    clube_id_z4 = linha_corte[0]["clube_id"]
    pontos_z4 = linha_corte[0]["pontos_projetados"]
    print(f"""
📊 Estatísticas do Campeonato:
   • Média de pontos projetados: {stats['media_pontos']:.2f}
   • Menor pontuação projetada: {stats['min_pontos']:.2f}
   • Maior pontuação projetada: {stats['max_pontos']:.2f}
   • Pontos projetados do 17º colocado (Z4): {pontos_z4:.2f}
   • Clube ID do 17º colocado: {clube_id_z4}
    """)

# Análise detalhada dos times em risco (com clube_id)
print("\n" + "="*80)
print("🔍 ANÁLISE DETALHADA - TIMES EM RISCO (clube_id):")
print("="*80)

df_analise_risco = df_analise_final.filter("posicao_projetada >= 14") \
    .select(
        "posicao_projetada",
        "clube_id",
        "nome",
        "pontos_atuais",
        "jogos_restantes_total",
        F.round("pontos_esperados_total", 2).alias("pts_esperados"),
        F.round("pontos_projetados", 2).alias("pts_projetados"),
        F.round("distancia_z4", 2).alias("dist_z4"),
        "risco_rebaixamento",
        "dificuldade_calendario"
    ).orderBy("posicao_projetada")

display(df_analise_risco)

# Analisar próximos jogos dos times em risco
print("\n" + "="*80)
print("⚽ PRÓXIMOS JOGOS DOS TIMES EM RISCO:")
print("="*80)

# Pegar IDs dos times em risco
ids_risco = [row["clube_id"] for row in df_analise_risco.collect()]

df_jogos_risco = df_futuro_features_final \
    .filter(F.col("mandante_id").isin(ids_risco) | F.col("visitante_id").isin(ids_risco)) \
    .select(
        "rodada",
        "mandante_id",
        "visitante_id",
        F.date_format("data_partida", "dd/MM/yyyy").alias("data")
    ).orderBy("rodada")

print("\nPróximas partidas envolvendo times em risco de rebaixamento:")
display(df_jogos_risco.limit(20))

print("\n💡 COMO INTERPRETAR:")
print("""
1. POSIÇÃO PROJETADA: Classificação final estimada baseada nas probabilidades
2. PONTOS ESPERADOS: Soma ponderada das probabilidades x pontos possíveis
3. DISTÂNCIA Z4: Diferença de pontos para a zona de rebaixamento (negativo = abaixo da linha)
4. RISCO DE REBAIXAMENTO: 
   • ALTO: Posições 17-20 (zona de rebaixamento)
   • MÉDIO: Posições 14-16 (próximos à zona)
   • BAIXO: Posições 11-13 (ainda com risco matemático)
   • MÍNIMO: Posições 1-10 (praticamente livres)
5. DIFICULDADE DO CALENDÁRIO: Força média dos adversários restantes
""")

print("\n" + "="*80)
print("✅✅✅ PIPELINE DE ANÁLISE DE REBAIXAMENTO CONCLUÍDO! ✅✅✅")
print("="*80)

print("\n📋 CONSULTAS DISPONÍVEIS:")
print("   • SELECT * FROM diamond.analise_rebaixamento")
print("   • SELECT * FROM diamond.previsoes_proximas_partidas")
print("\n" + "="*80)