In [1]:
import openai
import json
import re
import ast
from typing import Dict, Any, List
from hashlib import sha256
import numpy as np
import logging
from tenacity import retry, stop_after_attempt, wait_exponential
from dotenv import load_dotenv
import os
import textwrap
import pandas as pd
from IPython.display import display, HTML

In [None]:
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")

# Using different models will produce different results, however, in our tests,
# main issues in the code were discovered using all models we tried.
LLM_MODEL_C = "o3-mini"
LLM_MODEL_M = "o3"

EMBEDDING_MODEL = "text-embedding-3-small"

In [3]:
# Custom cosine similarity to avoid sklearn warnings
def safe_cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float:
    try:
        norm1 = np.linalg.norm(vec1)
        norm2 = np.linalg.norm(vec2)
        if norm1 < 1e-10 or norm2 < 1e-10:
            logging.warning("Zero or near-zero norm in cosine similarity, returning 0.0")
            return 0.0
        dot_product = np.dot(vec1, vec2)
        similarity = dot_product / (norm1 * norm2)
        if np.isnan(similarity) or np.isinf(similarity):
            logging.warning("Invalid cosine similarity, returning 0.0")
            return 0.0
        return max(min(similarity, 1.0), 0.0)
    except Exception as e:
        logging.error(f"Cosine similarity failed: {e}")
        return 0.0

# Robust embedding computation
def get_embedding(text: str) -> np.ndarray:
    if not text or not text.strip():
        logging.warning("Empty or invalid text for embedding, returning random vector")
        return np.random.normal(0, 0.01, 1536)
    try:
        response = openai.embeddings.create(
            model=EMBEDDING_MODEL,
            input=text
        )
        embedding = np.array(response.data[0].embedding)
        embedding = np.clip(embedding, -1e10, 1e10)
        norm = np.linalg.norm(embedding)
        if norm < 1e-10:
            logging.warning(f"Zero-norm embedding for text: {text[:50]}...")
            return np.random.normal(0, 0.01, 1536)
        return embedding / norm
    except Exception as e:
        logging.error(f"Embedding generation failed: {e}")
        return np.random.normal(0, 0.01, 1536)

