In [None]:
!pip install pandas numpy scikit-learn joblib tdqm mlflow

In [1]:
# -*- coding: utf-8 -*-
import os
import numpy as np
import pandas as pd
from datetime import datetime

from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, r2_score
import joblib
from tqdm import tqdm

DATA_PATH = "minecraft_servidores_features.csv"
MODEL_DIR = "models"; os.makedirs(MODEL_DIR, exist_ok=True)

# ============================
# 1) Carregar e filtrar um servidor
# ============================
df = pd.read_csv(DATA_PATH)

servidor_escolhido = df['ip'].value_counts().index[0]
df = df[df['ip'] == servidor_escolhido].copy()

# ============================
# 2) Selecionar features/target
# ============================
features = ['hora', 'final_de_semana', 'media_movel_10', 'proporcao_rede', 'pct_var_jogadores']
target = 'playerCount'

# Monte um dataframe só com o que interessa
df_model = df[features + [target]].copy()

# ============================
# 3) Saneamento em X e Y
# ============================
# Tipos numéricos
for c in df_model.columns:
    df_model[c] = pd.to_numeric(df_model[c], errors='coerce')

# Substituir Inf/-Inf por NaN em TUDO
df_model = df_model.replace([np.inf, -np.inf], np.nan)

# Winsorizar apenas pct_var_jogadores (cap 1% e 99%) para reduzir extremos
if 'pct_var_jogadores' in df_model.columns:
    p1, p99 = np.nanpercentile(df_model['pct_var_jogadores'], [1, 99])
    df_model['pct_var_jogadores'] = df_model['pct_var_jogadores'].clip(lower=p1, upper=p99)

# !! Remover linhas com y (target) NaN
n_total = len(df_model)
df_model = df_model.dropna(subset=[target])
n_drop_y = n_total - len(df_model)

# Se quiser, pode também descartar linhas com TODAS as features faltando:
# df_model = df_model.dropna(how="all", subset=features)

print(f"Linhas removidas por y NaN: {n_drop_y}")

X = df_model[features].copy()
y = df_model[target].astype(float)

# ============================
# 4) Split
# ============================
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# ============================
# 5) Pipelines (imputação + escala)
# ============================
num_features = features

preprocess_linear = ColumnTransformer(
    transformers=[
        ("num", Pipeline([
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler())
        ]), num_features)
    ],
    remainder="drop"
)

preprocess_rf = ColumnTransformer(
    transformers=[
        ("num", SimpleImputer(strategy="median"), num_features)
    ],
    remainder="drop"
)

modelos = {
    "LinearRegression": Pipeline([
        ("prep", preprocess_linear),
        ("est", LinearRegression())
    ]),
    "RandomForest": Pipeline([
        ("prep", preprocess_rf),
        ("est", RandomForestRegressor(
            n_estimators=200,
            random_state=42,
            n_jobs=-1
        ))
    ])
}

# ============================
# 6) Treinar
# ============================
print("\nTreinando modelos...")
for nome, modelo in tqdm(modelos.items()):
    modelo.fit(X_train, y_train)

# ============================
# 7) Avaliar
# ============================
def avaliar(nome, modelo, X, y):
    pred = modelo.predict(X)
    mae = mean_absolute_error(y, pred)
    r2 = r2_score(y, pred)
    print(f"{nome} -> MAE: {mae:.2f} | R²: {r2:.4f}")
    return mae, r2

print("\nAvaliação (teste):")
metricas = {nome: avaliar(nome, mdl, X_test, y_test) for nome, mdl in modelos.items()}

# ============================
# 8) Salvar melhor modelo + log
# ============================
melhor = max(metricas, key=lambda k: metricas[k][1])
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
path_model = os.path.join(MODEL_DIR, f"{melhor}_{ts}.joblib")
joblib.dump(modelos[melhor], path_model)

path_log = os.path.join(MODEL_DIR, f"report_{ts}.txt")
with open(path_log, "w", encoding="utf-8") as f:
    f.write("TREINO REGRESSÃO – Previsão de jogadores por horário\n")
    f.write(f"Servidor: {servidor_escolhido}\n")
    f.write(f"Linhas removidas por y NaN: {n_drop_y}\n")
    f.write("\nMétricas (teste):\n")
    for k,(mae,r2) in metricas.items():
        f.write(f"{k}: MAE={mae:.2f} | R2={r2:.4f}\n")
    f.write(f"\nMelhor modelo: {melhor}\nSalvo em: {path_model}\n")

