## Part 1: Experiment interaction with Gtihub MCP Server

#### Example: Run GitHub MCP server locally in Docker
The following code loads environment settings, spins up an OpenAI chat client, and launches GitHub’s MCP server inside Docker so the agent can call GitHub tools.

dotenv pulls in API keys and MCP settings from .env.
OpenAIChatClient is configured with your custom base URL/model so every MCP call uses that backend.

The MCP tool definition wraps docker run; it injects the GitHub PAT plus the list of toolsets the server should expose.
await github_mcp.connect() starts the Docker container and establishes the socket so subsequent ChatAgent runs can invoke GitHub functions (repos, PRs, issues, etc.) through the MCP layer.

In [1]:
from agent_framework.openai import OpenAIChatClient
from agent_framework import MCPStdioTool, ChatAgent
import os
from dotenv import load_dotenv
load_dotenv(dotenv_path=".env")
chat_client = OpenAIChatClient(api_key=os.getenv("OPENAI_API_KEY"), model_id=os.getenv("MODEL_ID"))
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN_FULL_PERMISIONS")
GITHUB_HOST = ""
toolsets = "repos,issues,pull_requests,actions,code_security,experiments"
github_mcp = MCPStdioTool(
    name="GitHubMCP",
    command="docker",
    args=[
        "run", "-i", "--rm",
        "-e", f"GITHUB_PERSONAL_ACCESS_TOKEN={GITHUB_TOKEN}", f"-e GITHUB_TOOLSETS={toolsets}",
        "ghcr.io/github/github-mcp-server"
    ],
    chat_client=chat_client,
)
await github_mcp.connect()
print("GitHub MCP connected")



GitHub MCP connected


### Example: Write comment to a pull request using Github's MCP server

This block wires the GitHub MCP tools into a task‑specific agent that leaves a review comment on a pull request.

all_tools = [*github_mcp.functions] expands every capability exposed by the connected MCP server (issues, PR reviews, etc.).

The instructions string plays system prompt: the agent must operate only on iuf26/workshop-project-detect-secrets-in-repo and reply with SUCCESS or FAILURE.

ChatAgent binds those instructions, the shared chat_client, and the MCP tool list into a named agent (ResearchAgent).

The await agent.run(...) call triggers one conversational turn where the agent visits PR #1, starts a review if needed, and posts the requested comment on leaky_sample.py line 6; the printed result is just SUCCESS or FAILURE + reason.

In [2]:
all_tools = [*github_mcp.functions]
instructions = """
You are a helpful assistant that helps me write comments to the 'workshop-detect_secrets' github repository with OWNER 'flaviusfetean'. 
As a response only return 'SUCCESS' if operation succeeded and otherwise return 'FAILURE' and reason for failure.
If you get errors while attempting to write the comment, please attempt first to work around it (unless there is no way to do so).
"""
agent = ChatAgent(
    chat_client=chat_client,
    instructions=instructions,
    name="ResearchAgent",
    tools=all_tools
 )
result = await agent.run("I would like to add the 'please remove this secret number 12 9 November' comment to the pull request number 1 to the 'leaky_sample.py' file at line number 12. If there is no review started for the viewer please do create it.")
print(result)


SUCCESS


### Example: Retrieve all files from a PR using GitHub MCP server

In [3]:
from domain.models import PRFileList
all_tools = [*github_mcp.functions]
instructions = """
You are a helpful assistant. Retrieve all files included in an open pull request from the GitHub repository 'flaviusfetean/workshop-detect_secrets'.
Respond only with a list of direct links (URLs) to the files changed or added in the pull request — no explanations or additional text.
"""
github_pr_agent = ChatAgent(
    chat_client=chat_client,
    instructions=instructions,
    name="PullRequestAgent",
    tools=all_tools
 )
result = await github_pr_agent.run("Get me all the files involved in the PR with number 1.")
print(result)


https://github.com/flaviusfetean/workshop-detect_secrets/blob/feat/add_secrets/leaky_sample.py


