# AutoGen for Backdoors & Breaches (Core and Expansion Decks)

In [None]:
import os
import re
import json
import random
import pprint
from datetime import datetime
from typing import List, Dict, Union
from collections import Counter

seed = 0
random.seed(seed)

In [None]:
if 'notebooks' in os.getcwd():
    os.chdir('..')
print(os.getcwd())

In [None]:
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')
llm_config = {"config_list": [{"model": "gpt-4o", "api_key": OPENAI_API_KEY}], "temperature": 1.0}

In [None]:
team_structure_name = "Homogeneous Centralized"
base_filename = f"{team_structure_name}_{seed}".replace(' ', '_').lower()
print(base_filename)

## Preparing Cards¶

### Loading Cards

In [None]:
# Load the cards from the JSON file
with open("data/cards_expansion.json", "r") as file:
    bnb_cards = json.load(file)["data"]
    print(f"Number of cards: {len(bnb_cards)}")

In [None]:
bnb_cards[0]

### Filtering Cards

In [None]:
# Remove inject and consultant cards
bnb_cards = [
    card for card in bnb_cards \
    if card["type"] != "consultant" and card["type"] != "inject" \
    and card["name"] != "Call a Consultant"
]
print(f"Number of cards: {len(bnb_cards)}")

### Validating Cards

In [None]:
# Validate detection procedures
procedure_names = [card["name"] for card in bnb_cards if card["type"] == "procedure"]

for card in bnb_cards:
    if "detection" in card:
        for detection in card["detection"]:
            if detection not in procedure_names:
                print(detection, card)

### Counting Cards

In [None]:
Counter([card["type"] for card in bnb_cards])

In [None]:
for card_type in set([card["type"] for card in bnb_cards]):
    print(
        card_type.title() + ': ' 
        + ', '.join([card["name"] for card in bnb_cards if card["type"] == card_type])
        + '\n'
    )

## Defining Tools¶

### Drawing Incidents

In [None]:
def draw_incident_cards() -> (List[Dict], str):
    """
    Draws one random incident card from each required attack phase:
    INITIAL COMPROMISE, PIVOT and ESCALATE, C2 and EXFIL, and PERSISTENCE.

    :return: A tuple containing:
        1. A list of selected incident card dictionaries.
        2. A formatted string describing each selected card.
    """
    required_types = {
        "INITIAL COMPROMISE": "initial",
        "PIVOT and ESCALATE": "pivot",
        "C2 and EXFIL": "c2",
        "PERSISTENCE": "persist"
    }

    incident_cards = []
    formatted_descriptions = []

    for category, card_type in required_types.items():
        type_cards = [card for card in bnb_cards if card["type"] == card_type]
        if type_cards:
            selected = random.choice(type_cards)
            incident_cards.append(selected)
            formatted_descriptions.append(
                f"Category: {category}\n"
                f"Name: {selected['name']}\n"
                f"Description: {selected['description']}\n"
                f"Detection Procedures: {', '.join(selected['detection'])}"
            )

    return incident_cards, "\n\n".join(formatted_descriptions)

In [None]:
incident_cards, incident_description = draw_incident_cards()
print(incident_description)

### Drawing Procedures

In [None]:
def draw_procedure_cards() -> (List[Dict], str):
    """
    Randomly selects 4 Procedure cards as 'Established Procedures' and classifies the rest as 'Other Procedures'.

    :return: A tuple containing:
        1. A list of all procedure cards with an added 'category' field: either 'established' or 'other'.
        2. A formatted string listing both categories with descriptions.
    """
    procedure_cards = [card.copy() for card in bnb_cards if card["type"] == "procedure"]
    established = random.sample(procedure_cards, 4)
    other = [card for card in procedure_cards if card not in established]

    for card in established:
        card["category"] = "established"
    for card in other:
        card["category"] = "other"

    # Generate formatted output
    established_lines = ["Established Procedures (+3 modifier):"]
    for card in established:
        established_lines.append(
            f"Name: {card['name']}\n"
            f"Description: {card['description']}"
        )

    other_lines = ["Other Procedures (+0 modifier):"]
    for card in other:
        other_lines.append(
            f"Name: {card['name']}\n"
            f"Description: {card['description']}"
        )

    procedure_description = "\n\n".join(established_lines) + "\n\n" + "\n\n".join(other_lines)
    return procedure_cards, procedure_description