print(f"\nMelhor modelo: {melhor}")
print(f"Modelo salvo em: {path_model}")
print(f"Relatório salvo em: {path_log}")

# ============================
# 9) Exemplos de previsão
# ============================
print("\nExemplos de previsão (primeiros 5 do teste):")
best = modelos[melhor]
pred = best.predict(X_test.iloc[:5])
for i,(real,prev) in enumerate(zip(y_test.iloc[:5].values, pred), start=1):
    print(f"{i:02d}) Real={real:.0f} | Previsto={prev:.0f}")


Linhas removidas por y NaN: 128

Treinando modelos...


100%|██████████| 2/2 [00:02<00:00,  1.39s/it]


Avaliação (teste):
LinearRegression -> MAE: 227.97 | R²: 0.9973
RandomForest -> MAE: 41.72 | R²: 0.9928

Melhor modelo: LinearRegression
Modelo salvo em: models/LinearRegression_20250831_192911.joblib
Relatório salvo em: models/report_20250831_192911.txt

Exemplos de previsão (primeiros 5 do teste):
01) Real=94478 | Previsto=94393
02) Real=77697 | Previsto=77913
03) Real=97386 | Previsto=97507
04) Real=84283 | Previsto=84179
05) Real=70784 | Previsto=70536





In [5]:
# -*- coding: utf-8 -*-
import os
import joblib
import logging
from typing import Dict, Any
import mlflow
import mlflow.sklearn

logger = logging.getLogger(__name__)

def publish_to_mlflow(model, params: Dict[str, Any], uri: str) -> str:
    """
    Publica o modelo no MLflow sem usar Model Registry (apenas artifacts).
    1. Salva modelo localmente como .joblib
    2. Loga no MLflow como artifact
    """
    # Tracking + experimento
    mlflow.set_tracking_uri(uri)
    mlflow.set_experiment(params.get("mlflow_experiment", "default"))

    with mlflow.start_run() as run:
        run_id = run.info.run_id

        # --- 1) salvar modelo local ---
        os.makedirs("mlruns_local", exist_ok=True)
        local_model_path = os.path.join("mlruns_local", "model.joblib")
        joblib.dump(model, local_model_path)

        # --- 2) logar como artifact no MLflow ---
        mlflow.log_artifact(local_model_path, artifact_path="model")

        # --- 3) logar parâmetros simples ---
        mlflow.log_params({
            k: v for k, v in params.items() if isinstance(v, (str, int, float, bool))
        })

        # Caminho no MLflow
        model_uri = f"runs:/{run_id}/model"
        logger.info("✅ Modelo logado como artifact em: %s", model_uri)

        print(f"🚀 Modelo salvo no MLflow em: {model_uri}")
        print(f"🔗 Veja em: {uri}/#/experiments/{run.info.experiment_id}/runs/{run_id}")
        return model_uri


# ======================
# Exemplo de uso
# ======================
if __name__ == "__main__":
    import joblib

    # Carrega modelo treinado (exemplo)
    model = joblib.load("models/LinearRegression_20250831_190908.joblib")

    params = {"mlflow_experiment": "default"}
    uri = "http://192.168.0.43:3000"

    publish_to_mlflow(model, params, uri)


2025/08/31 19:34:49 INFO mlflow.tracking.fluent: Experiment with name 'default' does not exist. Creating a new experiment.


🚀 Modelo salvo no MLflow em: runs:/9195b7d246eb418b800f5164fbad3010/model
🔗 Veja em: http://192.168.0.43:3000/#/experiments/1/runs/9195b7d246eb418b800f5164fbad3010
🏃 View run orderly-cow-199 at: http://192.168.0.43:3000/#/experiments/1/runs/9195b7d246eb418b800f5164fbad3010
🧪 View experiment at: http://192.168.0.43:3000/#/experiments/1


In [6]:
model = joblib.load("models/LinearRegression_20250831_190908.joblib")
params = {"mlflow_experiment": "default"}
uri = "http://192.168.0.122:3000"
publish_to_mlflow(model, params, uri)

🏃 View run defiant-toad-313 at: http://192.168.0.122:3000/#/experiments/1/runs/5dce07826a0241cd9732c6884000b657
🧪 View experiment at: http://192.168.0.122:3000/#/experiments/1


PermissionError: [Errno 13] Permission denied: '/home/serverlab'

In [None]:
# avaliar_modelo_saneado.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import joblib
import os

