## Generate playbooks from the incidents

In [1]:
# This is a dataset generator and validator for the MITRE ATT&CK framework using OpenAI's API models.
from distutils.command.clean import clean

import openai
import json

from scipy.io.arff.tests.test_arffread import missing
from tqdm.auto import tqdm
import uuid
import time
import os
import re
from datetime import datetime
import os
from dotenv import load_dotenv

load_dotenv()  # Load from .env file
openai_key = os.getenv("OPENAI_API_KEY")
openai.api_key = openai_key

default_model_name = "GPT-4o mini"  #"gpt-4-turbo"

client = openai.OpenAI(api_key=openai.api_key)

In [3]:
# Given an incident JSON, generate a CACAO 2.0-compliant playbook using OpenAI's API.
from typing import List
from pprint import pprint
def create_few_shot_examples() -> str:
    all_few_shots = []

    target_fewshot_examples = ["CACAO_examples/playbook-linear.json", "CACAO_examples/playbook-ifelse.json", "CACAO_examples/playbook-while.json", "CACAO_examples/playbook-parallel.json"]
    for target_shot in target_fewshot_examples:
        with open(target_shot, "r") as f:
            few_shot_ex = json.load(f)

        # Convert the JSON to a string
        few_shot_ex = json.dumps(few_shot_ex, indent=2)

        all_few_shots.append(few_shot_ex)


    # Add more examples as needed
    few_shot_section = "\n\n".join(f"Example {i+1}:\n{example}" for i, example in enumerate(all_few_shots))
    return few_shot_section


# Create a few-shot example section for the prompt
few_shot_section = create_few_shot_examples()
pprint(few_shot_section)

