<a href="https://colab.research.google.com/github/nazalan/cyber-blotto-rl/blob/main/Blotto.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

EPSS download

In [None]:
import os, gzip, shutil, calendar, csv
from pathlib import Path
from urllib.request import urlopen, Request

# --- Configuration ---
YEAR  = 2025
MONTH = 8   # 1, 5, and 8 are used in the thesis experiments

BASE_DIR   = Path("/content/august")
EPSS_DIR   = BASE_DIR / "epss"
EPSS_CLEAN = BASE_DIR / "epss_clean"

UA = {"User-Agent": "Mozilla/5.0 (Colab TDK downloader)"}


# --- Directory setup ---
def ensure_dirs():
    """Create working directories if they don’t exist."""
    for d in (EPSS_DIR, EPSS_CLEAN):
        d.mkdir(parents=True, exist_ok=True)


# --- HTTP utilities ---
def http_download(url: str, dest: Path, headers: dict | None = None) -> None:
    """Download a file from a URL with optional headers."""
    req = Request(url, headers=headers or UA)
    with urlopen(req, timeout=120) as resp, open(dest, "wb") as f:
        shutil.copyfileobj(resp, f)


def is_gzip(path: Path) -> bool:
    """Check whether a file is a valid gzip archive."""
    try:
        with open(path, "rb") as fh:
            return fh.read(2) == b"\x1f\x8b"
    except Exception:
        return False


# --- EPSS download ---
def download_epss_month(year: int, month: int):
    """Download daily EPSS CSV files for a given month."""
    days = calendar.monthrange(year, month)[1]
    for day in range(1, days + 1):
        dstr = f"{year:04d}-{month:02d}-{day:02d}"
        url = f"https://epss.empiricalsecurity.com/epss_scores-{dstr}.csv.gz"
        gz_path = EPSS_DIR / f"epss_{dstr}.csv.gz"
        csv_path = EPSS_DIR / f"epss_{dstr}.csv"

        print(f"[EPSS] {url}")
        try:
            http_download(url, gz_path, headers=UA)
        except Exception as e:
            print(f"  ! Download error: {e}  (skipping)")
            continue

        if not is_gzip(gz_path):
            bad = gz_path.with_suffix(".html")
            gz_path.replace(bad)
            print(f"  ! Not a gzip (possibly 404/403). Saved as: {bad.name}")
            continue

        try:
            with gzip.open(gz_path, "rb") as f_in, open(csv_path, "wb") as f_out:
                shutil.copyfileobj(f_in, f_out)
            gz_path.unlink(missing_ok=True)
            print(f"  ✓ Done: {csv_path.name}")
        except Exception as e:
            print(f"  ! Gzip error: {e}  (skipping)")


# --- Normalization helpers ---
def _norm_cols(cols):
    """Normalize column names (lowercase, no BOM, underscores instead of spaces)."""
    out = []
    for c in cols:
        c = (c or "").replace("\ufeff", "").strip().lower().replace("-", "_").replace(" ", "_")
        out.append(c)
    return out


def _alias(name):
    """Map common alternative column names to standard EPSS schema."""
    if name in {"cve", "cve_id", "cveid", "cve_identifier"}: return "cve"
    if name in {"epss", "epss_score", "score", "prob", "probability"}: return "epss"
    if name in {"percentile", "epss_percentile"}: return "percentile"
    return name


def _read_epss_any(path: Path):
    """
    Robust EPSS CSV reader and normalizer.

    - Skips metadata rows (#model_version, score_date) using comment="#"
    - Handles BOM, mixed delimiters (, ; or tab)
    - Returns a DataFrame with at least 'cve' and 'epss' columns
    """
    import pandas as pd

    with open(path, "r", encoding="utf-8-sig", errors="replace") as f:
        sample = f.read(8192)
    if sample.lstrip().startswith("<"):
        raise ValueError(f"File appears to be HTML (bad download?): {path.name}")

    try:
        delim = csv.Sniffer().sniff(sample, delimiters=",;\t").delimiter
    except Exception:
        delim = ","

    df = pd.read_csv(path, sep=delim, encoding="utf-8-sig", dtype=str, comment="#")

    df.columns = _norm_cols(df.columns)
    df = df.rename(columns={c: _alias(c) for c in df.columns})

    if "cve" not in df.columns or "epss" not in df.columns:
        df2 = pd.read_csv(path, sep=delim, encoding="utf-8-sig", dtype=str, header=None, comment="#")
        if len(df2) > 0:
            headers = _norm_cols([str(x) for x in df2.iloc[0].tolist()])
            headers = [_alias(h) for h in headers]
            df2 = df2.drop(index=0).reset_index(drop=True)
            df2.columns = headers
            df = df2

    if "cve" not in df.columns or "epss" not in df.columns:
        raise ValueError(f"Missing required columns: {path.name} | columns: {df.columns.tolist()}")

    keep = ["cve", "epss"] + (["percentile"] if "percentile" in df.columns else [])
    df = df[keep].copy()
    df["epss"] = pd.to_numeric(df["epss"], errors="coerce")
    return df


# --- Cleaning ---
def clean_epss_month():
    """Read, normalize, and clean all downloaded daily EPSS CSVs."""
    files = sorted(EPSS_DIR.glob("epss_*.csv"))
    if not files:
        print("[EPSS] Nothing to clean (no CSV files found).")
        return
    ok = 0
    for f in files:
        try:
            df = _read_epss_any(f)
            out = EPSS_CLEAN / f.name
            df[["cve", "epss"]].to_csv(out, index=False, encoding="utf-8")
            ok += 1
            print(f"[EPSS] CLEAN → {out.name} | rows: {len(df)} | NaN epss: {df['epss'].isna().sum()}")
        except Exception as e:
            print(f"[EPSS] CLEAN ERROR → {f.name} | {e}")
    print(f"[EPSS] Cleaning complete | success: {ok} / {len(files)}")


# --- Aggregation ---
def build_month_aggregate():
    """Merge all cleaned daily EPSS files into a single monthly dataset (day, cve, epss)."""
    import pandas as pd
    rows = []
    files = sorted(EPSS_CLEAN.glob("epss_*.csv"))
    for f in files:
        day = f.stem.split("_", 1)[1]
        try:
            df = pd.read_csv(f)
            df.insert(0, "day", day)
            rows.append(df)
        except Exception as e:
            print(f"[EPSS] Skipped merging: {f.name} ({e})")
    if not rows:
        print("[EPSS] No cleaned files found to merge.")
        return
    big = pd.concat(rows, ignore_index=True)
    out_csv = BASE_DIR / f"epss_{YEAR}-{MONTH:02d}_clean_all.csv"
    big.to_csv(out_csv, index=False, encoding="utf-8")
    try:
        out_parq = BASE_DIR / f"epss_{YEAR}-{MONTH:02d}_clean_all.parquet"
        big.to_parquet(out_parq, index=False)
        print(f"[EPSS] Monthly aggregate saved: {out_csv.name} and {out_parq.name} | rows: {len(big)}")
    except Exception as e:
        print(f"[EPSS] Parquet skipped (pyarrow missing?): {e}")
        print(f"[EPSS] Monthly CSV saved: {out_csv} | rows: {len(big)}")


# --- Quick preview ---
def preview_clean_sample(n=5):
    """Display a small sample of cleaned EPSS data."""
    samples = sorted(EPSS_CLEAN.glob("epss_*.csv"))
    if not samples:
        print("[EPSS] No cleaned files available for preview.")
        return
    p = samples[0]
    import pandas as pd
    df = pd.read_csv(p, nrows=n)
    print(f"[PREVIEW] {p.name}")
    print(df.to_string(index=False))


# --- Main ---
def main():
    ensure_dirs()
    print(f"==> Working directory: {BASE_DIR.resolve()}")
    download_epss_month(YEAR, MONTH)
    clean_epss_month()
    build_month_aggregate()
    preview_clean_sample(5)
    print("✅ Done (EPSS downloaded, cleaned, and aggregated by month)")


if __name__ == "__main__":
    main()


Drive include for download

In [None]:
from google.colab import drive
drive.mount("/content/drive")

from pathlib import Path
import shutil

# Local directory containing the generated results
LOCAL_DIR = Path("/content/august/battlefields")

# Target directory on Google Drive
TARGET_DIR = Path("/content/drive/MyDrive/TDK/battlefields/august2")
TARGET_DIR.mkdir(parents=True, exist_ok=True)

# Recursively copy the entire directory (Python 3.8+)
shutil.copytree(LOCAL_DIR, TARGET_DIR, dirs_exist_ok=True)

print("✅ Files successfully copied to:", TARGET_DIR)


CVSS download:

In [None]:
# Set your API key as an ENVIRONMENT variable (do NOT hardcode it!)
%env NVD_API_KEY=YOUR_API_KEY

# (Optional) target directory: Google Drive or Colab local filesystem
import os
os.environ["BASE_DIR"] = "/content/august"


In [None]:
import os, json, time, math, urllib.parse
from pathlib import Path
from datetime import datetime, timezone
import requests