# -------------------------
# Configurações
# -------------------------
MODEL_PATH = "models/LinearRegression_20250828_150613.joblib"
DATA_PATH = "minecraft_servidores_features.csv"
OUTPUT_DIR = "avaliacao"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# -------------------------
# 1) Carregar modelo e dados
# -------------------------
print("Carregando modelo e dados...")
model = joblib.load(MODEL_PATH)
df = pd.read_csv(DATA_PATH)

# Servidor alvo (mesma lógica dos treinos anteriores)
servidor = df['ip'].value_counts().index[0]
df_server = df[df['ip'] == servidor].copy()

# -------------------------
# 2) Preparar features/target com saneamento
# -------------------------
features = ['hora', 'final_de_semana', 'media_movel_10', 'proporcao_rede', 'pct_var_jogadores']
target = 'playerCount'

# garantir numérico
for col in features + [target]:
    df_server[col] = pd.to_numeric(df_server[col], errors='coerce')

# trocar ±inf por NaN
df_server.replace([np.inf, -np.inf], np.nan, inplace=True)

# imputar faltantes nas features com mediana (o seu modelo salvo já tem imputação,
# mas imputar aqui evita problemas se houver ±inf que não viraram NaN dentro do pipeline)
for col in features:
    med = df_server[col].median()
    df_server[col] = df_server[col].fillna(med)

# remover linhas sem y
df_server = df_server.dropna(subset=[target])

# manter timestamp se existir (para gráfico temporal)
if 'timestamp' in df_server.columns:
    # se timestamp numérico em ms
    if np.issubdtype(df_server['timestamp'].dtype, np.number):
        df_server['timestamp'] = pd.to_datetime(df_server['timestamp'], unit='ms', errors='coerce')

X = df_server[features]
y = df_server[target].astype(float)

# -------------------------
# 3) Prever e calcular métricas
# -------------------------
y_pred = model.predict(X)

mae  = mean_absolute_error(y, y_pred)
rmse = np.sqrt(((y - y_pred) ** 2).mean())
r2   = r2_score(y, y_pred)

print("\n==== Avaliação do Modelo ====")
print(f"Servidor: {servidor}")
print(f"MAE : {mae:.2f}")
print(f"RMSE: {rmse:.2f}")
print(f"R²  : {r2:.4f}")

# -------------------------
# 4) Gráfico: Real vs Previsto
# -------------------------
plt.figure(figsize=(10,6))
plt.scatter(y, y_pred, alpha=0.5)
mn, mx = float(min(y.min(), y_pred.min())), float(max(y.max(), y_pred.max()))
plt.plot([mn, mx], [mn, mx], linestyle='--')
plt.xlabel("Real")
plt.ylabel("Previsto")
plt.title(f"Real vs Previsto – {servidor}")
plt.grid(True)
plt.savefig(os.path.join(OUTPUT_DIR, "01_real_vs_previsto.png"), dpi=300)
plt.close()

# -------------------------
# 5) Gráfico: Distribuição dos Erros
# -------------------------
erro_abs = np.abs(y - y_pred)
plt.figure(figsize=(10,6))
plt.hist(erro_abs, bins=50)
plt.title("Distribuição do Erro Absoluto")
plt.xlabel("Erro Absoluto")
plt.ylabel("Frequência")
plt.grid(True)
plt.savefig(os.path.join(OUTPUT_DIR, "02_distribuicao_erro.png"), dpi=300)
plt.close()

# -------------------------
# 6) Gráfico: Erro por Hora
# -------------------------
# se não existir 'hora' por algum motivo, recalcula a partir do timestamp
if 'hora' not in df_server.columns and 'timestamp' in df_server.columns:
    df_server['hora'] = df_server['timestamp'].dt.hour

df_server['previsto'] = y_pred
df_server['erro_abs'] = erro_abs

erro_por_hora = df_server.groupby('hora')['erro_abs'].mean()
plt.figure(figsize=(10,6))
erro_por_hora.plot(kind='bar')
plt.title("Erro Absoluto Médio por Hora")
plt.xlabel("Hora do Dia")
plt.ylabel("Erro Absoluto Médio")
plt.grid(axis='y')
plt.savefig(os.path.join(OUTPUT_DIR, "03_erro_por_hora.png"), dpi=300)
plt.close()

# -------------------------
# 7) Gráfico: Resíduo vs Previsto (diagnóstico)
# -------------------------
residuo = y - y_pred
plt.figure(figsize=(10,6))
plt.scatter(y_pred, residuo, alpha=0.4)
plt.axhline(0, linestyle='--')
plt.xlabel("Previsto")
plt.ylabel("Resíduo (Real - Previsto)")
plt.title("Resíduo vs. Previsto")
plt.grid(True)
plt.savefig(os.path.join(OUTPUT_DIR, "04_residuo_vs_previsto.png"), dpi=300)
plt.close()

