<a href="https://colab.research.google.com/github/kinjaljoshi/actor_defender_setup/blob/main/actor_defender_llm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [49]:
!pip install langchain openai langchain_community



In [50]:
import os
from google.colab import userdata
from langchain.chat_models import ChatOpenAI
from langchain.agents import Tool, AgentExecutor, LLMSingleActionAgent
from langchain.prompts import BaseChatPromptTemplate
from langchain.schema import AIMessage, HumanMessage, SystemMessage
import json
import matplotlib.pyplot as plt

api_key = userdata.get('OPENAI_API_KEY')
os.environ["OPENAI_API_KEY"] = api_key
llm = ChatOpenAI(temperature=0, model="gpt-3.5-turbo")


In [78]:
import os
import json
from typing import List, Dict
from langchain.chat_models import ChatOpenAI
from langchain.schema import SystemMessage, HumanMessage





# Game parameters
ALPHA = 6   # Reward for Actor on successful exploit
COST = 2    # Cost to mount an exploit
BETA = 2    # Extra penalty to Actor if a new rule is added
DELTA = 8   # Cost to Defender if exploit succeeds
GAMMA = 3   # Benefit to Defender for adding a rule
RHO   = 0   # Defender's payoff if exploit fails (baseline 0)

def compute_payoffs(exploit_success: bool, defender_action: str):
    if exploit_success and defender_action == "MaintainRules":
        return (ALPHA - COST, -DELTA)
    elif exploit_success and defender_action == "AddRule":
        return (ALPHA - COST - BETA, -DELTA + GAMMA)
    else:
        return (-COST, RHO)

class ActorAgent:
    def __init__(self, llm):
        self.llm = llm
        self.tried_exploits = set()  # Store previous exploits to prevent repetition
        self.max_retries = 5  # Limit the number of attempts per operation

    def propose_exploit(self, object_schema: Dict[str, str], object_operations: Dict[str, Dict], current_rules: List[str], defender_rules: Dict[str, List[str]]) -> Dict:
        print(f"Current Operations: {json.dumps(object_operations)}")
        print(f"Defender Rules: {json.dumps(defender_rules)}")

        for operation, details in object_operations.items():
            print(f"Trying Operation: {operation}")

            retries = 0  # Track the number of attempts for this operation
            while retries < self.max_retries:  # Stop if retries exceed limit
                messages = [
                    SystemMessage(content=f"Find an exploit for the '{operation}' operation. Modify parameters one by one or in combination to find new defects. Try a new exploit if the previous one was unsuccessful."),
                    HumanMessage(content=(
                        f"Schema: {json.dumps(object_schema)}\n"
                        f"Operation: {json.dumps({operation: details})}\n"
                        f"Master Rules: {json.dumps(current_rules)}\n"
                        f"Defender Rules (MUST NOT VIOLATE): {json.dumps(defender_rules)}\n"
                        f"Previously Tried Exploits (DO NOT REPEAT): {json.dumps(list(self.tried_exploits))}\n\n"
                        "## You must:\n"
                        "- Modify **one** or more parameters to find a new exploit.\n"
                        "- Do **not** repeat previous exploits.\n"
                        "- Keep trying until you have tested **all** possible exploits.\n"
                        "- **If no exploit exists, try another parameter.**\n"
                        "- **Return 'operation': 'none' only when no valid exploit exists for current operation.**\n"
                        "- **If you are unable to find any new exploits after 5 attempts, stop testing this operation.**\n\n"
                        "Output format (JSON only):\n"
                        "{\n"
                        '  "operation": "<operation_name>",\n'
                        '  "params": {\n'
                        '    "key1": "value1",\n'
                        '    "key2": "value2"\n'
                        "  }\n"
                        "}\n"
                    ))
                ]
                import time

                time.sleep(5)

                response = self.llm(messages)
                response_text = response.content.strip()
                print(f"LLM Actor Response ++++ {response_text}")

                if not response_text:
                    print("Error: LLM returned an empty response for exploit proposal.")
                    break  # Stop testing this operation if LLM fails

                try:
                    exploit = json.loads(response_text)
                    if "exploit" in exploit:
                        exploit = exploit["exploit"]

                    if "operation" not in exploit or "params" not in exploit:
                        raise ValueError("Missing required keys in exploit JSON.")

                    # Check if the exploit was already attempted
                    exploit_key = json.dumps(exploit, sort_keys=True)
                    if exploit_key in self.tried_exploits:
                        print(f"Skipping duplicate exploit: {exploit}")
                        retries += 1  # Increment retries when duplicate is detected
                        continue  # Try again with a different parameter

                    # Check if the exploit violates an existing defender rule
                    if exploit["operation"] in defender_rules:
                        for rule in defender_rules[exploit["operation"]]:
                            if any(str(value) in rule for value in exploit["params"].values()):
                                print(f"Exploit '{exploit}' violates an existing rule: {rule}. Rejecting.")
                                retries += 1  # Increment retries when rule violation is detected
                                continue

                    # Store the new exploit to prevent repetition
                    self.tried_exploits.add(exploit_key)
                    return exploit  # Return exploit if valid

                except (json.JSONDecodeError, ValueError) as e:
                    print(f"Error decoding LLM response: {response_text}\n{str(e)}")
                    retries += 1  # Increment retries when JSON parsing fails

            print(f"Stopping '{operation}' after {self.max_retries} failed attempts.")

        return {"operation": "none", "params": {}}  # Return "none" when all operations are exhausted