In [None]:
procedure_cards, procedure_description = draw_procedure_cards()
print(procedure_description)

### Rolling a Dice

In [None]:
def roll_dice() -> int:
    """
    Rolls a 20-sided dice and returns the result.

    :return: A random integer between 1 and 20, inclusive, representing the dice roll outcome.
    """
    return random.randint(1, 20)

In [None]:
roll_dice()

### Checking Detections

In [None]:
def check_detections(procedure_name: str) -> str:
    """
    Rolls a die and determines whether the selected Procedure card successfully reveals an incident card.

    A detection succeeds if:
    - The procedure exists in the current procedure_cards list
    - base_roll + modifier >= 11
    - The procedure is listed in the 'detection' methods of any unrevealed incident card

    One matching incident card is revealed if multiple matches exist.

    :param procedure_name: The name of the Procedure card selected by Defenders.
    :return: A string describing the result, including roll breakdown and detection outcome.
    """
    matched_procedure = next((card for card in procedure_cards if card["name"] == procedure_name), None)
    if not matched_procedure:
        return f"[Error] Invalid procedure name: '{procedure_name}'."

    base_roll = random.randint(1, 20)
    modifier = 3 if matched_procedure.get("category") == "established" else 0
    final_roll = base_roll + modifier

    roll_info = (
        f"Base roll: {base_roll} | Modifier: +{modifier} | Final roll: {final_roll}"
    )

    if final_roll < 11:
        return f"[Failure] {roll_info}. Procedure failed to detect any attack."

    matches = [card for card in incident_cards if procedure_name in card.get("detection", [])]
    if not matches:
        return f"[Failure] {roll_info}. No matching attack card detected."

    revealed = random.choice(matches)

    return (
        f"[Success] {roll_info}. "
        f"The procedure '{procedure_name}' revealed the attack card: {revealed['name']}."
    )

In [None]:
result = check_detections("Wrong Procedure")
print(result)
result = check_detections("Endpoint Analysis")
print(result)

### Saving Results

In [None]:
def save_results(
    defenders: List[str],
    turn_results: List[Dict[str, Union[int, str]]],
    final_result: str,
) -> str:
    """
    Saves the game results to a JSON file in the 'results' folder.

    :param defenders: List of defender agent names.
    :param turn_results: List of dictionaries for each turn, including:
         "turn" (int), "procedure" (str), "base_roll" (int), "modifier" (int), and optionally 
         "revealed_incident" (str).
    :param final_result: Game outcome, either "Victory" or "Loss".
    :return: Confirmation message with path to saved file.
    """
    os.makedirs('results/results_expansion', exist_ok=True)

    game_data = {
        "defenders": defenders,
        "incident_cards": [card["name"] for card in incident_cards],
        "procedure_cards": {
            "established": [card["name"] for card in procedure_cards if card.get("category") == "established"],
            "other": [card["name"] for card in procedure_cards if card.get("category") == "other"]
        },
        "turn_results": turn_results,
        "summary": {
            "total_turns_played": len(turn_results),
            "final_result": final_result
        }
    }

    filename = f"results/results_expansion/results_{base_filename}.json"
    with open(filename, 'w') as f:
        json.dump(game_data, f, indent=2)

    return f"Game results saved to {filename}"

## Initializing Agents

### Defining the Sequence