In [None]:
# --- World Model ---
class WorldModel:
    def __init__(self, function_name: str):
        self.function_name = function_name
        self.model = {
            "functions": {},
            "global_invariants": [],
            "contradictions": [],
            "hypotheses": [],
            "meta_insights": [],
            "godel_limits": [],  # Track unanswerable questions
            "hypothesized_model": {},
            "fixpoint_reached": False
        }

    def add_godel_limit(self, question: str):
        if not isinstance(question, str) or not question.strip():
            logging.warning("Invalid Gödel limit question")
            return
        self.model["godel_limits"].append(question)

    def compute_godel_divergence(self, original_code: str, hypothesized_model: Dict[str, Any], inferred_purpose: str) -> float:
        try:
            # Structural divergence
            tree = ast.parse(original_code)
            original_signatures = []
            for node in ast.walk(tree):
                if isinstance(node, ast.FunctionDef):
                    inputs = [arg.arg for arg in node.args.args]
                    returns = ast.unparse(node.returns) if node.returns else "Unknown"
                    original_signatures.append({"name": node.name, "inputs": inputs, "returns": returns})

            hypothesized_functions = hypothesized_model.get("functions", {})
            structural_divergence = 0.0
            for fname, fdata in hypothesized_functions.items():
                hypo_inputs = fdata.get("inputs", [])
                hypo_returns = fdata.get("returns", ["Unknown"])
                orig_func = next((f for f in original_signatures if f["name"] == fname), None)
                if not orig_func:
                    structural_divergence += 0.4
                    continue
                input_match = len(set(hypo_inputs).intersection(orig_func["inputs"])) / max(len(hypo_inputs), len(orig_func["inputs"]), 1)
                return_match = 1.0 if hypo_returns[0] == orig_func["returns"] else 0.0
                structural_divergence += (1 - input_match) * 0.2 + (1 - return_match) * 0.2

            # Semantic divergence (hypothesized model vs. inferred purpose)
            hypo_text = json.dumps(hypothesized_model.get("functions", {}).get(self.function_name, {}))
            purpose_embedding = get_embedding(inferred_purpose)
            hypo_embedding = get_embedding(hypo_text)
            semantic_divergence = 1.0 - safe_cosine_similarity(purpose_embedding, hypo_embedding)

            # Gödel limits
            godel_weight = min(len(self.model["godel_limits"]) * 0.1, 0.2)
            # Developer insights
            meta_insights = hypothesized_model.get("meta_insights", [])
            insight_weight = min(len(meta_insights) * 0.1, 0.3)

            total_divergence = (structural_divergence * 0.3 + semantic_divergence * 0.3 + godel_weight * 0.2 + insight_weight * 0.2)
            logging.info(f"Gödel divergence: structural={structural_divergence:.3f}, semantic={semantic_divergence:.3f}, godel={godel_weight:.3f}, insights={insight_weight:.3f}, total={total_divergence:.3f}")
            return min(total_divergence, 1.0)
        except Exception as e:
            logging.error(f"Gödel divergence computation failed: {e}")
            return 0.0

    def is_fixpoint(self, previous_snapshot: str, last_hypothesis: str, prev_hypothesis: str, original_code: str, inferred_purpose: str) -> bool:
        snapshot_equal = self.snapshot() == previous_snapshot
        if not last_hypothesis or not prev_hypothesis:
            return snapshot_equal
        embedding1 = get_embedding(last_hypothesis)
        embedding2 = get_embedding(prev_hypothesis)
        similarity = safe_cosine_similarity(embedding1, embedding2)
        divergence = self.compute_godel_divergence(original_code, self.model, inferred_purpose)
        return snapshot_equal or similarity > 0.95 or divergence < 0.1

    def update_function(self, name: str, entry: Dict[str, Any]):
        if name not in self.model["functions"]:
            self.model["functions"][name] = entry
        else:
            for k, v in entry.items():
                if k in self.model["functions"][name] and isinstance(v, list):
                    self.model["functions"][name][k] = list(set(self.model["functions"][name][k]).union(set(v)))
                else:
                    self.model["functions"][name][k] = v

    def add_hypothesis(self, hypothesis: str):
        if not isinstance(hypothesis, str) or not hypothesis.strip():
            logging.warning("Invalid hypothesis: must be a non-empty string")
            return
        self.model["hypotheses"].append(hypothesis)
        structured = self.extract_structure_from_hypothesis(hypothesis)
        if structured:
            self.update_function(self.function_name, structured)

    def add_contradiction(self, contradiction: str):
        if not isinstance(contradiction, str) or not contradiction.strip():
            logging.warning("Invalid contradiction: must be a non-empty string")
            return
        self.model["contradictions"].append(contradiction)

    def add_meta_insight(self, insight: str):
        if not isinstance(insight, str) or not insight.strip():
            logging.warning("Invalid meta-insight: must be a non-empty string")
            return
        self.model["meta_insights"].append(insight)

    def dump(self) -> str:
        return json.dumps(self.model, indent=2)

    def snapshot(self) -> str:
        # Use a semantic hash instead of full JSON
        sorted_model = json.dumps({
            "functions": {k: sorted(v.items()) for k, v in self.model["functions"].items()},
            "hypotheses": sorted(self.model["hypotheses"]),
            "contradictions": sorted(self.model["contradictions"]),
            "meta_insights": sorted(self.model["meta_insights"])
        }, sort_keys=True)
        return sha256(sorted_model.encode()).hexdigest()

    def extract_structure_from_hypothesis(self, hypothesis: str) -> Dict[str, Any]:
        structured = {"bugs": [], "assumptions": [], "improvements": []}
        lines = hypothesis.split("\n")
        for line in lines:
            line = line.strip()
            if re.search(r'(?i)^\d+\..*bug', line):
                structured["bugs"].append(line)
            elif re.search(r'(?i)^\d+\..*assumption', line):
                structured["assumptions"].append(line)
            elif re.search(r'(?i)^\d+\..*(solution|recommendation|improvement|fix)', line):
                structured["improvements"].append(line)
        return structured if any(structured.values()) else None

