In [1]:
from pathlib import Path
import pandas as pd
import hashlib
from dataclasses import dataclass
import ast
import ipaddress

In [2]:
@dataclass(frozen=True)
class BuildConfig:
    golden_id: str = "episode_016"

    duplicate_prone_streams: tuple = ("cisco_ise.log", "cisco_asa.log")
    evidence_hash_len: int = 16

    episodes_dirname: str = "episodes"   
    dataset_dirname: str = "data"        
    unified_parquet_name: str = "episodes_all_baseline.parquet"

cfg = BuildConfig()

NOTEBOOK_DIR = Path.cwd()
PROJECT_DIR  = NOTEBOOK_DIR.parent

OUTPUT_DIR = PROJECT_DIR / cfg.episodes_dirname
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

DATASET_DIR = PROJECT_DIR / cfg.dataset_dirname
DATASET_DIR.mkdir(parents=True, exist_ok=True)

In [3]:
def short_hash(x) -> str:
    if x is None:
        return "none"
    try:
        if pd.isna(x):
            return "none"
    except Exception:
        pass

    s = str(x).strip()
    if not s or s.lower() in ("nan", "none"):
        return "none"
    return hashlib.sha256(s.encode("utf-8")).hexdigest()[:12]


def _is_empty(x) -> bool:
    if x is None:
        return True
    try:
        if pd.isna(x):
            return True
    except Exception:
        pass
    s = str(x).strip().lower()
    return s in ("", "nan", "none")


def as_list(x) -> list[str]:
    if _is_empty(x):
        return []

    if isinstance(x, (list, tuple, set)):
        return [str(v) for v in x if not _is_empty(v)]

    if isinstance(x, str):
        s = x.strip()
        if s.startswith("[") and s.endswith("]"):
            try:
                parsed = ast.literal_eval(s)
                if isinstance(parsed, list):
                    return [str(v) for v in parsed if not _is_empty(v)]
            except Exception:
                pass

    return [str(x)]


def norm_token(x, default: str = "unknown") -> str:
    if _is_empty(x):
        return default
    return str(x).strip().lower()


def join_tokens(x, default: str = "unknown") -> str:
    vals = [str(v).strip().lower() for v in as_list(x)]
    vals = [v for v in vals if v and v not in ("nan", "none")]
    if not vals:
        return default
    vals = sorted(set(vals))
    return "+".join(vals)


def dport_bucket(dport_raw) -> str:
    d = str(dport_raw).split(".")[0].strip()
    if d in ("22", "2222"):
        return "ssh"
    if d.lower() in ("", "nan", "none"):
        return "unknown"
    return "*"


def safe_col(df: pd.DataFrame, col: str, default="") -> pd.Series:
    if col in df.columns:
        return df[col]
    return pd.Series([default] * len(df), index=df.index)