In [None]:
sequence_of_play = (
    "Sequence of Play:\n\n"
    
    "1. Set the Scenario:\n"
    "   - Select one card for each of the four attack stages (Initial Compromise, Pivot and Escalate, C2 and Exfil, Persistence).\n"
    "   - Craft a detailed initial scenario description based on the chosen Initial Compromise card. Provide enough context "
    "for the Defenders to understand the breach, but avoid revealing any specific details or names from the Attack cards.\n\n"
    
    "2. Introduce the Defenders to the available Procedure cards:\n"
    "   - Explain the distinction between Established Procedures (with a +3 modifier) and Other Procedures (with a +0 modifier).\n"
    "   - Inform the Defenders of the initial setup, including which procedures are classified as Established vs. Other.\n\n"
    
    "3. Start Each Turn (Turn 1 to Turn 10):\n"
    "   - Announce the current turn number to the Defenders.\n"
    "   - Prompt the Defenders to discuss and select one Procedure card to use for this turn.\n\n"
    
    "4. Defenders’ Procedure Attempt:\n"
    "   - When the Defenders choose a Procedure, roll a 20-sided dice to determine if their attempt succeeds. "
    "Apply the appropriate modifier based on the type of Procedure selected:\n"
    "      - Established Procedure: +3 modifier to the roll.\n"
    "      - Other Procedure: +0 modifier to the roll.\n"
    "   - With the modifier applied, determine success or failure:\n"
    "      - Adjusted Roll 11 or higher: The attempt is successful.\n"
    "      - Adjusted Roll 10 or lower: The attempt fails.\n\n"
    
    "5. Respond to Success or Failure:\n"
    "   - On Success: Check if the Procedure used is listed under the 'Detection' methods for any of the hidden attack cards.\n"
    "      - If it matches, reveal that specific attack card to the Defenders.\n"
    "      - If multiple attack cards can be detected by the same Procedure, reveal only one and inform the Defenders accordingly.\n"
    "   - On Failure: Provide feedback noting that the Procedure did not reveal anything new.\n\n"
    
    "6. End Game:\n"
    "   - Victory: If Defenders reveal all four hidden attack cards within 10 turns, they win the game.\n"
    "   - Loss: If the Defenders fail to reveal all attack cards by the end of the 10th turn, the breach remains undetected.\n"
    "   - Save a detailed game summary in JSON format, capturing all key game events and results.\n"
    "   - Type the keyword 'END_GAME' to officially conclude the game."
)

In [None]:
print(sequence_of_play)

### Initializing the Incident Captain

In [None]:
from autogen import ConversableAgent

In [None]:
incident_captain = ConversableAgent(
    name="Incident_Captain",
    system_message=(
        "Welcome to Backdoors & Breaches! You are the Incident Captain, responsible for guiding the "
        "Defenders through a simulated cyber breach scenario. Your role is to control the game, craft the "
        "attack scenario, and provide guidance as Defenders attempt to detect and counter the breach.\n\n"

        "Your responsibilities include:\n"
        "- Selecting four hidden attack cards to define the breach scenario. These cards represent each stage: "
        "Initial Compromise, Pivot and Escalate, Command and Control, and Persistence.\n"
        "- Introducing the available Procedure cards (Established and Other) and explaining their roles and modifiers.\n"
        "- Tracking and managing game mechanics, including modifier applications and turn count.\n"
        "- Answering Defenders' questions or clarifying the scenario when asked, giving hints where appropriate.\n"
        "- Keeping the game within the 10-turn limit and ensuring Defenders have the context and support needed to achieve their objectives.\n\n"

        f"{sequence_of_play}\n\n"

        "Throughout the game, maintain the flow, stay in character, and guide Defenders with clarity. "
        "Remind them of modifier categories as needed. Do not reveal any hidden attack details unless their actions "
        "specifically uncover them. Let’s begin!\n\n"

        f"{'-' * 80}\n\n"
        f"The hidden attack cards for this game scenario are as follows:\n\n{incident_description}\n\n"
        f"{'-' * 80}\n\n"
        f"The available procedure cards are divided into Established and Other Procedures:\n\n{procedure_description}\n\n"
        f"{'-' * 80}"
    ),
    description=(
        "Oversees the Backdoors & Breaches game. Responsible for setting up the breach scenario, "
        "managing game events, and providing guidance to Defenders as they attempt to uncover hidden attack stages. "
        "Ensures gameplay runs smoothly and that Defenders stay engaged and on task."
    ),
    llm_config=llm_config,
    human_input_mode="NEVER",
    is_termination_msg=lambda msg: "END_GAME" in msg["content"],
)