In [5]:
# --- Universal Python Loader ---
class PythonFunctionLoader:
    def __init__(self, filepath: str, function_name: str):
        self.filepath = filepath
        self.function_name = function_name

    def load_function_code(self) -> str:
        try:
            with open(self.filepath, "r") as f:
                return f.read()
        except FileNotFoundError:
            logging.error(f"File not found: {self.filepath}")
            raise
        except Exception as e:
            logging.error(f"Error reading file {self.filepath}: {e}")
            raise

    def load_function_object(self):
        try:
            spec = importlib.util.spec_from_file_location("target_module", self.filepath)
            if spec is None:
                raise ValueError(f"Cannot create spec for {self.filepath}")
            module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(module)
            return getattr(module, self.function_name)
        except AttributeError:
            logging.error(f"Function {self.function_name} not found in {self.filepath}")
            raise
        except Exception as e:
            logging.error(f"Error loading module {self.filepath}: {e}")
            raise

    def extract_function_metadata(self) -> Dict[str, Any]:
        try:
            with open(self.filepath, "r") as f:
                tree = ast.parse(f.read())
            for node in ast.walk(tree):
                if isinstance(node, ast.FunctionDef) and node.name == self.function_name:
                    inputs = [arg.arg for arg in node.args.args]
                    returns = ast.unparse(node.returns) if node.returns else "Unknown"
                    return {
                        "inputs": inputs,
                        "returns": [returns],
                        "status": "parsed"
                    }
            return {}
        except Exception as e:
            logging.error(f"Error parsing metadata for {self.function_name}: {e}")
            return {}

In [6]:
# --- OpenAI LLM Interface ---
class OpenAILLM:
    def __init__(self, model: str = "o1", role: str = "debugger"):
        self.model = model
        self.role = role

    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    def ask(self, prompt: str) -> str:
        try:
            response = openai.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": self._get_system_prompt()},
                    {"role": "user", "content": prompt}
                ],
                # temperature=0.7
            )
            return response.choices[0].message.content.strip()
        except Exception as e:
            logging.error(f"LLM query failed: {e}")
            raise

    def _get_system_prompt(self) -> str:
        if self.role == "debugger":
            return "You are a precise, introspective, and autonomous debugger embedded inside a software system. Format responses strictly as numbered lists (e.g., '1. Bug: ...') when requested. Avoid generic disclaimers. Answer with clarity and focus."
        elif self.role == "developer":
            return "You are the developer who created the program. Provide clear, concise answers about your intent, design decisions, and assumptions behind the code. Avoid speculation."