In [4]:
from domain.models import PRFileList, PRFileInfo
instructions = """
You are a helpful assistant. Retrieve all files included in the open pull request from the GitHub repository 'flaviusfetean/workshop-detect_secrets'.
"""
github_pr_agent = ChatAgent(
    chat_client=chat_client,
    instructions=instructions,
    name="PullRequestAgent",
    tools=all_tools, 
    response_format=PRFileList
 )
# Extract with structured output
result_structured = await github_pr_agent.run(
    "Get me all the files involved in the PR with number 1.",
    response_model=PRFileList
)
print("Structured result:", result_structured.value)


Structured result: files=[PRFileInfo(source_file='leaky_sample.py', pull_request_number='1', source_branch='feat/add_secrets', repo='workshop-detect_secrets', repo_owner='flaviusfetean')]


### Example: List all github mcp server tool

In [5]:
for elem in github_mcp.functions:
    print(elem.name)

add_comment_to_pending_review
add_issue_comment
assign_copilot_to_issue
create_branch
create_or_update_file
create_pull_request
create_repository
delete_file
fork_repository
get_commit
get_file_contents
get_label
get_latest_release
get_me
get_release_by_tag
get_tag
get_team_members
get_teams
issue_read
issue_write
list_branches
list_commits
list_issue_types
list_issues
list_pull_requests
list_releases
list_tags
merge_pull_request
pull_request_read
pull_request_review_write
push_files
request_copilot_review
search_code
search_issues
search_pull_requests
search_repositories
search_users
sub_issue_write
update_pull_request
update_pull_request_branch
AssignCodingAgent
IssueToFixWorkflow


## Part 2: Implement agentic workflow

### Define Chunk processing research agent