In [4]:
def build_masked_message_cl(row: dict) -> str:
    ds = (str(row.get("data_stream.dataset", "")).strip() or "unknown_stream").lower()

    # Generic semantic tokens
    act   = join_tokens(row.get("event.action"),   default="unknown")
    cat   = join_tokens(row.get("event.category"), default="unknown")
    out   = norm_token(row.get("event.outcome"),   default="unknown")
    code  = norm_token(row.get("event.code"),      default="none")

    # Semantic richness
    kind  = join_tokens(row.get("event.kind"), default="unknown")
    etype = join_tokens(row.get("event.type"), default="unknown")

    # Network-ish tokens
    tr    = norm_token(row.get("network.transport"), default="unknown")
    direction = norm_token(row.get("network.direction"), default="unknown")
    dp    = dport_bucket(row.get("destination.port", ""))

    msg = norm_token(row.get("message"), default="")

    # Base prefix
    base = f"[{ds}] kind={kind} type={etype} cat={cat} act={act} out={out} code={code} dport={dp} tr={tr} dir={direction}"

    # elastic_agent.filebeat 
    if ds == "elastic_agent.filebeat":
        lvl = norm_token(row.get("log.level"), default="unknown")
        return f"{base} lvl={lvl}"

    # system.auth 
    if ds == "system.auth":
        proc = norm_token(row.get("process.name"), default="unknown")
        ssh_method = norm_token(row.get("system.auth.ssh.method"), default="unknown")

        cat_list = [t.lower() for t in as_list(row.get("event.category"))]
        act_list = [t.lower() for t in as_list(row.get("event.action"))]

        ev = "other"
        if "connection closed" in msg or "connection reset" in msg or "broken pipe" in msg:
            ev = "conn_err"
        elif "authentication" in cat_list and out == "failure":
            ev = "auth_fail"
        elif "authentication" in cat_list and out == "success":
            ev = "auth_ok"

        if "sshd" in proc:
            if "authentication" in cat_list and any(a in ("ssh_login", "user_login") for a in act_list):
                if "password" in ssh_method:
                    return f"{base} proc=sshd ssh_method=password ev={ev} user=* from=*"
                return f"{base} proc=sshd ssh_method=other ev={ev} user=* from=*"

            return f"{base} proc=sshd ssh_method={ssh_method} ev={ev} user=* from=*"

        if "sudo" in proc:
            return f"{base} proc=sudo ev={ev} user=* cmd=*"
        if "systemd" in proc:
            return f"{base} proc=systemd ev={ev}"

        return f"{base} proc=other ev={ev}"

    # endpoint.events.* 
    if ds == "endpoint.events.network" or "endpoint.events" in ds:
        act_n = str(len([p for p in act.split("+") if p.strip()])) if act != "unknown" else "unknown"
        return f"{base} act_n={act_n}"

    # panw.panos 
    if ds == "panw.panos":
        pan_act = norm_token(row.get("panw.panos.action"), default="")
        if pan_act not in ("", "unknown", "none"):
            fw = pan_act
        else:
            fw = "allow" if act in ("flow_started", "flow-started", "flow-created", "flow_created", "allow", "permit") else "deny"

        pan_type = norm_token(row.get("panw.panos.type"), default="unknown")
        pan_sub  = norm_token(row.get("panw.panos.sub_type"), default="unknown")
        endr     = norm_token(row.get("panw.panos.endreason"), default="unknown")
        app      = norm_token(row.get("network.application"), default="unknown")

        return f"{base} fw={fw} ptype={pan_type} psub={pan_sub} endr={endr} app={app}"

    # cisco_asa.log 
    if ds == "cisco_asa.log":
        term = norm_token(row.get("cisco.asa.termination_initiator"), default="unknown")
        return f"{base} term={term}"

    # cisco_ise.log 
    if ds == "cisco_ise.log":
        pkt  = norm_token(row.get("cisco_ise.log.radius.packet.type"), default="unknown")
        step = norm_token(row.get("cisco_ise.log.step"), default="unknown")
        authm = norm_token(row.get("cisco_ise.log.authentication.method"), default="unknown")
        svc   = norm_token(row.get("service.type"), default="unknown")

        ise_ev = "generic"
        if "accounting" in msg:
            ise_ev = "accounting"
        elif "success" in out or "passed" in act:
            ise_ev = "auth_ok"
        elif "fail" in out or "failure" in out or "failed" in msg or "failure" in msg:
            ise_ev = "auth_fail"

        return f"{base} ise_ev={ise_ev} pkt={pkt} step={step} authm={authm} svc={svc} user=* nas=*"

    # system.security
    if ds == "system.security":
        logon_type = norm_token(row.get("winlog.logon.type"), default="unknown")
        return f"{base} logon={logon_type}"

    return base

In [5]:
def is_ip(s: str) -> bool:
    try:
        ipaddress.ip_address(str(s).strip())
        return True
    except Exception:
        return False

