# Imports and Setup

In [48]:
import os
import re
import requests
from typing import Optional, List, Tuple, Literal, Dict
from urllib.parse import urlparse
from datetime import datetime

from pydantic import BaseModel, Field
from dotenv import load_dotenv

from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage

load_dotenv()  # loads OPENAI_API_KEY if you store it in .env
assert os.getenv("OPENAI_API_KEY"), "OPENAI_API_KEY not set"

# Data models (public-repo friendly)

Code to fetch github prs and create pydantic models to access required features

In [50]:

class GitUser(BaseModel):
    login: str
    id: int
    node_id: Optional[str] = None
    avatar_url: Optional[str] = None
    gravatar_id: Optional[str] = None
    url: Optional[str] = None
    html_url: Optional[str] = None
    type: Optional[str] = None

class Repository(BaseModel):
    url: Optional[str] = None
    svn_url: Optional[str] = None


class PRRef(BaseModel):
    sha: str
    ref: str
    label: Optional[str] = None
    repo: Optional[Repository] = None

class PRDetails(BaseModel):
    title: str
    body: Optional[str] = None
    user: GitUser
    created_at: datetime
    updated_at: datetime
    merged: bool
    mergeable: Optional[bool] = None
    commits: int
    additions: int
    deletions: int
    changed_files: int
    head: PRRef
    base: PRRef

class PRFile(BaseModel):
    sha: str
    filename: str
    status: Literal["added","modified","removed","renamed","copied","changed","unchanged"]
    additions: int
    deletions: int
    changes: int
    blob_url: Optional[str] = None
    raw_url: Optional[str] = None
    contents_url: Optional[str] = None
    patch: Optional[str] = None
    previous_filename: Optional[str] = None
    original_file_url: Optional[str] = None




# Github Connector

In [52]:
class GithubAPIConnector:
    def __init__(self, pr_url: str):
        parts = [p for p in urlparse(pr_url).path.split("/") if p]
        if len(parts) < 4 or parts[2] != "pull":
            raise ValueError(
                f"Expected https://github.com/<owner>/<repo>/pull/<num>, got: {pr_url}"
            )

        # STORE these as attributes
        self.owner = parts[0]
        self.repo = parts[1]
        self.pr_number = parts[3]

        self.api_pr_url = (
            f"https://api.github.com/repos/{self.owner}/{self.repo}/pulls/{self.pr_number}"
        )
        self.files_url = (
            f"https://api.github.com/repos/{self.owner}/{self.repo}/pulls/{self.pr_number}/files"
        )

        self.session = requests.Session()
        self.session.headers.update(
            {"Accept": "application/vnd.github+json"}
        )

    def get_pr_details(self) -> PRDetails:
        response = requests.get(self.api_pr_url)
        response.raise_for_status()
        return PRDetails.model_validate(response.json())

    def get_pr_files(self) -> list[PRFile]:
        response = requests.get(self.files_url)
        response.raise_for_status()
        return [PRFile.model_validate(file) for file in response.json()]

class GithubPRFilesFetcher():
    def __init__(self, pr_url):
        self.connector = GithubAPIConnector(pr_url)

    def fetch_pr(self) -> tuple[PRDetails, list[PRFile]]:
        pr = self.connector.get_pr_details()
        files = self.connector.get_pr_files()

        base_sha = pr.base.sha
        owner, repo = self.connector.owner, self.connector.repo

        for f in files:
            if f.status in ("modified", "renamed"):
                f.original_file_url = (
                    f"https://raw.githubusercontent.com/{owner}/{repo}/{base_sha}/{f.filename}"
                )

        return pr, files

In [53]:
# pull_request_url = 'https://github.com/topoteretes/cognee/pull/1851'
# pr_fetcher = GithubPRFilesFetcher(pr_url=pull_request_url)
# files = pr_fetcher.fetch_pr_files()

# Context enrichment: parse hunks ‚Üí fetch targeted windows from actual file

## Parse ‚Äúadded-side‚Äù hunks from patch

In [54]:
HUNK_RE = re.compile(r"@@ -\d+(?:,\d+)? \+(\d+)(?:,(\d+))? @@")