In [7]:
# --- Debugger Agent (C) ---
class DebuggerAgent:
    def __init__(self, llm: OpenAILLM, developer_llm: OpenAILLM, world_model: WorldModel, function_name: str):
        self.llm = llm
        self.developer_llm = developer_llm
        self.world_model = world_model
        self.history = []
        self.prev_hypothesis = ""
        self.function_name = function_name

    def observe(self, code: str, metadata: Dict[str, Any]) -> str:
        if not metadata.get("inputs") and metadata.get("status") != "parsed":
            logging.warning("Invalid metadata, using defaults")
            metadata = {"inputs": [], "returns": ["Unknown"], "status": "default"}
        prompt = f"""
You are a self-aware debugging agent analyzing this program:

{code}

Function metadata:
- Inputs: {metadata.get('inputs', [])}
- Returns: {metadata.get('returns', [])}

Build a model of how this function works. List:
- Its purpose
- Its logic flow
- Any preconditions or assumptions
- Potential issues or incomplete aspects
"""
        observation = self.llm.ask(prompt)
        self.world_model.update_function(self.function_name, {
            "inputs": metadata.get("inputs", []),
            "returns": metadata.get("returns", []),
            "assumptions": [],
            "status": "observed"
        })
        return observation

    def hypothesize(self, code: str, observation: str, retries: int = 3) -> str:
        prompt = f"""
Given the code:
{code}

And prior observations:
{observation}

Identify specific bugs, logical inconsistencies, or unstated assumptions that may lead to errors.
Format your response as a numbered list, e.g.:
1. Bug: [description]
2. Assumption: [description]
3. Improvement: [description]

Example:
1. Bug: Division by zero not handled
2. Assumption: Input is always positive
3. Improvement: Add input validation
"""
        for attempt in range(retries):
            hypothesis = self.llm.ask(prompt)
            logging.info(f"Hypothesis attempt {attempt + 1}: {hypothesis}")
            if re.match(r'(\d+\.\s*(Bug|Assumption|Improvement):.*\n?)+', hypothesis, re.IGNORECASE):
                return hypothesis
            logging.warning(f"Invalid hypothesis format, attempt {attempt + 1}/{retries}")
        # Fallback parsing for unstructured text
        fallback = self._parse_unstructured_hypothesis(hypothesis)
        logging.info(f"Fallback hypothesis: {fallback}")
        return fallback

    def _parse_unstructured_hypothesis(self, text: str) -> str:
        lines = text.split("\n")
        structured = []
        for i, line in enumerate(lines, 1):
            line = line.strip()
            if not line:
                continue
            if re.search(r'(?i)bug', line):
                structured.append(f"{i}. Bug: {line}")
            elif re.search(r'(?i)assumption', line):
                structured.append(f"{i}. Assumption: {line}")
            elif re.search(r'(?i)(improvement|solution|recommendation|fix)', line):
                structured.append(f"{i}. Improvement: {line}")
            else:
                structured.append(f"{i}. Assumption: {line} (inferred)")
        return "\n".join(structured) if structured else "Unable to generate structured hypothesis"

    def query_developer(self, code: str, hypothesis: str) -> str:
        prompt = f"""
You wrote this code:
{code}

An AI debugger hypothesized:
{hypothesis}

Answer these questions to clarify your intent:
1. What was the primary purpose of this function?
2. What assumptions did you make about the inputs or environment?
3. Are there any known limitations or edge cases you considered?
"""
        return self.developer_llm.ask(prompt)

    def refine_model(self, key: str, value: str):
        if key == "hypothesis":
            self.world_model.add_hypothesis(value)
        elif key == "contradiction":
            self.world_model.add_contradiction(value)
        elif key == "meta_insight":
            self.world_model.add_meta_insight(value)
        self.history.append((key, value))


    def build_hypothesized_model(self, code: str, meta_insights: list) -> Dict[str, Any]:
        """
        Construct a new model of the code incorporating M's insights, code review, and initial laws.
        """
        new_model = {
            "functions": {},
            "assumptions": [],
            "laws": [
                "Inputs must be validated for type and range.",
                "Error handling must be consistent (status codes or exceptions).",
                "Critical events must be logged for auditing."
            ]
        }
        # Parse original code
        try:
            tree = ast.parse(code)
            for node in ast.walk(tree):
                if isinstance(node, ast.FunctionDef):
                    inputs = [arg.arg for arg in node.args.args]
                    returns = ast.unparse(node.returns) if node.returns else "str"
                    new_model["functions"][node.name] = {
                        "inputs": inputs,
                        "returns": [returns],
                        "assumptions": [],
                        "bugs": [],
                        "improvements": []
                    }
        except Exception as e:
            logging.error(f"Code parsing failed: {e}")

        # Incorporate meta-insights from M
        for insight in meta_insights:
            if "purpose" in insight.lower():
                new_model["functions"][self.function_name]["purpose"] = insight
            if "assumption" in insight.lower():
                new_model["functions"][self.function_name]["assumptions"].append(insight)
            if "limitation" in insight.lower():
                new_model["functions"][self.function_name]["bugs"].append(insight)

        # Apply code review findings
        new_model["functions"][self.function_name]["bugs"].extend([
            "Missing credit limit check (e.g., >10,000).",
            "Inconsistent error handling (strings vs. exceptions).",
            "Hardcoded user_id 42 block."
        ])
        new_model["functions"][self.function_name]["improvements"].extend([
            "Add type/range validation for inputs.",
            "Implement logging for critical events.",
            "Use configurable blocked user list."
        ])
        return new_model

    # not used
    def test_counterfactuals(self, code: str, metadata: Dict[str, Any]) -> str:
        prompt = f"""
Given the code:
{code}

And function metadata:
- Inputs: {metadata.get('inputs', [])}
- Returns: {metadata.get('returns', [])}

Test counterfactual scenarios to reveal hidden assumptions or edge cases. Consider:
- Invalid input types (e.g., string for numeric fields)
- Extreme values (e.g., very large amounts, negative balances)
- Unexpected transaction types or fraud flags
List findings as:
1. Scenario: [description]
2. Expected Behavior: [what should happen]
3. Potential Issue: [what might go wrong]
"""
        counterfactuals = self.llm.ask(prompt)
        if not re.match(r'(\d+\.\s*Scenario:.*\n?)+', counterfactuals, re.IGNORECASE):
            logging.warning("Invalid counterfactual format, using fallback")
            counterfactuals = "Unable to generate structured counterfactuals"
        return counterfactuals

    # not used
    def compare_to_truth(self, hypothesized_model: Dict[str, Any], meta_insights: List[str]) -> str:
        truth_spec = """
Expected behavior for process_payment:
- Inputs: user_id (int), amount (float > 0), account_balance (float), is_fraud_flagged (bool), transaction_type (str, 'credit' or 'debit')
- Returns: str ('Success' or error message)
- Validates amount > 0, transaction_type in ['credit', 'debit'], user_id not blocked, no fraud, sufficient funds for debit
- No credit limit check (known limitation)
- Consistent error messages, no exceptions
"""
        prompt = f"""
Compare the hypothesized model:
{json.dumps(hypothesized_model, indent=2)}

With developer insights:
{' '.join(meta_insights)}

Against ground truth:
{truth_spec}

List discrepancies as:
1. Discrepancy: [description]
2. Source: [hypothesized model, developer insights, or both]
3. Implication: [impact on understanding]
"""
        return self.llm.ask(prompt)

    def infer_purpose(self, code: str, metadata: Dict[str, Any], meta_insights: List[str], hypothesized_model: Dict[str, Any]) -> str:
        prompt = f"""
Given the code:
{code}

Function metadata:
- Inputs: {metadata.get('inputs', [])}
- Returns: {metadata.get('returns', [])}

Developer insights:
{' '.join(meta_insights) or 'None'}

Hypothesized model:
{json.dumps(hypothesized_model, indent=2)}

Infer the intended purpose of the function, synthesizing its functionality, inputs, outputs, and developer intent. Describe the purpose concisely, focusing on the core behavior and business rules. If intent is unclear, note ambiguities as Gödelian limits. Format as:
Purpose: [description]
Ambiguities: [unresolvable questions or assumptions]
"""
        purpose_response = self.llm.ask(prompt)
        if not purpose_response.strip():
            logging.warning("Empty purpose response, adding Gödel limit")
            self.world_model.add_godel_limit("Unable to infer function purpose")
            purpose_response = "Purpose: Unknown\nAmbiguities: Failed to infer purpose from code and insights"
        return purpose_response

    def visualize_debug_state(self, step: int, observation: str, hypothesis: str, counterfactuals: str, developer_insight: str, inferred_purpose: str):
        print(f"\n=== Debug State (Step {step}) ===")
        print("\n[Model State]")
        print(textwrap.indent(json.dumps(self.world_model.model["functions"], indent=2), "  "))
        print("\n[Gödel Limits]")
        print(textwrap.indent("\n".join(self.world_model.model["godel_limits"]) or "None", "  "))
        print("\n[Observation]")
        print(textwrap.indent(observation, "  "))
        print("\n[Hypothesis]")
        print(textwrap.indent(hypothesis, "  "))
        print("\n[Counterfactuals]")
        print(textwrap.indent(counterfactuals, "  "))
        if developer_insight:
            print("\n[Developer Insight]")
            print(textwrap.indent(developer_insight, "  "))
        if inferred_purpose:
            print("\n[Inferred Purpose]")
            print(textwrap.indent(inferred_purpose, "  "))
        print("\n================\n")

    def recursive_debug(self, code: str, metadata: Dict[str, Any], max_steps: int = 5, min_steps: int = 2):
        for step in range(max_steps):
            print(f"\n--- Q{step+1} ---")
            previous_snapshot = self.world_model.snapshot()

            # Q₁: Observe
            obs = self.observe(code, metadata)
            print("\n[OBSERVATION]\n", obs)

            # Q₂: Hypothesize contradictions
            hypo = self.hypothesize(code, obs)
            print("\n[HYPOTHESIS]\n", hypo)
            self.refine_model("hypothesis", hypo)

            # Q₃: Hypothesize intent (via developer)
            dev_response = ""
            if step % 2 == 0 and step < max_steps - 1:
                dev_response = self.query_developer(code, hypo)
                print("\n[DEVELOPER INSIGHT]\n", dev_response)
                self.refine_model("meta_insight", dev_response)
                if "cannot determine" in dev_response.lower() or "unknown" in dev_response.lower():
                    self.world_model.add_godel_limit(f"Step {step+1}: Unable to infer intent from developer response")

            # Q₄: Test counterfactuals
            counter = self.test_counterfactuals(code, metadata)
            print("\n[COUNTERFACTUALS]\n", counter)
            self.refine_model("counterfactual", counter)

            # Update hypothesized model
            hypothesized_model = self.build_hypothesized_model(code, self.world_model.model["meta_insights"])
            self.world_model.model["hypothesized_model"] = hypothesized_model

            # Q₅: Infer purpose
            inferred_purpose = self.infer_purpose(code, metadata, self.world_model.model["meta_insights"], hypothesized_model)
            print("\n[INFERRED PURPOSE]\n", inferred_purpose)
            self.refine_model("inferred_purpose", inferred_purpose)

            # Visualize state
            self.visualize_debug_state(step + 1, obs, hypo, counter, dev_response, inferred_purpose)

            # Qₙ: Check fixpoint
            if step >= min_steps - 1 and self.world_model.is_fixpoint(previous_snapshot, hypo, self.prev_hypothesis, code, inferred_purpose):
                print("\n[FIXPOINT REACHED] Model stabilized, hypotheses converged, or low Gödel divergence.")
                self.world_model.model["fixpoint_reached"] = True
                break

            self.prev_hypothesis = hypo

    def refine_model(self, key: str, value: str):
        if key == "hypothesis":
            self.world_model.add_hypothesis(value)
        elif key == "contradiction":
            self.world_model.add_contradiction(value)
        elif key == "meta_insight":
            self.world_model.add_meta_insight(value)
        elif key == "counterfactual":
            self.world_model.model.setdefault("counterfactuals", []).append(value)
        elif key == "inferred_purpose":
            self.world_model.model.setdefault("inferred_purposes", []).append(value)
        self.history.append((key, value))