def extract_actor_ip(row: dict) -> str:
    ds = str(row.get("data_stream.dataset", "")).strip().lower()

    def _first_ip(*vals) -> str:
        for v in vals:
            for x in as_list(v):
                s = str(x).strip()
                if s and is_ip(s):
                    return s
        return "none"


    if ds != "cisco_ise.log":
        return _first_ip(row.get("source.ip"), row.get("source.address"))


    framed_ip = _first_ip(
        row.get("cisco_ise.log.framed.ip"),
    )
    if framed_ip != "none":
        return framed_ip

    client_ip = _first_ip(row.get("client.ip"))
    rel_ips = [str(x).strip() for x in as_list(row.get("related.ip")) if is_ip(str(x).strip())]

    if not rel_ips:
        return _first_ip(row.get("source.ip"), row.get("source.address"))

    if client_ip != "none":
        non_client = [ip for ip in rel_ips if ip != client_ip]
        if non_client:
            return non_client[-1]

    return rel_ips[-1]

In [6]:
def process_episode_folder(folder_path: Path, prefix: str, idx: int, output_dir: Path) -> None:
    episode_id = f"{prefix}_{idx:03d}"
    print(f"\nProcessing {folder_path.name} → {episode_id}")

    dfs = []
    for fp in sorted(folder_path.glob("*.json")):
        print(f"   Reading {fp.name}")
        df = pd.read_json(fp)
        if "_source" in df.columns:
            df = pd.json_normalize(df["_source"])
        else:
            df = pd.json_normalize(df.to_dict(orient="records"))
        dfs.append(df)

    if not dfs:
        print(f"   WARNING: no JSON files in {folder_path}")
        return

    df = pd.concat(dfs, ignore_index=True)

    df["episode_id"] = episode_id
    df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")

    df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)

    df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})

    src_ip = safe_col(df, "source.ip")
    src_addr = safe_col(df, "source.address")
    df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)

    df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)

    dst_ip = safe_col(df, "destination.ip")
    dst_addr = safe_col(df, "destination.address")
    df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)

    host_col = safe_col(df, "host.hostname")
    df["host_anon"] = host_col.astype(object).apply(short_hash)

    df["process_name"] = safe_col(df, "process.name").astype(str)
    df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
    df["dport"]        = safe_col(df, "destination.port").astype(str)

    df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
    df["event_action"]   = safe_col(df, "event.action").apply(lambda x: join_tokens(x, default="")).astype(str)
    df["event_outcome"]  = safe_col(df, "event.outcome").astype(str)
    df["event_code"]     = safe_col(df, "event.code").astype(str)

    df["asa_conn_id"] = safe_col(df, "cisco.asa.connection_id").astype(str).replace({"nan": "", "None": ""})

    for col in ("gt_core", "gt_extended"):
        if col in df.columns:
            df[col] = df[col].fillna(False).astype(bool)
        else:
            df[col] = False

    final = df[[
        "episode_id", "timestamp", "stream", "process_name", "masked_message_cl",
        "src_ip_anon", "actor_ip_anon", "dst_ip_anon", "host_anon",
        "event_category", "event_action", "event_outcome", "event_code",
        "dport", "ssh_method", "asa_conn_id",
        "gt_core", "gt_extended",
    ]].copy()

    output_dir.mkdir(parents=True, exist_ok=True)
    out_path = output_dir / f"{episode_id}.parquet"
    final.to_parquet(out_path, index=False)
    print(f"   SAVED {out_path.name} ({len(final):,} logs)")

In [7]:
# ALERT EPISODES
alert_folders = sorted([p for p in PROJECT_DIR.iterdir() if p.is_dir() and p.name.startswith("Alert ")])
for folder in alert_folders:
    episode_num = int(folder.name.split()[-1])
    process_episode_folder(folder, prefix="episode", idx=episode_num, output_dir=OUTPUT_DIR)

# BACKGROUND EPISODES
bg_folders = sorted([p for p in PROJECT_DIR.iterdir() if p.is_dir() and p.name.startswith("Background ")])
for i, folder in enumerate(bg_folders, start=1):
    process_episode_folder(folder, prefix="background", idx=i, output_dir=OUTPUT_DIR)