# ---- CONFIGURATION ----
BASE_DIR = Path(os.getenv("BASE_DIR", "/content/august"))
NVD_DIR  = BASE_DIR / "nvd"
NVD_DIR.mkdir(parents=True, exist_ok=True)

NVD_BASE = "https://services.nvd.nist.gov/rest/json/cves/2.0"

# Primary and fallback page sizes
RESULTS_PER_PAGE_PRIMARY = 1000   # usually more stable than 2000
RESULTS_PER_PAGE_FALLBACK = 200
SLEEP_BETWEEN_CALLS_SEC = 1.2

# Optional API key (from environment)
API_KEY = os.getenv("NVD_API_KEY", "").strip()

HEADERS = {
    "User-Agent": "Mozilla/5.0 (Colab TDK downloader)",
    "Accept": "application/json",
    # The API key is intentionally NOT placed here — it will be passed as a query parameter.
}


# ---- UTILITIES ----
def _iso_utc_ms(dt: datetime) -> str:
    """Return ISO-8601 timestamp with milliseconds and trailing 'Z'."""
    return dt.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"


def to_iso_date_only(ts: str | None) -> str | None:
    """Convert a timestamp to a YYYY-MM-DD string (UTC)."""
    if not ts:
        return None
    s = ts.replace("Z", "+00:00")
    try:
        d = datetime.fromisoformat(s)
        return d.date().isoformat()
    except Exception:
        return ts[:10] if len(ts) >= 10 and ts[4] == "-" and ts[7] == "-" else None


def extract_vendors_from_config(configs) -> list[str]:
    """
    Extract affected vendor/brand names from CPE 2.3 URIs.
    Handles both 'configurations' dictionaries and lists.
    """
    vendors = set()

    def parse_cpe_vendor(cpe: str | None) -> str | None:
        # Example: cpe:2.3:a:microsoft:windows_10:...
        if not cpe or not isinstance(cpe, str) or not cpe.startswith("cpe:2.3:"):
            return None
        parts = cpe.split(":")
        if len(parts) >= 5:
            v = parts[3].strip()
            return v if v and v != "*" else None
        return None

    def walk_node(node: dict):
        for m in (node.get("cpeMatch") or []):
            if m.get("vulnerable", True):
                cpe = m.get("criteria") or m.get("cpe23Uri") or ""
                v = parse_cpe_vendor(cpe)
                if v:
                    vendors.add(v)
        for m in (node.get("matches") or []):
            cpe = m.get("cpeName") or m.get("cpe23Uri") or m.get("criteria") or ""
            v = parse_cpe_vendor(cpe)
            if v:
                vendors.add(v)
        for child in (node.get("children") or []):
            walk_node(child)
        for child in (node.get("nodes") or []):
            walk_node(child)

    if not configs:
        return []

    config_items = []
    if isinstance(configs, dict):
        config_items = [configs]
    elif isinstance(configs, list):
        config_items = [c for c in configs if isinstance(c, dict)]
    else:
        return []

    for cfg in config_items:
        for n in (cfg.get("nodes") or []):
            walk_node(n)

    return sorted(vendors)


def _mask_url(url: str) -> str:
    """Hide the API key in log output."""
    try:
        parts = urllib.parse.urlsplit(url)
        q = urllib.parse.parse_qs(parts.query, keep_blank_values=True)
        if "apiKey" in q:
            q["apiKey"] = ["***"]
        new_query = urllib.parse.urlencode(q, doseq=True)
        return urllib.parse.urlunsplit((parts.scheme, parts.netloc, parts.path, new_query, parts.fragment))
    except Exception:
        return url


# ---- HTTP HELPERS ----
def _do_get(params: dict, use_key: bool, start_index: int, results_per_page: int | None = None):
    p = dict(params)
    p["startIndex"] = start_index
    if results_per_page:
        p["resultsPerPage"] = results_per_page
    if use_key and API_KEY:
        p["apiKey"] = API_KEY  # key passed as query param
    r = requests.get(NVD_BASE, params=p, headers=HEADERS, timeout=120)
    safe_url = _mask_url(r.url)
    return r, safe_url


# ---- NVD FETCH ----
def fetch_nvd_month(year: int, month: int) -> list[Path]:
    """Download all NVD CVE records for a given month as JSON pages and return file paths."""
    # Define publication date range [pubStartDate, pubEndDate)
    if month == 12:
        start = datetime(year, 12, 1, tzinfo=timezone.utc)
        end   = datetime(year + 1, 1, 1, tzinfo=timezone.utc)
    else:
        start = datetime(year, month, 1, tzinfo=timezone.utc)
        end   = datetime(year, month + 1, 1, tzinfo=timezone.utc)

    base_pub = {
        "pubStartDate": _iso_utc_ms(start),
        "pubEndDate":   _iso_utc_ms(end),
    }
    print(f"[NVD] Publication range: {base_pub['pubStartDate']} … {base_pub['pubEndDate']}")

    # (1) First attempt: with API key and large page size
    r, dbg_url = _do_get(base_pub, use_key=True, start_index=0, results_per_page=RESULTS_PER_PAGE_PRIMARY)
    print(f"  -> GET {dbg_url} | HTTP {r.status_code}")

    # (2) If 404/403/429/503: retry without key
    if r.status_code in (404, 403, 429, 503):
        print("  ! Error or 404. Retrying WITHOUT API key…")
        r, dbg_url = _do_get(base_pub, use_key=False, start_index=0, results_per_page=RESULTS_PER_PAGE_PRIMARY)
        print(f"  -> GET {dbg_url} | HTTP {r.status_code}")

    # (3) If still 404: fallback to lastMod* parameters
    if r.status_code == 404:
        base_mod = {
            "lastModStartDate": base_pub["pubStartDate"],
            "lastModEndDate":   base_pub["pubEndDate"],
        }
        print("  ! Still 404. Falling back to lastMod* parameters…")
        r, dbg_url = _do_get(base_mod, use_key=False, start_index=0, results_per_page=RESULTS_PER_PAGE_PRIMARY)
        print(f"  -> GET {dbg_url} | HTTP {r.status_code}")
        base_params_in_use = base_mod
    else:
        base_params_in_use = base_pub

    # (4) If not 200: smaller page size and retry once
    if r.status_code != 200:
        print("  ! Still failing. Retrying with smaller resultsPerPage…")
        time.sleep(1.5)
        r, dbg_url = _do_get(base_params_in_use, use_key=False, start_index=0, results_per_page=RESULTS_PER_PAGE_FALLBACK)
        print(f"  -> GET {dbg_url} | HTTP {r.status_code}")
        if r.status_code != 200:
            print(f"  ! HTTP {r.status_code}: {r.text[:400]}")
            r.raise_for_status()

    data = r.json()
    total = int(data.get("totalResults", 0))
    got   = len(data.get("vulnerabilities", []) or [])
    page_size_used = int(data.get("resultsPerPage", RESULTS_PER_PAGE_PRIMARY))
    pages = math.ceil(total / page_size_used) if page_size_used else 0
    print(f"  ✓ Total results: {total} | first page: {got} | expected pages: {pages}")

    out_files = []
    out0 = NVD_DIR / f"nvd_{year:04d}-{month:02d}_p0.json"
    out0.write_text(r.text, encoding="utf-8")
    out_files.append(out0)

    # Pagination – increment using actual page size returned
    start_index = got
    page = 1
    while start_index < total:
        time.sleep(SLEEP_BETWEEN_CALLS_SEC)
        rr, dbg_url = _do_get(base_params_in_use, use_key=False, start_index=start_index, results_per_page=RESULTS_PER_PAGE_PRIMARY)
        if rr.status_code != 200:
            print(f"  ! Page #{page} HTTP {rr.status_code} – retrying with smaller page size…")
            time.sleep(1.5)
            rr, dbg_url = _do_get(base_params_in_use, use_key=False, start_index=start_index, results_per_page=RESULTS_PER_PAGE_FALLBACK)
            if rr.status_code != 200:
                print(f"    !! HTTP {rr.status_code}: {rr.text[:200]}")
                break

        outp = NVD_DIR / f"nvd_{year:04d}-{month:02d}_p{page}.json"
        outp.write_text(rr.text, encoding="utf-8")
        got = len((rr.json().get("vulnerabilities") or []))
        print(f"  ✓ Page #{page}: {got} records → {outp.name}")
        if got == 0:
            break
        out_files.append(outp)
        start_index += got
        page += 1

    return out_files


# ---- CVSS EXTRACTION ----
def build_cvss_with_published_and_vendor(nvd_json_files: list[Path], out_csv: Path):
    """Parse downloaded NVD JSON files and extract (CVE, published, vendor, CVSS)."""
    import pandas as pd
    recs = []
    for jf in sorted(nvd_json_files):
        data = json.loads(jf.read_text(encoding="utf-8"))
        for v in (data.get("vulnerabilities") or []):
            cve_id = v["cve"]["id"]
            published = to_iso_date_only(v["cve"].get("published"))
            vendors = extract_vendors_from_config(v["cve"].get("configurations", {}))
            vendor_str = ";".join(vendors) if vendors else None
            # CVSS selection: prefer v3.1 > v3.0 > v2 (highest baseScore)
            m = v["cve"].get("metrics", {})
            best = None
            for key in ("cvssMetricV31", "cvssMetricV30", "cvssMetricV2"):
                for mm in (m.get(key) or []):
                    bs = mm.get("cvssData", {}).get("baseScore")
                    if bs is not None:
                        val = float(bs)
                        best = val if best is None else max(best, val)
            if best is not None:
                recs.append((cve_id, published, vendor_str, best))

    import pandas as pd
    (pd.DataFrame(recs, columns=["cve", "published", "vendor", "cvss"])
       .drop_duplicates("cve")
       .sort_values(["published", "cve"], na_position="last")
       .to_csv(out_csv, index=False, encoding="utf-8"))
    print(f"[OK] CSV saved: {out_csv}  (columns: cve, published, vendor, cvss)")


