In [13]:
#!/usr/bin/env python3
"""
convert_analytic_plans.py

Utility to transform legacy analytic plan JSON files into the new, JSON-friendly schema.

Old (legacy) structure examples:

1. Dictionary form (single IR):
{
    "Has the adversary gained initial access using valid accounts? (TA0001 - Initial Access)": {
        "Indicators": {
            "T1078 - Valid Accounts": {
                "Evidence description": {
                    "Data": "Windows Event ID 4624; Windows Event ID 4625",
                    "Data Platform": "Servers",
                    "NAI": "Insert site-specific NAI here",
                    "Action": "..."
                }
            }
        }
    },
    "version": "2.0",
    "last_updated": "2025-05-11"
}

2. List of per-IR objects (each object contains one IR key plus version metadata):
[
  {
    "Has the adversary gained initial access using valid accounts? (TA0001 - Initial Access)": { ... },
    "version": "2.0",
    "last_updated": "2025-05-11"
  },
  {
     ...
  }
]

New target structure (list of normalized objects):
[
  {
    "information_requirement": "Has the adversary gained initial access using valid accounts?",
    "tactic_id": "TA0001",
    "tactic_name": "Initial Access",
    "indicators": [
        {
            "technique_id": "T1078",
            "name": "Valid Accounts",
            "evidence": [
                {
                    "description": "Evidence description",
                    "data_sources": ["Windows Event ID 4624", "Windows Event ID 4625"],
                    "data_platforms": ["Servers"],
                    "nai": "Insert site-specific NAI here",
                    "action": "..."
                }
            ]
        }
    ],
    "version": "2.1",
    "date_created": "2025-05-04",
    "last_updated": "2025-07-20",
    "contributors": ["Zachary Szewczyk"]
  }
]

Usage:
    python convert_analytic_plans.py <input_path> [--output OUTPUT_FILE] [--outdir OUTPUT_DIR] [--indent 2]

* <input_path> may be a single JSON file or a directory containing multiple JSON files.
* If <input_path> is a directory, each *.json file will be transformed and written to the output directory.
* By default the transformed filename appends `_v2` before the extension.

Example:
    python convert_analytic_plans.py old_plans/ --outdir new_plans/

Notes:
    - The script is intentionally conservative: it does not attempt to infer or validate MITRE IDs beyond basic regex patterns.
    - Evidence items are generated one-to-one from the legacy evidence description keys under each technique.
    - Data sources / platforms are split on semicolons or commas, trimmed, and empty segments removed.
"""
from __future__ import annotations

import argparse
import json
import os
import re
import sys
from pathlib import Path
from typing import Any, Dict, List

# Constants per specification
NEW_VERSION = "2.1"
DATE_CREATED = "2025-05-04"  # fixed value for all converted objects
LAST_UPDATED = "2025-07-20"   # fixed value for all converted objects
CONTRIBUTORS = ["Zachary Szewczyk"]

# ── Regexes ──────────────────────────────────────────────────────────────
# e.g. "Has the adversary…? (TA0001 - Initial Access)"
TACTIC_PATTERN = re.compile(
    r"^(?P<ir>.+?)\s*"
    r"\("
    r"(?P<tactic_id>TA\d{4})\s*-\s*(?P<tactic_name>[^)]+)"
    r"\)$"
)

# e.g. "T1078 - Valid Accounts"  OR  "T1055.009 - Proc Injection-X"
TECHNIQUE_PATTERN = re.compile(
    r"^(?P<technique_id>T\d{4}(?:\.\d{3})?)\s*-\s*(?P<technique_name>.+)$"
)

LEGACY_METADATA_KEYS = {"version", "last_updated"}


def split_list(value: str) -> List[str]:
    """Split a semicolon- or comma-delimited string into a list of trimmed tokens.

    Empty tokens are discarded. Returns an empty list if input is falsy.
    """
    if not value or not isinstance(value, str):
        return []
    # First replace semicolons with commas for uniform splitting
    unified = value.replace(";", ",")
    parts = [p.strip() for p in unified.split(",")]
    return [p for p in parts if p]


