# Final Colab Notebook: Multi-Agent Vulnerability Detection & Response

**Overview:** This notebook demonstrates a lightweight multi-agent CAI pipeline using Bandit + Semgrep (Scanner), a Hugging Face model for Analyst & Responder, and a Streamlit UI exposed via ngrok.

**Important:** Use synthetic code only. Do NOT paste production secrets. You'll be prompted securely for your Hugging Face token and ngrok authtoken.


In [1]:
# Install required packages
!pip install --quiet streamlit pyngrok huggingface_hub bandit semgrep reportlab
print('Installed dependencies')

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.1/10.1 MB[0m [31m92.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m133.8/133.8 kB[0m [31m10.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.6/49.6 MB[0m [31m21.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m158.5/158.5 kB[0m [31m13.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m754.1/754.1 kB[0m [31m54.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m83.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m193.7/193.7 kB[0m [31m15.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [3]:
from huggingface_hub import InferenceClient
import getpass, os

HF_TOKEN = getpass.getpass('Enter your Hugging Face API token (hf_...): ' )
os.environ['HF_TOKEN'] = HF_TOKEN  # stored for main.py to read
client = InferenceClient(token=HF_TOKEN)
MODEL = 'HuggingFaceH4/zephyr-7b-beta'
print('Hugging Face client ready. Model set to', MODEL)

Enter your Hugging Face API token (hf_...): ··········
Hugging Face client ready. Model set to HuggingFaceH4/zephyr-7b-beta


In [4]:
%%bash
cat > custom_rules.yaml <<'YAML'
rules:
  - id: insecure-os-system
    patterns:
      - pattern: os.system($CMD)
    message: "Possible command injection via os.system"
    severity: ERROR
    languages: [python]
    metadata:
      cwe: "CWE-78: Improper Neutralization of Special Elements used in an OS Command ('OS Command Injection')"

  - id: subprocess-shell-true
    patterns:
      - pattern: subprocess.call($ARGS, shell=True)
    message: "Use of subprocess with shell=True can lead to command injection"
    severity: ERROR
    languages: [python]
    metadata:
      cwe: "CWE-78"

  - id: sql-injection
    patterns:
      - pattern: conn.execute("SELECT" + $VAR)
      - pattern: conn.cursor().execute("SELECT" + $VAR)
      - pattern: conn.cursor().execute("SELECT * FROM users WHERE id = '%s'" % $VAR)
    message: "SQL query uses untrusted string concatenation — potential SQL injection"
    severity: ERROR
    languages: [python]
    metadata:
      cwe: "CWE-89: SQL Injection"

  - id: unsafe-pickle-loads
    patterns:
      - pattern: pickle.loads($DATA)
    message: "Insecure deserialization using pickle.loads"
    severity: WARNING
    languages: [python]
    metadata:
      cwe: "CWE-502: Deserialization of Untrusted Data"
YAML
echo "✅ Created custom_rules.yaml"


✅ Created custom_rules.yaml


In [5]:
%%bash
mkdir -p sample_app
cat > sample_app/vulnerable.py <<'PY'
# sample_app/vulnerable.py
# Intentionally insecure Python app for Bandit & Semgrep demo.

import os
import subprocess
import sqlite3
import pickle
import requests

API_SECRET = "hardcoded-demo-secret"  # [Bandit B105] Hardcoded password/secret

def get_user_data(user_id):
    # SQL Injection vulnerability — string concatenation
    conn = sqlite3.connect("users.db")
    cur = conn.cursor()
    query = "SELECT * FROM users WHERE id = '%s'" % user_id  # flagged by Semgrep
    cur.execute(query)
    return cur.fetchall()

def execute_user_input(cmd):
    # Command injection — flagged by Bandit & Semgrep
    os.system("ping -c 1 " + cmd)

def unsafe_pickle(data):
    # Unsafe deserialization — flagged by Semgrep
    return pickle.loads(data)

def unverified_request(url):
    # SSL verification disabled — flagged by Bandit
    return requests.get(url, verify=False)

def run_shell(user_input):
    # Another command injection example (for Semgrep pattern match)
    subprocess.call(f"echo {user_input}", shell=True)

def main():
    user_input = input("Enter username: ")
    execute_user_input(user_input)
    get_user_data(user_input)
    unsafe_pickle(b"cos\nsystem\n(S'ls'\ntR.")
    unverified_request("https://example.com")
    run_shell(user_input)

if __name__ == "__main__":
    main()
PY
echo "✅ Created sample_app/vulnerable.py"


✅ Created sample_app/vulnerable.py


In [6]:
# Write main.py (deterministic pipeline + caching + PDF generator)
%%writefile main.py
import os, json, subprocess, hashlib, re, datetime
from huggingface_hub import InferenceClient
from reportlab.lib.pagesizes import A4
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet

# ======================================================
# Setup
# ======================================================
HF_TOKEN = os.environ.get("HF_TOKEN")
if not HF_TOKEN:
    raise RuntimeError("HF_TOKEN not set. Please run: os.environ['HF_TOKEN'] = 'your_token_here'")

client = InferenceClient(token=HF_TOKEN)
MODEL = "HuggingFaceH4/zephyr-7b-beta"

# Why "HuggingFaceH4/zephyr-7b-beta" model good for Cyber AI ?
# - Advanced Threat Detection: Leverages deep learning to identify vulnerabilities with high accuracy.
# - Real-Time Analysis: Enables swift automated responses to emerging cyber threats.
# - Resource Efficiency: Optimized model size allows deployment on limited hardware.
# - Secure Automation: Ensures data privacy and security during vulnerability assessments.
# - Scalable Integration: Easily integrates with existing cyber AI frameworks and tools.

CACHE_DIR = "analysis_cache"
os.makedirs(CACHE_DIR, exist_ok=True)

# ======================================================
# Helpers
# ======================================================
def run_cmd(cmd):
    p = subprocess.run(cmd, shell=True, capture_output=True, text=True)
    return p.stdout

def canonicalize(obj):
    return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False)

def hash_findings(bandit_json, semgrep_json):
    return hashlib.sha256((canonicalize(bandit_json) + canonicalize(semgrep_json)).encode("utf-8")).hexdigest()

def load_cache(key):
    path = os.path.join(CACHE_DIR, f"{key}.json")
    if os.path.exists(path):
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f)
    return None

def save_cache(key, data):
    path = os.path.join(CACHE_DIR, f"{key}.json")
    with open(path, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=2, ensure_ascii=False)

# ======================================================
# Scanners
# ======================================================
def run_bandit():
    out = run_cmd("bandit -r sample_app -f json -o - 2>/dev/null")
    try:
        return json.loads(out)
    except Exception:
        return {"results": []}

def run_semgrep():
    out = run_cmd("semgrep --config=p/ci sample_app --json 2>/dev/null")
    try:
        js = json.loads(out)
        if js.get("results"):
            return js
    except Exception:
        pass

    # fallback
    out2 = run_cmd("semgrep --config=auto sample_app --json 2>/dev/null")
    try:
        return json.loads(out2)
    except Exception:
        return {"results": []}



# ======================================================
# Prompts
# ======================================================
SYSTEM_PROMPT = "You are a cybersecurity assistant. Be concise, structured, and factual."

ANALYST_PROMPT_TEMPLATE = """You are an AI Security Analyst.
Analyze the vulnerability reports below from Bandit and Semgrep.

Prioritize vulnerabilities (High/Medium/Low) and explain their impact briefly.
Provide structured, concise output — no exploit details.

Bandit findings:
{bandit}
Semgrep findings:
{semgrep}

Return JSON with:
findings: list of objects with id, file, line, tool, issue_text, severity, rationale, remediation.
"""

# === 🧠 AI Recommendation Agent prompt (Enhanced) ===
RECOMMENDATION_PROMPT_TEMPLATE = """
You are an AI Security Recommendation Agent that helps developers fix vulnerabilities.

Below is a combined security analysis from multiple tools (Bandit, Semgrep).
For each finding, do the following:
1. Identify what the issue is and why it is risky.
2. Show the vulnerable code snippet or line context (if provided).
3. Provide a clear and correct secure fix (with short code example).
4. Prioritize findings (High, Medium, Low).
5. Provide a developer-friendly explanation and ticket message for PR.

Input JSON:
{analysis}

Return valid JSON with:
{{
  "plan": [
    {{
      "id": "finding-1",
      "tool": "bandit/semgrep",
      "file": "path/to/file",
      "line": number,
      "issue": "brief description",
      "recommendation": "how to fix",
      "example": "secure code snippet",
      "priority": "High|Medium|Low"
    }}
  ],
  "ticket": {{
    "title": "Security Remediation Plan",
    "message": "Summary for developer",
    "severity": "High|Medium|Low",
    "labels": ["security", "code-fix"]
  }}
}}
Rules:
- Always include both Bandit and Semgrep findings if available.
- Be specific: show correct secure replacement or code fix.
- Focus on Python best practices (avoid shell=True, input validation, parameterized SQL, etc.).
"""

def call_analyst_and_recommender(bandit_json, semgrep_json):
    key = hash_findings(bandit_json, semgrep_json)
    cached = load_cache(key)
    if cached:
        return cached

    bandit_str = json.dumps(bandit_json.get("results", []), sort_keys=True, ensure_ascii=False)
    semgrep_str = json.dumps(semgrep_json.get("results", []), sort_keys=True, ensure_ascii=False)
    analyst_prompt = ANALYST_PROMPT_TEMPLATE.format(bandit=bandit_str, semgrep=semgrep_str)

    # === Analyst Agent ===
    try:
        resp = client.chat.completions.create(
            model=MODEL,
            messages=[
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": analyst_prompt}
            ],
            max_tokens=900,
            temperature=0.0,
            top_p=1.0
        )
        text = resp.choices[0].message["content"]
    except Exception:
        text = ""

    # --- Try parsing AI response ---
    try:
        analysis_json = json.loads(re.search(r'(\{.*\})', text, re.DOTALL).group(1))
    except Exception:
        # Fallback: merge Bandit + Semgrep manually
        analysis_json = {"findings": []}

        # Add Bandit results
        for i, r in enumerate(bandit_json.get("results", []), start=1):
            analysis_json["findings"].append({
                "id": f"bandit-{i}",
                "file": r.get("filename"),
                "line": r.get("line_number"),
                "tool": "bandit",
                "issue_text": r.get("issue_text"),
                "severity": r.get("issue_severity", "Medium"),
                "rationale": r.get("more_info", ""),
                "remediation": "Review Bandit documentation and apply secure coding fix."
            })

        # Add Semgrep results too
        for j, s in enumerate(semgrep_json.get("results", []), start=1):
            meta = s.get("extra", {}).get("metadata", {})
            analysis_json["findings"].append({
                "id": f"semgrep-{j}",
                "file": s.get("path"),
                "line": s.get("start", {}).get("line"),
                "tool": "semgrep",
                "issue_text": s.get("extra", {}).get("message"),
                "severity": s.get("extra", {}).get("severity", "Medium"),
                "rationale": meta.get("category", "security"),
                "remediation": "Follow Semgrep rule guidance; apply fix or mitigation."
            })

    # === Recommendation Agent ===
    analysis_text = json.dumps(analysis_json, ensure_ascii=False)
    rec_prompt = RECOMMENDATION_PROMPT_TEMPLATE.format(analysis=analysis_text)

    try:
        resp2 = client.chat.completions.create(
            model=MODEL,
            messages=[
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": rec_prompt}
            ],
            max_tokens=1000,
            temperature=0.0,
            top_p=1.0
        )
        text2 = resp2.choices[0].message["content"]
    except Exception:
        text2 = ""

    # --- Try parsing recommendation JSON ---
    try:
        recommendation_json = json.loads(re.search(r'(\{.*\})', text2, re.DOTALL).group(1))
    except Exception:
        recommendation_json = {"plan": [], "ticket": {
            "title": "Remediation Plan",
            "message": "Fix security findings",
            "severity": "Medium",
            "labels": ["security"]
        }}
        for f in analysis_json.get("findings", []):
            recommendation_json["plan"].append({
                "id": f.get("id"),
                "recommendation": f"Fix {f.get('issue_text')} in {f.get('file')}",
                "priority": f.get("severity", "Medium")
            })

    out = {
        "analysis_json": analysis_json,
        "recommendation_json": recommendation_json
    }
    save_cache(key, out)
    return out

# ======================================================
# PDF Report
# ======================================================
def generate_pdf_report(bandit_text, semgrep_text, analysis_text, recommendation_text, output_path="CyberAI_Report.pdf"):
    styles = getSampleStyleSheet()
    doc = SimpleDocTemplate(output_path, pagesize=A4)
    story = []

    story.append(Paragraph("<b>Cybersecurity AI Report</b>", styles["Title"]))
    story.append(Spacer(1, 12))
    story.append(Paragraph(f"Generated: {datetime.datetime.utcnow().isoformat()} UTC", styles["Normal"]))
    story.append(Spacer(1, 12))

    story.append(Paragraph("<b>Bandit Findings</b>", styles["Heading3"]))
    story.append(Paragraph(f"<pre>{bandit_text[:4000]}</pre>", styles["Code"]))
    story.append(Spacer(1, 12))

    story.append(Paragraph("<b>Semgrep Findings</b>", styles["Heading3"]))
    story.append(Paragraph(f"<pre>{semgrep_text[:4000]}</pre>", styles["Code"]))
    story.append(Spacer(1, 12))

    story.append(Paragraph("<b>AI Analysis</b>", styles["Heading3"]))
    story.append(Paragraph(analysis_text.replace('\n','<br/>'), styles["Normal"]))
    story.append(Spacer(1, 12))

    story.append(Paragraph("<b>AI Recommendations</b>", styles["Heading3"]))
    story.append(Paragraph(recommendation_text.replace('\n','<br/>'), styles["Normal"]))
    story.append(Spacer(1, 12))

    doc.build(story)
    return output_path

# ======================================================
# Coordinator
# ======================================================
def coordinator():
    bandit_json = run_bandit()
    semgrep_json = run_semgrep()

    bandit_sorted = {"results": sorted(bandit_json.get("results", []), key=lambda r: (r.get("filename",""), r.get("test_name","")))}
    semgrep_sorted = {"results": sorted(semgrep_json.get("results", []), key=lambda r: (r.get("path",""), r.get("check_id","")))}

    out = call_analyst_and_recommender(bandit_sorted, semgrep_sorted)
    analysis_json = out.get("analysis_json", {})
    recommendation_json = out.get("recommendation_json") or out.get("responder_json", {})

    # ---- Text formatting ----
    analysis_lines = []
    for f in analysis_json.get("findings", []):
        analysis_lines.append(
            f"[{f.get('severity')}] {f.get('file')}:{f.get('line')} - {f.get('issue_text')} | Remediation: {f.get('remediation')}"
        )
    analysis_text = "\n".join(analysis_lines) or json.dumps(analysis_json, indent=2)

    rec_lines = []
    if isinstance(recommendation_json, dict):
        if "plan" in recommendation_json:
            for step in recommendation_json["plan"]:
                rec_lines.append(f"- {step.get('id')}: {step.get('recommendation')} (Priority: {step.get('priority')})")
        if "ticket" in recommendation_json:
            rec_lines.append(f"\nTicket: {json.dumps(recommendation_json['ticket'], indent=2)}")
    recommendation_text = "\n".join(rec_lines) or json.dumps(recommendation_json, indent=2)

    return bandit_sorted, semgrep_sorted, analysis_text, recommendation_text, analysis_json



Writing main.py


In [7]:
# Write app.py (Streamlit UI)
%%writefile app.py
import streamlit as st
import json
from main import coordinator, generate_pdf_report

st.set_page_config(page_title="CAI Vulnerability Demo (Deterministic)", layout="wide")
st.title("Multi-Agent CAI Framework Demo (Scanner + Analyst + Recommendation)")

st.markdown("Click **Run Multi-Agent Scan** to run Bandit+Semgrep, have the AI analyze, and generate a PDF.")

if st.button("🚀 Run Multi-Agent Scan"):
    with st.spinner("Running scanners and AI agents..."):
        bjson, sjson, analysis_text, responder_text, analysis_json = coordinator()

    st.subheader("🔍 Bandit (raw JSON)")
    st.json(bjson)

    st.subheader("🔍 Semgrep (raw JSON)")
    st.json(sjson)

    st.subheader("🧠 AI Analyst")
    st.code(analysis_text, language="text")

    st.subheader("🛠 AI Responder")
    st.code(responder_text, language="text")

    # Generate PDF and offer download
    report_path = generate_pdf_report(json.dumps(bjson, indent=2, ensure_ascii=False),
                                      json.dumps(sjson, indent=2, ensure_ascii=False),
                                      analysis_text,
                                      responder_text,
                                      output_path="CyberAI_Report.pdf")
    with open(report_path, "rb") as f:
        st.download_button("📄 Download report (PDF)", f, file_name="CyberAI_Report.pdf", mime="application/pdf")


Writing app.py


In [8]:
# Launch Streamlit and ngrok (pyngrok). Enter ngrok token when prompted.
!pip install --quiet pyngrok

from pyngrok import ngrok
import getpass, subprocess, time, os

NGROK_TOKEN = getpass.getpass("Enter ngrok authtoken (optional, press Enter to skip): ")
if NGROK_TOKEN:
    ngrok.set_auth_token(NGROK_TOKEN)

# start streamlit in background
subprocess.Popen(["streamlit", "run", "app.py", "--server.port", "10000"])
time.sleep(2)
url = ngrok.connect(10000)
print("Open the Streamlit app at:", url.public_url)


Enter ngrok authtoken (optional, press Enter to skip): ··········
Open the Streamlit app at: https://nonprominently-rendible-britni.ngrok-free.dev


In [None]:

# Kill old tunnels
ngrok.kill()

In [None]:
#This clears all cached JSONs, ensuring the new Recommendation Agent regenerates deterministic output fresh.
!rm -rf analysis_cache

### Troubleshooting

- If you see `401 Unauthorized` from Hugging Face: re-generate your HF token and re-run the HF token cell.

- If ngrok fails: ensure you set an ngrok token or try re-running the ngrok cell.

- If `main` import fails: ensure you executed the cell that wrote `main.py` before running app cells.

### Quick run checklist

1. Run the install cell.
2. Run the HF token cell and paste token securely.
3. Run the sample_app creation cell.
4. Run the `main.py` and `app.py` write cells.
5. Run the ngrok launch cell and open the printed public URL.
