# Colby Agentic Crew - GenAI + GitHub Ops

This notebook contains the complete agent script. Run the cells in order.

**First, run this cell to install the required Python libraries.**

In [1]:
# In a Colab cell, run this first to install dependencies:
!pip -q install google-genai PyGithub requests tiktoken


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m432.7/432.7 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m19.5 MB/s[0m eta [36m0:00:00[0m
[?25h

## 1. Imports

In [3]:
# @title  imports
import os
import sys
import json
import time
import math
import base64
import shutil
import textwrap
import subprocess
import tempfile
import zipfile
import logging
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
from collections import Counter
from html.parser import HTMLParser
import requests
try:
    import tiktoken  # Lightweight token estimates
    TIKTOKEN_AVAILABLE = True
except ImportError:
    TIKTOKEN_AVAILABLE = False
# --- Google/Colab Modules ---
try:
    from google.colab import drive
    from google.colab import userdata
    COLAB_ENV = True
except ImportError:
    COLAB_ENV = False
# --- Google GenAI SDK ---
try:
    from google import genai
    from google.genai import types
except ImportError:
    print("Error: 'google-genai' library not found. Please run: !pip install google-genai")
    sys.exit(1)
# --- GitHub SDK ---
try:
    from github import Github, GithubException
except ImportError:
    print("Error: 'PyGithub' library not found. Please run: !pip install PyGithub")
    sys.exit(1)


## 2. Configuration

Set cost limits, API models, and file paths here.

In [4]:
# --- Model Configuration ---
# Using latest preview models as of Oct 2025
GEN_MODEL_TEXT = "gemini-2.5-flash-preview-09-2025"
# --- Cost Control Configuration ---
# Prices per 1,000,000 tokens (as of late 2025 previews)
# $0.35 / 1M input tokens, $0.70 / 1M output tokens for flash
PRICE_FLASH_INPUT_PER_TOKEN = 0.35 / 1_000_000
PRICE_FLASH_OUTPUT_PER_TOKEN = 0.70 / 1_000_000
COST_LIMIT_USD = 25.00
TOTAL_ESTIMATED_COST = 0.0  # Global cost tracker
COST_LEDGER: List[Dict[str, Any]] = []  # Detailed log per task
COST_REPORT_BASENAME = "crew_cost_report.md"
# --- Google Drive / Local Output Configuration ---
GDRIVE_MOUNT_POINT = '/content/drive'
OUTPUT_FOLDER_NAME = 'colby_agentic_crew'
if COLAB_ENV:
    OUTPUT_ROOT_PATH = (Path(GDRIVE_MOUNT_POINT) / 'MyDrive' / OUTPUT_FOLDER_NAME).resolve()
else:
    OUTPUT_ROOT_PATH = (Path.cwd() / OUTPUT_FOLDER_NAME).resolve()
CREW_TEMPLATE_DIR_NAME = 'crew_ai_cloudflare_template'
COST_LOG_PATH = (OUTPUT_ROOT_PATH / COST_REPORT_BASENAME)
# --- GitHub Configuration ---
GITHUB_REPO = "jmbish04/colby-agentic-crew"
GITHUB_SAVE_PATH = "agent_generated_files"  # Subfolder in the repo
GITHUB_TOKEN = ""  # Will be loaded by init_clients
# --- Cloudflare Org Discovery Configuration ---
CLOUDFLARE_ORG = "cloudflare"
REPO_SCAN_URL = f"https://api.github.com/orgs/{CLOUDFLARE_ORG}/repos"
REPO_KEYWORDS = [
    "workers",
    "durable",
    "object",
    "actor",
    "agent",
    "queue",
    "workflow",
    "sandbox",
    "sdk",
    "container",
]
MAX_REPOS_TO_CLONE = 10
REPO_DOWNLOAD_DIR = Path('/content/cloudflare_repos').resolve()
MAX_INDEX_LINES = 20_000
CONTEXT_MAX_CHARS = 8_000
DRY_RUN = False  # When True, skip network heavy actions and model calls
INDEX_SKIP_DIRS = {
    '.git', '.github', 'node_modules', 'dist', 'build', 'out', 'vendor', '__pycache__', '.next'
}
BINARY_EXTENSIONS = {
    '.png', '.jpg', '.jpeg', '.gif', '.webp', '.zip', '.tar', '.gz', '.lock', '.bin', '.exe', '.dll',
    '.ico', '.wasm', '.woff', '.woff2', '.ttf', '.mp4', '.mp3'
}
LANGUAGE_HINTS = {
    '.ts': 'TypeScript',
    '.tsx': 'TypeScript',
    '.js': 'JavaScript',
    '.jsx': 'JavaScript',
    '.py': 'Python',
    '.rs': 'Rust',
    '.go': 'Go',
    '.toml': 'TOML',
    '.json': 'JSON',
    '.md': 'Markdown',
    '.yml': 'YAML',
    '.yaml': 'YAML',
    '.sh': 'Shell',
    '.tsconfig': 'JSON',
}
FRAMEWORK_KEYWORDS = {
    'Hono': ['from "hono"', "from 'hono'", 'import { Hono }'],
    'Wrangler': ['wrangler.toml', 'defineConfig(', 'wrangler dev'],
    'Durable Objects': ['DurableObject', 'durable_objects'],
    'Queues': ['QueuesBinding', 'queueProducer'],
    'Workers AI': ['@cloudflare/workers-types', 'ai.run', 'Workers AI'],
    'Workflows': ['workflows deploy', 'WorkflowsBinding'],
}
DEFAULT_SNIPPET_LINES = 20
# --- Documentation Configuration ---
CLOUDFLARE_DOC_TOPICS = {
    'workers': 'https://developers.cloudflare.com/workers/',
    'durable objects': 'https://developers.cloudflare.com/workers/runtime-apis/durable-objects/',
    'queues': 'https://developers.cloudflare.com/queues/',
    'd1': 'https://developers.cloudflare.com/d1/',
    'r2': 'https://developers.cloudflare.com/r2/',
    'kv': 'https://developers.cloudflare.com/workers/runtime-apis/kv/',
    'agents': 'https://developers.cloudflare.com/workers/ai/agents/',
    'workflows': 'https://developers.cloudflare.com/workflows/',
    'wrangler': 'https://developers.cloudflare.com/workers/wrangler/configuration/',
    'ai': 'https://developers.cloudflare.com/workers/ai/',
}
DOC_TOPICS_ORDER = [
    'agents', 'durable objects', 'queues', 'workers', 'wrangler', 'r2', 'd1', 'kv', 'workflows', 'ai'
]
DOC_FETCH_TIMEOUT = 45  # seconds
SUMMARY_FILENAME = 'cloudflare_repos_summary.json'
CODE_INDEX_FILENAME = 'cloudflare_code_index.json'
DOCS_REFERENCE_FILENAME = 'docs_reference.json'
TIKTOKEN_MODEL_NAME = 'cl100k_base'


## 3. Utilities

Helper functions for reading secrets and API call backoff.

In [5]:
def read_secret(name: str, prompt: Optional[str] = None) -> str:
    """
    Read a secret from Colab userdata if available, else environment, else prompt stdin.
    """
    if COLAB_ENV:
        try:
            val = userdata.get(name)
            if val:
                return val
        except Exception:
            pass
    val = os.getenv(name)
    if val:
        return val
    if 'getpass' not in sys.modules:
        import getpass
    if prompt is None:
        prompt = f"Enter value for {name}: "
    try:
        return getpass.getpass(prompt)
    except Exception:
        return input(prompt)
def backoff_sleep(retry: int):
    delay = min(2 ** retry, 30) + (0.1 * retry)
    print(f"  ...backing off for {delay:.1f}s")
    time.sleep(delay)
def timestamp_iso() -> str:
    return datetime.utcnow().replace(microsecond=0).isoformat() + 'Z'
class _HTMLStripper(HTMLParser):
    def __init__(self):
        super().__init__()
        self._parts: List[str] = []
    def handle_data(self, data: str):
        data = data.strip()
        if data:
            self._parts.append(data)
    def get_text(self) -> str:
        return ' '.join(self._parts)
def strip_html(html_text: str) -> str:
    stripper = _HTMLStripper()
    try:
        stripper.feed(html_text or '')
    except Exception:
        return html_text
    return stripper.get_text()
def ensure_directory(path: Path) -> Path:
    path.mkdir(parents=True, exist_ok=True)
    return path
def write_json_document(path: Path, payload: Any) -> Path:
    ensure_directory(path.parent)
    path.write_text(json.dumps(payload, indent=2, sort_keys=False), encoding='utf-8')
    print(f"[json] Saved → {path}")
    return path
def estimate_tokens(text: str) -> int:
    if not text:
        return 0
    if TIKTOKEN_AVAILABLE:
        try:
            try:
                encoding = tiktoken.get_encoding(TIKTOKEN_MODEL_NAME)
            except Exception:
                encoding = tiktoken.get_encoding('cl100k_base')
            return len(encoding.encode(text))
        except Exception as exc:
            print(f"[tiktoken] Warning: {exc}. Falling back to heuristic estimate.")
    return max(1, len(text) // 4)
def truncate_for_context(text: str, limit: int = CONTEXT_MAX_CHARS) -> str:
    if len(text) <= limit:
        return text
    head = text[: max(0, limit - 400)]
    tail = text[-200:]
    return head + "\n... [truncated] ...\n" + tail
def is_binary_extension(path: Path) -> bool:
    return path.suffix.lower() in BINARY_EXTENSIONS
def should_skip_path(path: Path) -> bool:
    return any((part in INDEX_SKIP_DIRS) or part.startswith('.') for part in path.parts)
def safe_read_file_lines(path: Path, max_lines: int = DEFAULT_SNIPPET_LINES) -> Tuple[List[str], int]:
    snippet: List[str] = []
    total = 0
    try:
        with path.open('r', encoding='utf-8', errors='ignore') as handle:
            for line in handle:
                total += 1
                if len(snippet) < max_lines:
                    snippet.append(line.rstrip())
    except Exception as exc:
        snippet.append(f"[Error reading file: {exc}]")
    return snippet, total


## 4. Cost Control

Functions to track and enforce the `$25.00` spending limit.

In [6]:

def check_and_update_cost(
    task_name: str,
    prompt_tokens: int = 0,
    output_tokens: int = 0,
    model: str = GEN_MODEL_TEXT
) -> bool:
    """
    Checks if a new call would exceed the limit, logs the itemized cost,
    and updates the global cost.
    Returns True if the call can proceed, False if it's blocked.
    """
    global TOTAL_ESTIMATED_COST, COST_LEDGER

    call_cost = 0.0
    if model == GEN_MODEL_TEXT:
        call_cost += prompt_tokens * PRICE_FLASH_INPUT_PER_TOKEN
        call_cost += output_tokens * PRICE_FLASH_OUTPUT_PER_TOKEN

    details = f"{prompt_tokens} in, {output_tokens} out"
    projected_total = TOTAL_ESTIMATED_COST + call_cost

    if projected_total > COST_LIMIT_USD:
        print("=" * 50)
        print(f"COST LIMIT BREACHED: Task '{task_name}' cannot proceed.")
        print(f"  Current Cost: ${TOTAL_ESTIMATED_COST:.4f}")
        print(f"  Attempted Call Cost: ${call_cost:.4f}")
        print(f"  Limit: ${COST_LIMIT_USD:.4f}")
        print("=" * 50)
        COST_LEDGER.append({
            "timestamp": timestamp_iso(),
            "task": f"FAILED: {task_name} (Cost Limit Breach)",
            "status": "blocked",
            "prompt_tokens": prompt_tokens,
            "output_tokens": output_tokens,
            "cost": 0.0,
            "cumulative_cost": TOTAL_ESTIMATED_COST,
            "details": details,
        })
        return False

    TOTAL_ESTIMATED_COST = projected_total
    entry = {
        "timestamp": timestamp_iso(),
        "task": task_name,
        "status": "ok",
        "prompt_tokens": prompt_tokens,
        "output_tokens": output_tokens,
        "cost": call_cost,
        "cumulative_cost": TOTAL_ESTIMATED_COST,
        "details": details,
    }
    COST_LEDGER.append(entry)
    print(f"[CostTracker] Task '{task_name}' → ${call_cost:.6f} (cumulative ${TOTAL_ESTIMATED_COST:.6f})")
    return True


def format_cost_ledger_markdown() -> str:
    lines = [
        "# Crew Cost Ledger",
        "",
        "| Timestamp | Task | Status | Prompt Tokens | Output Tokens | Cost (USD) | Cumulative (USD) | Details |",
        "| :-- | :-- | :-- | --: | --: | --: | --: | :-- |",
    ]
    for item in COST_LEDGER:
        lines.append(
            f"| {item.get('timestamp', '')} | {item.get('task', '')} | {item.get('status', '')} | "
            f"{item.get('prompt_tokens', 0)} | {item.get('output_tokens', 0)} | ${item.get('cost', 0.0):.6f} | "
            f"${item.get('cumulative_cost', 0.0):.6f} | {item.get('details', '')} |"
        )
    lines.append("")
    lines.append("*Log generated automatically by Colby Agentic Crew.*")
    return "
".join(lines)


def persist_cost_ledger() -> Optional[Path]:
    if COST_LOG_PATH is None:
        return None
    ensure_directory(Path(COST_LOG_PATH).parent)
    content = format_cost_ledger_markdown()
    Path(COST_LOG_PATH).write_text(content, encoding="utf-8")
    print(f"[cost] Ledger saved → {COST_LOG_PATH}")
    return Path(COST_LOG_PATH)



## 5. Cloudflare Documentation Client



In [None]:

class CloudflareMCPClient:
    """Simple client to pull reference excerpts from Cloudflare documentation."""

    def __init__(
        self,
        session: Optional[requests.Session] = None,
        doc_topics: Optional[Dict[str, str]] = None,
    ) -> None:
        self.session = session or requests.Session()
        self.session.headers.setdefault("User-Agent", "colby-agentic-crew/1.0")
        topic_map = doc_topics or CLOUDFLARE_DOC_TOPICS
        self.doc_topics = {key.lower(): url for key, url in topic_map.items()}

    def _resolve_topic(self, topic: str) -> Tuple[str, Optional[str]]:
        key = topic.lower().strip()
        if key in self.doc_topics:
            return key, self.doc_topics[key]
        for known_key, url in self.doc_topics.items():
            if key in known_key:
                return known_key, url
        return key, None

    def fetch_topic(self, topic: str) -> Dict[str, Any]:
        resolved_key, url = self._resolve_topic(topic)
        if not url:
            return {
                "topic": topic,
                "status": "missing",
                "url": None,
                "excerpt": "",
            }
        try:
            resp = self.session.get(url, timeout=DOC_FETCH_TIMEOUT)
            resp.raise_for_status()
            text = strip_html(resp.text)
            excerpt = textwrap.shorten(text, width=1500, placeholder="…")
            return {
                "topic": resolved_key,
                "status": "ok",
                "url": url,
                "excerpt": excerpt,
            }
        except Exception as exc:
            return {
                "topic": resolved_key,
                "status": "error",
                "url": url,
                "excerpt": f"Error fetching doc: {exc}",
            }

    def gather_docs(self, topics: Optional[List[str]] = None) -> Dict[str, Any]:
        chosen_topics = topics or list(self.doc_topics.keys())
        documents = [self.fetch_topic(topic) for topic in chosen_topics]
        return {
            "queried_at": timestamp_iso(),
            "topics": chosen_topics,
            "documents": documents,
        }

    def summarize_for_prompt(self, docs_payload: Dict[str, Any], limit: int = 4000) -> str:
        lines: List[str] = []
        for doc in docs_payload.get("documents", []):
            status = doc.get("status")
            topic = doc.get("topic", "unknown").title()
            url = doc.get("url") or ""
            if status != "ok":
                lines.append(f"- {topic}: {status} ({url})")
                continue
            excerpt = textwrap.shorten(doc.get("excerpt", ""), width=400, placeholder="…")
            lines.append(f"- {topic}: {url}
  Excerpt: {excerpt}")
        summary = "
".join(lines)
        return truncate_for_context(summary, limit=limit)



## 6. API Clients



In [7]:

@dataclass
class Clients:
    genai_client: genai.Client
    gh: Github
    docs_client: CloudflareMCPClient


def init_clients() -> Optional[Clients]:
    """Initializes and returns all API clients."""
    global GITHUB_TOKEN
    try:
        print("Initializing clients...")
        gemini_key = read_secret("GOOGLE_API_KEY", "GOOGLE_API_KEY (Google API key): ")
        GITHUB_TOKEN = read_secret("GITHUB_TOKEN", "GITHUB_TOKEN (GitHub PAT): ")

        if not gemini_key:
            print("ERROR: GOOGLE_API_KEY is not set in Colab secrets (View > Show secrets).")
            return None

        if not GITHUB_TOKEN:
            print("ERROR: GITHUB_TOKEN is not set in Colab secrets (View > Show secrets).")
            return None

        genai_client = genai.Client(api_key=gemini_key)
        gh = Github(GITHUB_TOKEN, per_page=50)
        _ = gh.get_user().login
        print("GitHub client authenticated.")

        docs_client = CloudflareMCPClient()
        print("Documentation client ready.")

        print("All clients initialized successfully.")
        return Clients(genai_client, gh, docs_client)

    except Exception as e:
        print(f"Error initializing clients: {e}")
        if 'Bad credentials' in str(e):
            print("Hint: Check your GITHUB_TOKEN.")
        return None



## 7. Google GenAI Operations



In [14]:

def gen_text(
    clients: Clients,
    task_name: str,
    prompt: str,
    max_retries: int = 3,
    system_instruction: Optional[str] = None,
    additional_context: Optional[str] = None,
) -> Optional[str]:
    """
    Generate text content using google.genai (Gemini Developer API via API key).
    Allows injecting additional context into the system instruction and
    tracks cost usage.
    """
    global TOTAL_ESTIMATED_COST

    default_system = (
        "You are an expert AI assistant specializing in Cloudflare Workers, "
        "agentic systems, and CrewAI. Provide complete, production-ready code and configurations."
    )
    system_text = system_instruction or default_system
    if additional_context:
        system_text = f"{system_text}

Context:
{additional_context}"

    print(f"
--- Starting Task: {task_name} ---")
    print(f"Generating text for prompt: '{prompt[:50]}...'")

    contents = prompt

    try:
        ct = clients.genai_client.models.count_tokens(
            model=GEN_MODEL_TEXT,
            contents=contents,
            config=types.GenerateContentConfig(
                system_instruction=system_text,
            ),
        )
        prompt_tokens = getattr(ct, "total_tokens", None) or 0
        print(f"[CostTracker] Estimated prompt tokens: {prompt_tokens}")
    except Exception as e:
        print(f"Warning: Could not pre-count tokens via API: {e}. Using local estimator.")
        prompt_tokens = estimate_tokens(system_text) + estimate_tokens(prompt)

    pre_check_cost = prompt_tokens * PRICE_FLASH_INPUT_PER_TOKEN
    if (TOTAL_ESTIMATED_COST + pre_check_cost) > COST_LIMIT_USD:
        print("=" * 50)
        print(f"COST LIMIT BREACH: Pre-check for task '{task_name}' failed.")
        print(f"  Current Cost: ${TOTAL_ESTIMATED_COST:.4f}")
        print(f"  Est. Prompt Cost: ${pre_check_cost:.4f}")
        print(f"  Limit: ${COST_LIMIT_USD:.4f}")
        print("=" * 50)
        COST_LEDGER.append({
            "timestamp": timestamp_iso(),
            "task": f"SKIPPED: {task_name} (Pre-check failed cost limit)",
            "status": "blocked",
            "prompt_tokens": prompt_tokens,
            "output_tokens": 0,
            "cost": 0.0,
            "cumulative_cost": TOTAL_ESTIMATED_COST,
            "details": f"Pre-estimate: {prompt_tokens} tokens",
        })
        return None

    for attempt in range(max_retries + 1):
        try:
            resp = clients.genai_client.models.generate_content(
                model=GEN_MODEL_TEXT,
                contents=contents,
                config=types.GenerateContentConfig(
                    system_instruction=system_text,
                    temperature=0.3,
                    top_p=0.9,
                    max_output_tokens=2048,
                ),
            )

            usage = getattr(resp, "usage_metadata", None)
            if usage:
                in_tokens = getattr(usage, "prompt_token_count", 0)
                out_tokens = getattr(usage, "candidates_token_count", 0)
                ok = check_and_update_cost(
                    task_name=task_name,
                    prompt_tokens=in_tokens,
                    output_tokens=out_tokens,
                    model=GEN_MODEL_TEXT,
                )
                if not ok:
                    return "(Cost limit reached during generation)"
            else:
                print("Warning: usage_metadata missing; relying on local estimate.")
                check_and_update_cost(
                    task_name=f"{task_name} (Usage data missing)",
                    prompt_tokens=prompt_tokens,
                    output_tokens=0,
                    model=GEN_MODEL_TEXT,
                )

            text_out = getattr(resp, "text", "") or ""
            if not text_out:
                parts = []
                for cand in getattr(resp, "candidates", []) or []:
                    for part in getattr(cand.content, "parts", []) or []:
                        if getattr(part, "text", None):
                            parts.append(part.text)
                text_out = "
".join(parts).strip()

            if not text_out:
                raise RuntimeError("No text output received from model.")

            print("Text generation successful.")
            return text_out

        except Exception as e:
            print(f"GenAI text error (Attempt {attempt + 1}): {e}")
            if attempt >= max_retries:
                raise
            backoff_sleep(attempt)

    return None



## 8. GitHub Operations (SDK)



In [15]:
from typing import List, Optional, cast
from github import Github
from github.GithubException import GithubException
from github.Repository import Repository
from github.Issue import Issue
from github.PaginatedList import PaginatedList

def gh_get_repo(gh: Github, full_name: str) -> Repository:
    """Get a repo like 'owner/name'. Raises if not found or no access."""
    try:
        repo = gh.get_repo(full_name)
        return cast(Repository, repo)
    except GithubException as e:
        print(f"GitHub repo error for {full_name}: {getattr(e, 'data', e)}")
        raise

def gh_list_issues(gh: Github, repo_full_name: str, state: str = "open", limit: int = 5):
    print(f"Listing last {limit} '{state}' issues for {repo_full_name}...")
    repo = gh_get_repo(gh, repo_full_name)

    issues_pl = repo.get_issues(state=state)
    issues_pl = cast(PaginatedList, issues_pl)  # PaginatedList[Issue] at runtime

    out: List[dict] = []
    # safer than slicing a PaginatedList for some stub checkers
    for i, issue in enumerate(issues_pl):
        if i >= limit:
            break
        issue = cast(Issue, issue)
        out.append({
            "number": issue.number,
            "title": issue.title,
            "state": issue.state,
            "url": issue.html_url,
        })
    print(f"Found {len(out)} issues.")
    return out

def gh_create_issue(gh: Github, repo_full_name: str, title: str, body: str = "", labels: Optional[List[str]] = None) -> int:
    print(f"Creating issue in {repo_full_name}: '{title}'")
    repo = gh_get_repo(gh, repo_full_name)
    issue = repo.create_issue(title=title, body=body, labels=labels or [])
    issue = cast(Issue, issue)
    print(f"Issue #{issue.number} created successfully.")
    return issue.number

def gh_comment_issue(gh: Github, repo_full_name: str, number: int, body: str) -> str:
    print(f"Commenting on issue #{number} in {repo_full_name}...")
    repo = gh_get_repo(gh, repo_full_name)
    issue = cast(Issue, repo.get_issue(number=number))
    comment = issue.create_comment(body)
    # comment is a GithubObject; just use getattr to keep pyright happy
    url = getattr(comment, "html_url", "")
    print(f"Comment posted: {url}")
    return url

## 9. Cloudflare Org Discovery & Indexing



In [None]:
def normalize_keywords(keywords: Iterable[str]) -> List[str]:
    return sorted({kw.lower().strip() for kw in keywords if kw})
def repo_keyword_matches(repo: Dict[str, Any], keywords: List[str]) -> List[str]:
    haystack = " ".join([
        repo.get("name", ""),
        repo.get("description", "") or "",
        " ".join(repo.get("topics", []) or []),
    ]).lower()
    return [kw for kw in keywords if kw in haystack]
def fetch_cloudflare_repositories(keywords: Iterable[str]) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
    keyword_list = normalize_keywords(keywords)
    session = requests.Session()
    session.headers.update(_gh_headers())
    relevant: List[Dict[str, Any]] = []
    total_scanned = 0
    page = 1
    while True:
        params = {"per_page": 100, "page": page}
        resp = session.get(REPO_SCAN_URL, params=params, timeout=60)
        if resp.status_code in (429, 502, 503, 504):
            backoff_sleep(page)
            continue
        resp.raise_for_status()
        data = resp.json()
        if not data:
            break
        total_scanned += len(data)
        for repo in data:
            matches = repo_keyword_matches(repo, keyword_list)
            entry = {
                "name": repo.get("name"),
                "full_name": repo.get("full_name"),
                "url": repo.get("html_url"),
                "description": repo.get("description"),
                "stars": repo.get("stargazers_count", 0),
                "topics": repo.get("topics", []),
                "pushed_at": repo.get("pushed_at"),
                "matches": matches,
                "clone_url": repo.get("clone_url"),
                "archive_url": repo.get("archive_url"),
                "default_branch": repo.get("default_branch", "main"),
            }
            if matches:
                relevant.append(entry)
        page += 1
        if 'next' not in resp.links:
            break
    summary = {
        "queried_at": timestamp_iso(),
        "total_scanned": total_scanned,
        "relevant_repos": relevant,
    }
    return relevant, summary
def score_repository(repo: Dict[str, Any]) -> Tuple[int, int, str]:
    return (len(repo.get("matches", [])), repo.get("stars", 0), repo.get("name", ""))
def select_top_repositories(repos: List[Dict[str, Any]], limit: int = MAX_REPOS_TO_CLONE) -> List[Dict[str, Any]]:
    return sorted(repos, key=score_repository, reverse=True)[:limit]
def _clone_repository(repo: Dict[str, Any], destination: Path) -> bool:
    ensure_directory(destination.parent)
    if destination.exists():
        shutil.rmtree(destination)
    clone_url = repo.get("clone_url")
    if clone_url:
        try:
            subprocess.run(
                ["git", "clone", "--depth", "1", clone_url, str(destination)],
                check=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
            )
            return True
        except Exception as exc:
            print(f"[clone] git clone failed for {repo.get('name')}: {exc}")
    # Fallback to zip download
    try:
        full_name = repo.get("full_name")
        branch = repo.get("default_branch", "main")
        if not full_name:
            return False
        zip_url = f"https://api.github.com/repos/{full_name}/zipball/{branch}"
        resp = requests.get(zip_url, headers=_gh_headers(), timeout=120)
        resp.raise_for_status()
        with tempfile.TemporaryDirectory() as tmpdir:
            zip_path = Path(tmpdir) / "repo.zip"
            zip_path.write_bytes(resp.content)
            with zipfile.ZipFile(zip_path) as zf:
                zf.extractall(tmpdir)
            extracted_dirs = [p for p in Path(tmpdir).iterdir() if p.is_dir()]
            if not extracted_dirs:
                return False
            extracted_root = extracted_dirs[0]
            if destination.exists():
                shutil.rmtree(destination)
            shutil.move(str(extracted_root), destination)
        return True
    except Exception as exc:
        print(f"[clone] zip fallback failed for {repo.get('name')}: {exc}")
        return False
def download_repositories(repos: List[Dict[str, Any]], dry_run: bool = False) -> List[Dict[str, Any]]:
    ensure_directory(REPO_DOWNLOAD_DIR)
    downloaded: List[Dict[str, Any]] = []
    for repo in repos:
        target_path = REPO_DOWNLOAD_DIR / repo.get("name", "repo")
        if dry_run:
            ensure_directory(target_path)
            downloaded.append({**repo, "local_path": str(target_path)})
            continue
        success = _clone_repository(repo, target_path)
        if success:
            downloaded.append({**repo, "local_path": str(target_path)})
    return downloaded
def detect_language(path: Path) -> str:
    suffix = path.suffix.lower()
    if suffix in LANGUAGE_HINTS:
        return LANGUAGE_HINTS[suffix]
    if suffix:
        return suffix.lstrip('.') or 'Text'
    return 'Text'
def index_repositories(downloaded: List[Dict[str, Any]], max_lines: int = MAX_INDEX_LINES) -> Dict[str, Any]:
    repo_entries: List[Dict[str, Any]] = []
    remaining_lines = max_lines
    total_indexed = 0
    for repo in downloaded:
        repo_path = Path(repo.get("local_path", ""))
        if not repo_path.exists():
            continue
        languages: Counter[str] = Counter()
        key_files: List[str] = []
        framework_hits: set[str] = set()
        file_samples: List[Dict[str, Any]] = []
        repo_lines = 0
        top_level_dirs = [
            item.name for item in repo_path.iterdir()
            if item.is_dir() and item.name not in INDEX_SKIP_DIRS and not item.name.startswith('.')
        ]
        for path in sorted(repo_path.rglob('*')):
            if remaining_lines <= 0:
                break
            if path.is_dir():
                continue
            rel_path = path.relative_to(repo_path)
            if should_skip_path(rel_path):
                continue
            if path.name.startswith('.'):
                continue
            if is_binary_extension(path):
                continue
            language = detect_language(path)
            languages[language] += 1
            if rel_path.name in {"wrangler.toml", "README.md", "package.json", "tsconfig.json"}:
                key_files.append(str(rel_path).replace('\\', '/'))
            snippet, line_count = safe_read_file_lines(path, DEFAULT_SNIPPET_LINES)
            consumed = min(line_count, remaining_lines)
            remaining_lines -= consumed
            total_indexed += consumed
            repo_lines += consumed
            snippet_text = "
".join(snippet)
            for framework, patterns in FRAMEWORK_KEYWORDS.items():
                if any(pattern in snippet_text for pattern in patterns):
                    framework_hits.add(framework)
            file_samples.append({
                "path": str(rel_path).replace('\\', '/'),
                "language": language,
                "line_count": line_count,
                "snippet": snippet_text,
            })
            if len(file_samples) >= 25:
                continue
        repo_entry = {
            "name": repo.get("name"),
            "url": repo.get("url"),
            "description": repo.get("description"),
            "stars": repo.get("stars"),
            "matches": repo.get("matches", []),
            "key_directories": sorted(top_level_dirs),
            "key_files": sorted(set(key_files)),
            "languages": [f"{lang} ({count})" for lang, count in languages.most_common()],
            "frameworks": sorted(framework_hits),
            "samples": file_samples[:25],
            "lines_indexed": repo_lines,
        }
        repo_entries.append(repo_entry)
        if remaining_lines <= 0:
            break
    return {
        "indexed_at": timestamp_iso(),
        "max_total_lines": max_lines,
        "total_lines_indexed": total_indexed,
        "repos": repo_entries,
    }
def build_repo_context(summary: Dict[str, Any], index: Dict[str, Any]) -> str:
    lines: List[str] = [
        f"Cloudflare org scan completed at {summary.get('queried_at')}.",
        f"Total repositories scanned: {summary.get('total_scanned', 0)}.",
        f"Relevant repositories indexed: {len(index.get('repos', []))}.",
        "",
    ]
    for repo in index.get("repos", [])[:10]:
        lines.append(
            f"- {repo.get('name')} ({repo.get('stars', 0)}★): {repo.get('description') or 'No description.'}"
        )
        if repo.get("languages"):
            lines.append(f"  Languages: {', '.join(repo['languages'][:5])}")
        if repo.get("frameworks"):
            lines.append(f"  Framework signals: {', '.join(repo['frameworks'])}")
        if repo.get("matches"):
            lines.append(f"  Keyword matches: {', '.join(repo['matches'])}")
        if repo.get("key_files"):
            lines.append(f"  Key files: {', '.join(repo['key_files'][:5])}")
    context = "
".join(lines)
    return truncate_for_context(context, limit=CONTEXT_MAX_CHARS)
def prepare_cloudflare_assets(keywords: Iterable[str], dry_run: bool = False) -> Dict[str, Any]:
    relevant, summary = fetch_cloudflare_repositories(keywords)
    top_repos = select_top_repositories(relevant, MAX_REPOS_TO_CLONE)
    downloads = download_repositories(top_repos, dry_run=dry_run)
    code_index = index_repositories(downloads, MAX_INDEX_LINES)
    context = build_repo_context(summary, code_index)
    return {
        "summary": summary,
        "top_repos": top_repos,
        "downloads": downloads,
        "code_index": code_index,
        "context": context,
    }


## 10. Google Drive & GitHub File Ops (REST API)



In [16]:

# ========== Google Drive (mounted) ==========
def ensure_drive_ready() -> bool:
    """Mounts Drive (if in Colab) and ensures the output folder exists."""
    try:
        if COLAB_ENV:
            print("[drive] Mounting…")
            drive.mount(GDRIVE_MOUNT_POINT)
        ensure_directory(OUTPUT_ROOT_PATH)
        print(f"[drive] Ready: {OUTPUT_ROOT_PATH}")
        return True
    except Exception as e:
        print(f"[drive] ERROR preparing output: {e}")
        return False


def save_to_drive(rel_path: str, content: str | bytes, binary: bool = False) -> str:
    """
    Saves a file under the configured output folder.
    rel_path: path relative to OUTPUT_ROOT_PATH (e.g., 'plans/wrangler.toml').
    content:  str or bytes. Set binary=True if bytes.
    """
    target_root = ensure_directory(OUTPUT_ROOT_PATH)
    abs_path = (target_root / rel_path).resolve()
    ensure_directory(abs_path.parent)

    mode = "wb" if binary else "w"
    encoding = None if binary else "utf-8"
    with open(abs_path, mode, encoding=encoding) as handle:
        handle.write(content)
    print(f"[drive] Saved → {abs_path}")
    return str(abs_path)


# ========== GitHub (Contents API) ==========
_GH_API = "https://api.github.com"

def _gh_headers() -> dict:
    if not GITHUB_TOKEN:
        raise RuntimeError("GITHUB_TOKEN is missing (set in Colab secrets or env).")
    return {
        "Authorization": f"Bearer {GITHUB_TOKEN}",
        "Accept": "application/vnd.github+json",
        "X-GitHub-Api-Version": "2022-11-28",
        "User-Agent": "genai-colab-uploader",
    }

def _gh_get_default_branch(repo_full: str) -> str:
    r = requests.get(f"{_GH_API}/repos/{repo_full}", headers=_gh_headers(), timeout=30)
    r.raise_for_status()
    return r.json().get("default_branch", "main")

def _gh_get_file_sha(repo_full: str, path: str, ref: Optional[str] = None) -> Optional[str]:
    params = {"ref": ref} if ref else {}
    r = requests.get(f"{_GH_API}/repos/{repo_full}/contents/{path}", headers=_gh_headers(), params=params, timeout=30)
    if r.status_code == 404:
        return None
    r.raise_for_status()
    return r.json().get("sha")

def _gh_put_contents(repo_full: str, path: str, message: str, b64content: str, branch: Optional[str], sha: Optional[str]):
    payload = {"message": message, "content": b64content}
    if branch:
        payload["branch"] = branch
    if sha:
        payload["sha"] = sha
    r = requests.put(f"{_GH_API}/repos/{repo_full}/contents/{path}", headers=_gh_headers(), json=payload, timeout=60)
    if r.status_code >= 400:
        try:
            print("[github] Error payload:", r.json())
        except Exception:
            print("[github] Error text:", r.text)
    r.raise_for_status()
    return r.json()

def save_to_github(
    path: str,
    content: str | bytes,
    commit_message: str,
    repo_full: str = GITHUB_REPO,
    branch: Optional[str] = None,
    max_retries: int = 3
) -> dict:
    if branch is None:
        branch = _gh_get_default_branch(repo_full)

    content_bytes = content if isinstance(content, (bytes, bytearray)) else content.encode("utf-8")
    b64 = base64.b64encode(content_bytes).decode("ascii")

    sha = _gh_get_file_sha(repo_full, path, ref=branch)

    for attempt in range(max_retries):
        try:
            print(f"[github] PUT {repo_full}:{branch}/{path} (attempt {attempt + 1})")
            return _gh_put_contents(repo_full, path, commit_message, b64, branch, sha)
        except requests.HTTPError as e:
            code = getattr(e.response, "status_code", None)
            if code in (429, 502, 503, 504) and attempt < max_retries - 1:
                delay = min(2 ** attempt, 30) + 0.2 * (attempt + 1)
                print(f"[github] {code} retrying in {delay:.1f}s …")
                time.sleep(delay)
                continue
            raise
    raise RuntimeError(f"Failed to upload to GitHub after {max_retries} attempts.")



## 11. Cost Report Generation



In [17]:

def generate_cost_report() -> str:
    """Generates a formatted Markdown string from the COST_LEDGER."""
    print("Generating cost report...")
    global COST_LEDGER, TOTAL_ESTIMATED_COST, COST_LIMIT_USD

    persist_cost_ledger()

    report_lines = [
        "# Agent Run Cost Report",
        f"**Run completed:** {time.ctime()}",
        f"**Cost Limit:** ${COST_LIMIT_USD:.2f}",
        f"**Total Estimated Cost:** ${TOTAL_ESTIMATED_COST:.6f}",
        "",
        "| Task | Status | Prompt Tokens | Output Tokens | Cost (USD) | Cumulative (USD) | Details |",
        "| :-- | :-- | --: | --: | --: | --: | :-- |",
    ]

    if not COST_LEDGER:
        report_lines.append("| (no tasks) | | 0 | 0 | $0.000000 | $0.000000 | |")
    else:
        for item in COST_LEDGER:
            report_lines.append(
                f"| {item.get('task', 'Unknown Task')} | {item.get('status', 'n/a')} | "
                f"{item.get('prompt_tokens', 0)} | {item.get('output_tokens', 0)} | "
                f"${item.get('cost', 0.0):.6f} | ${item.get('cumulative_cost', 0.0):.6f} | {item.get('details', '')} |"
            )

    report_lines.append("")
    report_lines.append("---")
    report_lines.append("*End of Report*")
    return "
".join(report_lines)



## 12. Main Execution



In [18]:
print("=" * 50)
print("Colby Agentic Crew - GenAI + GitHub Ops")
print("=" * 50)
print(f"Cost Limit set to: ${COST_LIMIT_USD:.2f}")
print(f"GitHub Repo set to: {GITHUB_REPO}")
print(f"Dry run mode: {DRY_RUN}")
print("NOTE: Make sure 'GOOGLE_API_KEY' and 'GITHUB_TOKEN' are set in Colab secrets (View > Show secrets).")
ensure_directory(OUTPUT_ROOT_PATH)
drive_ready = ensure_drive_ready()
cloudflare_assets: Dict[str, Any] = {}
docs_reference: Dict[str, Any] = {}
generated_text_toml: Optional[str] = None
generated_text_readme: Optional[str] = None
generated_text_agents: Optional[str] = None
now = datetime.utcnow()
timestamp_label = now.strftime("%Y%m%d-%H%M%S")
try:
    if not drive_ready and COLAB_ENV:
        print("Could not mount Google Drive. Aborting to prevent lost work.")
        sys.exit(1)
    clients = init_clients()
    if clients:
        if DRY_RUN:
            print("[dry-run] Gemini generation will be replaced with placeholder text.")
        print("
--- 0. Discovering Cloudflare repositories ---")
        cloudflare_assets = prepare_cloudflare_assets(REPO_KEYWORDS, dry_run=DRY_RUN)
        summary_payload = cloudflare_assets.get("summary", {})
        code_index_payload = cloudflare_assets.get("code_index", {})
        repo_context = cloudflare_assets.get("context", "")
        summary_json = json.dumps(summary_payload, indent=2)
        index_json = json.dumps(code_index_payload, indent=2)
        save_to_drive(SUMMARY_FILENAME, summary_json)
        save_to_drive(CODE_INDEX_FILENAME, index_json)
        if not DRY_RUN:
            try:
                save_to_github(
                    path=f"{GITHUB_SAVE_PATH}/{SUMMARY_FILENAME}",
                    content=summary_json,
                    commit_message=f"Agent: Update {SUMMARY_FILENAME} ({timestamp_label})"
                )
                save_to_github(
                    path=f"{GITHUB_SAVE_PATH}/{CODE_INDEX_FILENAME}",
                    content=index_json,
                    commit_message=f"Agent: Update {CODE_INDEX_FILENAME} ({timestamp_label})"
                )
            except Exception as e:
                print(f"Warning: unable to push discovery artifacts to GitHub: {e}")
        else:
            print("[dry-run] Skipping GitHub upload for discovery artifacts.")
        summary_context = truncate_for_context(summary_json, limit=CONTEXT_MAX_CHARS // 2)
        index_context = truncate_for_context(index_json, limit=CONTEXT_MAX_CHARS // 2)
        combined_context = truncate_for_context(
            f"Cloudflare Repository Summary (JSON):
{summary_context}
"
            f"Code Index Snapshot (JSON):
{index_context}
"
            f"Narrative Context:
{repo_context}",
            limit=CONTEXT_MAX_CHARS,
        )
        if DRY_RUN:
            generated_text_toml = "# Dry run - wrangler.toml not generated."
        else:
            print("
--- 1. Generating wrangler.toml ---")
            wrangler_prompt = """
            Using the supplied Cloudflare repository context, draft a production-ready `wrangler.toml`
            for the `colby-agentic-crew` project. The configuration must:
            - Define a Workers script entry at `src/index.ts`.
            - Register Durable Object bindings named `AGENT_SUPERVISOR` (class `AgentSupervisor`) and `TASK_LOCKER` (class `TaskLocker`).
            - Include KV (`CREW_MEMORY_KV`), D1 (`CREW_STATE_DB`), R2 (`AGENT_ARTIFACTS_R2`), and Queues (`WORKER_QUEUE`) bindings.
            - Demonstrate usage of Workers AI and Workflows where applicable.
            - Reference any relevant Cloudflare services discovered in the source repos.
            Provide full TOML including environments for development and production.
            """
            generated_text_toml = gen_text(
                clients,
                task_name="Generate wrangler.toml",
                prompt=wrangler_prompt,
                additional_context=combined_context,
            )
        if generated_text_toml:
            print("Generated wrangler.toml:
", generated_text_toml)
            output_filename_toml = f"wrangler_config_{timestamp_label}.toml"
            save_to_drive(output_filename_toml, generated_text_toml)
            save_to_drive(f"{CREW_TEMPLATE_DIR_NAME}/wrangler.toml", generated_text_toml)
            if not DRY_RUN:
                try:
                    save_to_github(
                        path=f"{GITHUB_SAVE_PATH}/{output_filename_toml}",
                        content=generated_text_toml,
                        commit_message=f"Agent: Add wrangler.toml config ({timestamp_label})"
                    )
                    save_to_github(
                        path=f"{GITHUB_SAVE_PATH}/{CREW_TEMPLATE_DIR_NAME}/wrangler.toml",
                        content=generated_text_toml,
                        commit_message=f"Agent: Sync template wrangler.toml ({timestamp_label})"
                    )
                except Exception as e:
                    print(f"Error saving wrangler.toml to GitHub: {e}")
            else:
                print("[dry-run] Skipping GitHub upload for wrangler.toml.")
        else:
            print("Skipping wrangler.toml saving steps as generation failed or was cost-blocked.")
        if DRY_RUN:
            generated_text_readme = "# Dry run - README not generated."
        else:
            print("
--- 2. Generating README.md ---")
            readme_prompt = """
            Create a comprehensive README for the Colby Agentic Crew template. The README must:
            - Summarize architecture patterns observed in the Cloudflare repositories.
            - Document how Durable Objects, Queues, KV, R2, and D1 are wired together.
            - Include deployment steps with Wrangler, Workers AI integration notes, and token budget guidance.
            - Reference the generated wrangler.toml and highlight how to extend the template.
            """
            generated_text_readme = gen_text(
                clients,
                task_name="Generate README.md",
                prompt=readme_prompt,
                additional_context=combined_context,
            )
        if generated_text_readme:
            print("Generated README.md:
", generated_text_readme)
            output_filename_readme = f"README_{timestamp_label}.md"
            save_to_drive(output_filename_readme, generated_text_readme)
            save_to_drive(f"{CREW_TEMPLATE_DIR_NAME}/README.md", generated_text_readme)
            if not DRY_RUN:
                try:
                    save_to_github(
                        path=f"{GITHUB_SAVE_PATH}/{output_filename_readme}",
                        content=generated_text_readme,
                        commit_message=f"Agent: Add README.md documentation ({timestamp_label})"
                    )
                    save_to_github(
                        path=f"{GITHUB_SAVE_PATH}/{CREW_TEMPLATE_DIR_NAME}/README.md",
                        content=generated_text_readme,
                        commit_message=f"Agent: Sync template README ({timestamp_label})"
                    )
                except Exception as e:
                    print(f"Error saving README to GitHub: {e}")
            else:
                print("[dry-run] Skipping GitHub upload for README.")
        else:
            print("Skipping README.md saving steps as generation failed or was cost-blocked.")
        print("
--- 3. Fetching Cloudflare documentation references ---")
        try:
            docs_reference = clients.docs_client.gather_docs(DOC_TOPICS_ORDER)
            docs_summary_text = clients.docs_client.summarize_for_prompt(docs_reference, limit=4000)
        except Exception as e:
            print(f"Warning: unable to gather docs: {e}")
            docs_reference = {
                "queried_at": timestamp_iso(),
                "topics": DOC_TOPICS_ORDER,
                "documents": [],
                "error": str(e),
            }
            docs_summary_text = "Documentation retrieval failed; proceed with repository context only."
        docs_json = json.dumps(docs_reference, indent=2)
        save_to_drive(DOCS_REFERENCE_FILENAME, docs_json)
        if not DRY_RUN:
            try:
                save_to_github(
                    path=f"{GITHUB_SAVE_PATH}/{DOCS_REFERENCE_FILENAME}",
                    content=docs_json,
                    commit_message=f"Agent: Update {DOCS_REFERENCE_FILENAME} ({timestamp_label})"
                )
            except Exception as e:
                print(f"Warning: unable to push docs reference to GitHub: {e}")
        else:
            print("[dry-run] Skipping GitHub upload for docs reference.")
        agents_context = truncate_for_context(
            f"{combined_context}
Cloudflare Docs Excerpts:
{docs_summary_text}",
            limit=CONTEXT_MAX_CHARS,
        )
        if DRY_RUN:
            generated_text_agents = "# Dry run - AGENTS instructions not generated."
        else:
            print("
--- 4. Generating AGENTS.md ---")
            agents_prompt = """
            Produce an `AGENTS.md` file describing how a Crew of AI agents should collaborate on this project.
            Include:
            - Roles for planner, researcher, implementer, and reviewer agents.
            - How to leverage the downloaded Cloudflare repositories for prior art.
            - Steps for updating bindings, Wrangler configs, and deployment pipelines.
            - Guidance on cost controls and when to consult the fetched Cloudflare docs.
            """
            generated_text_agents = gen_text(
                clients,
                task_name="Generate AGENTS.md",
                prompt=agents_prompt,
                additional_context=agents_context,
            )
        if generated_text_agents:
            print("Generated AGENTS.md:
", generated_text_agents)
            output_filename_agents = f"AGENTS_{timestamp_label}.md"
            save_to_drive(output_filename_agents, generated_text_agents)
            save_to_drive(f"{CREW_TEMPLATE_DIR_NAME}/AGENTS.md", generated_text_agents)
            if not DRY_RUN:
                try:
                    save_to_github(
                        path=f"{GITHUB_SAVE_PATH}/{output_filename_agents}",
                        content=generated_text_agents,
                        commit_message=f"Agent: Add AGENTS.md instructions ({timestamp_label})"
                    )
                    save_to_github(
                        path=f"{GITHUB_SAVE_PATH}/{CREW_TEMPLATE_DIR_NAME}/AGENTS.md",
                        content=generated_text_agents,
                        commit_message=f"Agent: Sync template AGENTS ({timestamp_label})"
                    )
                except Exception as e:
                    print(f"Error saving AGENTS.md to GitHub: {e}")
            else:
                print("[dry-run] Skipping GitHub upload for AGENTS.")
        else:
            print("Skipping AGENTS.md saving steps as generation failed or was cost-blocked.")
        print("
--- 5. Building Crew template workspace ---")
        template_path = ensure_directory(OUTPUT_ROOT_PATH / CREW_TEMPLATE_DIR_NAME)
        if summary_json:
            (template_path / SUMMARY_FILENAME).write_text(summary_json, encoding="utf-8")
        if index_json:
            (template_path / CODE_INDEX_FILENAME).write_text(index_json, encoding="utf-8")
        if docs_reference:
            (template_path / DOCS_REFERENCE_FILENAME).write_text(docs_json, encoding="utf-8")
        print(f"Template assets stored at {template_path}")
        print("
--- 6. Performing GitHub Issue Operations ---")
        if DRY_RUN:
            print("[dry-run] Skipping GitHub issue automation.")
        elif GITHUB_REPO == "jmbish04/colby-agentic-crew":
            try:
                issues = gh_list_issues(clients.gh, GITHUB_REPO, state="open", limit=5)
                print("Existing open issues:", json.dumps(issues, indent=2))
                if generated_text_toml:
                    issue_title = f"Bot Task: Review new wrangler.toml ({timestamp_label})"
                    issue_body = (
                        "Generated by agent.
"
                        f"See Drive output: `{output_filename_toml}`
"
                        "**Configuration Preview:**
"
                        f"```toml
{generated_text_toml[:2000]}
```
"
                    )
                    new_issue_num = gh_create_issue(
                        clients.gh,
                        GITHUB_REPO,
                        issue_title,
                        issue_body,
                        labels=["bot", "config"],
                    )
                    comment_body = "Task created automatically."
                    if generated_text_readme:
                        comment_body += f"
README draft: `{output_filename_readme}`"
                    gh_comment_issue(clients.gh, GITHUB_REPO, new_issue_num, comment_body)
            except GithubException as e:
                print(f"Error during GitHub operations: {e}")
                print(f"Please ensure the token has 'repo' scope and access to '{GITHUB_REPO}'.")
            except Exception as e:
                print(f"An unexpected error occurred during GitHub ops: {e}")
    else:
        print("Failed to initialize clients. Check secrets.")
except Exception as e:
    print(f"An uncaught error occurred in main execution: {e}")
finally:
    print("
" + "=" * 50)
    print("--- 7. Generating Final Cost Report ---")
    report_content = generate_cost_report()
    print(report_content)
    print("
--- 8. Saving Final Cost Report ---")
    report_filename = f"cost_report_{timestamp_label}.md"
    save_to_drive(report_filename, report_content)
    if not DRY_RUN:
        try:
            save_to_github(
                path=f"{GITHUB_SAVE_PATH}/{report_filename}",
                content=report_content,
                commit_message=f"Agent: Final cost report ({timestamp_label})"
            )
        except Exception as e:
            print(f"Error saving final cost report to GitHub: {e}")
    else:
        print("[dry-run] Skipping GitHub upload for cost report.")
    print("
" + "=" * 50)
    print("Script finished.")
    print(f"FINAL ESTIMATED COST: ${TOTAL_ESTIMATED_COST:.6f}")
    if COST_LOG_PATH:
        print(f"Cost ledger: {COST_LOG_PATH}")
    print("=" * 50)


Colby Agentic Crew - GenAI + GitHub Ops
Cost Limit set to: $25.00
GitHub Repo set to: jmbish04/colby-agentic-crew
NOTE: Make sure 'GOOGLE_API_KEY' and 'GITHUB_TOKEN' are set in Colab secrets (View > Show secrets).
[drive] Mounting…
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
[drive] Ready: /content/drive/MyDrive/colby_agentic_crew
Initializing clients...


  gh = Github(GITHUB_TOKEN, per_page=50)


GitHub client authenticated.
All clients initialized successfully.

--- Starting Task: Generate wrangler.toml ---
Generating text for prompt: '
        Generate a complete 'wrangler.toml' file ...'
[CostTracker] Task 'Generate wrangler.toml' cost: $0.000632
[CostTracker] New total estimated cost: $0.0006
Text generation successful.
Generated wrangler.toml:
 As an expert AI assistant specializing in Cloudflare Workers and agentic systems, here is the complete, production-ready `wrangler.toml` configuration for your project, `colby-agentic-crew`.

This configuration includes all specified bindings, using placeholders for IDs that must be generated when you run `wrangler deploy` or `wrangler d1 create`/`wrangler kv namespace create`.

## `wrangler.toml`

```toml
# CORE WORKER CONFIGURATION
name = "colby-agentic-crew"
main = "src/index.ts"
compatibility_date = "2024-05-15"
workers_dev = true

# Assuming you are using TypeScript, ensure your build process handles it.
# If using the standard