def parse_ir_key(ir_key: str) -> Dict[str, str]:
    """Extract information requirement text and tactic metadata from the legacy IR key.

    Raises ValueError if the pattern does not match.
    """
    match = TACTIC_PATTERN.match(ir_key)
    if not match:
        raise ValueError(f"IR key does not match expected pattern with tactic parentheses: {ir_key}")
    return {
        "information_requirement": match.group("ir").strip(),
        "tactic_id": match.group("tactic_id").strip(),
        "tactic_name": match.group("tactic_name").strip(),
    }


def parse_technique_key(tech_key: str) -> Dict[str, str]:
    match = TECHNIQUE_PATTERN.match(tech_key)
    if not match:
        raise ValueError(f"Technique key does not match 'T#### - Name' pattern: {tech_key}")
    return {
        "technique_id": match.group("technique_id").strip(),
        "name": match.group("technique_name").strip(),
    }


def transform_legacy_entry(ir_key: str, ir_obj: Dict[str, Any]) -> Dict[str, Any]:
    """Transform a single legacy IR entry (IR key + nested object) into the new schema object."""
    meta = parse_ir_key(ir_key)

    indicators_block = ir_obj.get("Indicators") or {}

    new_indicators: List[Dict[str, Any]] = []
    for technique_key, technique_value in indicators_block.items():
        # Each technique_value is a dict mapping evidence description -> evidence attributes
        tech_meta = parse_technique_key(technique_key)
        evidence_items: List[Dict[str, Any]] = []

        if not isinstance(technique_value, dict):
            raise ValueError(f"Technique value for {technique_key} is not a dict as expected.")

        for evidence_description, evidence_attributes in technique_value.items():
            if not isinstance(evidence_attributes, dict):
                raise ValueError(f"Evidence attributes for '{evidence_description}' under {technique_key} must be a dict.")
            data_sources_raw = evidence_attributes.get("Data", "")
            data_platforms_raw = evidence_attributes.get("Data Platform", "")
            nai = evidence_attributes.get("NAI", "")
            action = evidence_attributes.get("Action", "")
            evidence_items.append({
                "description": evidence_description.strip(),
                "data_sources": split_list(data_sources_raw),
                "data_platforms": split_list(data_platforms_raw),
                "nai": nai,
                "action": action,
            })

        new_indicators.append({
            **tech_meta,
            "evidence": evidence_items,
        })

    transformed = {
        **meta,
        "indicators": new_indicators,
        "version": NEW_VERSION,
        "date_created": DATE_CREATED,
        "last_updated": LAST_UPDATED,
        "contributors": CONTRIBUTORS,
    }
    return transformed


def extract_ir_entries(data: Any) -> List[Dict[str, Any]]:
    """Locate all IR entries in the loaded legacy JSON structure.

    Handles both list-of-dicts form and single-dict form.
    """
    entries: List[Dict[str, Any]] = []

    if isinstance(data, list):
        for obj in data:
            if not isinstance(obj, dict):
                continue
            # Each dict may contain one IR key plus 'version' and 'last_updated'
            ir_keys = [k for k in obj.keys() if k not in LEGACY_METADATA_KEYS]
            for ir_key in ir_keys:
                entries.append((ir_key, obj[ir_key]))
    elif isinstance(data, dict):
        ir_keys = [k for k in data.keys() if k not in LEGACY_METADATA_KEYS]
        for ir_key in ir_keys:
            entries.append((ir_key, data[ir_key]))
    else:
        raise ValueError("Unsupported root JSON structure; expected list or dict.")

    # Convert tuples to dicts in expected shape for downstream transform
    transformed_entries = []
    for ir_key, ir_obj in entries:
        transformed_entries.append({"ir_key": ir_key, "ir_obj": ir_obj})
    return transformed_entries