In [6]:
input_test_chunks = [
  {
    "chunk": "import os\nDEBUG=True\nDATABASE_URL=\"postgresql://appuser:Sup3rS3cret!@db:5432/app\"\nJWT_SECRET=\"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.fake.payload\"\nADMIN_EMAIL=\"maria.popescu@example.ro\"\nADMIN_PHONE=\"+40-721-555-321\"\nSMTP_PASSWORD=\"p@55w0rd!\"\nLOG_LEVEL=\"INFO\"\nprint(\"config loaded\")",
    "original_lines_interval": [120, 128],
    "original_file": "src/app/config.py",
    "pull_request_number": "",
    "repo": "",
    "repo_owner": ""
  },
  {
    "chunk": "# .env\nAWS_ACCESS_KEY_ID=AKIAZZZZEXAMPLE1234\nAWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYZZZexample\nS3_BUCKET=media-prod\nSTRIPE_WEBHOOK_SECRET=whsec_1e2b3c4d5e6f7g8h9i0j\nREDIS_URL=redis://:s3cr3tP@redis:6379/0\n# service creds\nSERVICE_EMAIL=ops.team@example.com\nSERVICE_PHONE=+40-745-000-111\nNODE_ENV=production",
    "original_lines_interval": [45, 52],
    "original_file": "services/api/.env",
    "pull_request_number": "",
    "repo": "",
    "repo_owner": ""
  },
  {
    "chunk": "// config.js\nmodule.exports = {\n  stripeKey: \"sk_live_51NvEXAMPLEaBcD1234567890\",\n  headers: {\n    \"Authorization\": \"Bearer eyJraWQiOiJrZXkiLCJhbGciOiJIUzI1NiJ9.fake.token\",\n    \"X-Api-Key\": \"b7b2f0c6-1b24-4e1a-9c1a-12e34abcd567\"\n  },\n  supportEmail: \"support@example.com\",\n  logLevel: \"debug\"\n};",
    "original_lines_interval": [200, 209],
    "original_file": "web/src/config.js",
    "pull_request_number": "",
    "repo": "",
    "repo_owner": ""
  },
  {
    "chunk": "apiVersion: v1\nkind: ConfigMap\nmetadata:\n  name: app-config\ndata:\n  DB_DSN: \"mysql://admin:hunter2@mysql:3306/app\"\n  MAIL_USER: \"mailer@example.ro\"\n  MAIL_PASS: \"Tr1cky-P@ss!\"\n  OAUTH_CLIENT_SECRET: \"c0a89e5f-7d1f-4d0c-9b77-1a2b3c4d5e6f\"",
    "original_lines_interval": [31, 41],
    "original_file": "deploy/k8s/app-config.yaml",
    "pull_request_number": "",
    "repo": "",
    "repo_owner": ""
  },
  {
    "chunk": "{\n  \"type\": \"service_account\",\n  \"project_id\": \"demo-proj-123\",\n  \"private_key_id\": \"f4f3c2e1d0b9a8f7e6d5c4b3a2918171\",\n  \"private_key\": \"-----BEGIN PRIVATE KEY-----\\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBK...FAKE...\\n-----END PRIVATE KEY-----\\n\",\n  \"client_email\": \"svc-acc@demo-proj-123.iam.gserviceaccount.com\",\n  \"client_id\": \"109876543210987654321\",\n  \"auth_uri\": \"https://accounts.google.com/o/oauth2/auth\"\n}",
    "original_lines_interval": [402, 410],
    "original_file": "infra/gcp/service-account.json",
    "pull_request_number": "",
    "repo": "",
    "repo_owner": ""
  },
  {
    "chunk": "#!/usr/bin/env bash\nexport GITHUB_TOKEN=ghp_FAKE1234567890abcdef1234567890abcd\nexport SLACK_BOT_TOKEN=xoxb-123456789012-1234567890123-ABCdefGHIjklMNOpqr\ncurl -s -H \"Authorization: Bearer $GITHUB_TOKEN\" https://api.github.com/user\ncurl -u \"deployer:Sup3r-Secret!\" https://registry.example.com/v2/_catalog\nEMAIL_NOTIFY=\"andrei.ionescu@example.com\"\necho \"Done\"",
    "original_lines_interval": [88, 96],
    "original_file": "scripts/release.sh",
    "pull_request_number": "",
    "repo": "",
    "repo_owner": ""
  },
  {
    "chunk": "-- seed.sql\nINSERT INTO users (full_name, email, phone, password_plain)\nVALUES ('Ioana Radu','ioana.radu@example.ro','+40 722 111 222','summer2025');\nINSERT INTO api_credentials (name, token)\nVALUES ('internal-bot','eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.fake');\n-- DO NOT COMMIT REAL DATA",
    "original_lines_interval": [12, 16],
    "original_file": "db/seed.sql",
    "pull_request_number": "",
    "repo": "",
    "repo_owner": ""
  },
  {
    "chunk": "variable \"cloud_api_token\" {\n  type = string\n  default = \"tkn_live_3b2f1c4d5e6f7a8b9c0d\"\n}\nprovider \"example\" {\n  token = var.cloud_api_token\n}\noutput \"endpoint\" { value = example_service.url }",
    "original_lines_interval": [77, 84],
    "original_file": "infra/terraform/vars.tf",
    "pull_request_number": "",
    "repo": "",
    "repo_owner": ""
  },
  {
    "chunk": "spring.datasource.url=jdbc:postgresql://db:5432/app?user=app&password=Str0ngP@ss\nspring.mail.username=mailer@example.ro\nspring.mail.password=Qwerty!234\njwt.signing.key=5a6b7c8d9e0f11223344556677889900\nmanagement.endpoints.web.exposure.include=health,info,metrics\nsupport.phone=+40-733-000-222",
    "original_lines_interval": [15, 20],
    "original_file": "service/src/main/resources/application-prod.properties",
    "pull_request_number": "",
    "repo": "",
    "repo_owner": ""
  },
  {
    "chunk": "# notebook cell\nOPENAI_API_KEY=\"sk-live-abc123XYZ987fakefakefake\"\nTWILIO_ACCOUNT_SID=\"AC1234567890abcdef1234567890abcd\"\nTWILIO_AUTH_TOKEN=\"9d8c7b6a5e4f3d2c1b0a\"\nuser_name = \"Ciprian Istrate\"\nuser_email = \"ciprian.istrate@example.com\"\nprint(\"init done\")",
    "original_lines_interval": [260, 266],
    "original_file": "notebooks/exp01.ipynb",
    "pull_request_number": "",
    "repo": "",
    "repo_owner": ""
  }
]


