# Custom-agent:SBOM Analysis with AG2

### Description
This is a sequential agent pipeline for automated security analysis of Git repositories. The pipeline uses AG2's group chat patterns with handoff conditions to orchestrate a multi-stage security workflow.
### Purpose
- Automate the process of scanning Git repositories for vulnerabilities
- Generate comprehensive security reports
- Provide a structured workflow for security analysis

## Overview

The notebook showcases:

1. Setting up Openai LLMConfig integration with AG2
2. Creating specialized security agents for vulnerability scanning
3. Using Trivy for automated SBOM analysis
4. Implementing function calling for repository scanning
5. Generating comprehensive security reports

## Requirements

### Required system tools
- Trivy (for vulnerability scanning)
- Python 3.10+

### Dependencies
```bash
pip install -q "ag2[openai]"
```

### Additional packages
subprocess  # Built-in

## Environment Setup

### Install Trivy (macOS)
```bash
brew install trivy
```
### Install Trivy (Ubuntu/Debian)
```bash
sudo apt-get install wget apt-transport-https gnupg lsb-release
wget -qO - https://aquasecurity.github.io/trivy-repo/deb/public.key | sudo apt-key add -
echo deb https://aquasecurity.github.io/trivy-repo/deb $(lsb_release -sc) main | sudo tee -a /etc/apt/sources.list.d/trivy.list
sudo apt-get update
sudo apt-get install trivy
```

### Verify installation
```bash
trivy --version
```

#### Configure Environment Variables
   ```
   OPENAI_API_KEY="your-oai-api-key"
   ```

In [21]:
from autogen import ConversableAgent, LLMConfig
from autogen.agentchat import initiate_group_chat
from autogen.agentchat.group.patterns import DefaultPattern
from autogen.agentchat.group import (
    AgentTarget, 
    RevertToUserTarget, 
    OnCondition, 
    StringLLMCondition,
    OnContextCondition,
    ExpressionContextCondition,
    ContextExpression,
    ContextVariables
)
import subprocess
import datetime

llm_config = LLMConfig(api_type="openai", model="gpt-5-nano")

# Shared context for tracking pipeline state
shared_context = ContextVariables(data={
    "pipeline_started": False,
    "scan_completed": False,
    "report_generated": False,
    "has_error": False,
    "error_message": "",
    "repo_url": "",
    "scan_results": ""
})


In [31]:
with llm_config:
    triage_agent = ConversableAgent(
        name="triage_agent",
        system_message="""You are the entry point for SBOM and vulnerability analysis.
        
        Your responsibilities:
        1. Validate that the user provided a valid git repository URL
        2. Extract the repository URL from the user's message
        3. Route valid requests to the appsec_agent
        4. Return errors for invalid requests
        
        Always use the start_scan_pipeline function to initiate the scan process."""
    )

    appsec_agent = ConversableAgent(
        name="appsec_agent",
        system_message="""You are an application security expert specializing in SBOM analysis and vulnerability scanning.
        
        Your responsibilities:
        1. Perform comprehensive vulnerability scanning using the scan_git_repo_vulnerabilities tool
        2. Analyze the scan results
        3. Pass results to the report agent
        
        Always use the scan_git_repo_vulnerabilities function to execute the scan."""
    )

    report_agent = ConversableAgent(
        name="report_agent",
        system_message="""You are a security report specialist.
        
        Your responsibilities:
        1. Receive scan results from the appsec agent
        2. Generate a comprehensive, human-readable security report
        3. Include vulnerability summaries, risk assessments, and recommendations
        4. Format the report for easy consumption
        5. Return the final report to the user
        
        Always use the generate_security_report function to create the report."""
    )

    user = ConversableAgent(name="user", human_input_mode="ALWAYS")

In [33]:
# Pipeline functions
@triage_agent.register_for_llm()
def start_scan_pipeline(repo_url: str, context_variables: ContextVariables) -> str:
    """Start the security scan pipeline for the given repository URL"""
    context_variables["pipeline_started"] = True
    context_variables["repo_url"] = repo_url
    
    return f"Security scan pipeline initiated for: {repo_url}"

@appsec_agent.register_for_llm()
@appsec_agent.register_for_execution()
def scan_git_repo_vulnerabilities(
    git_repo_url: str, output_json_path: str | None = "scan.json"
) -> str:
    """
    Execute Trivy repo scan on a git repository and save the JSON output to a file.

    Args:
        git_repo_url (str): The git repository URL to scan
        output_json_path (Optional[str]): Path to save the JSON output. If None, does not save.

    Returns:
        str: The JSON output from Trivy scan as a string

    Raises:
        subprocess.CalledProcessError: If the Trivy command fails
        FileNotFoundError: If Trivy is not installed or not in PATH
    """
    try:
        # Execute the trivy command
        result = subprocess.run(
            ["trivy", "repo", "--format=json", git_repo_url], capture_output=True, text=True, check=True
        )
        json_output = result.stdout
        if output_json_path:
            with open(output_json_path, "w", encoding="utf-8") as f:
                f.write(json_output)
        return f"Scan result saved to {output_json_path} for : {git_repo_url} \n {json_output}"
    except subprocess.CalledProcessError as e:
        return f"Error: {e.stderr}"
    except FileNotFoundError:
        return "Error: Trivy command not found. Please ensure Trivy is installed and in your PATH."