# ---- MAIN ----
def main():
    files = fetch_nvd_month(2025, 8)
    build_cvss_with_published_and_vendor(files, BASE_DIR / "nvd_cvss_2025-08.csv")
    print("✅ Done (NVD JSONs saved in 'nvd/' and combined output: nvd_cvss_2025-08.csv)")


if __name__ == "__main__":
    main()


Combine EPSS and CVSS in one file per day:

In [None]:
%env END_DATE=2025-08-31
%env RUN_WHOLE_MONTH=True
%env INCLUDE_PREVIOUS_MONTHS=False

In [None]:
# build_top10_cumulative_until.py
from pathlib import Path
import os, re, calendar
import pandas as pd

# --- SETTINGS / SWITCHES ---
BASE_DIR = Path(os.getenv("BASE_DIR", "/content/august"))
EPSS_DIR = BASE_DIR / "epss_clean"
OUT_DIR  = BASE_DIR / "battlefields"
OUT_DIR.mkdir(parents=True, exist_ok=True)

# End date (inclusive)
END_DATE_STR = os.getenv("END_DATE", "2025-08-31")
END_DATE = pd.to_datetime(END_DATE_STR)

# If True: automatically load ALL available nvd_cvss_YYYY-MM.csv files up to END_DATE’s month
INCLUDE_PREVIOUS_MONTHS = os.getenv("INCLUDE_PREVIOUS_MONTHS", "False").lower() in ("1", "true", "y", "yes")

# NEW SWITCH: if True, generate a cumulative report for each day from the 1st of the month up to END_DATE
RUN_WHOLE_MONTH = os.getenv("RUN_WHOLE_MONTH", "False").lower() in ("1", "true", "y", "yes")


# --- Helpers ---
def _month_key_from_name(path: Path):
    m = re.match(r"^nvd_cvss_(\d{4}-\d{2})\.csv$", path.name)
    if not m:
        return None
    y_m = m.group(1)
    try:
        return pd.to_datetime(y_m + "-01")
    except Exception:
        return None


def _load_nvd_all_until(end_date: pd.Timestamp) -> pd.DataFrame:
    """Load and concatenate all relevant NVD monthly CSVs up to the given end_date month."""
    nvd_files = []
    if INCLUDE_PREVIOUS_MONTHS:
        for p in BASE_DIR.glob("nvd_cvss_*.csv"):
            mk = _month_key_from_name(p)
            if mk is not None and mk <= end_date.replace(day=1):
                nvd_files.append((mk, p))
        nvd_files.sort()
    else:
        ym = end_date.strftime("%Y-%m")
        one = BASE_DIR / f"nvd_cvss_{ym}.csv"
        assert one.exists(), f"Missing NVD monthly CSV: {one}"
        nvd_files.append((end_date.replace(day=1), one))

    assert nvd_files, "No nvd_cvss_YYYY-MM.csv files found for the given period."

    frames = []
    for _, fp in nvd_files:
        df = pd.read_csv(fp, dtype={"cve": str, "published": str, "vendor": str, "cvss": str})
        frames.append(df)

    nvd_all = pd.concat(frames, ignore_index=True)
    nvd_all["cve"] = nvd_all["cve"].str.strip()
    nvd_all["published"] = pd.to_datetime(nvd_all["published"], errors="coerce")
    nvd_all["cvss"] = pd.to_numeric(nvd_all["cvss"], errors="coerce")
    nvd_all = nvd_all.dropna(subset=["published", "cvss"])

    # Deduplicate CVEs (some appear in multiple monthly dumps)
    nvd_all = nvd_all.drop_duplicates(subset=["cve"], keep="first")

    # Split and normalize vendor names
    nvd_all["vendor"] = nvd_all["vendor"].fillna("")
    nvd_all = nvd_all[nvd_all["vendor"] != ""].copy()
    nvd_all["vendor"] = nvd_all["vendor"].str.split(";")
    nvd_all = nvd_all.explode("vendor", ignore_index=True)
    nvd_all["vendor"] = nvd_all["vendor"].str.strip().str.lower()

    return nvd_all


def _compute_and_save_for_date(date_str: str, nvd_all: pd.DataFrame) -> None:
    """Compute cumulative Top-10 vendors up to the given date.
       The EPSS snapshot corresponds to the specific day’s EPSS data."""
    cutoff = pd.to_datetime(date_str)

    # EPSS daily file
    epss_path = EPSS_DIR / f"epss_{date_str}.csv"
    if not epss_path.exists():
        print(f"[SKIP] No EPSS file found for this day: {epss_path.name}")
        return

    epss = pd.read_csv(epss_path, dtype={"cve": str, "epss": str})
    epss["cve"] = epss["cve"].str.strip()
    epss["epss"] = pd.to_numeric(epss["epss"], errors="coerce")
    epss = epss.dropna(subset=["epss"])
    if epss.empty:
        print(f"[SKIP] Empty EPSS file on this day: {date_str}")
        return

    # NVD filtered up to cutoff
    nvd_cut = nvd_all[(nvd_all["published"].notna()) & (nvd_all["published"] <= cutoff)].copy()
    if nvd_cut.empty:
        print(f"[INFO] Empty NVD slice <= {date_str}")
        out_path = OUT_DIR / f"top10_vendors_until_{date_str}.csv"
        pd.DataFrame(columns=["date", "vendor", "battlefield_value", "n_cves", "bf_value_x1000"]).to_csv(out_path, index=False)
        print(f"[OK] Empty result saved: {out_path}")
        return

    # Merge + compute combined value
    df = nvd_cut.merge(epss[["cve", "epss"]], on="cve", how="inner")
    if df.empty:
        print(f"[INFO] No matching CVEs for this day: {date_str}")
        out_path = OUT_DIR / f"top10_vendors_until_{date_str}.csv"
        pd.DataFrame(columns=[
            "date", "vendor", "battlefield_value", "cvss_sum", "epss_sum",
            "cvss_avg", "epss_avg", "n_cves", "bf_value_x1000"
        ]).to_csv(out_path, index=False)
        print(f"[OK] Empty result saved: {out_path}")
        return

    # Value calculation
    df["cvss"] = pd.to_numeric(df["cvss"], errors="coerce")
    df["epss"] = pd.to_numeric(df["epss"], errors="coerce")
    df = df.dropna(subset=["cvss", "epss"])
    df["value"] = df["cvss"] * df["epss"]

    # Aggregate per vendor
    agg = (
        df.groupby("vendor", as_index=False)
          .agg(
              battlefield_value=("value", "sum"),
              cvss_sum=("cvss", "sum"),
              epss_sum=("epss", "sum"),
              cvss_avg=("cvss", "mean"),
              epss_avg=("epss", "mean"),
              n_cves=("cve", "nunique"),
          )
          .sort_values("battlefield_value", ascending=False)
          .head(10)
    )

    # Optional scaling for readability
    SCALE = 1000.0
    agg["bf_value_x1000"] = agg["battlefield_value"] * SCALE
    agg.insert(0, "date", date_str)

    out_path = OUT_DIR / f"top10_vendors_until_{date_str}.csv"
    agg.to_csv(out_path, index=False, encoding="utf-8")
    print(f"[OK] Saved: {out_path}  | rows: {len(agg)}")


# --- MAIN EXECUTION ---
if __name__ == "__main__":
    # Load all relevant NVD monthly data once
    nvd_all = _load_nvd_all_until(END_DATE)
    print(f"[NVD] Total records loaded: {len(nvd_all)}")

    if RUN_WHOLE_MONTH:
        # Iterate from the 1st of the month until END_DATE
        month_start = END_DATE.replace(day=1)
        for d in pd.date_range(month_start, END_DATE, freq="D"):
            _compute_and_save_for_date(d.strftime("%Y-%m-%d"), nvd_all)
    else:
        # Generate report only for END_DATE
        _compute_and_save_for_date(END_DATE_STR, nvd_all)


Simulation:

In [None]:
from google.colab import drive
drive.mount("/content/drive")

In [None]:
%env BASE_DIR=/content/drive/MyDrive/TDK/battlefields/january
%env START_DATE=2025-01-01
%env END_DATE=2025-01-31
%env EPISODES_PER_DAY=1000


In [None]:
# %%writefile /content/rl_value_calendar_fixed_per_day.py
# rl_value_calendar_fixed_per_day.py
import os, re
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import math