In [8]:
# --- Main Execution ---
function_name = "process_advanced_payment"
function_file = "../tests/target_function2.py"

try:
    loader = PythonFunctionLoader(function_file, function_name)
    code = loader.load_function_code()
    metadata = loader.extract_function_metadata()

    model = WorldModel(function_name)
    llm = OpenAILLM(model=LLM_MODEL_C, role="debugger")
    dev_llm = OpenAILLM(model=LLM_MODEL_M, role="developer")
    debugger = DebuggerAgent(llm, dev_llm, model, function_name)

    debugger.recursive_debug(code, metadata)
    print("\n[FINAL WORLD MODEL]\n", model.dump())

except Exception as e:
    logging.error(f"Debugger failed: {e}")
    model = WorldModel(function_name)  # Reset world model on failure
    print("\n[ERROR] Debugger failed, world model reset.")


--- Q1 ---

[OBSERVATION]
 1. Purpose:
   - The function is designed to process payment transactions with features such as currency conversion, user tier-based transaction amount limits, validation of transaction details, and audit logging of the operations.

2. Logic Flow:
   - Validate that the transaction type is one of the allowed ones ('credit', 'debit', 'refund', 'transfer'); if not, a ValueError is raised.
   - Check if the user is blocked by comparing the user_id against a list of blocked users. If blocked, return an error message.
   - Ensure the transaction amount is positive; otherwise, return an error.
   - Validate the user tier against predefined limits for basic, premium, or enterprise users.
   - If the source and target currencies differ, perform a currency conversion using a hardcoded exchange rate lookup. If the currency pair is unsupported, return an error.
   - Check that the converted amount does not exceed the transaction limit defined by the user's tier.
   - P