def parse_added_hunks(patch: Optional[str]) -> List[Tuple[int, int]]:
    """
    Returns list of (start_line, length) for the + (new) side hunks.
    """
    if not patch:
        return []
    hunks = []
    for m in HUNK_RE.finditer(patch):
        start = int(m.group(1))
        length = int(m.group(2) or "1")
        hunks.append((start, length))
    return hunks


## Fetch file content

In [55]:
# A simple tool to fetch text from a URL
import requests
from typing import Optional

def fetch_text(url: Optional[str], timeout: int = 30) -> Optional[str]:
    if not url:
        return None
    r = requests.get(url, timeout=timeout)
    if r.status_code == 404:
        return None
    r.raise_for_status()
    return r.text


## Extract windows around hunks and merge overlaps

In [56]:
def merge_ranges(ranges: List[Tuple[int,int]]) -> List[Tuple[int,int]]:
    """Merge overlapping [start,end] inclusive ranges (1-indexed)."""
    if not ranges:
        return []
    ranges = sorted(ranges)
    merged = [ranges[0]]
    for s,e in ranges[1:]:
        ps,pe = merged[-1]
        if s <= pe + 1:
            merged[-1] = (ps, max(pe, e))
        else:
            merged.append((s,e))
    return merged

def extract_windows_from_hunks(file_text: str, hunks: List[Tuple[int,int]], padding: int = 30, max_total_lines: int = 260) -> str:
    lines = file_text.splitlines()
    # convert each hunk into [start, end] range with padding
    ranges = []
    for start, length in hunks:
        s = max(1, start - padding)
        e = min(len(lines), start + length + padding)
        ranges.append((s,e))

    ranges = merge_ranges(ranges)

    snippets = []
    used = 0
    for s,e in ranges:
        if used >= max_total_lines:
            break
        take_s = s
        take_e = min(e, s + (max_total_lines - used) - 1)
        snippet = "\n".join(lines[take_s-1:take_e])
        snippets.append(f"[lines {take_s}-{take_e}]\n{snippet}")
        used += (take_e - take_s + 1)

    return "\n\n".join(snippets)


## Build the ‚Äúreview bundle‚Äù (patch + targeted file context)

In [57]:
def build_pr_header(pr: PRDetails) -> str:
    return "\n".join([
        f"PR TITLE: {pr.title}",
        f"PR AUTHOR: {pr.user.login} ({pr.user.type})",
        f"CREATED: {pr.created_at.isoformat()} | UPDATED: {pr.updated_at.isoformat()}",
        f"COMMITS: {pr.commits} | +{pr.additions} -{pr.deletions} | FILES: {pr.changed_files}",
        "",
        "PR DESCRIPTION:",
        (pr.body or "").strip()[:2000],
        "",
    ])

def build_patch_only_bundle(files: List[PRFile], max_patch_chars: int = 6000) -> str:
    chunks = []
    for f in files:
        patch = (f.patch or "")
        patch = patch[:max_patch_chars] if patch else ""
        chunks.append("\n".join([
            f"FILE: {f.filename} | status={f.status} | +{f.additions} -{f.deletions} | changes={f.changes}",
            "PATCH:",
            patch if patch else "[No patch provided by GitHub API]",
            "-"*60
        ]))
    return "\n".join(chunks)

def build_enriched_bundle(files: List[PRFile], focus_files: List[str], padding: int = 30) -> str:
    focus_set = set(focus_files)
    chunks = []
    for f in files:
        if f.filename not in focus_set:
            continue

        head_text = fetch_text(f.raw_url)
        hunks = parse_added_hunks(f.patch)
        context = ""
        if head_text and hunks:
            context = extract_windows_from_hunks(head_text, hunks, padding=padding, max_total_lines=260)
        elif head_text:
            # fallback: top slice
            lines = head_text.splitlines()
            context = "\n".join(lines[:140]) + ("\n...\n" if len(lines) > 160 else "")

        chunks.append("\n".join([
            f"FILE: {f.filename} | status={f.status} | +{f.additions} -{f.deletions} | changes={f.changes}",
            "TARGETED HEAD CONTEXT:",
            context if context else "[Could not fetch/extract head context]",
            "-"*60
        ]))
    return "\n".join(chunks)


# Agent schemas (structured)

In [58]:
Severity = Literal["P0", "P1", "P2"]
Category = Literal["correctness", "security", "performance", "maintainability"]