# -------------------- ENV helpers (comment-robust) --------------------
def env_str(name, default=""):
    s = os.getenv(name, default)
    return s.split("#", 1)[0].strip()

def env_int(name, default):
    s = env_str(name, str(default))
    return int(s)

def env_bool(name, default=False):
    s = env_str(name, str(default))
    return s.lower() in ("1", "true", "yes", "y", "on")

def env_float(name, default):
    s = env_str(name, str(default))
    try:
        return float(s)
    except:
        return float(default)


# -------------------- Parameters --------------------
BASE_DIR = Path(env_str("BASE_DIR", "/content/august"))
BATTLEFIELD_DIR = BASE_DIR
PREFER_CUMULATIVE = env_bool("PREFER_CUMULATIVE", True)  # True → prefer top10_vendors_until_*.csv first
START_DATE_STR = env_str("START_DATE", "")               # empty → start from earliest available
END_DATE_STR   = env_str("END_DATE", "2025-08-31")
EPISODES_PER_DAY = env_int("EPISODES_PER_DAY", 500)      # number of episodes per day

ATTACKER_RESOURCES = 20
DEFENDER_RESOURCES = 40

# RL (epsilon-greedy bandit)
EPSILON_ATTACKER = 0.1
EPSILON_DEFENDER = 0.1
ALPHA_ATTACKER = 0.1
ALPHA_DEFENDER = 0.1

# ---------- Stabilization metrics ----------
STAB_WINDOW = 1000       # evaluation window (episodes)
DOMINANCE_P = 0.70       # dominance threshold (e.g., 70%)
Q_TOL = 1e-2             # max allowed |ΔQ| variation
CHECK_EVERY = 100        # check every N episodes
REQUIRED_STREAK = 2      # need this many consecutive “OK” intervals

stabilization_episode_att = None
stabilization_episode_def = None
streak_ok_att = 0
streak_ok_def = 0
Q_att_prev_check = None
Q_def_prev_check = None

# Controlled randomness / outcome noise
ATTEMPT_FAIL_PROB = 0.30
USE_ADAPTIVE_TARGETING = False
TARGET_ATTEMPT_SUCCESS = 0.70
ADAPT_EVERY = 200
ADAPT_ETA = 0.01

# -------------------- Normalization settings --------------------
# NORMALIZE_MODE: "minmax" | "sum1" | "none"
NORMALIZE_MODE = env_str("NORMALIZE_MODE", "minmax").lower()
NORMALIZE_EPS = 1e-9
WRITE_NORMALIZED_CSV = env_bool("WRITE_NORMALIZED_CSV", True)

# The normalized weights are scaled so that their sum equals this target value (default: 10).
# If 0 → zero vector; if negative → no constraint.
NORMALIZE_SUM_TARGET = env_float("NORMALIZE_SUM_TARGET", 10.0)

# -------------------- Scenarios / information visibility --------------------
# R0 (baseline): A sees EPSS ✓, D sees EPSS ✓, A sees D strat ✗, D sees A strat ✗
# R1 (blue intel edge): A ✗, D ✓, A ✗, D ✓
# R2 (red intel edge):  A ✓, D ✗, A ✗, D ✗
# R3 (asymmetric):      A ✓, D ✓, A ✗, D ✓
SCENARIO = env_str("SCENARIO", "R1").upper()

# Defaults based on scenario
if SCENARIO == "R1":
    A_SEES_EPSS_DEFAULT = False
    D_SEES_EPSS_DEFAULT = True
    A_SEES_DEF_STRAT_DEFAULT = False
    D_SEES_ATT_STRAT_DEFAULT = True
elif SCENARIO == "R2":
    A_SEES_EPSS_DEFAULT = True
    D_SEES_EPSS_DEFAULT = False
    A_SEES_DEF_STRAT_DEFAULT = False
    D_SEES_ATT_STRAT_DEFAULT = False
elif SCENARIO == "R3":
    A_SEES_EPSS_DEFAULT = True
    D_SEES_EPSS_DEFAULT = True
    A_SEES_DEF_STRAT_DEFAULT = False
    D_SEES_ATT_STRAT_DEFAULT = True
else:  # R0
    A_SEES_EPSS_DEFAULT = True
    D_SEES_EPSS_DEFAULT = True
    A_SEES_DEF_STRAT_DEFAULT = False
    D_SEES_ATT_STRAT_DEFAULT = False

# Overridable via ENV
ATTACKER_SEES_EPSS = env_bool("ATTACKER_SEES_EPSS", A_SEES_EPSS_DEFAULT)
DEFENDER_SEES_EPSS = env_bool("DEFENDER_SEES_EPSS", D_SEES_EPSS_DEFAULT)
ATTACKER_SEES_DEF_STRAT = env_bool("ATTACKER_SEES_DEF_STRAT", A_SEES_DEF_STRAT_DEFAULT)
DEFENDER_SEES_ATTACK_STRAT = env_bool("DEFENDER_SEES_ATTACK_STRAT", D_SEES_ATT_STRAT_DEFAULT)

# CVSS-proxy flattening parameter (0<p<=1, smaller → flatter → information disadvantage)
CVSS_FLATTEN_P = env_float("CVSS_FLATTEN_P", 0.5)


# -------------------- Top-k coverage --------------------
TOP_K = 3  # can be adjusted as needed

def compute_topk_coverage(true_vals, att_all, def_all, k=TOP_K):
    if true_vals is None or att_all is None or def_all is None:
        return np.nan, np.nan
    k = max(1, min(k, len(true_vals)))
    top_idx = np.argsort(true_vals)[-k:]
    total_top_value = float(np.sum(true_vals[top_idx]))
    if total_top_value <= 0:
        return np.nan, np.nan
    att_mask = (att_all[top_idx] > def_all[top_idx])
    def_mask = ~att_mask
    att_value = float(np.sum(true_vals[top_idx] * att_mask))
    def_value = float(np.sum(true_vals[top_idx] * def_mask))
    return att_value / total_top_value, def_value / total_top_value


# ---- Dynamic entropy (mixed strategy diversity) ----
ENT_WINDOW = 50
entropy_att_hist = []
entropy_def_hist = []

def calc_entropy(counts):
    probs = counts / np.sum(counts)
    probs = probs[probs > 0]
    return -np.sum(probs * np.log2(probs))


# -------------------- Battlefield file handling --------------------
def _parse_date_from_name(name: str):
    # top10_vendors_until_YYYY-MM-DD.csv  OR  top10_vendors_YYYY-MM-DD.csv
    m = re.search(r"top10_vendors_(?:until_)?(\d{4}-\d{2}-\d{2})\.csv$", name)
    return m.group(1) if m else None

def _list_available_dates(bf_dir: Path) -> list[str]:
    if not bf_dir.exists():
        return []
    dates = set()
    for p in bf_dir.glob("top10_vendors*.csv"):
        d = _parse_date_from_name(p.name)
        if d:
            dates.add(d)
    return sorted(dates)

def _load_values_for_date(date_str: str, prefer_cumulative: bool = True):
    """
    Returns: (values: np.ndarray [N], vendors: list[str], path: Path)
    Source: top10_vendors_until_YYYY-MM-DD.csv or top10_vendors_YYYY-MM-DD.csv
    """
    cand = []
    if prefer_cumulative:
        cand.append(BATTLEFIELD_DIR / f"top10_vendors_until_{date_str}.csv")
        cand.append(BATTLEFIELD_DIR / f"top10_vendors_{date_str}.csv")
    else:
        cand.append(BATTLEFIELD_DIR / f"top10_vendors_{date_str}.csv")
        cand.append(BATTLEFIELD_DIR / f"top10_vendors_until_{date_str}.csv")

    path = next((p for p in cand if p.exists()), None)
    if path is None:
        raise FileNotFoundError(f"No battlefield file for {date_str} ({cand[0].name} / {cand[1].name})")

    df = pd.read_csv(path)
    cols_lower = {c.lower(): c for c in df.columns}
    assert "vendor" in cols_lower and "battlefield_value" in cols_lower, \
        f"Missing columns in file: {path.name} (required: vendor, battlefield_value)"

    df = df.rename(columns={cols_lower["vendor"]: "vendor", cols_lower["battlefield_value"]: "battlefield_value"})
    df = df.sort_values("battlefield_value", ascending=False).head(10).reset_index(drop=True)
    vendors = df["vendor"].astype(str).tolist()
    vals = pd.to_numeric(df["battlefield_value"], errors="coerce").fillna(0.0).to_numpy(dtype=float)
    return vals, vendors, path

# -------------------- Normalization functions --------------------
def _normalize_values(raw: np.ndarray, mode: str, sum_target: float) -> np.ndarray:
    mode = (mode or "minmax").lower()
    x = np.asarray(raw, dtype=float)

    # Protect against empty input
    if x.size == 0:
        return x.copy()

    # (1) base normalization
    if mode == "none":
        norm = x.copy()
    elif mode == "sum1":
        s = x.sum()
        if s < NORMALIZE_EPS:
            norm = np.zeros_like(x, dtype=float)
        else:
            norm = x / (s + NORMALIZE_EPS)
    else:  # default: min-max
        mn, mx = float(np.min(x)), float(np.max(x))
        rng = mx - mn
        if rng < NORMALIZE_EPS:
            norm = np.zeros_like(x, dtype=float)
        else:
            norm = (x - mn) / (rng + NORMALIZE_EPS)

    # (2) enforce total-sum constraint (if sum_target >= 0)
    if sum_target >= 0:
        cur_sum = float(np.sum(norm))
        if cur_sum < NORMALIZE_EPS or sum_target == 0:
            norm = np.zeros_like(norm)
        else:
            norm = norm * (sum_target / cur_sum)

    return norm


