<a href="https://colab.research.google.com/github/sergiocostaifes/PPCOMP_DM/blob/main/03_window_5min_base.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# =========================
# 03_window_5min_base.ipynb
# Janelas de 5 minutos (minute_bucket) + agregações básicas + série contínua
# =========================

# ===== Bootstrap padrão PPCOMP_DM (Colab) =====
from google.colab import drive
drive.mount("/content/drive", force_remount=False)

import sys, subprocess, importlib
from pathlib import Path
from importlib.machinery import PathFinder

REPO_DIR = Path("/content/drive/MyDrive/Mestrado/PPCOMP_DM")
GITHUB_REPO = "https://github.com/sergiocostaifes/PPCOMP_DM.git"

if not REPO_DIR.exists():
    REPO_DIR.parent.mkdir(parents=True, exist_ok=True)
    subprocess.run(["git", "clone", GITHUB_REPO, str(REPO_DIR)], check=True)

repo_str = str(REPO_DIR)
if repo_str in sys.path:
    sys.path.remove(repo_str)
sys.path.insert(0, repo_str)

importlib.invalidate_caches()
if PathFinder not in sys.meta_path:
    sys.meta_path.append(PathFinder)

from importlib import reload
import src.paths as _paths
reload(_paths)

from src.paths import RAW_PATH, PROCESSED_PATH, FEATURES_PATH, REPORTS_PATH, ensure_dirs
ensure_dirs()

def log(msg: str) -> None:
    print(f"[03_window_5min_base] {msg}")


# =========================
# Parâmetros
# =========================
BUCKET_SEC = 5 * 60
BUCKET_US = BUCKET_SEC * 1_000_000  # 300_000_000 µs

TOP_EVENTS = ["FAIL", "SCHEDULE", "FINISH", "ENABLE", "LOST", "EVICT", "KILL"]


# =========================
# 1) Leitura do dataset limpo
# =========================
CLEAN_PARQUET = PROCESSED_PATH / "google_trace_clean.parquet"
assert CLEAN_PARQUET.exists(), f"Arquivo não encontrado: {CLEAN_PARQUET}"

import pandas as pd
import numpy as np
import json

df = pd.read_parquet(CLEAN_PARQUET)
log(f"Lido: {CLEAN_PARQUET}")
log(f"Shape: {df.shape} (linhas, colunas)")

# Sanidade mínima
required_cols = ["t_rel_us", "machine_id", "collection_id", "event", "failed"]
missing = [c for c in required_cols if c not in df.columns]
assert not missing, f"Colunas obrigatórias ausentes: {missing}"

df["t_rel_us"] = pd.to_numeric(df["t_rel_us"], errors="coerce")
df = df.dropna(subset=["t_rel_us"]).copy()
df["t_rel_us"] = df["t_rel_us"].astype("int64")