print("\nALERT + BACKGROUND EPISODES READY")


Processing Alert 1 → episode_001
   Reading asa_alert1.json
   Reading endpoint_events_network_alert1.json
   Reading filebeat_alert1.json
   Reading ise_alert1.json
   Reading panos_alert1.json
   Reading system_auth_sshd_alert1.json
   Reading system_auth_sudo_systemd_alert1.json
   Reading system_security_alert1.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED episode_001.parquet (29,572 logs)

Processing Alert 10 → episode_010
   Reading asa_alert10.json
   Reading endpoint_events_network_alert10.json
   Reading filebeat_alert10.json
   Reading ise_alert10.json
   Reading panos_alert10.json
   Reading system_auth_sshd_alert10.json
   Reading system_auth_sudo_systemd_alert10.json
   Reading system_security_alert10.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED episode_010.parquet (29,999 logs)

Processing Alert 11 → episode_011
   Reading asa_alert11.json
   Reading endpoint_events_network_alert11.json
   Reading filebeat_alert11.json
   Reading ise_alert11.json
   Reading panos_alert11.json
   Reading system_auth_sshd_alert11.json
   Reading system_auth_sudo_systemd_alert11.json
   Reading system_security_alert11.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED episode_011.parquet (29,997 logs)

Processing Alert 12 → episode_012
   Reading asa_alert12.json
   Reading endpoint_events_network_alert12.json
   Reading filebeat_alert12.json
   Reading ise_alert12.json
   Reading panos_alert12.json
   Reading system_auth_sshd_alert12.json
   Reading system_auth_sudo_systemd_alert12.json
   Reading system_security_alert12.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED episode_012.parquet (29,998 logs)

Processing Alert 13 → episode_013
   Reading asa_alert13.json
   Reading endpoint_events_network_alert13.json
   Reading filebeat_alert13.json
   Reading ise_alert13.json
   Reading panos_alert13.json
   Reading system_auth_sshd_alert13.json
   Reading system_auth_sudo_systemd_alert13.json
   Reading system_security_alert13.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED episode_013.parquet (29,997 logs)

Processing Alert 14 → episode_014
   Reading asa_alert14.json
   Reading endpoint_events_network_alert14.json
   Reading filebeat_alert14.json
   Reading ise_alert14.json
   Reading panos_alert7.json
   Reading system_auth_sshd_alert14.json
   Reading system_auth_sudo_systemd_alert14.json
   Reading system_security_alert14.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED episode_014.parquet (30,327 logs)

Processing Alert 15 → episode_015
   Reading asa_alert15.json
   Reading endpoint_events_network_alert15.json
   Reading filebeat_alert15.json
   Reading ise_alert15.json
   Reading panos_alert15.json
   Reading system_auth_sshd_alert15.json
   Reading system_auth_sudo_systemd_alert15.json
   Reading system_security_alert15.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED episode_015.parquet (30,000 logs)

Processing Alert 2 → episode_002
   Reading asa_alert2.json
   Reading endpoint_events_network_alert2.json
   Reading filebeat_alert2.json
   Reading ise_alert2.json
   Reading panos_alert2.json
   Reading system_auth_sshd_alert2.json
   Reading system_auth_sudo_systemd_alert2.json
   Reading system_security_alert2.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED episode_002.parquet (29,997 logs)

Processing Alert 3 → episode_003
   Reading asa_alert3.json
   Reading endpoint_events_network_alert3.json
   Reading filebeat_alert3.json
   Reading ise_alert3.json
   Reading panos_alert3.json
   Reading system_auth_sshd_alert3.json
   Reading system_auth_sudo_systemd_alert3.json
   Reading system_security_alert3.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED episode_003.parquet (30,000 logs)

Processing Alert 4 → episode_004
   Reading asa_alert4.json
   Reading endpoint_events_network_alert4.json
   Reading filebeat_alert4.json
   Reading ise_alert4.json
   Reading panos_alert4.json
   Reading system_auth_sshd_alert4.json
   Reading system_auth_sudo_systemd_alert4.json
   Reading system_security_alert4.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED episode_004.parquet (29,999 logs)