class Finding(BaseModel):
    severity: Severity
    category: Category
    file: Optional[str] = None
    line_range: Optional[str] = None
    title: str
    description: str
    recommendation: str
    confidence: float  # 0.0 - 1.0

class AgentFindings(BaseModel):
    findings: List[Finding]

# LLM agents (Triage + Correctness + Security)

In [59]:
llm = ChatOpenAI(model="gpt-4.1-mini", temperature=0)

# Triage Agent
def triage_agent(pr_header: str, patch_bundle: str, llm) -> TriageResult:
    system = """You are a senior tech lead doing PR triage.
Pick a small set of focus files (max 5) that are most important/risky to review deeply.
Only use information in the PR header and patches provided.
"""

    user = f"""Return JSON matching this schema:
{TriageResult.model_json_schema()}

PR HEADER:
{pr_header}

PATCHES:
{patch_bundle}
"""
    structured = llm.with_structured_output(TriageResult)
    return structured.invoke([SystemMessage(content=system), HumanMessage(content=user)])

# Correctness Agent
def correctness_agent(pr_header: str, enriched_bundle: str, llm) -> List[Finding]:
    system = """You are a correctness-focused code reviewer.
Find concrete, high-signal issues grounded in the provided context.
Prefer fewer, higher impact findings. If unsure, lower confidence.
"""
    user = f"""Return JSON matching this schema:
{AgentFindings.model_json_schema()}

PR HEADER:
{pr_header}

ENRICHED CONTEXT (targeted file windows):
{enriched_bundle}
"""
    structured = llm.with_structured_output(AgentFindings)
    return structured.invoke([SystemMessage(content=system), HumanMessage(content=user)]).findings

# Security Agent
def security_agent(pr_header: str, enriched_bundle: str, llm) -> List[Finding]:
    system = """You are a security-focused code reviewer.
Look for auth/authz mistakes, secrets leakage, injection risks, unsafe logging, insecure defaults.
Only report issues supported by the diff/context. If unsure, ask a question instead of asserting.
"""
    user = f"""Return JSON matching this schema:
{AgentFindings.model_json_schema()}

PR HEADER:
{pr_header}

ENRICHED CONTEXT (targeted file windows):
{enriched_bundle}
"""
    structured = llm.with_structured_output(AgentFindings)
    return structured.invoke([SystemMessage(content=system), HumanMessage(content=user)]).findings


# Lead reviewer: dedupe + rank + render Markdown

In [60]:
def dedupe_findings(findings: List[Finding]) -> List[Finding]:
    # simple heuristic: same file + same title => same issue
    seen = set()
    out = []
    for f in findings:
        key = (f.file or "", f.title.strip().lower(), f.category)
        if key in seen:
            continue
        seen.add(key)
        out.append(f)
    # sort by severity then confidence desc
    sev_order = {"P0": 0, "P1": 1, "P2": 2}
    out.sort(key=lambda x: (sev_order.get(x.severity, 9), -x.confidence))
    return out

def render_findings_md(items: List[Finding]) -> str:
    if not items:
        return "_None_"
    lines = []
    for f in items:
        loc = ""
        if f.file:
            loc = f.file
            if f.line_range:
                loc += f":{f.line_range}"
        loc = f" ‚Äî `{loc}`" if loc else ""
        lines.append(
            f"- **[{f.category.upper()}] {f.title}**{loc}\n"
            f"  - {f.description}\n"
            f"  - **Recommendation:** {f.recommendation}\n"
            f"  - Confidence: `{f.confidence:.2f}`"
        )
    return "\n".join(lines)

def lead_reviewer(pr: PRDetails, triage: TriageResult, findings: List[Finding]) -> str:
    findings = dedupe_findings(findings)
    p0 = [f for f in findings if f.severity == "P0"]
    p1 = [f for f in findings if f.severity == "P1"]
    p2 = [f for f in findings if f.severity == "P2"]

    q_md = "\n".join([f"- {q}" for q in triage.questions_for_author]) if triage.questions_for_author else "_None_"

    return f"""# SentinelReview Report

## PR
- **Title:** {pr.title}
- **Author:** {pr.user.login} ({pr.user.type})
- **Files Changed:** {pr.changed_files} | **Commits:** {pr.commits} | **Net:** +{pr.additions} -{pr.deletions}
- **Triage Risk:** **{triage.risk.upper()}**

## Summary
{triage.summary}

## Questions for the author
{q_md}

## Findings

### P0 (Must fix)
{render_findings_md(p0)}

### P1 (Should fix)
{render_findings_md(p1)}

### P2 (Nice to have)
{render_findings_md(p2)}

## Focus files reviewed deeply
{", ".join(triage.focus_files) if triage.focus_files else "_None_"}
"""