# -------------------------
# 8) Gráfico temporal: Real vs Previsto (se houver timestamp)
# -------------------------
if 'timestamp' in df_server.columns:
    df_plot = df_server[['timestamp', target, 'previsto']].dropna().sort_values('timestamp')
    plt.figure(figsize=(12,6))
    plt.plot(df_plot['timestamp'], df_plot[target], label='Real')
    plt.plot(df_plot['timestamp'], df_plot['previsto'], label='Previsto', alpha=0.8)
    plt.title(f"Série Temporal – Real vs Previsto ({servidor})")
    plt.xlabel("Tempo")
    plt.ylabel("Jogadores")
    plt.legend()
    plt.grid(True)
    plt.savefig(os.path.join(OUTPUT_DIR, "05_temporal_real_vs_previsto.png"), dpi=300)
    plt.close()

# -------------------------
# 9) Relatório txt rápido
# -------------------------
status = "BOM" if r2 >= 0.95 and mae <= 0.05 * y.mean() else "OK"
with open(os.path.join(OUTPUT_DIR, "report_metrics.txt"), "w", encoding="utf-8") as f:
    f.write("AVALIAÇÃO – Previsão de jogadores por horário\n")
    f.write(f"Servidor: {servidor}\n\n")
    f.write(f"MAE : {mae:.2f}\n")
    f.write(f"RMSE: {rmse:.2f}\n")
    f.write(f"R²  : {r2:.4f}\n")
    f.write(f"\nStatus: {status} (critério: R² ≥ 0.95 e MAE ≤ 5% da média de players)\n")

print(f"\nGráficos e relatório salvos em: {os.path.abspath(OUTPUT_DIR)}")


In [None]:
!curl http://localhost:8000/health


In [None]:
!curl http://localhost:8000/models


In [None]:
!curl -X POST http://localhost:8000/reload


In [None]:
# -*- coding: utf-8 -*-
import requests
import math
from copy import deepcopy
from typing import Dict, List, Any

API_URL = "http://localhost:8000/predict"

# thresholds configuráveis (ajuste conforme sua realidade)
LOAD_THRESHOLDS = {
    "low": 30000,      # < 30k: baixo
    "medium": 60000,   # 30k-60k: médio
    "high": 90000      # 60k-90k: alto ; >=90k: crítico
}

FEATURES = ["hora", "final_de_semana", "media_movel_10", "proporcao_rede", "pct_var_jogadores"]

def sanitize_instance(x: Dict[str, Any]) -> Dict[str, float]:
    """Coerção simples para numérico + defaults prudentes, sem mudar a API."""
    y = {}
    for f in FEATURES:
        v = x.get(f, None)
        try:
            v = float(v)
        except (TypeError, ValueError):
            v = None
        y[f] = v
    # preenchimentos mínimos (não-intrusivos)
    if y["hora"] is None: y["hora"] = 18.0
    if y["final_de_semana"] is None: y["final_de_semana"] = 0.0
    if y["media_movel_10"] is None: y["media_movel_10"] = 50000.0
    if y["proporcao_rede"] is None: y["proporcao_rede"] = 0.30
    if y["pct_var_jogadores"] is None or math.isinf(y["pct_var_jogadores"]):
        y["pct_var_jogadores"] = 0.0
    return y

def label_load(pred: float) -> str:
    if pred < LOAD_THRESHOLDS["low"]:
        return "baixo"
    if pred < LOAD_THRESHOLDS["medium"]:
        return "médio"
    if pred < LOAD_THRESHOLDS["high"]:
        return "alto"
    return "crítico"

def action_for_load(level: str) -> str:
    if level == "baixo":
        return "janela ok para manutenção leve; recursos podem ser reduzidos"
    if level == "médio":
        return "monitorar; ajustar autoscaling conforme tendência"
    if level == "alto":
        return "preparar autoscaling; adiar manutenção; reforçar capacidade"
    return "alerta de sobrecarga; ativar mitigação, limitar eventos e reforçar capacidade"

def deltas_report(baseline: float, scenarios: List[Dict[str, Any]], preds: List[float]) -> List[str]:
    lines = []
    # identificar maior variação absoluta
    deltas = [(i, p - baseline) for i, p in enumerate(preds)]
    if deltas:
        i_max = max(range(len(deltas)), key=lambda i: abs(deltas[i][1]))
        change = deltas[i_max][1]
        s = scenarios[i_max]
        driver = ", ".join([f"{k}→{s[k]}" for k in FEATURES if s[k] != None])
        lines.append(f"> Maior impacto do conjunto testado: Δ≈{change:.0f} jogadores com {driver}")
    return lines