Processing Alert 5 → episode_005
   Reading asa_alert5.json
   Reading endpoint_events_network_alert5.json
   Reading filebeat_alert5.json
   Reading ise_alert5.json
   Reading panos_alert5.json
   Reading system_auth_sshd_alert5.json
   Reading system_auth_sudo_systemd_alert5.json
   Reading system_security_alert5.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED episode_005.parquet (29,998 logs)

Processing Alert 6 → episode_006
   Reading asa_alert6.json
   Reading endpoint_events_network_alert6.json
   Reading filebeat_alert6.json
   Reading ise_alert6.json
   Reading panos_alert6.json
   Reading system_auth_sshd_alert6.json
   Reading system_auth_sudo_systemd_alert6.json
   Reading system_security_alert6.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED episode_006.parquet (29,997 logs)

Processing Alert 7 → episode_007
   Reading asa_alert7.json
   Reading endpoint_events_network_alert7.json
   Reading filebeat_alert7.json
   Reading ise_alert7.json
   Reading panos_alert7.json
   Reading system_auth_sshd_alert7.json
   Reading system_auth_sudo_systemd_alert7.json
   Reading system_security_alert7.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED episode_007.parquet (29,999 logs)

Processing Alert 8 → episode_008
   Reading asa_alert8.json
   Reading endpoint_events_network_alert8.json
   Reading filebeat_alert8.json
   Reading ise_alert8.json
   Reading panos_alert8.json
   Reading system_auth_sshd_alert8.json
   Reading system_auth_sudo_systemd_alert8.json
   Reading system_security_alert8.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED episode_008.parquet (29,998 logs)

Processing Alert 9 → episode_009
   Reading asa_alert9.json
   Reading endpoint_events_network_alert9.json
   Reading filebeat_alert9.json
   Reading ise_alert9.json
   Reading panos_alert9.json
   Reading system_auth_sshd_alert9.json
   Reading system_auth_sudo_systemd_alert9.json
   Reading system_security_alert9.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED episode_009.parquet (30,000 logs)

Processing Background 1 → background_001
   Reading asa_alert16.json
   Reading endpoint_events_network_alert16.json
   Reading filebeat_alert16.json
   Reading ise_alert16.json
   Reading panos_alert16.json
   Reading system_auth_sshd_alert16.json
   Reading system_auth_sudo_systemd_alert16.json
   Reading system_security_alert16.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED background_001.parquet (30,000 logs)

Processing Background 2 → background_002
   Reading asa_alert17.json
   Reading endpoint_events_network_alert17.json
   Reading filebeat_alert17.json
   Reading ise_alert17.json
   Reading panos_alert17.json
   Reading system_auth_sshd_alert17.json
   Reading system_auth_sudo_systemd_alert17.json
   Reading system_security_alert17.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED background_002.parquet (30,000 logs)

Processing Background 3 → background_003
   Reading asa_alert18.json
   Reading endpoint_events_network_alert18.json
   Reading filebeat_alert18.json
   Reading ise_alert18.json
   Reading panos_alert18.json
   Reading system_auth_sshd_alert18.json
   Reading system_auth_sudo_systemd_alert18.json
   Reading system_security_alert18.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED background_003.parquet (29,927 logs)

ALERT + BACKGROUND EPISODES READY


In [8]:
# GOLDEN EPISODE
golden_folder = sorted([p for p in PROJECT_DIR.iterdir() if p.is_dir() and p.name.startswith("Golden")])
for folder in golden_folder:
    episode_num = 16
    process_episode_folder(folder, prefix="episode", idx=episode_num, output_dir=OUTPUT_DIR)