### Test split chunk
Split sample file that is found in the PR

In [7]:
import code
from pathlib import Path
from typing import List, Dict

def split_file_by_newlines(
    file_path: str,
    newlines_per_chunk: int,
    pull_request_number: str = "",
    repo: str = "",
    repo_owner: str = "",
) -> List[Dict]:
    """ 
    Split a text file into chunks containing a fixed number of newline separators.
    - Normalizes all line endings to '\n' first.
    - `original_lines_interval` is 1-based and inclusive.
    - If the last chunk has fewer lines (fewer '\n' separators), it's still included.
    """

    if "http" in file_path:
        import requests

        filepath_github_raw = file_path.replace("github.com", "raw.githubusercontent.com").replace("/blob/", "/")

        r = requests.get(filepath_github_raw)
        r.raise_for_status()

        text = r.text.replace("\r\n", "\n").replace("\r", "\n")
        original_file = file_path.split("/")[-1]
        print(text[:200])  # print first 200 chars
    else:
        p = Path(file_path)
        original_file = p.name

        # Normalize all newlines to '\n' to ensure consistent splitting
        text = p.read_text(encoding="utf-8", errors="replace").replace("\r\n", "\n").replace("\r", "\n")

    # Split strictly by '\n'
    lines = text.split("\n")  # newline characters are removed by split
    total_lines = len(lines)

    chunks: List[Dict] = []

    # Step through in blocks of `newlines_per_chunk` lines
    for i in range(0, total_lines, newlines_per_chunk):
        block = lines[i : i + newlines_per_chunk]
        if not block:
            continue

        # Reconstruct the chunk string with '\n' between lines.
        # IMPORTANT: we DO NOT append a trailing '\n' at the end of the chunk.
        chunk_text = "\n".join(block)

        # 1-based line numbers for the original interval
        start_line = i + 1
        end_line = i + len(block)

        chunks.append({
            "chunk": chunk_text,
            "original_lines_interval": [start_line, end_line],
            "original_file": original_file,
            "pull_request_number": pull_request_number,
            "repo": repo,
            "repo_owner": repo_owner,
        })

    return chunks

In [8]:
input_test_chunks = split_file_by_newlines("samples/leaky_sample.py",10,"1",repo="workshop-detect-secrets-in-repo",repo_owner="iuf26")

In [9]:
input_test_chunks_from_url = split_file_by_newlines("https://raw.githubusercontent.com/flaviusfetean/workshop-detect_secrets/feat/add_secrets/leaky_sample.py",10,"1",repo="workshop-detect_secrets",repo_owner="flaviusfetean")

# leaky_sample.py
# ⚠️ FAKE CREDENTIALS FOR TESTING ONLY. NONE OF THESE WORK.
# This file intentionally contains strings that *look like* secrets so you can
# test detectors, LLMs, and CI scanners. Do


In [10]:
import hashlib
import uuid

from domain.models import TextChunk

NAMESPACE = uuid.UUID("876de125-4386-442b-9e88-d40dfbbb301d")  # pick once & keep
def stable_uuid(s: str) -> str:
    s = s.strip().lower()  # normalize to avoid accidental mismatches
    return str(uuid.uuid5(NAMESPACE, s))