In [None]:
print(incident_captain.system_message)

In [None]:
print(incident_captain.description)

### Loading Defender Roles

In [None]:
# Load the roles from the JSON file
with open("data/roles_expansion.json", "r") as file:
    defender_roles = json.load(file)

In [None]:
defender_roles.keys()

### Generating System Messages

In [None]:
def generate_system_message(role_name, role_data):
    """
    Generate the system message for a defender based on their role.
    """
    role_responsibilities = '\n'.join(['- ' + resp for resp in role_data['responsibilities']])
    return (
        f"Welcome to Backdoors & Breaches! You are a {role_name}. In this game, Defenders collaborate to uncover hidden stages "
        "of a simulated cyber attack. Your goal, along with the other Defenders, is to work together to identify and reveal four hidden attack "
        "cards within 10 turns to win the game. Each attack card represents a critical stage in the breach process that attackers might use against your organization.\n\n"

        "Game Overview:\n"
        "The game begins with the Incident Captain setting up the scenario by selecting four hidden attack cards representing the stages of a breach: "
        "Initial Compromise, Pivot and Escalate, Command and Control (C2), and Persistence. Defenders take turns selecting Procedure cards to investigate "
        "and uncover these stages. Procedure cards are divided into Established cards, which provide a +3 modifier to dice rolls, and Other cards, which do not provide modifiers. "
        "Each turn, the team selects one Procedure card, rolls a 20-sided dice, and applies any modifiers to determine success or failure.\n\n"

        "Game Mechanics:\n"
        "- Procedure Cards: Represent investigative approaches. Established cards have a +3 modifier, while Other cards have no modifier.\n"
        "- Dice Rolling: After selecting a Procedure, roll a 20-sided dice and apply the modifier. "
        "A final roll of 11 or higher results in success, while 10 or lower results in failure.\n"
        "- Outcomes: Success reveals a hidden attack card if the Procedure matches its detection methods. "
        "Failure means no new information is revealed that turn.\n\n"

        f"Your Responsibilities as a {role_name}:\n"
        "- Collaborate with your teammates to analyze the scenario and decide the most effective Procedures to use each turn.\n"
        "- Provide your insights, expertise, or support based on your specific role and knowledge level.\n"
        "- Stay engaged, communicate effectively, and contribute to the success of your team.\n"
        f"{role_responsibilities}\n\n"

        "Victory Condition:\n"
        "The Defenders win by successfully uncovering all four attack cards within 10 turns. If the Defenders fail to do so, the breach remains undetected, "
        "and the game is lost.\n\n"

        "Your role is crucial to the team's success. Work together, strategize effectively, and let's uncover the breach!"
    )

In [None]:
print(generate_system_message(role_name='Team Leader', role_data=defender_roles['Team Leader']))

### Initializing the Defenders

In [None]:
team_structures = {
    "Homogeneous Centralized": {
        "Team Leader": 1,
        "Team Member": 4,
    },
    "Heterogeneous Centralized": {
        "Team Leader": 1,
        "Endpoint Security Expert": 1,
        "Network Traffic Analysis Expert": 1,
        "Log and Behavioral Analysis Expert": 1,
        "Deception and Containment Expert": 1,
    },
    "Homogeneous Decentralized": {
        "Team Member": 5,
    },
    "Heterogeneous Decentralized": {
        "Endpoint Security Expert": 1,
        "Network Traffic Analysis Expert": 1,
        "Log and Behavioral Analysis Expert": 1,
        "Deception and Containment Expert": 1,
        "Incident Response Expert": 1
    },
    "Homogeneous Hybrid": {
        "Expert": 3,
        "Beginner": 2,
    },
    "Heterogeneous Hybrid": {
        "Endpoint Security Expert": 1,
        "Network Traffic Analysis Expert": 1,
        "Log and Behavioral Analysis Expert": 1,
        "Beginner": 2,
    }
}