Processing Golden → episode_016
   Reading asa_golden.json
   Reading endpoint_events_network_golden.json
   Reading filebeat_golden.json
   Reading ise_attacker_golden.json
   Reading ise_rest_golden.json
   Reading panos_attacker_golden.json
   Reading panos_rest_golden.json
   Reading system_auth_sshd_attacker_golden.json
   Reading system_auth_sshd_rest_golden.json
   Reading system_auth_sudo_systemd_golden.json
   Reading system_security_attacker_golden.json
   Reading system_security_rest_golden.json


  df["episode_id"] = episode_id
  df["timestamp"] = pd.to_datetime(df["@timestamp"], utc=True, errors="coerce")
  df["masked_message_cl"] = df.apply(lambda r: build_masked_message_cl(r.to_dict()), axis=1)
  df["stream"] = safe_col(df, "data_stream.dataset").astype(str).replace({"nan": "unknown_stream", "None": "unknown_stream"})
  df["src_ip_anon"] = src_ip.fillna(src_addr).astype(object).apply(short_hash)
  df["actor_ip_anon"] = df.apply(lambda r: extract_actor_ip(r.to_dict()), axis=1).apply(short_hash)
  df["dst_ip_anon"] = dst_ip.fillna(dst_addr).astype(object).apply(short_hash)
  df["host_anon"] = host_col.astype(object).apply(short_hash)
  df["process_name"] = safe_col(df, "process.name").astype(str)
  df["ssh_method"]   = safe_col(df, "system.auth.ssh.method").astype(str)
  df["dport"]        = safe_col(df, "destination.port").astype(str)
  df["event_category"] = safe_col(df, "event.category").apply(lambda x: join_tokens(x, default="")).astype(str)
  df["event_action"]   = safe_c

   SAVED episode_016.parquet (29,998 logs)


In [9]:
all_files = sorted(OUTPUT_DIR.glob("*.parquet"))
dfs = [pd.read_parquet(f) for f in all_files]
episodes_df = pd.concat(dfs, ignore_index=True)

def episode_type(eid: str, golden_id: str) -> str:
    if eid == golden_id:
        return "golden"
    if eid.startswith("episode_"):
        return "alert"
    if eid.startswith("background_"):
        return "background"
    return "unknown"

def bucket_dport(val: str) -> str:
    val = str(val).split(".")[0]
    if val in ("22", "2222"):
        return "ssh"
    if val in ("", "nan", "None"):
        return "unknown"
    return "other"

episodes_df["episode_type"] = episodes_df["episode_id"].astype(str).apply(lambda x: episode_type(x, cfg.golden_id))

episodes_df["dport_bucket"] = episodes_df.get("dport", "").astype(str).apply(bucket_dport)
episodes_df["event_outcome_norm"] = (
    episodes_df.get("event_outcome", "")
    .astype(str)
    .str.lower()
    .replace({"nan": "", "none": ""})
)
episodes_df["stream"] = episodes_df["stream"].astype(str).replace({"nan": "unknown_stream"})

In [10]:
def make_evidence_id(df: pd.DataFrame, cfg) -> pd.Series:
    df = df.copy()

    evidence = pd.Series("row_" + df.index.astype(str), index=df.index)

    mask = df["stream"].isin(cfg.duplicate_prone_streams)

    asa_mask = mask & (df["stream"] == "cisco_asa.log")
    if asa_mask.any():
        cols = [
            "timestamp", "stream",
            "asa_conn_id",              
            "src_ip_anon", "dst_ip_anon", "host_anon",
            "event_action", "event_outcome", "event_code",
            "dport",
        ]
        use = [c for c in cols if c in df.columns]
        key = df.loc[asa_mask, use].fillna("").astype(str).agg("|".join, axis=1)
        sig = key.map(lambda s: hashlib.sha256(s.encode("utf-8")).hexdigest()[: cfg.evidence_hash_len])
        evidence.loc[asa_mask] = sig

    ise_mask = mask & (df["stream"] == "cisco_ise.log")
    if ise_mask.any():
        cols = [
            "timestamp", "stream", "masked_message_cl",
            "src_ip_anon", "dst_ip_anon", "host_anon",
            "event_action", "event_outcome", "event_code",
            "dport",
        ]
        use = [c for c in cols if c in df.columns]
        key = df.loc[ise_mask, use].fillna("").astype(str).agg("|".join, axis=1)
        sig = key.map(lambda s: hashlib.sha256(s.encode("utf-8")).hexdigest()[: cfg.evidence_hash_len])
        evidence.loc[ise_mask] = sig

    return evidence