def transform_file(input_path: Path) -> List[Dict[str, Any]]:
    with input_path.open("r", encoding="utf-8") as f:
        data = json.load(f)

    entries = extract_ir_entries(data)
    result: List[Dict[str, Any]] = []
    for entry in entries:
        transformed = transform_legacy_entry(entry["ir_key"], entry["ir_obj"])
        result.append(transformed)
    return result


def write_output(output_path: Path, data: Any, indent: int = 2) -> None:
    output_path.parent.mkdir(parents=True, exist_ok=True)
    with output_path.open("w", encoding="utf-8") as f:
        json.dump(data, f, indent=indent, ensure_ascii=False)
        f.write("\n")


def derive_output_path(input_file: Path, outdir: Path | None, explicit_output: Path | None) -> Path:
    if explicit_output:
        return explicit_output
    outdir = outdir or input_file.parent
    stem = input_file.stem
    return outdir / f"{stem}.json"


def process_path(input_path: Path, outdir: Path | None, explicit_output: Path | None, indent: int) -> None:
    if input_path.is_dir():
        for file in sorted(input_path.glob("*.json")):
            try:
                transformed = transform_file(file)
                output_path = derive_output_path(file, outdir, None)
                write_output(output_path, transformed, indent=indent)
                print(f"[OK] {file} -> {output_path}")
            except Exception as e:
                print(f"[ERROR] {file}: {e}", file=sys.stderr)
    else:
        try:
            transformed = transform_file(input_path)
            output_path = derive_output_path(input_path, outdir, explicit_output)
            write_output(output_path, transformed, indent=indent)
            print(f"[OK] {input_path} -> {output_path}")
        except Exception as e:
            print(f"[ERROR] {input_path}: {e}", file=sys.stderr)
            raise SystemExit(1)


def build_arg_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(description="Convert legacy analytic plan JSON to new schema.")
    p.add_argument("input", help="Input JSON file or directory of JSON files.")
    p.add_argument("--output", "-o", help="Explicit output JSON file (only valid when a single input file is provided).")
    p.add_argument("--outdir", help="Directory to place converted files (used for directory input or when omitting --output).")
    p.add_argument("--indent", type=int, default=2, help="Indent level for pretty-printed JSON output (default: 2).")
    return p


input_path = Path("./")

if not input_path.exists():
    print(f"Input path does not exist: {input_path}", file=sys.stderr)

explicit_output = None
outdir = Path("./")

if input_path.is_dir() and explicit_output:
    print("--output cannot be used when input is a directory. Use --outdir instead.", file=sys.stderr)

process_path(input_path, outdir, explicit_output, indent=4)

[ERROR] D3-D - Detect.json: IR key does not match expected pattern with tactic parentheses: What data is available for threat detection and modeling? (D3-D - Detect)
[ERROR] enterprise-attack.json: IR key does not match expected pattern with tactic parentheses: type


[OK] T1001 - Data Obfuscation.json -> T1001 - Data Obfuscation.json
[OK] T1001.001 - Junk Data.json -> T1001.001 - Junk Data.json
[OK] T1001.002 - Steganography.json -> T1001.002 - Steganography.json
[OK] T1001.003 - Protocol or Service Impersonation.json -> T1001.003 - Protocol or Service Impersonation.json
[OK] T1003 - OS Credential Dumping.json -> T1003 - OS Credential Dumping.json
[OK] T1003.001 - LSASS Memory.json -> T1003.001 - LSASS Memory.json
[OK] T1003.002 - Security Account Manager.json -> T1003.002 - Security Account Manager.json
[OK] T1003.003 - NTDS.json -> T1003.003 - NTDS.json
[OK] T1003.004 - LSA Secrets.json -> T1003.004 - LSA Secrets.json
[OK] T1003.005 - Cached Domain Credentials.json -> T1003.005 - Cached Domain Credentials.json
[OK] T1003.006 - DCSync.json -> T1003.006 - DCSync.json
[OK] T1003.007 - Proc Filesystem.json -> T1003.007 - Proc Filesystem.json
[OK] T1003.008 - -etc-passwd and -etc-shadow.json -> T1003.008 - -etc-passwd and -etc-shadow.json
[OK] T1005 -