@report_agent.register_for_llm()
def generate_security_report(scan_results: str, context_variables: ContextVariables) -> str:
    """Generate a comprehensive security report"""
    context_variables["report_generated"] = True
    context_variables["scan_results"] = scan_results
    
    report_content = f"""
    SECURITY SCAN REPORT
    ====================
    
    Repository: {context_variables['repo_url']}
    
    SCAN RESULTS:
    {scan_results[:500]}...
    
    RECOMMENDATIONS:
    1. Review the scan.json file for detailed vulnerability information
    2. Update vulnerable dependencies
    3. Review and patch critical CVEs
    4. Implement security scanning in CI/CD pipeline
    
    Full scan results available in scan.json
    """
    
    return report_content


In [34]:

# Pattern setup
pattern = DefaultPattern(
    initial_agent=triage_agent,
    agents=[triage_agent, appsec_agent, report_agent],
    user_agent=user,
    context_variables=shared_context,
    group_manager_args={"llm_config": llm_config}
)

# Handoff conditions with context-based routing
triage_agent.handoffs.add_context_condition(
    OnContextCondition(
        target=AgentTarget(appsec_agent),
        condition=ExpressionContextCondition(
            ContextExpression("${pipeline_started} == True and ${scan_completed} == False")
        )
    )
)

triage_agent.handoffs.add_llm_conditions([
    OnCondition(
        target=AgentTarget(appsec_agent),
        condition=StringLLMCondition(
            prompt="When the user provides a valid git repository URL for security scanning."
        ),
    ),
    OnCondition(
        target=RevertToUserTarget(),
        condition=StringLLMCondition(
            prompt="When the user does not provide a valid git repository URL or the request is invalid."
        ),
    )
])

# AppSec agent handoffs
appsec_agent.handoffs.add_context_condition(
    OnContextCondition(
        target=AgentTarget(report_agent),
        condition=ExpressionContextCondition(
            ContextExpression("${scan_completed} == True and ${report_generated} == False")
        )
    )
)

appsec_agent.handoffs.add_llm_conditions([
    OnCondition(
        target=AgentTarget(report_agent),
        condition=StringLLMCondition(
            prompt="When the security scan has been completed successfully."
        ),
    ),
    OnCondition(
        target=RevertToUserTarget(),
        condition=StringLLMCondition(
            prompt="When the security scan fails or encounters an error."
        ),
    )
])

# Report agent handoffs
report_agent.handoffs.add_context_condition(
    OnContextCondition(
        target=RevertToUserTarget(),
        condition=ExpressionContextCondition(
            ContextExpression("${report_generated} == True")
        )
    )
)

report_agent.handoffs.add_llm_conditions([
    OnCondition(
        target=RevertToUserTarget(),
        condition=StringLLMCondition(
            prompt="When the security report has been generated and is ready to return to the user."
        ),
    )
])

# Error handling handoffs
for agent in [triage_agent, appsec_agent, report_agent]:
    agent.handoffs.add_context_condition(
        OnContextCondition(
            target=RevertToUserTarget(),
            condition=ExpressionContextCondition(
                ContextExpression("${has_error} == True")
            )
        )
    )


In [35]:
# Run the orchestrated pipeline
def run_security_pipeline(repo_url: str):
    """Run the complete security analysis pipeline"""
    print(f"Starting security analysis for: {repo_url}")
    
    result, context, last_agent = initiate_group_chat(
        pattern=pattern,
        messages=f"Please perform a security scan on {repo_url}",
        max_rounds=15
    )
    
    # Check pipeline completion
    if context["report_generated"]:
        print("✅ Security analysis completed successfully!")
        print(f"📊 Report generated: {context['scan_results'][:200]}...")
    elif context["has_error"]:
        print(f"❌ Pipeline failed: {context['error_message']}")
    else:
        print("⚠️ Pipeline did not complete as expected")
    
    return result, context, last_agent

In [36]:
result, context, last_agent = run_security_pipeline(
        "https://github.com/ine-labs/AWSGoat.git"
    )

Starting security analysis for: https://github.com/ine-labs/AWSGoat.git
[33muser[0m (to chat_manager):

Please perform a security scan on https://github.com/ine-labs/AWSGoat.git

--------------------------------------------------------------------------------
[32m
Next speaker: triage_agent
[0m
[31m
>>>>>>>> USING AUTO REPLY...[0m
[33mtriage_agent[0m (to chat_manager):

[32m***** Suggested tool call (call_HaXrs01U5EdHsu5dR3rPdCD1): start_scan_pipeline *****[0m
Arguments: 
{"repo_url":"https://github.com/ine-labs/AWSGoat.git"}
[32m************************************************************************************[0m

--------------------------------------------------------------------------------
[32m
Next speaker: _Group_Tool_Executor
[0m
[35m
>>>>>>>> EXECUTING FUNCTION start_scan_pipeline...
Call ID: call_HaXrs01U5EdHsu5dR3rPdCD1
Input arguments: {'repo_url': 'https://github.com/ine-labs/AWSGoat.git'}[0m
[35m
>>>>>>>> EXECUTED FUNCTION start_scan_pipeline...
Call ID