def generate_scenarios(baseline: Dict[str, float]) -> List[Dict[str, float]]:
    """
    Gera cenários one-at-a-time ao redor do baseline p/ ver sensibilidade.
    - Hora: -3, -1, +1, +3
    - Final de semana: 0 ↔ 1
    - Média móvel 10: ±10%
    - Proporção rede: ±0.05
    - pct_var: ±1.0
    """
    b = baseline
    sc = []

    # hora
    for dh in [-3, -1, 1, 3]:
        s = deepcopy(b); s["hora"] = max(0, min(23, s["hora"] + dh)); sc.append(s)

    # final de semana toggle
    s = deepcopy(b); s["final_de_semana"] = 1.0 - s["final_de_semana"]; sc.append(s)

    # media_movel_10 ±10%
    for mult in [0.9, 1.1]:
        s = deepcopy(b); s["media_movel_10"] = max(0.0, s["media_movel_10"] * mult); sc.append(s)

    # proporcao_rede ±0.05 (clamp 0..1)
    for dv in [-0.05, 0.05]:
        s = deepcopy(b); s["proporcao_rede"] = max(0.0, min(1.0, s["proporcao_rede"] + dv)); sc.append(s)

    # pct_var ±1.0
    for dv in [-1.0, 1.0]:
        s = deepcopy(b); s["pct_var_jogadores"] = s["pct_var_jogadores"] + dv; sc.append(s)

    return sc

def call_api(instances: List[Dict[str, float]]) -> List[float]:
    payload = {"instances": instances}
    r = requests.post(API_URL, json=payload, timeout=20)
    if r.status_code != 200:
        raise RuntimeError(f"API error {r.status_code}: {r.text}")
    data = r.json()
    return data.get("predictions", [])

def pretty_instance(x: Dict[str, float]) -> str:
    return (f"hora={int(x['hora'])}, fimsem={int(x['final_de_semana'])}, "
            f"mm10={x['media_movel_10']:.0f}, propRede={x['proporcao_rede']:.2f}, "
            f"pctVar={x['pct_var_jogadores']:.2f}")

def analyze_baseline(name: str, baseline_raw: Dict[str, Any]) -> None:
    print("\n" + "="*70)
    print(f"Baseline: {name}")
    base = sanitize_instance(baseline_raw)
    print("Entrada:", pretty_instance(base))

    # batch: baseline + cenários
    scenarios = generate_scenarios(base)
    batch = [base] + scenarios

    try:
        preds = call_api(batch)
    except Exception as e:
        print("Falha ao chamar API:", e)
        return

    base_pred = preds[0]
    base_level = label_load(base_pred)
    print(f"Previsão baseline: {base_pred:.0f} jogadores → carga {base_level.upper()} → {action_for_load(base_level)}")

    # análise “what-if”
    scen_preds = preds[1:]
    print("\nCenários (what-if):")
    for s, p in zip(scenarios, scen_preds):
        lvl = label_load(p)
        delta = p - base_pred
        sign = "+" if delta >= 0 else ""
        print(f"- {pretty_instance(s)} => {p:.0f} jog. ({sign}{delta:.0f}) → {lvl.upper()}")

    # maior impacto
    lines = deltas_report(base_pred, scenarios, scen_preds)
    for line in lines:
        print(line)

    # “explicabilidade útil” (sensibilidade aproximada)
    # aproximação do efeito marginal: média de |Δ| por tipo de mudança
    buckets = {
        "hora": [],
        "final_de_semana": [],
        "media_movel_10": [],
        "proporcao_rede": [],
        "pct_var_jogadores": [],
    }
    for s, p in zip(scenarios, scen_preds):
        # detecta qual feature mudou (one-at-a-time)
        changed = [f for f in FEATURES if abs(s[f] - base[f]) > 1e-9]
        if len(changed) == 1:
            buckets[changed[0]].append(abs(p - base_pred))

    print("\nSensibilidade média (|Δ jogadores| por feature):")
    for k, vals in buckets.items():
        if vals:
            print(f"- {k}: ~{sum(vals)/len(vals):.0f}")
        else:
            print(f"- {k}: n/d")