In [9]:
print(model.dump())

{
  "functions": {
    "process_advanced_payment": {
      "inputs": [
        "account_balance",
        "is_fraud_flagged",
        "user_id",
        "target_currency",
        "source_currency",
        "transaction_type",
        "amount",
        "user_tier"
      ],
      "returns": [
        "str"
      ],
      "assumptions": [
        "3. Assumption: The hardcoded exchange_rates dictionary is assumed to include all required currency pairs\u2014if a conversion pair isn\u2019t present, a KeyError is caught and an error is returned, which may hide underlying issues.",
        "4. Assumption: The function assumes all input types are correct and does not validate type safety, relying on the caller to provide proper data.",
        "4. Assumption: The updated account balance (new_balance) is calculated and logged but not persisted or returned to update the actual account state, implying an unstated dependency on external state management.",
        "4. Assumption: The function assu

In [16]:
class ModelPresenter:
    def __init__(self, model_input, function_name, similarity_threshold = 0.8):
        self.function_name = function_name
        self.similarity_threshold = similarity_threshold
        if isinstance(model_input, str):
            try:
                model_input = json.loads(model_input)
                if not isinstance(model_input, dict):
                    logging.error("JSON string did not parse to a dictionary")
                    raise ValueError("ModelPresenter expects a JSON string parsing to a dict, WorldModel, or dict")
            except json.JSONDecodeError as e:
                logging.error(f"Invalid JSON string: {e}")
                raise ValueError("ModelPresenter expects a valid JSON string, WorldModel, or dict")
        if isinstance(model_input, dict):
            self.model = model_input
        elif hasattr(model_input, 'model') and isinstance(model_input.model, dict):
            self.model = model_input.model
        else:
            logging.error("Invalid input: Expected JSON string, dict, or WorldModel with 'model' attribute")
            raise ValueError("ModelPresenter requires a JSON string, dict, or WorldModel instance")
        if 'functions' not in self.model or not isinstance(self.model['functions'], dict):
            logging.error("Model lacks valid 'functions' dictionary")
            raise ValueError("Model must contain a 'functions' dictionary")

    def _clean_entry(self, entry: str) -> str:
        """Remove numbering prefixes and normalize text."""
        try:
            cleaned = re.sub(r'^\d+\.\s*(Bug|Improvement|Assumption):\s*', '', entry, flags=re.IGNORECASE)
            return ' '.join(cleaned.strip().split())
        except Exception as e:
            logging.error(f"Error cleaning entry '{entry}': {e}")
            return entry

    def _extract_unique_entries(self, key: str) -> List[str]:
        """Extract unique bugs or improvements, merging semantically similar bugs."""
        try:
            entries = self.model.get('functions', {}).get(self.function_name, {}).get(key, [])
            if not entries:
                return []

            # Clean entries
            cleaned_entries = [self._clean_entry(entry) for entry in entries]
            if not cleaned_entries:
                return []

            # For improvements, use exact deduplication (nuances matter)
            if key == 'improvements':
                seen = set()
                return [entry for entry in cleaned_entries if not (entry in seen or seen.add(entry))]

            # For bugs, cluster by semantic similarity
            embeddings = [get_embedding(entry) for entry in cleaned_entries]
            clusters = []
            used = set()

            for i, emb1 in enumerate(embeddings):
                if i in used:
                    continue
                cluster = [i]
                for j, emb2 in enumerate(embeddings[i+1:], start=i+1):
                    if j in used:
                        continue
                    similarity = safe_cosine_similarity(emb1, emb2)
                    if similarity > self.similarity_threshold:
                        cluster.append(j)
                        used.add(j)
                clusters.append(cluster)
                used.add(i)

            # Select representative bug (shortest description) from each cluster
            unique_entries = []
            for cluster in clusters:
                cluster_entries = [cleaned_entries[i] for i in cluster]
                # Choose shortest entry as representative
                representative = min(cluster_entries, key=len)
                unique_entries.append(representative)

            logging.info(f"Clustered {len(cleaned_entries)} bugs into {len(unique_entries)} unique entries")
            return unique_entries

        except Exception as e:
            logging.error(f"Error extracting {key}: {e}")
            # Fallback to exact deduplication
            entries = self.model.get('functions', {}).get(self.function_name, {}).get(key, [])
            cleaned_entries = [self._clean_entry(entry) for entry in entries]
            seen = set()
            return [entry for entry in cleaned_entries if not (entry in seen or seen.add(entry))]

    def display_tables(self):
        """Display bugs and improvements as separate tables in Jupyter."""
        try:
            bugs = self._extract_unique_entries('bugs')
            improvements = self._extract_unique_entries('improvements')

            bugs_df = pd.DataFrame(bugs, columns=['Bugs']) if bugs else pd.DataFrame(columns=['Bugs'])
            improvements_df = pd.DataFrame(improvements, columns=['Improvements']) if improvements else pd.DataFrame(columns=['Improvements'])

            def style_df(df):
                return df.style.set_properties(**{
                    'text-align': 'left',
                    'border-color': 'black',
                    'border-style': 'solid',
                    'border-width': '1px',
                    'padding': '5px',
                    'font-size': '14px'
                }).set_table_styles([
                    {'selector': 'th', 'props': [('background-color', '#f0f0f0'), ('font-weight', 'bold'), ('text-align', 'center')]},
                    {'selector': 'table', 'props': [('border-collapse', 'collapse'), ('width', '100%')]}
                ])

            if not bugs_df.empty:
                display(HTML("<h3>Bugs</h3>"))
                display(style_df(bugs_df))
            else:
                display(HTML("<h3>Bugs</h3><p>No bugs found.</p>"))

            if not improvements_df.empty:
                display(HTML("<h3>Improvements</h3>"))
                display(style_df(improvements_df))
            else:
                display(HTML("<h3>Improvements</h3><p>No improvements suggested.</p>"))

        except Exception as e:
            logging.error(f"Error displaying tables: {e}")
            display(HTML("<p>Error rendering tables. Check logs for details.</p>"))

In [17]:
# Model output contains multiple descriptions of the same issue but usually with different wording.
# Adjust the similarity threshold to control how similar the descriptions need to be to be considered duplicates.

presenter = ModelPresenter(model.dump(), function_name, similarity_threshold = 0.8)
presenter.display_tables()

Unnamed: 0,Bugs
0,"For ""transfer"" transactions, the code subtracts the converted amount from account_balance without first checking for sufficient funds, unlike the ""debit"" case."
1,"Audit log file is opened in write mode (""w""), which overwrites existing logs instead of appending new entries."
2,"An invalid transaction type immediately raises an exception without a try-catch mechanism, potentially causing an unhandled exception in the application."
3,"The fraud detection check (is_fraud_flagged) occurs after processing the transaction logic; if fraud is flagged, the transaction logic has already been executed (e.g., balance calculation), which could lead to inconsistent transaction state."
4,"The function adjusts the balance in-memory (new_balance) without updating any persistent user account record, limiting its practical application in a real system."
5,"In the currency conversion block, unsupported currency pairs trigger a KeyError that is caught and returns an error message. However, providing guidance or logging details why the conversion failed could improve debuggability."
6,"While the debit transaction checks for insufficient funds, the transfer transaction does not perform this check even though it subtracts the converted amount, potentially allowing transfers with insufficient funds."
7,The fraud check is performed after processing the transaction and computing the new balance. This can lead to processing (and potential logging) of a transaction that should be halted.
8,"The 'transfer' transaction type does not validate whether the account has sufficient funds, which can result in a negative balance."
9,The error handling is inconsistent—some validation failures return error strings while an invalid transaction type raises a ValueError and fraud detection raises an exception—leading to mixed control flows that may confuse downstream error handling.


Unnamed: 0,Improvements
0,"Consider performing all necessary checks (such as sufficient funds for all debit-like operations, including transfers) before processing and updating any balances."
1,"Use append (""a"") mode for audit logging to preserve historical logs and maintain a complete audit trail."
2,"If a transaction is flagged as fraud, implement a rollback or avoid committing partial processing results (such as changes to the computed new_balance) to ensure transactional integrity."
3,"It would be beneficial to handle I/O errors for audit logging in a way that informs the system (or administrator) rather than silently passing, which might hide important issues."
4,"For persistent balance updates, integrate with a database or state management system rather than only computing a local balance change, ensuring the transaction's effects are properly recorded."
5,Consider moving the fraud check earlier in the process before computing or applying the transaction to avoid unnecessary processing and potential inconsistencies.
6,"Floating-point arithmetic is used without explicit rounding or precision management, which could lead to minor inaccuracies in currency conversion and balance calculations."
7,"The fraud detection check raises an exception after updating the balance locally, which means no audit log is generated for fraudulent transactions. Adding logging or additional context before raising the exception would aid in further investigations."
8,"The system relies on hardcoded data for exchange rates, blocked users, and tier limits, meaning any changes in these parameters require code modifications rather than dynamic updates."


In [12]:
# save the model to a file
with open("../tests/world_model2.json", "w") as f:
    json.dump(model.model, f, indent=2)