def shard_for_chunk(chunk: TextChunk, total_agents: int) -> int:
    """
    Pick which worker (0..total_agents-1) should handle this chunk.

    How it works (in plain words):
    - Build a key from the file name and line range (e.g., "app.py|120|180").
    - Hash that key with SHA-256 (gives a big, stable number).
    - Take that number modulo total_agents to get a shard index.

    Inputs:
    - chunk: has `source_file: str` and `line_span: (start:int, end:int)`.
    - total_agents: number of workers; must be >= 1.

    Guarantees:
    - Same chunk → same shard index (deterministic).
    - Result r is an int with 0 <= r < total_agents.

    Example:
    >> shard_for_chunk(TextChunk(source_file="a.py", line_span=(10, 30)), total_agents=3)
    2
    """
    h = hashlib.sha256(f"{chunk.source_file}|{chunk.line_span}".encode()).digest()
    return int.from_bytes(h[:4], "big") % total_agents

In [None]:
import asyncio
from agent_framework import Executor, WorkflowBuilder, WorkflowEvent, handler, WorkflowContext, ChatAgent, ExecutorInvokedEvent, ExecutorCompletedEvent
from agent_framework.openai import OpenAIChatClient
import json
from domain.models import TextChunk, LineComment, SecretsDetectorExecutorResponse, EmptySecretsDetectorExecutorResponseFactory, PRFileList, PRFileInfo
import os
from dotenv import load_dotenv
load_dotenv()
chat_client = OpenAIChatClient(base_url=os.getenv("BASE_URL"), api_key=os.getenv("OPENAI_API_KEY"), model_id=os.getenv("MODEL_ID"))
GITHUB_TOKEN = os.getenv("GITHUB_PAT")
DETECTED_SECRETS_RESULT_KEY = "detected_secrets"

class CustomResponseEvent(WorkflowEvent):
    def __init__(self, result: list[SecretsDetectorExecutorResponse]):
        super().__init__(result)

class SecretsDetectorExec(Executor):
    agent: ChatAgent
    agent_instruction = """
        <instruction role="system">
        You are a code-secrets detector. Given a text CHUNK (with "\n" newlines) and its original line interval [START, END], return only a JSON array of findings. Flag lines that contain likely secrets (API keys/tokens, private keys, passwords, connection strings with creds, service-account JSON fields, auth headers) or PII (names paired with email/phone/IDs). Be precise; if unsure, don't flag. Ignore obvious placeholders.
        </instruction>
        <schema>
        Output exactly:
        [
        { "line_number": <int original line>, "comment": "<types comma-separated>. Please remove." }
        ]
        Return [] if nothing is found. No extra text.
        </schema>
        <procedure>
        1) Split CHUNK by "\n".
        2) For each line i (1-based), assess for secrets/PII using field names and context (e.g., "api_key", "token", "password", "private_key", DSN with user:pass, "Authorization: Bearer ...", service-account fields like private_key_id/private_key).
        3) If flagged, compute original line_number = START + i - 1.
        4) Emit JSON as per <schema>, comments short, no code excerpts.
        </procedure>
        <example>
        INPUT:
        START=4, END=7
        CHUNK:
        print("ok")
        "private_key_id": "f4f3c2e1d0b9a8f7e6d5c4b3a2918171",
        print("done")

        OUTPUT:
        [
        { "line_number": 5, "comment": "Private key identifier. Please remove." }
        ]
        </example>
    """

    def __init__(self, chat_client: OpenAIChatClient,  my_shard: int, total_agents: int, id: str = "secrets detector"):
        agent = chat_client.create_agent(
            instructions=self.agent_instruction,
            name=f"SecretsDetectorAgent_{id}",
        )
        self.id = id
        self.agent = agent
        self.my_shard = my_shard
        self.total_agents = total_agents
        super().__init__(agent=agent, id=id)
    
    def create_prompt_from_chunk(self, chunk: TextChunk):
        prompt = f"""
            Please investigate and detect secrets existent in the chunk taken from the line intervals of the file {chunk.source_file}.
            INPUT
            START={chunk.line_span[0]}, END={chunk.line_span[1]}
            CHUNK:
            {chunk.text}
        """
        return prompt

    
    @handler
    async def run(self, chunk: TextChunk,ctx: WorkflowContext[SecretsDetectorExecutorResponse]) -> None:
        if shard_for_chunk(chunk, self.total_agents) != self.my_shard:
            return
        prompt = self.create_prompt_from_chunk(chunk)
        key = stable_uuid(repr((chunk.source_file, chunk.line_span)))

        async with ctx.shared_state.hold():
            chunk_processed = await ctx.shared_state.get_within_hold(key)
            if chunk_processed:
                await ctx.send_message(EmptySecretsDetectorExecutorResponseFactory.get_empty_secrets_detector())
                return
            await ctx.shared_state.set_within_hold(key, True)
        response = await self.agent.run(prompt)
        identified_problematic_lines = [LineComment(line_number=elem["line_number"], comment=elem["comment"]) for elem in json.loads(response.text)]
        await ctx.set_shared_state(key, True)
        await ctx.send_message(SecretsDetectorExecutorResponse(comments=identified_problematic_lines, 
                                                               original_file=chunk.source_file, 
                                                               executor_agent=self.id,
                                                               repo=chunk.repo,
                                                               repo_owner=chunk.repo_owner,
                                                               pull_request_number=chunk.pull_request_number
                                                               ))