In [None]:
def create_defender_agents(team_structure_name, defender_roles, team_structures):
    """
    Create defender agents based on the specified team structure.
    """
    if team_structure_name not in team_structures:
        raise ValueError(f"Unknown team structure: {team_structure_name}")

    structure = team_structures[team_structure_name]
    defenders = []

    for role_name, count in structure.items():
        for i in range(count):
            if count == 1:
                agent_name = f"{role_name} Defender"
            else:
                agent_name = f"{role_name} Defender {i + 1}"
            system_message = generate_system_message(agent_name, defender_roles[role_name])
            defender = ConversableAgent(
                name=agent_name.replace(' ', '_'),
                system_message=system_message,
                description=defender_roles[role_name]["description"],
                llm_config=llm_config,
                human_input_mode="NEVER",
            )
            defenders.append(defender)

    return defenders

In [None]:
defenders = create_defender_agents(
    team_structure_name=team_structure_name,
    defender_roles=defender_roles,
    team_structures=team_structures,
)

In [None]:
for defender in defenders:
    print('=' * 80)
    print(defender.name)
    print('-' * 80)
    print(defender.system_message)
    print('-' * 80)
    print(defender.description)
    print('=' * 80)

## Registering Tools

In [None]:
from autogen import UserProxyAgent, register_function

In [None]:
tool_executor = UserProxyAgent(
    name="Tool_Executor",
    llm_config=False,
    code_execution_config=False,
    human_input_mode="NEVER",
    description=(
        "A dedicated agent responsible for executing specific game functions. "
        "Handles tool-related requests from the Incident Captain, such as drawing cards or rolling dice "
        "when needed during gameplay. This agent operates silently, "
        "only responding to tool execution calls without participating in general discussions."
    )
)

In [None]:
print(tool_executor.description)

In [None]:
for tools in [check_detections, save_results]:
    register_function(
        f=tools,
        caller=incident_captain,
        executor=tool_executor,
        name=tools.__name__,
        description=tools.__doc__,
    )

In [None]:
incident_captain.llm_config["tools"]

## Creating a Group Chat

In [None]:
from autogen import GroupChat, GroupChatManager

In [None]:
allowed_transitions = {
    incident_captain: [incident_captain, tool_executor] + defenders,
    tool_executor: [incident_captain],
}
allowed_transitions.update({
    defender: [incident_captain] + defenders for defender in defenders
})

In [None]:
group_chat = GroupChat(
    agents=[incident_captain, tool_executor] + defenders,
    messages=[],
    max_round=1000,
    send_introductions=True,
    speaker_selection_method="auto",
    speaker_transitions_type="allowed",
    allowed_or_disallowed_speaker_transitions=allowed_transitions,
)

In [None]:
group_chat_manager = GroupChatManager(
    name="Group_Chat_Manager",
    groupchat=group_chat,
    llm_config=llm_config,
    human_input_mode="NEVER",
    is_termination_msg=lambda msg: "END_GAME" in msg["content"],
)

## Playing the Game

In [None]:
chat_result = group_chat_manager.initiate_chat(
    recipient=incident_captain,
    message=(
        "Welcome to the Backdoors & Breaches game! "
        "Please begin by setting the stage for the Defenders: describe the breach scenario using the "
        "four hidden Attack cards, providing enough context without revealing any specific card details.\n\n"
        "After presenting the scenario, introduce the available Procedure cards, indicating which are classified as "
        "Established (+3 modifier) and which are Other (+0 modifier). The Defenders are ready to begin their investigation."
    ),
    max_turns=None,
)

## Saving Messages

In [None]:
os.makedirs('results', exist_ok=True)
chat_filename = f"results/results_expansion/chat_{base_filename}.json"
print(chat_filename)

In [None]:
with open(chat_filename, 'w') as f:
    json.dump(incident_captain.chat_messages[group_chat_manager], f, indent=2)
print(f"Chat messages saved to {chat_filename}")