# =========================
# 2) minute_bucket de 5 minutos
# =========================
df["bucket_id"] = (df["t_rel_us"] // BUCKET_US).astype("int64")
df["bucket_start_us"] = (df["bucket_id"] * BUCKET_US).astype("int64")

log(f"bucket_id min..max: {df['bucket_id'].min()}..{df['bucket_id'].max()}")
log(f"Buckets distintos: {df['bucket_id'].nunique()}")


# =========================
# 3) Extrair features simples (resource_request)
#    (sem “engenharia pesada” aqui; isso fica no Notebook 05_feature_engineering)
# =========================
def _to_dict(x):
    if x is None:
        return None
    if isinstance(x, dict):
        return x
    if isinstance(x, str):
        s = x.strip()
        if not s:
            return None
        # tenta JSON primeiro
        try:
            return json.loads(s)
        except Exception:
            # tenta fallback mínimo para strings tipo "{'cpus':..., 'memory':...}"
            try:
                return eval(s, {"__builtins__": {}})
            except Exception:
                return None
    return None

if "resource_request" in df.columns:
    rr = df["resource_request"].map(_to_dict)
    df["req_cpus"] = rr.map(lambda d: d.get("cpus") if isinstance(d, dict) else np.nan)
    df["req_mem"]  = rr.map(lambda d: d.get("memory") if isinstance(d, dict) else np.nan)
else:
    df["req_cpus"] = np.nan
    df["req_mem"]  = np.nan


# =========================
# 4) Agregações por bucket (window_5min_base)
# =========================
base = (
    df.groupby("bucket_id", as_index=False)
      .agg(
          bucket_start_us=("bucket_start_us", "min"),
          n_events=("event", "size"),
          n_failed=("failed", "sum"),
          n_machines=("machine_id", "nunique"),
          n_collections=("collection_id", "nunique"),
          mean_priority=("priority", "mean") if "priority" in df.columns else ("bucket_id", "mean"),
          mean_req_cpus=("req_cpus", "mean"),
          mean_req_mem=("req_mem", "mean"),
      )
)

# event counts (top events) – cria colunas event_<X>_count
evt_counts = (
    df[df["event"].isin(TOP_EVENTS)]
      .groupby(["bucket_id", "event"])
      .size()
      .unstack(fill_value=0)
)

# garante todas as colunas TOP_EVENTS existirem
for ev in TOP_EVENTS:
    if ev not in evt_counts.columns:
        evt_counts[ev] = 0

evt_counts = evt_counts[TOP_EVENTS].reset_index()
evt_counts = evt_counts.rename(columns={ev: f"event_{ev}_count" for ev in TOP_EVENTS})

base = base.merge(evt_counts, on="bucket_id", how="left")

# preenchimento seguro
count_cols = ["n_events", "n_failed", "n_machines", "n_collections"] + [f"event_{ev}_count" for ev in TOP_EVENTS]
for c in count_cols:
    base[c] = base[c].fillna(0).astype("int64")

# ordena
base = base.sort_values("bucket_id").reset_index(drop=True)

log(f"window_5min_base shape: {base.shape}")


# =========================
# 5) Série contínua (reindex) -> window_5min_series
# =========================
bmin = int(base["bucket_id"].min())
bmax = int(base["bucket_id"].max())

full = pd.DataFrame({"bucket_id": np.arange(bmin, bmax + 1, dtype=np.int64)})
full["bucket_start_us"] = (full["bucket_id"] * BUCKET_US).astype(np.int64)

series = full.merge(base, on=["bucket_id", "bucket_start_us"], how="left")

# colunas de contagem: zeros quando bucket não existia no dataset
for c in count_cols:
    series[c] = series[c].fillna(0).astype("int64")

# métricas contínuas: manter NaN (depois, no FE, você decide interpolar, ffill etc.)
for c in ["mean_priority", "mean_req_cpus", "mean_req_mem"]:
    if c in series.columns:
        series[c] = series[c].astype("float64")

log(f"window_5min_series shape: {series.shape} (contínua)")


# =========================
# 6) Persistência
# =========================
BASE_FILE = FEATURES_PATH / "window_5min_base.parquet"
SERIES_FILE = FEATURES_PATH / "window_5min_series.parquet"

base.to_parquet(BASE_FILE, index=False, compression="snappy")
series.to_parquet(SERIES_FILE, index=False, compression="snappy")

log(f"Salvo: {BASE_FILE}")
log(f"Salvo: {SERIES_FILE}")

summary03 = {
    "input_file": str(CLEAN_PARQUET),
    "output_base": str(BASE_FILE),
    "output_series": str(SERIES_FILE),
    "bucket_sec": int(BUCKET_SEC),
    "bucket_us": int(BUCKET_US),
    "rows_in": int(len(df)),
    "base_rows": int(len(base)),
    "series_rows": int(len(series)),
    "bucket_id_min": int(bmin),
    "bucket_id_max": int(bmax),
    "top_events": TOP_EVENTS,
    "columns_base": list(base.columns),
}

summary_file = REPORTS_PATH / "03_window_5min_base_summary.json"
summary_file.write_text(json.dumps(summary03, indent=2, ensure_ascii=False), encoding="utf-8")
log(f"Resumo salvo: {summary_file}")

base.head()