if __name__ == "__main__":
    # dois baselines (os que você usou no curl)
    baseline_1 = {"hora": 10, "final_de_semana": 0, "media_movel_10": 60000, "proporcao_rede": 0.30, "pct_var_jogadores": -0.8}
    baseline_2 = {"hora": 21, "final_de_semana": 1, "media_movel_10": 90000, "proporcao_rede": 0.38, "pct_var_jogadores": 2.1}

    analyze_baseline("Cenário A (dia útil, manhã)", baseline_1)
    analyze_baseline("Cenário B (fim de semana, noite)", baseline_2)


In [None]:
# -*- coding: utf-8 -*-
import os
import math
import requests
from copy import deepcopy
from typing import Dict, List, Any, Tuple

from tqdm import tqdm
from rich.console import Console, Group
from rich.table import Table
from rich.panel import Panel
from rich.columns import Columns
from rich.text import Text
from rich import box

API_URL = os.environ.get("API_URL", "http://localhost:8000/predict")

# limiares de carga (ajuste ao seu contexto)
LOAD_THRESHOLDS = {"low": 30000, "medium": 60000, "high": 90000}

FEATURES = ["hora", "final_de_semana", "media_movel_10", "proporcao_rede", "pct_var_jogadores"]

FEATURE_LABELS = {
    "hora": "Hora do dia (0–23)",
    "final_de_semana": "É fim de semana? (0=Não, 1=Sim)",
    "media_movel_10": "Média móvel (10 janelas)",
    "proporcao_rede": "Proporção na rede (0–1)",
    "pct_var_jogadores": "% variação de jogadores",
}

CLUSTERS = {
    "Cluster A – AM dias úteis": {
        "baseline": {"hora": 10, "final_de_semana": 0, "media_movel_10": 60000, "proporcao_rede": 0.30, "pct_var_jogadores": -0.8},
        "servers": ["srv-A1", "srv-A2", "srv-A3", "srv-A4"],
    },
    "Cluster B – PM dias úteis": {
        "baseline": {"hora": 18, "final_de_semana": 0, "media_movel_10": 70000, "proporcao_rede": 0.33, "pct_var_jogadores": 0.5},
        "servers": ["srv-B1", "srv-B2", "srv-B3", "srv-B4"],
    },
    "Cluster C – Noite fim semana": {
        "baseline": {"hora": 21, "final_de_semana": 1, "media_movel_10": 90000, "proporcao_rede": 0.38, "pct_var_jogadores": 2.1},
        "servers": ["srv-C1", "srv-C2", "srv-C3", "srv-C4"],
    },
    "Cluster D – Madrugada global": {
        "baseline": {"hora": 3, "final_de_semana": 0, "media_movel_10": 40000, "proporcao_rede": 0.25, "pct_var_jogadores": -0.4},
        "servers": ["srv-D1", "srv-D2", "srv-D3", "srv-D4"],
    },
}

console = Console()

# ---------- Helpers ----------
def fmt_int(x: float) -> str:
    """inteiro com separador de milhar 12 345"""
    try:
        return f"{int(round(x)):,}".replace(",", " ")
    except Exception:
        return str(x)

def fmt_delta(x: float) -> str:
    s = f"{x:+.0f}"
    # adiciona separador em deltas grandes
    try:
        return f"{int(round(x)):+,}".replace(",", " ")
    except Exception:
        return s

def fmt_ratio(x: float) -> str:
    return f"{x:.2f}"

def sanitize_instance(x: Dict[str, Any]) -> Dict[str, float]:
    y = {}
    for f in FEATURES:
        v = x.get(f, None)
        try: v = float(v)
        except (TypeError, ValueError): v = None
        y[f] = v
    if y["hora"] is None: y["hora"] = 18.0
    y["hora"] = max(0.0, min(23.0, y["hora"]))
    if y["final_de_semana"] is None: y["final_de_semana"] = 0.0
    y["final_de_semana"] = 1.0 if y["final_de_semana"] >= 0.5 else 0.0
    if y["media_movel_10"] is None: y["media_movel_10"] = 50_000.0
    if y["proporcao_rede"] is None: y["proporcao_rede"] = 0.30
    y["proporcao_rede"] = max(0.0, min(1.0, y["proporcao_rede"]))
    if y["pct_var_jogadores"] is None or math.isinf(y["pct_var_jogadores"]):
        y["pct_var_jogadores"] = 0.0
    return y