def _cvss_proxy_from_raw(raw: np.ndarray, sum_target: float, p: float) -> np.ndarray:
    """
    CVSS proxy: less informative, flatter weights.
    Steps:
      1) min-max normalization to 0..1
      2) raise to power p (0<p<=1) → flattening
      3) sum1 normalization and scaling to target sum
    """
    x = np.asarray(raw, dtype=float)
    if x.size == 0:
        return x.copy()
    mn, mx = float(np.min(x)), float(np.max(x))
    rng = mx - mn
    if rng < NORMALIZE_EPS:
        flat = np.ones_like(x, dtype=float)
    else:
        flat = (x - mn) / (rng + NORMALIZE_EPS)
    flat = np.power(np.clip(flat, 0.0, 1.0), max(NORMALIZE_EPS, min(1.0, p)))  # 0<p<=1
    s = flat.sum()
    if s < NORMALIZE_EPS:
        flat[:] = 0.0
    else:
        flat = flat / s
    if sum_target >= 0:
        flat = flat * sum_target
    return flat


# -------------------- Assemble daily battlefield list --------------------
all_dates = _list_available_dates(BATTLEFIELD_DIR)
assert all_dates, f"No battlefield CSVs found in: {BATTLEFIELD_DIR}"

if START_DATE_STR:
    all_dates = [d for d in all_dates if d >= START_DATE_STR]
all_dates = [d for d in all_dates if d <= END_DATE_STR]
assert all_dates, f"No available dates within range (START_DATE={START_DATE_STR or 'AUTO'}, END_DATE={END_DATE_STR})."

print(f"[DEBUG] Available days in range: {len(all_dates)} ({all_dates[0]} … {all_dates[-1]})")
print(f"[CFG] EPISODES_PER_DAY={EPISODES_PER_DAY}, PREFER_CUMULATIVE={PREFER_CUMULATIVE}")
print(f"[CFG] FIXED mode | ATTEMPT_FAIL_PROB={ATTEMPT_FAIL_PROB}, USE_ADAPTIVE_TARGETING={USE_ADAPTIVE_TARGETING}")
print(f"[CFG] NORMALIZE_MODE={NORMALIZE_MODE}, WRITE_NORMALIZED_CSV={WRITE_NORMALIZED_CSV}, NORMALIZE_SUM_TARGET={NORMALIZE_SUM_TARGET}")
print(f"[CFG] SCENARIO={SCENARIO} | A_SEES_EPSS={ATTACKER_SEES_EPSS}, D_SEES_EPSS={DEFENDER_SEES_EPSS}, "
      f"A_SEES_DEF_STRAT={ATTACKER_SEES_DEF_STRAT}, D_SEES_ATT_STRAT={DEFENDER_SEES_ATTACK_STRAT}, "
      f"CVSS_FLATTEN_P={CVSS_FLATTEN_P}")


# -------------------- Strategy helpers (value-based) --------------------
def normalize_and_allocate(weights, total):
    w = np.array(weights, dtype=float)
    w = np.maximum(w, 1e-9)
    w = w / w.sum()
    raw = w * total
    alloc = np.floor(raw).astype(int)
    rem = total - alloc.sum()
    if rem > 0:
        frac = raw - alloc
        for i in np.argsort(frac)[-rem:]:
            alloc[i] += 1
    return alloc

def top_k_targeting(vals, total, k):
    k = max(1, min(k, len(vals)))
    alloc = np.zeros(len(vals), dtype=int)
    idx = np.argsort(vals)[-k:]
    base = total // k
    rem = total % k
    alloc[idx] = base
    if rem > 0:
        alloc[idx[:rem]] += 1
    return alloc

def all_in_top1(vals, total):
    alloc = np.zeros(len(vals), dtype=int)
    alloc[int(np.argmax(vals))] = total
    return alloc

def strat_even(total, n):               return normalize_and_allocate(np.ones(n), total)
def strat_vals(total, vals):            return normalize_and_allocate(vals, total)
def strat_top3(total, vals):            return top_k_targeting(vals, total, k=3)
def strat_random(total, n):             return normalize_and_allocate(np.random.rand(n), total)

def attacker_counter(total, last_def_alloc, fallback_alloc):
    if last_def_alloc is None:
        return fallback_alloc
    # if the attacker sees defender strategy (by default false), react; otherwise fallback
    if not ATTACKER_SEES_DEF_STRAT:
        return fallback_alloc
    inv = last_def_alloc.max() - last_def_alloc + 1
    return normalize_and_allocate(inv, total)

def defender_counter(total, last_att_alloc, fallback_alloc):
    if last_att_alloc is None:
        return fallback_alloc
    if not DEFENDER_SEES_ATTACK_STRAT:
        return fallback_alloc
    return normalize_and_allocate(last_att_alloc + 1, total)
# -------------------- Daily switching and visibility handling --------------------
vendors_names = []
raw_values = None
values_epss = None        # EPSS-like (informative) scale — primary “values”
values_cvss_proxy = None  # CVSS-proxy (flatter)
NUM_BATTLEFIELDS = 0
last_att_alloc = None
last_def_alloc = None

# Day-specific planning vectors (visibility-dependent)
att_planning_values = None
def_planning_values  = None

def set_day(date_str: str):
    global values_epss, values_cvss_proxy, NUM_BATTLEFIELDS
    global last_att_alloc, last_def_alloc, vendors_names, raw_values
    global att_planning_values, def_planning_values
    global attack_history_sum

    vals, vendors, src = _load_values_for_date(date_str, prefer_cumulative=PREFER_CUMULATIVE)
    raw_values = vals.copy()
    vendors_names = vendors

    # EPSS-informed scale (the “true” battlefield values)
    values_epss = _normalize_values(raw_values, NORMALIZE_MODE, NORMALIZE_SUM_TARGET)

    # CVSS-proxy – less informative, flattened weights
    values_cvss_proxy = _cvss_proxy_from_raw(raw_values, NORMALIZE_SUM_TARGET, CVSS_FLATTEN_P)

    NUM_BATTLEFIELDS = len(values_epss)
    last_att_alloc = None
    last_def_alloc = None

    # Reset attack pattern history for this day
    attack_history_sum = np.zeros(NUM_BATTLEFIELDS, dtype=float)

    print(f"[DAY] {date_str} | battlefields: {NUM_BATTLEFIELDS} | source: {src.name}")
    if NUM_BATTLEFIELDS == 0:
        print(f"[SKIP] No battlefields on {date_str} — skipping day.")
        return False

    # Visibility-based planning vectors
    att_planning_values = values_epss if ATTACKER_SEES_EPSS else values_cvss_proxy
    def_planning_values = values_epss if DEFENDER_SEES_EPSS else values_cvss_proxy
    return True


# -------------------- Strategy pools (value-based) --------------------
# Strategies are built upon visibility-dependent planning vectors.

# --- Attacker strategies ---
attacker_strategies = [
    ("A1 Even",            lambda: strat_even(ATTACKER_RESOURCES, NUM_BATTLEFIELDS)),
    ("A2 Value-weighted",  lambda: strat_vals(ATTACKER_RESOURCES, att_planning_values)),
    ("A3 Top-3 targeting", lambda: strat_top3(ATTACKER_RESOURCES, att_planning_values)),
    ("A4 Randomized",      lambda: strat_random(ATTACKER_RESOURCES, NUM_BATTLEFIELDS)),
    ("A5 All-in (Top-1)",  lambda: all_in_top1(att_planning_values, ATTACKER_RESOURCES)),
]

# --- Attack pattern memory ---
attack_history_sum = np.zeros(10, dtype=float)  # assuming 10 battlefields
attack_history_decay = 0.9                      # exponential decay for past influence

def defender_predictive_counter(total_resources, last_att_alloc, fallback_alloc):
    """
    Predictive defense strategy:
      - uses exponential decay over past attack patterns,
      - protects frequently targeted “hot” fields more strongly,
      - blends prior EPSS-based risk,
      - adds slight noise to avoid deterministic freezing.
    """
    global attack_history_sum, values_epss

    if last_att_alloc is None:
        return fallback_alloc

    # (1) Update attack history with decay
    attack_history_sum = attack_history_decay * attack_history_sum + last_att_alloc

    # (2) Normalize historical frequency
    hist_norm = attack_history_sum / (np.sum(attack_history_sum) + 1e-9)

    # (3) Combine with EPSS prior (if visible)
    epss_norm = values_epss / (np.sum(values_epss) + 1e-9)
    combined_pred = 0.7 * hist_norm + 0.3 * epss_norm  # ratio adjustable

    # (4) Add small random noise
    noise = np.random.rand(len(combined_pred)) * 0.02
    combined_pred = np.clip(combined_pred + noise, 0, None)

    # (5) Emphasize strong activity zones (non-linear weighting)
    combined_pred = np.power(combined_pred, 1.5)

    # (6) Normalize and allocate
    return normalize_and_allocate(combined_pred, total_resources)