In [11]:
episodes_df["evidence_id"] = make_evidence_id(episodes_df, cfg)

In [12]:
episodes_df

Unnamed: 0,episode_id,timestamp,stream,process_name,masked_message_cl,src_ip_anon,actor_ip_anon,dst_ip_anon,host_anon,event_category,...,event_code,dport,ssh_method,asa_conn_id,gt_core,gt_extended,episode_type,dport_bucket,event_outcome_norm,evidence_id
0,background_001,2025-09-25 22:00:00.494000+00:00,cisco_asa.log,,[cisco_asa.log] kind=event type=connection+end...,5644b8637bd0,5644b8637bd0,1cd7d76ee854,485661e4763f,network,...,302014,22.0,,4964540,False,False,background,ssh,,922626fa84d0f2d0
1,background_001,2025-09-25 22:00:02.065000+00:00,cisco_asa.log,,[cisco_asa.log] kind=event type=connection+sta...,3ae69698411f,3ae69698411f,613457f360a8,d25837bc88b6,network,...,302013,22.0,,2793740973,False,False,background,ssh,success,e152cd0486f94fee
2,background_001,2025-09-25 22:00:02.576000+00:00,cisco_asa.log,,[cisco_asa.log] kind=event type=connection+sta...,3ae69698411f,3ae69698411f,613457f360a8,d25837bc88b6,network,...,302013,22.0,,2793741050,False,False,background,ssh,success,c3ebb20a3c729dde
3,background_001,2025-09-25 22:00:02.577000+00:00,cisco_asa.log,,[cisco_asa.log] kind=event type=connection+sta...,3ae69698411f,3ae69698411f,3037eb8cffb1,d25837bc88b6,network,...,302013,22.0,,2793741055,False,False,background,ssh,success,68aaafc72c7cce9c
4,background_001,2025-09-25 22:00:02.861000+00:00,cisco_asa.log,,[cisco_asa.log] kind=event type=connection+end...,b79cefbb974c,b79cefbb974c,293c291ae824,485661e4763f,network,...,302014,22.0,,4965436,False,False,background,ssh,,a63103a021a3828a
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
569798,episode_016,2025-10-16 07:53:00.013000+00:00,system.security,,[system.security] kind=event type=admin cat=ia...,none,none,none,cf9c2f259a30,iam,...,4672,,,,False,False,golden,unknown,success,row_569798
569799,episode_016,2025-10-16 07:53:00.013000+00:00,system.security,,[system.security] kind=event type=admin cat=ia...,none,none,none,cf9c2f259a30,iam,...,4672,,,,False,False,golden,unknown,success,row_569799
569800,episode_016,2025-10-16 07:53:00.013000+00:00,system.security,,[system.security] kind=event type=admin cat=ia...,none,none,none,cf9c2f259a30,iam,...,4672,,,,False,False,golden,unknown,success,row_569800
569801,episode_016,2025-10-16 07:53:00.013000+00:00,system.security,,[system.security] kind=event type=admin cat=ia...,none,none,none,cf9c2f259a30,iam,...,4672,,,,False,False,golden,unknown,success,row_569801


In [15]:
unified_path = DATASET_DIR / cfg.unified_parquet_name
episodes_df.to_parquet(unified_path, index=False)
print(f"\nSAVED unified episodes_df → {unified_path} ({len(episodes_df):,} rows)")


SAVED unified episodes_df → C:\Users\patri\OneDrive\Documentos\MASTER THESIS\FRAMEWORK\2025-10-16T07_27Z_ssh_alert_01\Data Extraction\data\episodes_all_baseline.parquet (569,803 rows)