def generate_scenarios(b: Dict[str, float]) -> List[Dict[str, float]]:
    sc: List[Dict[str, float]] = []
    for dh in [-3, -1, 1, 3]:
        s = deepcopy(b); s["hora"] = max(0.0, min(23.0, s["hora"] + dh)); sc.append(s)
    s = deepcopy(b); s["final_de_semana"] = 1.0 - s["final_de_semana"]; sc.append(s)
    for mult in [0.9, 1.1]:
        s = deepcopy(b); s["media_movel_10"] = max(0.0, s["media_movel_10"] * mult); sc.append(s)
    for dv in [-0.05, 0.05]:
        s = deepcopy(b); s["proporcao_rede"] = max(0.0, min(1.0, s["proporcao_rede"] + dv)); sc.append(s)
    for dv in [-1.0, 1.0]:
        s = deepcopy(b); s["pct_var_jogadores"] = s["pct_var_jogadores"] + dv; sc.append(s)
    return sc

def call_api(instances: List[Dict[str, float]]) -> List[float]:
    r = requests.post(API_URL, json={"instances": instances}, timeout=30)
    if r.status_code != 200:
        raise RuntimeError(f"API error {r.status_code}: {r.text}")
    return r.json().get("predictions", [])

def label_load(pred: float) -> str:
    if pred < LOAD_THRESHOLDS["low"]: return "baixo"
    if pred < LOAD_THRESHOLDS["medium"]: return "médio"
    if pred < LOAD_THRESHOLDS["high"]: return "alto"
    return "crítico"

def level_colors(level: str) -> Tuple[str, str]:
    if level == "baixo": return "green", "white"
    if level == "médio": return "yellow3", "black"
    if level == "alto": return "orange1", "white"
    return "red", "white"

def action_for_load(level: str) -> str:
    return {
        "baixo": "Janela boa p/ manutenção; reduzir recursos.",
        "médio": "Monitorar; ajustar autoscaling conforme tendência.",
        "alto": "Preparar autoscaling; adiar manutenção; reforçar capacidade.",
        "crítico": "Alerta de sobrecarga; ativar mitigação; limitar eventos."
    }[level]

def sensitivity_bucket(base_pred: float, base: Dict[str, float],
                       scenarios: List[Dict[str, float]], scen_preds: List[float]) -> Dict[str, float]:
    """
    Retorna a média de |Δ jogadores| por feature, quando variamos uma feature por vez.
    Não é 'hora', é 'jogadores' (unidade da previsão).
    """
    buckets = {f: [] for f in FEATURES}
    for s, p in zip(scenarios, scen_preds):
        changed = [f for f in FEATURES if abs(s[f] - base[f]) > 1e-9]
        if len(changed) == 1:
            buckets[changed[0]].append(abs(p - base_pred))
    return {k: (sum(v)/len(v) if v else 0.0) for k, v in buckets.items()}

# ---------- Render ----------
def legend_panel() -> Panel:
    tbl = Table(box=box.SIMPLE)
    tbl.add_column("Coluna", style="bold")
    tbl.add_column("Descrição")
    tbl.add_row("Hora do dia", FEATURE_LABELS["hora"])
    tbl.add_row("Fim de semana", FEATURE_LABELS["final_de_semana"])
    tbl.add_row("Média móvel (10)", FEATURE_LABELS["media_movel_10"])
    tbl.add_row("Proporção na rede", FEATURE_LABELS["proporcao_rede"])
    tbl.add_row("% var. jogadores", FEATURE_LABELS["pct_var_jogadores"])
    note = Text("Obs.: Valores de sensibilidade são a variação média ABSOLUTA de jogadores (|Δ|) ao alterar cada feature isoladamente.", style="italic")
    return Panel(Group(tbl, note), title="Legenda das features", border_style="cyan")