# --- Defender strategies ---
defender_strategies = [
    ("D1 Even",             lambda: strat_even(DEFENDER_RESOURCES, NUM_BATTLEFIELDS)),
    ("D2 Value-weighted",   lambda: strat_vals(DEFENDER_RESOURCES, def_planning_values)),
    ("D3 Top-3 defense",    lambda: strat_top3(DEFENDER_RESOURCES, def_planning_values)),
    ("D4 Randomized",       lambda: strat_random(DEFENDER_RESOURCES, NUM_BATTLEFIELDS)),
    ("D5 Predictive",       None),
    ("D6 All-in (Top-1)",   lambda: all_in_top1(def_planning_values, DEFENDER_RESOURCES)),
]

# --- Enable or disable D5 depending on visibility ---
if DEFENDER_SEES_ATTACK_STRAT:
    for i, (name, fn) in enumerate(defender_strategies):
        if name.startswith("D5"):
            defender_strategies[i] = (
                name,
                lambda: defender_predictive_counter(
                    DEFENDER_RESOURCES,
                    last_att_alloc,
                    strat_vals(DEFENDER_RESOURCES, def_planning_values)
                )
            )
else:
    # remove D5 if attacker strategy is invisible
    defender_strategies = [s for s in defender_strategies if not s[0].startswith("D5")]


# -------------------- Initialize Q-tables and metrics --------------------
NA = len(attacker_strategies)
ND = len(defender_strategies)

Q_att = np.zeros(NA)
Q_def = np.zeros(ND)

att_used, def_used = [], []
att_rewards, def_rewards = [], []
win_flags = []

last_att_alloc = None
last_def_alloc = None

total_attempts = 0
total_successes = 0

def attack_attempt_succeeds(att_tokens, def_tokens):
    """Probabilistic attack success based on ATTEMPT_FAIL_PROB."""
    p_success = float(np.clip(1.0 - ATTEMPT_FAIL_PROB, 0.0, 1.0))
    return np.random.rand() < p_success


# -------------------- Main loop: run EPISODES_PER_DAY for each day --------------------
total_episodes = 0

for day_str in all_dates:
    ok = set_day(day_str)
    if not ok or NUM_BATTLEFIELDS == 0:
        continue  # skip day if no battlefields available

    for ep in range(EPISODES_PER_DAY):
        # epsilon-greedy selection
        a_idx = np.random.randint(NA) if np.random.rand() < EPSILON_ATTACKER else int(np.argmax(Q_att))
        d_idx = np.random.randint(ND) if np.random.rand() < EPSILON_DEFENDER else int(np.argmax(Q_def))

        # allocations
        att_alloc = attacker_strategies[a_idx][1]()
        if defender_strategies[d_idx][0].startswith("D7"):
            fallback = strat_vals(DEFENDER_RESOURCES, def_planning_values)
            def_alloc = defender_counter(DEFENDER_RESOURCES, last_att_alloc, fallback)
        else:
            def_alloc = defender_strategies[d_idx][1]()

        # compute payoffs using EPSS-scale as “true” battlefield value
        true_values = values_epss
        att_reward_real = 0.0
        def_reward = 0.0
        any_breakthrough = False
        att_access_reward = 0.0

        for i in range(len(true_values)):
            if att_alloc[i] > def_alloc[i]:
                att_access_reward = max(att_access_reward, true_values[i])
                total_attempts += 1
                if attack_attempt_succeeds(att_alloc[i], def_alloc[i]):
                    total_successes += 1
                    any_breakthrough = True
                    att_reward_real = max(att_reward_real, true_values[i])
                else:
                    def_reward += true_values[i]
            else:
                def_reward += true_values[i]

        # Q-learning updates
        Q_att[a_idx] += ALPHA_ATTACKER * (att_access_reward - Q_att[a_idx])
        Q_def[d_idx] += ALPHA_DEFENDER * (def_reward - Q_def[d_idx])

        # record history and metrics
        last_att_alloc = att_alloc
        last_def_alloc = def_alloc
        att_used.append(a_idx); def_used.append(d_idx)
        att_rewards.append(att_reward_real)
        def_rewards.append(def_reward)
        win_flags.append(1 if any_breakthrough else 0)

        # optional adaptive calibration
        if USE_ADAPTIVE_TARGETING and (total_episodes + 1) % ADAPT_EVERY == 0 and total_attempts > 0:
            observed = total_successes / total_attempts
            ATTEMPT_FAIL_PROB = float(np.clip(
                ATTEMPT_FAIL_PROB + ADAPT_ETA * (observed - TARGET_ATTEMPT_SUCCESS),
                0.0, 1.0
            ))
            total_attempts = 0
            total_successes = 0
            print(f"[ADAPT] New ATTEMPT_FAIL_PROB={ATTEMPT_FAIL_PROB:.3f} (observed={observed:.3f})")

        # --- STABILIZATION CHECKS (attacker & defender) ---
        if (total_episodes) % CHECK_EVERY == 0:
            # --- Attacker ---
            if len(att_used) >= STAB_WINDOW and stabilization_episode_att is None:
                last_actions_att = np.array(att_used[-STAB_WINDOW:])
                counts_att = np.bincount(last_actions_att, minlength=NA)
                dom_idx_att = counts_att.argmax()
                dom_share_att = counts_att.max() / STAB_WINDOW
                dom_ok_att = (dom_share_att >= DOMINANCE_P) and (np.argmax(Q_att) == dom_idx_att)

                if Q_att_prev_check is None:
                    q_ok_att = False
                else:
                    q_drift_att = np.max(np.abs(Q_att - Q_att_prev_check))
                    q_ok_att = (q_drift_att < Q_TOL)
                Q_att_prev_check = Q_att.copy()

                if dom_ok_att and q_ok_att:
                    streak_ok_att += 1
                else:
                    streak_ok_att = 0
                if streak_ok_att >= REQUIRED_STREAK:
                    stabilization_episode_att = total_episodes

            # --- Defender ---
            if len(def_used) >= STAB_WINDOW and stabilization_episode_def is None:
                last_actions_def = np.array(def_used[-STAB_WINDOW:])
                counts_def = np.bincount(last_actions_def, minlength=ND)
                dom_idx_def = counts_def.argmax()
                dom_share_def = counts_def.max() / STAB_WINDOW
                dom_ok_def = (dom_share_def >= DOMINANCE_P) and (np.argmax(Q_def) == dom_idx_def)

                if Q_def_prev_check is None:
                    q_ok_def = False
                else:
                    q_drift_def = np.max(np.abs(Q_def - Q_def_prev_check))
                    q_ok_def = (q_drift_def < Q_TOL)
                Q_def_prev_check = Q_def.copy()

                if dom_ok_def and q_ok_def:
                    streak_ok_def += 1
                else:
                    streak_ok_def = 0
                if streak_ok_def >= REQUIRED_STREAK:
                    stabilization_episode_def = total_episodes

            # --- Dynamic entropy (measure of mixed strategy diversity) ---
            if len(att_used) >= ENT_WINDOW:
                last_actions_att = np.array(att_used[-ENT_WINDOW:])
                last_actions_def = np.array(def_used[-ENT_WINDOW:])
                counts_att = np.bincount(last_actions_att, minlength=NA)
                counts_def = np.bincount(last_actions_def, minlength=ND)
                entropy_att_hist.append(calc_entropy(counts_att))
                entropy_def_hist.append(calc_entropy(counts_def))

        total_episodes += 1


# -------------------- Summary metrics --------------------
print("\n Key performance metrics (averaged over all episodes):")
print(f"  Total episodes: {total_episodes}")
print(f"  Mean attacker reward (realized): {np.mean(att_rewards):.3f}")
print(f"  Mean defender retained value:    {np.mean(def_rewards):.3f}")
print(f"  Attacker breakthrough rate:      {np.mean(win_flags)*100:.1f}%")
attempt_rate = 100.0 * (total_successes / max(1, total_attempts))
print(f"  Attacker per-attempt success rate: {attempt_rate:.1f}%")
print(f"  Learned best attacker strategy: {attacker_strategies[int(np.argmax(Q_att))][0]} (Q={Q_att.max():.3f})")
print(f"  Learned best defender strategy: {defender_strategies[int(np.argmax(Q_def))][0]} (Q={Q_def.max():.3f})")


# -------------------- Plots --------------------
def movavg(x, w=200):
    if len(x) < w:
        return np.array(x, dtype=float)
    k = np.ones(w) / w
    return np.convolve(x, k, mode='valid')

plt.figure(figsize=(10, 4))
plt.plot(movavg(att_rewards), label='Attacker reward (MA)', alpha=0.9)
plt.plot(movavg(def_rewards), label='Defender reward (MA)', alpha=0.9)
plt.xlabel("Episode"); plt.ylabel("Reward (moving average)")
plt.title("Reward convergence – value-based dynamic fields + visibility scenarios")
plt.grid(True); plt.legend(); plt.tight_layout()
plt.show()