class ChunksExporterExec(Executor):
    def __init__(self, id):
         self.id = id
         super().__init__(id=id)

    @handler
    async def run(self, _: str,ctx: WorkflowContext[TextChunk]) -> None:
        """Sends input test chunks"""
        final_results = []
        await ctx.set_shared_state(DETECTED_SECRETS_RESULT_KEY, final_results)

        agent_instructions = """
        You are a helpful assistant. Retrieve all files included in an open pull request from the GitHub repository 'flaviusfetean/workshop-detect_secrets'.
        Respond only with a list of direct links (URLs) to the files changed or added in the pull request along with the necessary extra information (owner, repo, branch).
        """

        files: List[PRFileInfo] = []
        while not files:
            github_pr_extraction_agent = ChatAgent(
                chat_client=chat_client,
                instructions=agent_instructions,
                name="PullRequestExtractorAgent",
                tools=all_tools, 
                response_format=PRFileList
            )

            pr_files_response = await github_pr_extraction_agent.run(
                "Get me all the files involved in the PR with number 1.",
                response_format=PRFileList
            )

            files: List[PRFileInfo] = pr_files_response.value.files
            if not files:
                print("No files extracted, retrying...")
                continue
            for file in files:
                # Sometimes the agent outputs the full URL, we need only the filename and we build the URL ourselves
                file.source_file = file.source_file.split("/")[-1]
            print(f"Files in PR: {[file.source_file for file in files]}")

        input_chunks = []
        for file in files:
            try:
                chunks = split_file_by_newlines(
                    file_path=f"https://raw.githubusercontent.com/{file.repo_owner}/{file.repo}/{file.source_branch}/{file.source_file}",
                    newlines_per_chunk=10,
                    pull_request_number=file.pull_request_number,
                    repo=file.repo,
                    repo_owner=file.repo_owner
                )
                input_chunks.extend(chunks)
            except Exception as e:
                print(f"Error processing file {file.source_file}: {e}")

        for chunk in input_chunks:
            start, end = chunk["original_lines_interval"]
            text_chunk = TextChunk(chunk=str(chunk["chunk"]), 
                                             line_span=(int(start), int(end)), 
                                             source_file=str(chunk["original_file"]),
                                             repo=str(chunk["repo"]),
                                             repo_owner=str(chunk["repo_owner"]),
                                             pull_request_number=str(chunk["pull_request_number"])
                                             )
            key = stable_uuid(repr((text_chunk.source_file, text_chunk.line_span)))
            await ctx.set_shared_state(key, False)
            await ctx.send_message(text_chunk)
        