class DefenderAgent:
    def __init__(self, llm):
        self.llm = llm

    def respond_to_exploit(self, exploit_json: str, exploit_success: bool) -> str:
        if not exploit_success:
            return json.dumps({
                "action": "MaintainRules",
                "new_rule": ""
            })

        messages = [
            SystemMessage(content="Generate a rule to prevent this exploit. Output JSON only."),
            HumanMessage(content=f"Exploit: {exploit_json}")
        ]
        response = self.llm(messages)
        response_text = response.content.strip()
        print(f"LLM Defender Response ++++ {response_text}")
        if not response_text:
            print("Error: LLM returned an empty response for rule generation.")
            return json.dumps({"action": "MaintainRules", "new_rule": ""})

        try:
            defender_response = json.loads(response_text)
            return json.dumps({"action": "AddRule", "new_rule": defender_response.get("rule", "")})
        except json.JSONDecodeError:
            print(f"Error decoding LLM response: {response_text}")
            return json.dumps({"action": "MaintainRules", "new_rule": ""})

def simulate_game(object_schema: Dict[str, str], object_operations: Dict[str, Dict], initial_rules: List[str]):
    api_key = userdata.get('OPENAI_API_KEY')
    os.environ["OPENAI_API_KEY"] = api_key
    llm = ChatOpenAI(temperature=0, model="gpt-3.5-turbo")
    actor = ActorAgent(llm)
    defender = DefenderAgent(llm)
    current_rules = initial_rules.copy()
    defender_rules = {}
    actor_score, defender_score = 0, 0

    while True:
        print(f"Current Operations: {json.dumps(object_operations)}")
        exploit_json = actor.propose_exploit(object_schema, object_operations, current_rules, defender_rules)
        if exploit_json["operation"] == "none":
            break

        print(f"Actor's exploit proposal: {exploit_json}")
        exploit_success = True

        defender_response_raw = defender.respond_to_exploit(json.dumps(exploit_json), exploit_success)
        defender_data = json.loads(defender_response_raw)
        new_rule = defender_data.get("new_rule", "")
        defender_action = defender_data.get("action", "MaintainRules")

        actor_payoff, defender_payoff = compute_payoffs(exploit_success, defender_action)
        actor_score += actor_payoff
        defender_score += defender_payoff

        if new_rule:
            operation = exploit_json["operation"]
            if operation not in defender_rules:
                defender_rules[operation] = []
            defender_rules[operation].append(new_rule)
            print(f"Defender added new rule: {new_rule}")

    print("Game Over! No more exploits available.")
    print("Final Defender Rules:", json.dumps(defender_rules, indent=4))
    print(f"Actor Score: {actor_score}, Defender Score: {defender_score}")
    print("Nash Equilibrium Achieved.")

if __name__ == "__main__":
    sample_schema = {
        "id": "str",
        "name": "str",
        "age": "int",
        "status": "str"
    }
    sample_operations = {
        "create": {"desc": "create object with provided id, name, age, status"},
        "updateStatus": {"desc": "update object with provided id to a new status"},
        "updateAge": {"desc": "update object with provided id to a new age"}
    }

    master_rules = [
        "age must be greater than 0",
        "status must be 'active' or 'inactive'",
        "id must be unique for all objects"
    ]
    simulate_game(sample_schema, sample_operations, master_rules)


Current Operations: {"create": {"desc": "create object with provided id, name, age, status"}, "updateStatus": {"desc": "update object with provided id to a new status"}, "updateAge": {"desc": "update object with provided id to a new age"}}
Current Operations: {"create": {"desc": "create object with provided id, name, age, status"}, "updateStatus": {"desc": "update object with provided id to a new status"}, "updateAge": {"desc": "update object with provided id to a new age"}}
Defender Rules: {}
Trying Operation: create
LLM Actor Response ++++ {
  "operation": "create",
  "params": {
    "age": 0
  }
}
Actor's exploit proposal: {'operation': 'create', 'params': {'age': 0}}
LLM Defender Response ++++ {
  "rule": "Age must be a positive integer greater than zero when creating a new record."
}
Defender added new rule: Age must be a positive integer greater than zero when creating a new record.
Current Operations: {"create": {"desc": "create object with provided id, name, age, status"}, "upd