plt.figure(figsize=(10, 3.6))
bins_a = np.arange(len(attacker_strategies) + 1) - 0.5
bins_d = np.arange(len(defender_strategies) + 1) - 0.5

plt.subplot(1, 2, 1)
plt.hist(att_used, bins=bins_a, edgecolor='black')
plt.xticks(range(len(attacker_strategies)), [s[0].split()[0] for s in attacker_strategies], rotation=45)
plt.title("Attacker strategy frequencies"); plt.grid(True, axis='y')

plt.subplot(1, 2, 2)
plt.hist(def_used, bins=bins_d, edgecolor='black')
plt.xticks(range(len(defender_strategies)), [s[0].split()[0] for s in defender_strategies], rotation=45)
plt.title("Defender strategy frequencies"); plt.grid(True, axis='y')
plt.tight_layout(); plt.show()


# -------------------- Q-values & stability summary --------------------
print("\n Attacker Q-values:")
for i, (name, _) in enumerate(attacker_strategies):
    print(f"  {name:22s} Q={Q_att[i]:.3f}")

print("\n Defender Q-values:")
for i, (name, _) in enumerate(defender_strategies):
    print(f"  {name:22s} Q={Q_def[i]:.3f}")

att_stab = stabilization_episode_att if stabilization_episode_att is not None else "N/A (not stabilized)"
def_stab = stabilization_episode_def if stabilization_episode_def is not None else "N/A (not stabilized)"

print("\n Stabilization episode counts:")
print(f"  Attacker: {att_stab}")
print(f"  Defender: {def_stab}")

plt.figure(figsize=(10,4))
plt.plot(entropy_att_hist, label='Attacker entropy (H)', alpha=0.9)
plt.plot(entropy_def_hist, label='Defender entropy (H)', alpha=0.9)
plt.xlabel("Episode"); plt.ylabel("Entropy (bits)")
plt.title("Strategy distribution entropy – dynamic equilibrium indicator")
plt.legend(); plt.grid(True); plt.tight_layout(); plt.show()

print(f"\nAverage attacker entropy: {np.mean(entropy_att_hist):.3f}")
print(f"Average defender entropy: {np.mean(entropy_def_hist):.3f}")

# --- Final-day Top-K coverage ---
topk_att_cov_final, topk_def_cov_final = compute_topk_coverage(
    values_epss, last_att_alloc, last_def_alloc, k=TOP_K
)
print(f"\nTop-{TOP_K} coverage on final day:")
print(f"  Attacker coverage of top-{TOP_K} values: {topk_att_cov_final*100:.1f}%")
print(f"  Defender retention of top-{TOP_K} values: {topk_def_cov_final*100:.1f}%")


# -------------------- Summary export function --------------------
def main_run():
    global total_episodes, att_rewards, def_rewards, win_flags
    global entropy_att_hist, entropy_def_hist
    global stabilization_episode_att, stabilization_episode_def
    global Q_att, Q_def, attacker_strategies, defender_strategies
    global values_epss, last_att_alloc, last_def_alloc
    global TOP_K, all_dates, SCENARIO

    # Top-K coverage on final day
    topk_att_cov_final, topk_def_cov_final = compute_topk_coverage(
        values_epss, last_att_alloc, last_def_alloc, k=TOP_K
    )

    stab_att = stabilization_episode_att if stabilization_episode_att is not None else np.nan
    stab_def = stabilization_episode_def if stabilization_episode_def is not None else np.nan

    q_att_max, q_def_max = float(np.max(Q_att)), float(np.max(Q_def))
    q_att_min, q_def_min = float(np.min(Q_att)), float(np.min(Q_def))

    ent_att_mean = float(np.mean(entropy_att_hist)) if len(entropy_att_hist) else np.nan
    ent_def_mean = float(np.mean(entropy_def_hist)) if len(entropy_def_hist) else np.nan

    best_att_idx = int(np.argmax(Q_att))
    best_def_idx = int(np.argmax(Q_def))

    results = {
        "scenario": SCENARIO,
        "num_days": len(all_dates),
        "total_episodes": total_episodes,
        "att_reward_mean": float(np.mean(att_rewards)),
        "def_reward_mean": float(np.mean(def_rewards)),
        "win_rate": float(np.mean(win_flags)),
        "stab_episode_att": stab_att,
        "stab_episode_def": stab_def,
        "entropy_att_mean": ent_att_mean,
        "entropy_def_mean": ent_def_mean,
        "entropy_gap": ent_att_mean -
    results = {
        # --- core simulation identifiers ---
        "scenario": SCENARIO,
        "num_days": len(all_dates),
        "total_episodes": total_episodes,

        # --- performance metrics ---
        "att_reward_mean": float(np.mean(att_rewards)),
        "def_reward_mean": float(np.mean(def_rewards)),
        "win_rate": float(np.mean(win_flags)),

        # --- strategic stability and mixing ---
        "stab_episode_att": stab_att,
        "stab_episode_def": stab_def,
        "entropy_att_mean": ent_att_mean,
        "entropy_def_mean": ent_def_mean,
        "entropy_gap": (
            ent_att_mean - ent_def_mean
            if not np.isnan(ent_att_mean) and not np.isnan(ent_def_mean)
            else np.nan
        ),

        # --- learning outcomes ---
        "q_att_max": q_att_max,
        "q_att_min": q_att_min,
        "q_def_max": q_def_max,
        "q_def_min": q_def_min,
        "best_att_strat": attacker_strategies[best_att_idx][0],
        "best_def_strat": defender_strategies[best_def_idx][0],

        # --- final-day top-k coverage ---
        "topk_att_cov_final": float(topk_att_cov_final) if topk_att_cov_final is not None else np.nan,
        "topk_def_cov_final": float(topk_def_cov_final) if topk_def_cov_final is not None else np.nan,

        # --- attacker/defender efficiency ratio ---
        "att_def_reward_ratio": (
            float(np.mean(att_rewards)) / max(1e-9, float(np.mean(def_rewards)))
        ),
    }

    return results


if __name__ == "__main__":
    results = main_run()
    print("\n=== main_run() summary ===")
    for k, v in results.items():
        print(f"{k:28s}: {v}")

    # optional: save reward history for plotting
    out_dir = Path("results")
    out_dir.mkdir(parents=True, exist_ok=True)
    pd.DataFrame({
        "episode": np.arange(len(att_rewards)),
        "att_reward": att_rewards,
        "def_reward": def_rewards
    }).to_csv(out_dir / f"reward_history_{SCENARIO}.csv", index=False)


Run for every month:

In [None]:
# --- Multiple deterministic runs for each month and scenario ---
import importlib, random, numpy as np, pandas as pd, os

# Reload the main simulation script
import rl_value_calendar_fixed_per_day as sim

# Reproducibility
def set_global_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)

# Scenarios and months
scenarios = ["R0", "R1", "R2", "R3"]
months = {
    "january": ("/content/drive/MyDrive/TDK/battlefields/january", "2025-01-01", "2025-01-31"),
    "may":     ("/content/drive/MyDrive/TDK/battlefields/may",     "2025-05-01", "2025-05-31"),
    "august":  ("/content/drive/MyDrive/TDK/battlefields/august",  "2025-08-01", "2025-08-31"),
}

n_runs_per_scenario = 10
all_results = []

# Main loop
for month_name, (base_dir, start, end) in months.items():
    for scenario in scenarios:
        for run_id in range(1, n_runs_per_scenario + 1):
            seed = 1000 * hash(month_name + scenario) % (2**16) + run_id
            set_global_seed(seed)

            # --- Set environment variables for the current run ---
            os.environ["BASE_DIR"] = base_dir
            os.environ["START_DATE"] = start
            os.environ["END_DATE"] = end
            os.environ["SCENARIO"] = scenario

            print(f"\n=== Run: {month_name} | {scenario} | repetition {run_id}/10 | seed={seed} ===")

            # Reload simulation (fresh import for each run)
            importlib.reload(sim)
            result = sim.main_run()

            # Add identification metadata
            result.update({
                "month": month_name,
                "run_id": run_id,
                "seed": seed,
            })
            all_results.append(result)

# --- Save all results to CSV ---
df = pd.DataFrame(all_results)
csv_path = "/content/drive/MyDrive/TDK/results/tdk_batch_results.csv"
os.makedirs(os.path.dirname(csv_path), exist_ok=True)
df.to_csv(csv_path, index=False)
print(f"\n✅ All runs completed successfully! Results saved to: {csv_path}")
display(df.head())

Visualization:

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Load CSV
df = pd.read_csv("/content/drive/MyDrive/TDK/results/tdk_batch_results.csv")

# Keep only R0–R3 scenarios
df = df[df["scenario"].isin(["R0", "R1", "R2", "R3"])]

# Average entropy per scenario
grouped = df.groupby("scenario", as_index=False)[["entropy_att_mean", "entropy_def_mean"]].mean()
grouped = grouped.rename(columns={
    "entropy_att_mean": "Attacker entropy",
    "entropy_def_mean": "Defender entropy"
})