# End-to-end run

In [61]:
pull_request_url = "https://github.com/kiranrawat/SentinelReview/pull/1"


#  "https://github.com/langchain-ai/langgraph/pulls/6641"
# pull_request_url = "https://github.com/topoteretes/cognee/pull/1851"


fetcher = GithubPRFilesFetcher(pr_url=pull_request_url)
pr, files = fetcher.fetch_pr()

pr_header = build_pr_header(pr)
patch_bundle = build_patch_only_bundle(files)

# 1) TRIAGE decides focus files
triage = triage_agent(pr_header, patch_bundle, llm)
print("Triage focus files:", triage.focus_files)

# 2) Build enriched context only for focus files
enriched = build_enriched_bundle(files, triage.focus_files, padding=30)

# 3) Specialist agents
findings = []
findings += correctness_agent(pr_header, enriched, llm)
findings += security_agent(pr_header, enriched, llm)

# 4) Final report
report_md = lead_reviewer(pr, triage, findings)
print(report_md)


RateLimitError: Error code: 429 - {'error': {'message': 'You exceeded your current quota, please check your plan and billing details. For more information on this error, read the docs: https://platform.openai.com/docs/guides/error-codes/api-errors.', 'type': 'insufficient_quota', 'param': None, 'code': 'insufficient_quota'}}

In [None]:
print(bundle)

PR TITLE: feat: Simple TUI for cognee-cli
PR AUTHOR: rajeevrajeshuni (User)
CREATED: 2025-11-30 08:52:14+00:00 | UPDATED: 2025-12-29 16:33:11+00:00
COMMITS: 50 | +1710 -25 | FILES: 18

PR DESCRIPTION:
## Description
The TUI is up as a functional starting point, though some flags are missing compared to the CLI.


* **Usage:** Added a `tui` the command to the cognee-cli to start the TUI.
* **Status:** `add`, `search`, `delete` and `cognify` screens work as expected.
* **Blockers:** The `config` command seems incomplete on the existing cli, same is the case with TUI too. 
* **Plan:** Let's hold off on automated tests until the UI design is final to avoid simulating UI actions on a moving target.

Fixes #1762

## Type of Change
- [x] New feature (non-breaking change that adds functionality)

## Pre-submission Checklist
- [x] This PR contains minimal changes necessary to address the issue/feature
- [x] My code follows the project's coding standards and style guidelines
- [x] I have searche

In [30]:
findings

[Finding(severity='P1', category='correctness', file='cognee/cli/tui/cognify_screen.py', line_range='144-169', title='Incomplete code in _cognify_async method', description="The _cognify_async method in CognifyTUIScreen ends abruptly with an incomplete line 'background_checkbox = sel', which is a syntax error and will cause the TUI to fail when cognify is executed.", recommendation='Complete the method implementation by properly re-enabling the inputs and removing the incomplete line. Ensure the method ends with correct syntax and logic to re-enable UI elements after processing.', confidence=0.9),
 Finding(severity='P1', category='correctness', file='cognee/cli/tui/delete_screen.py', line_range='211-247', title='Incomplete code in _handle_delete_all method', description="The _handle_delete_all method in DeleteTUIScreen ends abruptly with an incomplete line 'se', which is a syntax error and will cause the TUI to fail when attempting to delete all data.", recommendation='Complete the met

In [35]:
files[0].patch

'@@ -92,6 +92,7 @@ def _discover_commands() -> List[Type[SupportsCliCommand]]:\n         ("cognee.cli.commands.cognify_command", "CognifyCommand"),\n         ("cognee.cli.commands.delete_command", "DeleteCommand"),\n         ("cognee.cli.commands.config_command", "ConfigCommand"),\n+        ("cognee.cli.commands.tui_command", "TuiCommand"),\n     ]\n \n     for module_path, class_name in command_modules:'