def render_cluster_panel(name: str, base: Dict[str, float], base_pred: float,
                         scen_rows: List[Tuple[Dict[str, float], float, float, str]],
                         sensitivity: Dict[str, float], servers: List[str]) -> Panel:
    level = label_load(base_pred)
    color, _ = level_colors(level)

    header = Panel(
        Text(f"{name}\nBaseline: {fmt_int(base_pred)} jogadores  ({level.upper()})", justify="center", style=f"bold {color}"),
        border_style=color,
    )

    # Tabela legível: cada feature vira uma coluna
    table = Table(box=box.SIMPLE_HEAVY)
    table.add_column("Hora", justify="center")
    table.add_column("Fim de semana", justify="center")
    table.add_column("Média móvel (10)", justify="right")
    table.add_column("Proporção rede", justify="right")
    table.add_column("% var. jog.", justify="right")
    table.add_column("Prev.", justify="right")
    table.add_column("Δ vs base", justify="right")
    table.add_column("Carga", justify="center")

    def row_from_scenario(s: Dict[str, float], p: float, delta: float, lvl: str, baseline=False):
        lvl_color, _ = level_colors(lvl)
        label = Text(lvl.upper(), style=f"bold {lvl_color}")
        table.add_row(
            f"{int(s['hora'])}",
            "Sim" if int(s["final_de_semana"]) == 1 else "Não",
            fmt_int(s['media_movel_10']),
            fmt_ratio(s['proporcao_rede']),
            f"{s['pct_var_jogadores']:.2f}",
            fmt_int(p),
            "—" if baseline else fmt_delta(delta),
            label
        )

    # linha de BASELINE
    row_from_scenario(base, base_pred, 0.0, level, baseline=True)
    # demais cenários
    for s, p, delta, lvl in scen_rows:
        row_from_scenario(s, p, delta, lvl)

    table_panel = Panel(table, title="Cenários (what-if)", border_style=color)

    # Sensibilidade (|Δ| médio por feature)
    sens_tbl = Table(box=box.MINIMAL_DOUBLE_HEAD, title="Sensibilidade média (|Δ jogadores|)")
    sens_tbl.add_column("Feature")
    sens_tbl.add_column("|Δ|", justify="right")
    sens_tbl.add_row("Hora do dia", fmt_int(sensitivity.get('hora', 0.0)))
    sens_tbl.add_row("Fim de semana", fmt_int(sensitivity.get('final_de_semana', 0.0)))
    sens_tbl.add_row("Média móvel (10)", fmt_int(sensitivity.get('media_movel_10', 0.0)))
    sens_tbl.add_row("Proporção na rede", fmt_int(sensitivity.get('proporcao_rede', 0.0)))
    sens_tbl.add_row("% var. jogadores", fmt_int(sensitivity.get('pct_var_jogadores', 0.0)))
    sens_note = Text("Interpretação: quanto maior o |Δ|, maior o impacto daquela feature na previsão (em jogadores).", style="italic")
    sens_panel = Panel(Group(sens_tbl, sens_note), border_style=color)

    # Servidores
    sv_tbl = Table(box=box.MINIMAL, show_edge=False, title="Servidores no cluster")
    for srv in servers:
        sv_tbl.add_row(f"• {srv}")
    sv_panel = Panel(sv_tbl, border_style=color)

    columns = Columns([table_panel, sens_panel, sv_panel], equal=True, expand=True)
    action_panel = Panel(Text(action_for_load(level), style="bold"), title="Ação sugerida", border_style=color)

    return Panel(Group(header, columns, action_panel), border_style=color)

# ---------- Main ----------
def main():
    console.rule("[bold cyan]BI – Previsões por Cluster (Rich + tqdm)")
    console.print(legend_panel())
    console.rule()

    cluster_panels = []
    risk_rank = []

    for cluster_name, cfg in tqdm(CLUSTERS.items(), desc="Processando clusters"):
        base_raw = cfg["baseline"]; servers = cfg.get("servers", [])
        base = sanitize_instance(base_raw)
        scenarios = generate_scenarios(base)

        preds = call_api([base] + scenarios)
        base_pred, scen_preds = preds[0], preds[1:]

        scen_rows = []
        for sc, p in zip(scenarios, scen_preds):
            lvl = label_load(p); delta = p - base_pred
            scen_rows.append((sc, p, delta, lvl))

        sens = sensitivity_bucket(base_pred, base, scenarios, scen_preds)
        cluster_panels.append(render_cluster_panel(cluster_name, base, base_pred, scen_rows, sens, servers))
        risk_rank.append((cluster_name, base_pred, label_load(base_pred)))

    for p in cluster_panels:
        console.print(p)
        console.rule()

    # ranking
    rank = sorted(risk_rank, key=lambda x: x[1], reverse=True)
    rank_table = Table(title="Ranking de risco (baseline)", box=box.SIMPLE_HEAVY)
    rank_table.add_column("#", style="bold")
    rank_table.add_column("Cluster")
    rank_table.add_column("Prev. (jogadores)", justify="right")
    rank_table.add_column("Carga", justify="center")
    for i, (name, pred, lvl) in enumerate(rank, 1):
        lvl_color, _ = level_colors(lvl)
        rank_table.add_row(str(i), name, fmt_int(pred), Text(lvl.upper(), style=f"bold {lvl_color}"))
    console.print(rank_table)
    console.rule("[bold cyan]FIM")

if __name__ == "__main__":
    main()