('Example 1:\n'
 '{\n'
 '  "type": "playbook",\n'
 '  "spec_version": "cacao-2.0",\n'
 '  "id": "playbook--a5903e76-fe56-4ec6-acd7-3e1a2acf7fca",\n'
 '  "name": "Mitigation Playbook for Malicious Script Execution Incident",\n'
 '  "description": "This playbook uses only a linear connections of node. It '
 'addresses an incident where an attacker injected malicious scripts that '
 'compromised sensitive data.",\n'
 '  "created_by": "identity--9528bf3f-2fc6-4fda-9b20-26d5edd03672",\n'
 '  "created": "2025-04-09T08:55:35.498Z",\n'
 '  "modified": "2025-04-09T08:56:56.043Z",\n'
 '  "revoked": false,\n'
 '  "derived_from": [\n'
 '    "playbook--ab53ec08-c454-4bef-8b04-a68714690242",\n'
 '    "playbook--1b89cf29-48b1-400d-944b-ac9ab96ccdf6"\n'
 '  ],\n'
 '  "workflow_start": "start--f1a23a51-c12e-4a7f-aba0-dcc18ae12345",\n'
 '  "workflow": {\n'
 '    "start--f1a23a51-c12e-4a7f-aba0-dcc18ae12345": {\n'
 '      "on_completion": "action--5b6e4f91-48f9-4f53-86a6-d57804ca1caa",\n'
 '      "type":

In [10]:
# Check which playbooks from the dataset are created or missing
import glob
import os
import json
from tqdm.auto import tqdm
# Select folders containing incident data and models
input_folders = ["Main"]#["Samples/gpt-4o"] #["Samples/gpt-4o-mini"]#, "Samples/o3-mini", "Samples/gpt-4o", "Samples/gpt-4.5-preview"]
models_to_generate_playbooks = ["gpt-4o-mini"] #["gpt-4o-mini"]#["o3", "o3-mini", "gpt-4o", "gpt-4.5-preview"] #["gpt-4o-mini"] #["gpt-4o-mini", "o3-mini"]#["gpt-4o", "gpt-4.5-preview"] #"gpt-4o-mini", "gpt-4-turbo", o mini", "GPT-4o", "gpt-3.5-turbo", "o3-mini", "o3", "gpt-4"]

assert len(input_folders) == len(models_to_generate_playbooks), "The number of input folders must match the number of models to generate playbooks."

missing_files = {}


# Check if a playbook file already exists for the given incident ID in the specified folder path.
# Return True if any matching files are found, otherwise False
def check_if_playbook_exists(folder_path, incident_id) -> bool:
    """
    Check if a playbook file already exists for the given incident ID in the specified folder path.
    """
    # Construct the file name pattern
    file_to_check_pattern = f"{folder_path}/playbook_{incident_id}_*.json"
    matching_files = glob.glob(file_to_check_pattern)

    # Return True if any matching files are found, otherwise False
    return bool(matching_files)


# Iterate over the input folders and check for missing playbooks
for input_folder in input_folders:
    missing_files[input_folder] = []

    if not os.path.exists(input_folder):
        print(f"Input folder {input_folder} does not exist at all. ALL FILES ARE MISSING")
        missing_files[input_folder] = ["ALL FILES"]
        continue

    output_folder_path = f"{input_folder}/Playbooks"

    # List all filenames in output_folder_path
    already_built_files = os.listdir(output_folder_path)
    already_built_files = set(already_built_files)  # Convert to set for faster lookup

    # Read the incident JSON file
    with open(f"{input_folder}/dataset.json", "r") as f:
        incidents_data = json.load(f)
    # Check if the playbook already exists
    for incident in tqdm(incidents_data):
        # Extract the key ID from the incident
        incident_id = list(incident.keys())[0]

        if not check_if_playbook_exists(output_folder_path, incident_id):
            missing_files[input_folder].append(incident_id)

print("Analysis of the missing files - Num of files per folder")
for folder, files in missing_files.items():
    if files:
        print(f"In folder {folder} - missing - {len(files)}, the following playbooks are missing:\n {'\n'.join(files)}")
    else:
        print(f"In folder {folder}, all playbooks are present.")

  0%|          | 0/1374 [00:00<?, ?it/s]

Analysis of the missing files - Num of files per folder
In folder Main, all playbooks are present.


In [9]:
def generate_playbook(incident_json, few_shot_section, output_folder, model_name):
    # Define the prompt for generating a CACAO 2.0 playbook
    playbook_prompt_template = f"""
    You are a cybersecurity automation assistant.
    Given the following structured incident data (in JSON format), generate a CACAO 2.0-compliant playbook that includes:

    1. A `start` step to initiate the workflow.
    2. Action steps for each mitigation, referencing:
       - Agent IDs
       - Bash-style commands
    3. Use advanced CACAO constructs:
       - Parallel execution (multiple actions triggered simultaneously)
       - Conditional branches (if-else) using decision nodes
       - Iteration (e.g., loop until a variable condition is satisfied)
       - Variables to link outputs of one step to conditions of another

    Each workflow node must include:
    - `id` (e.g., start--uuid, action--uuid)
    - `type` (start, action, decision, loop, end)
    - `name`, `description`
    - `agent`, `commands` (with bash-type syntax)
    - `on_completion`, `on_true`, `on_false`, or `next_steps` depending on type

    Ensure:
    - The playbook is valid CACAO JSON.
    - The structure supports conditional logic and iterative flows.
    - The variable linking syntax is correctly handled.

    Here are a few valid examples of CACAO 2.0 playbooks for reference:

    {few_shot_section}

    Now generate a new CACAO 2.0 playbook for this input incident:

    ```json
    {incident_json_str}
    """

    response = client.chat.completions.create(
        model=model_name,
        messages=[{"role": "user", "content": playbook_prompt_template}],
        #temperature=0.6,
        #max_tokens=4096,
    )

    output_json_str = response.choices[0].message.content
    match = re.search(r'\{.*\}', output_json_str, re.DOTALL)

    clean_json_str = None # This is the string that will be parsed
    clean_json = None # This is the json that will be returned (dict format, not a string)
    if match:
        clean_json_str = match.group(0)
        clean_json = json.loads(clean_json_str)

        #pprint(clean_json)
    else:
        print("No JSON-like structure found in the response.")
        clean_json = output_json_str

    return clean_json


LIMIT_PLAYBOOKS = 1500
num_generated = 0

# Iterate over the models to generate playbooks
# Generate only for those that have not been generated yet.
# We take as input the dataset generated by the models previously
for model_iter in tqdm(models_to_generate_playbooks):
    # Call OpenAI's API to generate the playbook
    # For each model that produced data, iterate over the issues
    for input_folder in input_folders:
        # Set the output folder path
        output_folder_path = f"{input_folder}/Playbooks"
        os.makedirs(output_folder_path, exist_ok=True)

        # List all filenames in output_folder_path
        already_built_files = os.listdir(output_folder_path)
        already_built_files = set(already_built_files)  # Convert to set for faster lookup

        print("Checked files")

        # Read the incident JSON file
        with open(f"{input_folder}/dataset.json", "r") as f:
            incidents_data = json.load(f)

        # Iterate over the incidents and generate playbooks
        for incident in tqdm(incidents_data):
            # Extract the key ID from the incident
            incident_id = list(incident.keys())[0]

            timestamp = datetime.utcnow().strftime("%Y%m%dT%H%M%S")
            output_file_path = f"{input_folder}/Playbooks/playbook_{incident_id}_{timestamp}.json"
            output_file_fixed_path = f"{output_file_path}_fixed.json"

            # Check if the playbook already exists
            if not check_if_playbook_exists(output_folder_path, incident_id):
                print(f"Generating playbook for incident {incident_id} using model {model_iter} since it does not exist.")
            else:
                #print(f"Playbook for incident {incident_id} already exists. Skipping generation.")
                continue

            # Format it as a string for insertion into prompt
            incident_json_str = json.dumps(incident, indent=2)
            for iter_attempt in range(10):
                try:
                    # Generate the playbook using OpenAI's API
                    response = generate_playbook(incident_json_str, few_shot_section, output_folder=output_folder_path, model_name=model_iter)
                    print(f"Generated playbook for incident {incident_id} using model {model_iter}.")

                    # Save the generated playbook to a JSON file
                    with open(output_file_path, "w") as f:
                        json.dump(response, f, indent=4)
                    break

                except Exception as e:
                    print(f"Error - attempt {iter_attempt} generating playbook for incident {incident}: {e}")

            num_generated += 1
            if num_generated >= LIMIT_PLAYBOOKS:
                break






  0%|          | 0/1 [00:00<?, ?it/s]

Checked files


  0%|          | 0/1374 [00:00<?, ?it/s]

Playbook for incident technique_id_20250405T162835 already exists. Skipping generation.
Playbook for incident technique_id_20250405T162840 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074200 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074210 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074219 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074231 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074240 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074249 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074258 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074308 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074320 already exists. Skipping generation.
Playbook for incident technique_

  timestamp = datetime.utcnow().strftime("%Y%m%dT%H%M%S")


Playbook for incident technique_id_20250410T074651 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074707 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074721 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074732 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074743 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074750 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074803 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074812 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074823 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074831 already exists. Skipping generation.
Playbook for incident technique_id_20250410T074843 already exists. Skipping generation.
Playbook for incident technique_

## Explainable

In [None]:
import json

from pprint import pprint
def explain_playbook(example_file_path, model_name):

    with open(example_path, "r") as f:
        example_playbook = json.load(f)

    # Convert the JSON to a string
    example_playbook_str = json.dumps(example_playbook, indent=2)


    technique_id, technique_desc = technique_tuple

    incident_prompt_template = f"""
    You are a cybersecurity simulation assistant trained to generate structured synthetic incident data for automation workflows.

    Your task is to explain to the user a CACAO playbook in JSON format representing a cybersecurity incident that is aligned with a given MITRE ATT&CK technique.
    You can find the description of the technique in the following link: https://attack.mitre.org/techniques/{technique_id}/

    The JSON must include the following fields:

    1. `incident_id` – a unique identifier (UUID format).
    2. `technique_id` – e.g., T1059
    3. `technique_desc` – short description of the MITRE technique (e.g., Command and Scripting Interpreter).
    4. `incident_description` – a 2-sentence narrative of how the attack unfolded.
    5. `attack_logs` – a list of exactly 3 logs, each containing:
       - `timestamp` (ISO 8601 format)
       - `host` (e.g., host-22)
       - `action` (e.g., “File Dropped”)
       - `details` (e.g., suspicious behavior)
    6. `ground_truth_mitigations` – a list of 3 to 6 mitigation steps. Each mitigation should include:
       - `step`: description of the action (e.g., “Kill malicious process”)
       - `uuid`: a unique UUID
       - `agent`: organization ID responsible (e.g., organization--abc)
       - `command`: executable bash-style command to perform the step
       - optionally:
         - `condition`: for if/else branching
         - `loop`: for iterative steps
         - `variables`: any variable set or used

    You are encouraged to add structural diversity:
    - Some mitigations should include loop or conditional logic (e.g., “repeat until scan is clean”).
    - Some should be parallelizable or dependent on others using variable links.

    The final output must be valid JSON.
    Only return the JSON object.

    Target MITRE Technique:
    {technique_id} - {technique_desc}
    """

    response = client.chat.completions.create(
        model=model_name,
        messages=[{"role": "user", "content": incident_prompt_template}],
        #temperature=0.6,
        #max_tokens=4096,
    )

    output_json_str = response.choices[0].message.content
    match = re.search(r'\{.*\}', output_json_str, re.DOTALL)

    clean_json_str = None # This is the string that will be parsed
    clean_json = None # This is the json that will be returned (dict format, not a string)
    if match:
        clean_json_str = match.group(0)
        clean_json = json.loads(clean_json_str)

        #pprint(clean_json)
    else:
        print("No JSON-like structure found in the response.")
        clean_json = output_json_str

    timestamp = datetime.utcnow().strftime("%Y%m%dT%H%M%S")

    key = f"{technique_id}_{timestamp}"
    return {key: clean_json}

example_path = "Samples/gpt-4o/Playbooks/playbook_technique_id_20250410T074210_20250410T081539_fixed.json"
with open(example_path, "r") as f:
    example_playbook = json.load(f)

# Convert the JSON to a string
example_playbook_str = json.dumps(example_playbook, indent=2)