class ChunksAgregatorExec(Executor):

    def __init__(self, id, github_mcp_server):
         self.id = id
         self.github_mcp_server = github_mcp_server
         all_tools = [*self.github_mcp_server.functions]
         self.github_mcp_client = ChatAgent(
                chat_client=chat_client,
                instructions=(
                    "You are a helpful assistant that writes review comments on GitHub PRs. "
                    "Respond with 'SUCCESS' if the operation succeeded, otherwise 'FAILURE: <reason>'."
                ),
                name="GithubCodeReviewerAgent",
                tools=all_tools,
            )
         super().__init__(id=id)

    async def _call_github_mcp_client(self, detected_secret: SecretsDetectorExecutorResponse, line_comment: LineComment):
        prompt = (
            f"Add the comment '{line_comment.comment}' to pull request "
            f"#{detected_secret.pull_request_number} in repository "
            f"'{detected_secret.repo_owner}/{detected_secret.repo}', "
            f"file '{detected_secret.original_file}', at line {line_comment.line_number}. "
            f"MAKE SURE TO INCLUDE THE LINE NUMBER IN THE REQUEST."
            f"If there is no active review, create one."
        )
        return await self.github_mcp_client.run(prompt)

    @handler
    async def run(self, detected_secrets: list[SecretsDetectorExecutorResponse] ,ctx: WorkflowContext[None]) -> None:
        """Sends input test chunks"""
        filtered_nonempty = [secret for secret in detected_secrets if not secret.is_empty()]
        for elem in filtered_nonempty:
            for comment in elem.comments:
                await self._call_github_mcp_client(detected_secret=elem, line_comment=comment)
        await ctx.add_event(CustomResponseEvent(filtered_nonempty))

class ImprovedChunksAgregatorExec(Executor):

    def __init__(self, id, github_mcp_server):
         self.id = id
         self.github_mcp_server = github_mcp_server
         all_tools = [*self.github_mcp_server.functions]
         self.github_mcp_client = ChatAgent(
                chat_client=chat_client,
                instructions=(
                    "You are a helpful assistant that writes review comments on GitHub PRs. "
                    "Respond with 'SUCCESS' if the operation succeeded, otherwise 'FAILURE: <reason>'."
                ),
                name="GithubCodeReviewerAgent",
                tools=all_tools,
            )
         super().__init__(id=id)

    async def _call_github_mcp_client(self, detected_secret: SecretsDetectorExecutorResponse, line_comment: LineComment):
        prompt = (
            f"Add the comment '{line_comment.comment}' to pull request "
            f"#{detected_secret.pull_request_number} in repository "
            f"'{detected_secret.repo_owner}/{detected_secret.repo}', "
            f"file '{detected_secret.original_file}', at line {line_comment.line_number}. "
            f"If there is no active review, create one."
        )
        print(f"Adding comment for file '{detected_secret.original_file}' at line {line_comment.line_number}")

        return await self.github_mcp_client.run(prompt)

    async def _bounded_call(
        self,
        sem: asyncio.Semaphore,
        detected_secret: SecretsDetectorExecutorResponse,
        line_comment: LineComment,
    ):
        async with sem:
            try:
                return await self._call_github_mcp_client(detected_secret, line_comment)
            except Exception as e:
                # Surface failures but don’t crash the whole batch
                return f"FAILURE: {type(e).__name__}: {e}"
    
    @handler
    async def run(self, detected_secrets: list[SecretsDetectorExecutorResponse] ,ctx: WorkflowContext[None]) -> None:
        """Sends input test chunks"""
        sem = asyncio.Semaphore(self.max_concurrency)
        tasks: list[asyncio.Task] = []
        filtered_nonempty = [secret for secret in detected_secrets if not secret.is_empty()]
        for elem in filtered_nonempty:
            for comment in elem.comments:
                tasks.append(asyncio.create_task(self._bounded_call(sem, elem, comment)))

        # Fire all at once and wait for completion
        results = await asyncio.gather(*tasks, return_exceptions=False)

        # (Optional) summarize successes/failures if you want to emit a richer event
        await ctx.add_event(CustomResponseEvent({
            "processed_items": len(tasks),
            "successes": sum(1 for r in results if isinstance(r, str) and r.startswith("SUCCESS")),
            "failures": [r for r in results if not (isinstance(r, str) and r.startswith("SUCCESS"))],
            "secrets": filtered_nonempty,
        }))
        