# Plot
sns.set(style="whitegrid", font_scale=1.1)
plt.figure(figsize=(6.5, 4))

sns.barplot(
    data=grouped.melt(id_vars="scenario", var_name="Agent", value_name="Entropy (H)"),
    x="scenario", y="Entropy (H)", hue="Agent",
    palette=["tab:red", "tab:blue"], edgecolor="black"
)

plt.xlabel("Scenario")
plt.ylabel("Average strategic entropy (bits)")
plt.legend(title="", bbox_to_anchor=(1.02, 1), loc="upper left", borderaxespad=0)
plt.tight_layout()
plt.savefig("figures/5.2.pdf", dpi=300, bbox_inches="tight")  # safety raster version
plt.show()


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Load CSV
df = pd.read_csv("/content/drive/MyDrive/TDK/results/tdk_batch_results.csv")

# Keep only R0–R3 scenarios (in case others are present)
df = df[df["scenario"].isin(["R0", "R1", "R2", "R3"])]

# Compute frequency of the most-learned strategies
att_freq = df["best_att_strat"].value_counts(normalize=True).sort_index()
def_freq = df["best_def_strat"].value_counts(normalize=True).sort_index()

# DataFrame for plotting
plot_df = pd.DataFrame({
    "Strategy": list(att_freq.index) + list(def_freq.index),
    "Frequency": list(att_freq.values) + list(def_freq.values),
    "Agent": ["Attacker"] * len(att_freq) + ["Defender"] * len(def_freq)
})

# Plot
sns.set(style="whitegrid", font_scale=1.1)
plt.figure(figsize=(7, 4))

sns.barplot(
    data=plot_df,
    x="Strategy", y="Frequency", hue="Agent",
    palette=["tab:red", "tab:blue"], edgecolor="black"
)

plt.ylabel("Relative frequency")
plt.xlabel("Strategy")
plt.legend(title="", bbox_to_anchor=(1.02, 1), loc="upper left")
plt.xticks(rotation=30, ha="right")
plt.tight_layout()
plt.savefig("figures/5.3.pdf", dpi=300, bbox_inches="tight")  # safe raster backup
plt.show()


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# === 1. Load CSV ===
df = pd.read_csv("/content/drive/MyDrive/TDK/results/tdk_batch_results.csv")

# Keep only R0–R3 scenarios
df = df[df["scenario"].isin(["R0", "R1", "R2", "R3"])]

# === 2. Strategy name mapping ===
attacker_names = {
    "A1": "Even allocation",
    "A2": "Value-proportional",
    "A3": "Top-3 targeting",
    "A4": "Random allocation",
    "A5": "All-in (Top-1)"
}
defender_names = {
    "D1": "Even allocation",
    "D2": "Value-proportional",
    "D3": "Top-3 defense",
    "D4": "Random allocation",
    "D5": "Predictive counter",
    "D6": "All-in (Top-1)"
}

# === 3. Compute relative frequencies ===
att_freq = df["best_att_strat"].value_counts(normalize=True)
def_freq = df["best_def_strat"].value_counts(normalize=True)

# Keep only the three most frequent strategies for each agent
att_top3 = att_freq.head(3)
def_top3 = def_freq.head(3)

# === 4. Prepare DataFrame for plotting (ID + name) ===
plot_df = pd.DataFrame({
    "Strategy": (
        [f"{k} – {attacker_names.get(k, k)}" for k in att_top3.index] +
        [f"{k} – {defender_names.get(k, k)}" for k in def_top3.index]
    ),
    "Frequency": list(att_top3.values) + list(def_top3.values),
    "Agent": ["Attacker"] * len(att_top3) + ["Defender"] * len(def_top3)
})

# === 5. Plot ===
sns.set(style="whitegrid", font_scale=1.1)
plt.figure(figsize=(8, 4))

sns.barplot(
    data=plot_df,
    x="Strategy", y="Frequency", hue="Agent",
    palette=["tab:red", "tab:blue"], edgecolor="black"
)

plt.ylabel("Relative frequency")
plt.xlabel("Strategy")
plt.legend(title="", bbox_to_anchor=(1.02, 1), loc="upper left")
plt.xticks(rotation=30, ha="right")
plt.tight_layout()
plt.savefig("figures/5.3.pdf", dpi=300, bbox_inches="tight")  # safe raster backup
plt.show()


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Load data
df = pd.read_csv("/content/drive/MyDrive/TDK/results/tdk_batch_results.csv")

# Keep only relevant metrics
plot_df = df[["scenario", "topk_att_cov_final", "topk_def_cov_final"]]

# Convert to long format
melted = plot_df.melt(
    id_vars="scenario",
    var_name="Agent",
    value_name="Coverage"
)

# Relabel columns
melted["Agent"] = melted["Agent"].map({
    "topk_att_cov_final": "Attacker",
    "topk_def_cov_final": "Defender"
})

# Plot settings
sns.set(style="whitegrid", font_scale=1.1)
plt.figure(figsize=(7, 4))

sns.barplot(
    data=melted,
    x="scenario", y="Coverage", hue="Agent",
    palette=["tab:red", "tab:blue"], edgecolor="black"
)

plt.ylabel("Top-3 coverage")
plt.xlabel("Information regime")
plt.ylim(0, 1)  # 0–1 scale (represents 0–100%)
plt.legend(title="", bbox_to_anchor=(1.02, 1), loc="upper left")
plt.tight_layout()
plt.savefig("figures/5.4.pdf", dpi=300, bbox_inches="tight")  # safe raster backup
plt.show()


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os

# Load CSV
df = pd.read_csv("/content/drive/MyDrive/TDK/results/tdk_batch_results.csv")
df["rho"] = df["att_reward_mean"] / df["def_reward_mean"]

# Custom, clearly distinguishable colors (especially for R1 and R3)
palette = {
    "R0": "#1f77b4",  # blue
    "R1": "#e41a1c",  # bright red
    "R2": "#984ea3",  # deep purple
    "R3": "#4daf4a",  # green
}

sns.set(style="whitegrid", font_scale=1.1)
plt.figure(figsize=(7, 4))

sns.pointplot(
    data=df,
    x="month", y="rho", hue="scenario",
    palette=palette, dodge=0.3,
    markers="o", linestyles="-", errorbar=("sd")
)

plt.ylabel(r"Reward ratio ($\rho$)")
plt.xlabel("Month")
plt.ylim(0, 0.25)
plt.legend(title="Scenario", bbox_to_anchor=(1.02, 1), loc="upper left")
plt.tight_layout()

os.makedirs("figures", exist_ok=True)
plt.savefig("figures/5.6.png", dpi=300)
plt.savefig("figures/5.6.pdf", dpi=300, bbox_inches="tight")  # safe raster backup
plt.show()


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# --- Data ---
scenarios = ["R0", "R1", "R2", "R3"]
attacker = [1.24, 0.71, 1.41, 0.70]
defender = [8.40, 8.98, 8.16, 9.00]
ratio = [0.15, 0.08, 0.17, 0.08]

x = np.arange(len(scenarios))
width = 0.35

# --- Dark, print-friendly colors ---
color_red = "tab:red"    # attacker
color_blue = "tab:blue"  # defender
color_green = "#1a9850"  # ratio line

# --- Figure setup ---
fig, ax1 = plt.subplots(figsize=(7.5, 4))

# The 'edgecolor' and 'linewidth' parameters add visible outlines
bar_att = ax1.bar(
    x - width / 2, attacker, width,
    color=color_red, edgecolor="black", linewidth=0.8,
    label="Attacker ($\\bar{R}_A$)"
)
bar_def = ax1.bar(
    x + width / 2, defender, width,
    color=color_blue, edgecolor="black", linewidth=0.8,
    label="Defender ($\\bar{R}_D$)"
)

ax1.set_ylabel(r"Reward ($\bar{R}_A$, $\bar{R}_D$)", fontsize=11)
ax1.set_xlabel("Scenario", fontsize=11)
ax1.set_xticks(x)
ax1.set_xticklabels(scenarios, fontsize=10)

# Secondary axis (green line for ratio)
ax2 = ax1.twinx()
line_ratio, = ax2.plot(
    x, ratio, marker="o", color=color_green, linewidth=2.2,
    label=r"Reward ratio ($\rho$)"
)
ax2.set_ylabel(r"Reward ratio ($\rho$)", fontsize=11)
ax2.set_ylim(0, 0.25)

# Legend
handles = [bar_att, bar_def, line_ratio]
labels = [h.get_label() for h in handles]
ax1.legend(
    handles, labels,
    title="Metric",
    bbox_to_anchor=(1.15, 0.9),
    loc="upper left",
    fontsize=10,
    title_fontsize=10,
    frameon=False
)

ax1.grid(True, axis="y", linestyle="--", alpha=0.6)
plt.subplots_adjust(right=0.83, left=0.1, bottom=0.12, top=0.95)

# --- Save for thesis ---
plt.savefig("figures/5.5.pdf", bbox_inches="tight")   # vector graphic
plt.savefig("figures/5.5.png", dpi=300, bbox_inches="tight")  # raster backup
plt.show()