In [12]:
from agent_framework.openai import OpenAIChatClient
from agent_framework import MCPStdioTool
import os
from dotenv import load_dotenv
load_dotenv()
openai_chat_client = OpenAIChatClient(base_url=os.getenv("BASE_URL"), api_key=os.getenv("OPENAI_API_KEY"), model_id=os.getenv("MODEL_ID"))
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN_FULL_PERMISIONS")
GITHUB_HOST = ""
toolsets = "repos,issues,pull_requests,actions,code_security,experiments"
github_mcp_server = MCPStdioTool(
    name="GitHubMCP",
    command="docker",
    args=[
        "run", "-i", "--rm",
        "-e", f"GITHUB_PERSONAL_ACCESS_TOKEN={GITHUB_TOKEN}", f"-e GITHUB_TOOLSETS={toolsets}",
        "ghcr.io/github/github-mcp-server"
    ],
    chat_client=openai_chat_client,
)
await github_mcp_server.connect()

In [None]:
total_agents=3
secrets_detector_1 = SecretsDetectorExec(openai_chat_client,id="SecretsDetector1", my_shard=0, total_agents=total_agents)
secrets_detector_2 = SecretsDetectorExec(openai_chat_client, id="SecretsDetector2", my_shard=1, total_agents=total_agents)
secrets_detector_3 = SecretsDetectorExec(openai_chat_client, id="SecretsDetector3", my_shard=2, total_agents=total_agents)
exporter = ChunksExporterExec(id="ChunkExporterAgent")
aggregator = ChunksAgregatorExec(id="ChunksAgregatorAgent", github_mcp_server=github_mcp_server)
builder = WorkflowBuilder()
builder.set_start_executor(exporter)
builder.add_fan_out_edges(exporter, [secrets_detector_1, secrets_detector_2, secrets_detector_3])
builder.add_fan_in_edges([secrets_detector_1,
                          secrets_detector_2,
                          secrets_detector_3],
                         aggregator)

workflow = builder.build()



In [14]:
async for event in workflow.run_stream(""):
    match event:
        case CustomResponseEvent() as output:
            print(f"Workflow finished")
        case ExecutorInvokedEvent() as invoke:
            print(f"Starting {invoke.executor_id}")
        case ExecutorCompletedEvent() as complete:
            print(f"Completed {complete.executor_id}: {complete.data}")

        

Files in PR: ['leaky_sample.py']
# leaky_sample.py
# ⚠️ FAKE CREDENTIALS FOR TESTING ONLY. NONE OF THESE WORK.
# This file intentionally contains strings that *look like* secrets so you can
# test detectors, LLMs, and CI scanners. Do
Starting ChunkExporterAgent
Completed ChunkExporterAgent: None
Starting SecretsDetector1
Completed SecretsDetector1: None
Starting SecretsDetector2
Starting SecretsDetector3
Completed SecretsDetector3: None
Completed SecretsDetector2: None
Starting SecretsDetector1
Completed SecretsDetector1: None
Starting SecretsDetector2
Completed SecretsDetector2: None
Starting SecretsDetector3
Completed SecretsDetector3: None
Starting SecretsDetector1
Completed SecretsDetector1: None
Starting SecretsDetector2
Completed SecretsDetector2: None
Starting SecretsDetector3
Completed SecretsDetector3: None
Starting SecretsDetector1
Completed SecretsDetector1: None
Starting SecretsDetector2
Starting SecretsDetector3
Completed SecretsDetector3: None
Completed SecretsDetector2: 

AttributeError: 'ImprovedChunksAgregatorExec' object has no attribute 'max_concurrency'