In [None]:
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
AIzaSyDh0sLr2IMrrIP3MKfQ8K5F9xcRin5VsuA

In [5]:
!pip install --upgrade pip



In [6]:
!pip install tavily-python google-generativeai tenacity pandas reportlab tqdm jsonschema requests PyPDF2 rich

Collecting PyPDF2
  Downloading pypdf2-3.0.1-py3-none-any.whl.metadata (6.8 kB)
Downloading pypdf2-3.0.1-py3-none-any.whl (232 kB)
Installing collected packages: PyPDF2
Successfully installed PyPDF2-3.0.1


In [13]:
# ------------------------
# One-shot Colab cell: Robust Tavily + Gemini (gemini-2.5-flash/pro) pipeline
# Paste into a single Google Colab cell and run.
# ------------------------

# 0) Installs (first run)
!pip install --quiet --upgrade pip
!pip install --quiet tavily-python google-generativeai tenacity pandas reportlab tqdm jsonschema requests rich

# 1) Imports & config
import os, json, re, time, traceback
from dataclasses import dataclass
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any, Optional, Tuple
from pathlib import Path

import pandas as pd
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type
from tqdm.auto import tqdm
from jsonschema import validate
from rich.console import Console
import requests

# Tavily + Gemini client import (we'll configure at runtime)
from tavily import TavilyClient
import google.generativeai as genai
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas

console = Console()

@dataclass
class Config:
    TAVILY_API_KEY: str = "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"
    GEMINI_API_KEY: str = "AIzaSyDh0sLr2IMrrIP3MKfQ8K5F9xcRin5VsuA"
    GEMINI_MODEL: str = "gemini-2.5-flash"   # change to "gemini-2.5-pro" if desired
    MAX_WORKERS: int = 6
    TAVILY_MAX_RESULTS: int = 6
    CACHE_PATH: str = "/content/tavily_cache.json"
    OUTPUT_DIR: str = "/content/tavily_outputs"
    PDF_OUTPUT: str = "security_report.pdf"
    REQUEST_TIMEOUT: int = 30
    PRIORITY_THRESHOLD: float = 70.0

cfg = Config()

# 2) Gemini JSON schema (strict)
GEMINI_OUTPUT_SCHEMA = {
    "type": "object",
    "properties": {
        "cve": {"type": "string"},
        "exploit_status": {"type": ["string", "null"]},
        "vendor_mitigations": {"type": ["array", "null"]},
        "patch_available": {"type": ["string", "null"]},
        "cvss": {"type": ["number", "null"]},
        "summary": {"type": ["string", "null"]},
        "citations": {"type": ["array", "null"]}
    },
    "required": ["cve"]
}

# 3) Lazy clients
_tavily_client = None
_generative_model = None

def init_tavily_client():
    global _tavily_client
    if _tavily_client is None:
        if not cfg.TAVILY_API_KEY:
            cfg.TAVILY_API_KEY = input("Enter Tavily API key: ").strip()
        _tavily_client = TavilyClient(api_key=cfg.TAVILY_API_KEY)
    return _tavily_client

def init_gemini_model():
    """
    Initialize Gemini model object using google.generativeai.GenerativeModel.
    Does not pass unsupported kwargs. If initialization fails, raise informative error.
    """
    global _generative_model
    if _generative_model is not None:
        return _generative_model
    if not cfg.GEMINI_API_KEY:
        cfg.GEMINI_API_KEY = input("Enter Gemini API key: ").strip()
    try:
        genai.configure(api_key=cfg.GEMINI_API_KEY)
        # Create GenerativeModel instance
        _generative_model = genai.GenerativeModel(cfg.GEMINI_MODEL)
        # quick smoke call WITHOUT timeout kwarg to test connectivity (short prompt)
        try:
            resp = _generative_model.generate_content("Health check — reply OK.")  # do NOT pass timeout kwarg
            txt = getattr(resp, "text", None) or str(resp)
            console.print("[green]Gemini reachable (sample reply):[/green]", txt[:200])
        except Exception as e:
            # we still proceed; will catch errors on generation per-call
            console.print("[yellow]Warning: Gemini model test raised an exception (will continue but per-call errors may occur):[/yellow]")
            console.print(repr(e))
        return _generative_model
    except Exception as e:
        # Provide diagnostic tips instead of failing silently
        console.print("[red]Failed to initialize Gemini GenerativeModel.[/red]")
        console.print("- Confirm GEMINI_API_KEY is correct and has model access.")
        console.print(f"- Confirm model name (cfg.GEMINI_MODEL) is valid: currently '{cfg.GEMINI_MODEL}'. Try 'gemini-2.5-flash' or 'gemini-2.5-pro'.")
        console.print("- If you see 127.0.0.1/404 in logs, ensure no HTTP proxy/local interceptor is configured.")
        console.print("Exception:", repr(e))
        raise

# 4) Disk cache
def ensure_output_dir():
    Path(cfg.OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
    return cfg.OUTPUT_DIR

def load_cache(path=cfg.CACHE_PATH) -> Dict[str,Any]:
    try:
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f)
    except FileNotFoundError:
        return {}
    except Exception as e:
        console.print("[yellow]Warning loading cache:[/yellow]", e)
        return {}

def save_cache(cache, path=cfg.CACHE_PATH):
    ensure_output_dir()
    with open(path, "w", encoding="utf-8") as f:
        json.dump(cache, f, indent=2, ensure_ascii=False)

cache = load_cache()

# 5) Tavily search with retry + caching
@retry(wait=wait_exponential(min=1, max=8), stop=stop_after_attempt(3), retry=retry_if_exception_type(Exception))
def tavily_search(query: str, max_results: int = None) -> List[Dict[str,str]]:
    client = init_tavily_client()
    max_r = max_results or cfg.TAVILY_MAX_RESULTS
    key = f"TAVILY::{query}::{max_r}"
    if key in cache:
        return cache[key]
    resp = client.search(query=query, max_results=max_r, timeout=cfg.REQUEST_TIMEOUT)
    normalized = []
    if isinstance(resp, dict) and "results" in resp:
        for r in resp["results"]:
            normalized.append({
                "content": (r.get("content") or r.get("text") or r.get("snippet") or "")[:8000],
                "url": r.get("url") or r.get("source_url") or ""
            })
    else:
        normalized = [{"content": str(resp), "url": ""}]
    cache[key] = normalized
    save_cache(cache)
    return normalized

# 6) Call Gemini safely (no unsupported kwargs), parse & repair
def call_gemini_once(prompt_text: str) -> Tuple[bool, str]:
    """
    Calls Gemini model.generate_content(prompt) without unsupported kwargs.
    Returns (ok, text_or_error_string).
    """
    try:
        model = init_gemini_model()
    except Exception as e:
        return False, f"init_error: {e}"
    try:
        # IMPORTANT: do NOT pass 'timeout' or other unsupported kwargs here
        response = model.generate_content(prompt_text)
        text = getattr(response, "text", None)
        if text is None:
            # fallback to stringifying the response
            text = str(response)
        return True, text
    except Exception as e:
        # log exception trace brief
        console.print("[yellow]Gemini call failed (caught exception):[/yellow]", repr(e))
        # capture minimal traceback for diagnostics
        tb = traceback.format_exc()
        return False, f"generation_error: {e}\n{tb[:1000]}"

def try_parse_json_with_repair(text: str, cve_id: str, context: str) -> Dict[str,Any]:
    # Try parse -> try JSON substring -> ask Gemini to strictly return JSON (repair) -> fallback object
    try:
        obj = json.loads(text)
        validate(instance=obj, schema=GEMINI_OUTPUT_SCHEMA)
        return obj
    except Exception:
        m = re.search(r"(\{[\s\S]*\})", text)
        if m:
            try:
                obj = json.loads(m.group(1))
                validate(instance=obj, schema=GEMINI_OUTPUT_SCHEMA)
                return obj
            except Exception:
                pass
        # Repair attempt: ask Gemini to reformat into strict JSON
        repair_prompt = (
            "The previous response was not valid JSON. Reformat the following into STRICT valid JSON "
            "with fields: cve, exploit_status, vendor_mitigations, patch_available, cvss, summary, citations. "
            "Return ONLY JSON.\n\n"
            f"Context:\n{context}\nCVE: {cve_id}\n"
        )
        ok, resp = call_gemini_once(repair_prompt)
        if ok:
            try:
                obj = json.loads(resp)
                validate(instance=obj, schema=GEMINI_OUTPUT_SCHEMA)
                return obj
            except Exception:
                pass
        # final fallback
        return {
            "cve": cve_id,
            "exploit_status": None,
            "vendor_mitigations": [],
            "patch_available": None,
            "cvss": None,
            "summary": (text[:1000] if text else None),
            "citations": []
        }

def call_gemini_structured(cve_id: str, context: str) -> Dict[str,Any]:
    prompt = f"""
You are a security analyst. Using the context below (vendor advisories, CVE pages, blog posts),
produce a STRICT and VALID JSON object with these fields:
- cve (string)
- exploit_status (string or null) -- one of: "Active exploit", "Proof-of-Concept public", "No known PoC", "Patch only", "Unknown"
- vendor_mitigations (array of short strings) or []
- patch_available ("Yes"/"No"/null)
- cvss (numeric e.g., 9.8 or null)
- summary (short paragraph)
- citations (array of URLs)

Context:
{context}
CVE: {cve_id}

Return ONLY valid JSON (no extra commentary). If a field is not known, use null or an empty array.
"""
    ok, resp = call_gemini_once(prompt)
    if not ok:
        # return safe fallback including the error for diagnostics
        return {
            "cve": cve_id,
            "exploit_status": None,
            "vendor_mitigations": [],
            "patch_available": None,
            "cvss": None,
            "summary": None,
            "citations": [],
            "_model_error": resp,
            "_raw_context_snippet": context[:3000]
        }
    # parse/repair
    return try_parse_json_with_repair(resp, cve_id, context)

# 7) Enrichment worker
def enrich_single_cve(cve_id: str) -> Dict[str,Any]:
    try:
        query = f"{cve_id} vendor advisory exploit mitigation"
        hits = tavily_search(query)
        context = "\n\n".join([h.get("content","") for h in hits[: cfg.TAVILY_MAX_RESULTS]])
        citations = [h.get("url") for h in hits if h.get("url")]
        parsed = call_gemini_structured(cve_id, context)
        parsed.setdefault("cve", cve_id)
        if not parsed.get("citations"):
            parsed["citations"] = citations
        parsed["_tavily_hits"] = len(hits)
        parsed["_raw_context_snippet"] = context[:4000]
        parsed["_retrieved_at"] = datetime.now(timezone.utc).isoformat()
        return parsed
    except Exception as e:
        return {"cve": cve_id, "error": str(e)}

# 8) Bulk enrichment concurrency
def enrich_cve_list(cve_list: List[str]) -> List[Dict[str,Any]]:
    results = []
    with ThreadPoolExecutor(max_workers=cfg.MAX_WORKERS) as ex:
        futures = {ex.submit(enrich_single_cve, cve): cve for cve in cve_list}
        for fut in tqdm(as_completed(futures), total=len(futures), desc="Enriching CVEs"):
            try:
                results.append(fut.result())
            except Exception as e:
                results.append({"cve": futures[fut], "error": str(e)})
    return results

# 9) Parsing & scoring helpers
def parse_cvss(value) -> Optional[float]:
    if value is None: return None
    if isinstance(value, (int, float)): return float(value)
    s = str(value).strip()
    if s == "" or s.lower() in ("none","null","na","n/a","nan","unknown"): return None
    m = re.search(r"([0-9]\.[0-9])", s)
    if m:
        try: return float(m.group(1))
        except: pass
    m2 = re.search(r"\b([0-9]{1,2})\b", s)
    if m2:
        try:
            val = float(m2.group(1))
            if 0 <= val <= 10: return val
        except: pass
    return None

def map_exploit_status_score(status: Optional[str]) -> float:
    if not status: return 0.2
    s = status.lower()
    if "active exploit" in s: return 1.0
    if "proof-of-concept" in s or "poc" in s: return 0.8
    if "no known" in s: return 0.2
    if "patch" in s and "only" in s: return 0.1
    return 0.3

def compute_priority_score(item: Dict[str,Any]) -> float:
    cvss = parse_cvss(item.get("cvss"))
    cvss_val = cvss or 0.0
    cvss_norm = min(10.0, max(0.0, cvss_val)) / 10.0
    exploit_score = map_exploit_status_score(item.get("exploit_status") or (item.get("_model_error") and "Unknown"))
    asset_score = 0.5
    if item.get("asset_criticality"):
        ac = str(item.get("asset_criticality")).lower()
        mp = {"critical":1.0,"high":0.8,"medium":0.5,"low":0.2}
        asset_score = mp.get(ac, 0.5)
    rec_ts = item.get("_retrieved_at")
    recency_boost = 0.0
    if rec_ts:
        try:
            dt = datetime.fromisoformat(rec_ts)
            days = (datetime.now(timezone.utc) - dt).days
            recency_boost = max(0.0, 0.1 - 0.01 * days)
        except:
            recency_boost = 0.0
    score = (0.5 * cvss_norm + 0.35 * exploit_score + 0.1 * asset_score + recency_boost) * 100.0
    if score < 5.0:
        if item.get("summary") or item.get("_raw_context_snippet") or (item.get("citations") and len(item.get("citations"))>0):
            score = max(score, 5.0)
    return round(score,2)

def extract_cvss_from_text(text: str) -> Optional[float]:
    if not text: return None
    m = re.search(r"CVSS[^0-9\r\n:]*[:\s]*([0-9]\.[0-9])", text, flags=re.IGNORECASE)
    if m:
        try: return float(m.group(1))
        except: pass
    m2 = re.search(r"score[:\s]*([0-9]\.[0-9])", text, flags=re.IGNORECASE)
    if m2:
        try: return float(m2.group(1))
        except: pass
    return None

# 10) Outputs: CSV/JSON/PDF/HTML
def save_outputs(enriched: List[Dict[str,Any]], tag: str = None) -> Dict[str,str]:
    ensure_output_dir()
    ts = datetime.now().strftime("%Y%m%d_%H%M%S") if tag is None else tag
    csv_path = os.path.join(cfg.OUTPUT_DIR, f"enriched_{ts}.csv")
    json_path = os.path.join(cfg.OUTPUT_DIR, f"enriched_{ts}.json")
    df = pd.json_normalize(enriched)
    if "priority_score" not in df.columns:
        df["priority_score"] = df.apply(lambda r: compute_priority_score(r.to_dict()), axis=1)
    df.to_csv(csv_path, index=False)
    with open(json_path, "w", encoding="utf-8") as f:
        json.dump(enriched, f, indent=2, ensure_ascii=False)
    return {"csv": csv_path, "json": json_path}

def generate_pdf(enriched: List[Dict[str,Any]], pdf_path: str = None, top_n:int=10) -> str:
    ensure_output_dir()
    pdf_path = os.path.join(cfg.OUTPUT_DIR, pdf_path or cfg.PDF_OUTPUT)
    df = pd.json_normalize(enriched)
    if "priority_score" not in df.columns:
        df["priority_score"] = df.apply(lambda r: compute_priority_score(r.to_dict()), axis=1)
    top = df.sort_values("priority_score", ascending=False).head(top_n)
    c = canvas.Canvas(pdf_path, pagesize=letter)
    w,h = letter
    margin = 40
    x = margin
    y = h - margin
    c.setFont("Helvetica-Bold", 16)
    c.drawString(x, y, "Security Enrichment Report")
    y -= 22
    c.setFont("Helvetica", 9)
    c.drawString(x, y, f"Generated: {datetime.now().isoformat()}")
    y -= 18
    for _, row in top.iterrows():
        if y < 110:
            c.showPage()
            y = h - margin
        title = f"{row.get('cve','?')} — Score {row.get('priority_score')}"
        c.setFont("Helvetica-Bold", 11)
        c.drawString(x, y, title)
        y -= 14
        c.setFont("Helvetica", 9)
        summ = row.get("summary") or (row.get("_raw_context_snippet") or "")[:800]
        import textwrap
        for line in textwrap.wrap(str(summ), width=95):
            c.drawString(x, y, line[:140])
            y -= 11
            if y < 110:
                c.showPage()
                y = h - margin
        cit = row.get("citations") or []
        if isinstance(cit, str): cit = [cit]
        if cit:
            c.setFont("Helvetica-Oblique", 8)
            c.drawString(x, y, "Citations:")
            y -= 12
            for u in cit[:3]:
                c.drawString(x+8, y, str(u)[:100])
                y -= 10
                if y < 110:
                    c.showPage()
                    y = h - margin
        y -= 8
    c.save()
    return pdf_path

def generate_html(enriched: List[Dict[str,Any]], html_filename: str = "security_report.html") -> str:
    ensure_output_dir()
    html_path = os.path.join(cfg.OUTPUT_DIR, html_filename)
    df = pd.json_normalize(enriched)
    if "priority_score" not in df.columns:
        df["priority_score"] = df.apply(lambda r: compute_priority_score(r.to_dict()), axis=1)
    top = df.sort_values("priority_score", ascending=False)
    html_parts = ["<html><head><meta charset='utf-8'><title>Security Report</title></head><body>"]
    html_parts.append(f"<h1>Security Enrichment Report</h1><p>Generated: {datetime.now().isoformat()}</p>")
    html_parts.append("<table border=1 cellpadding=6 cellspacing=0 style='border-collapse:collapse'><tr><th>CVE</th><th>Score</th><th>Exploit Status</th><th>Patch</th><th>CVSS</th><th>Summary</th></tr>")
    for _, row in top.iterrows():
        html_parts.append("<tr>")
        html_parts.append(f"<td>{row.get('cve','')}</td>")
        html_parts.append(f"<td>{row.get('priority_score','')}</td>")
        html_parts.append(f"<td>{row.get('exploit_status','')}</td>")
        html_parts.append(f"<td>{row.get('patch_available','')}</td>")
        html_parts.append(f"<td>{row.get('cvss','')}</td>")
        html_parts.append(f"<td>{(str(row.get('summary',''))[:300])}</td>")
        html_parts.append("</tr>")
    html_parts.append("</table></body></html>")
    with open(html_path, "w", encoding="utf-8") as f:
        f.write("\n".join(html_parts))
    return html_path

# ------------------------
# 11) Runner
# ------------------------
def run_pipeline(cve_list: List[str], re_enrich_low: bool = True, notify: bool = False) -> Dict[str,Any]:
    # prompt for API keys if not set in env
    if not cfg.TAVILY_API_KEY:
        cfg.TAVILY_API_KEY = input("Enter Tavily API key (or set env TAVILY_API_KEY): ").strip()
    if not cfg.GEMINI_API_KEY:
        cfg.GEMINI_API_KEY = input("Enter Gemini API key (or set env GEMINI_API_KEY): ").strip()
    # init clients (Tavily and Gemini test)
    try:
        init_tavily_client()
        init_gemini_model()
    except Exception as e:
        console.print("[red]Client initialization failed. See message above for diagnostics.[/red]")
        raise

    console.print(f"[green]Starting enrichment for {len(cve_list)} CVEs...[/green]")
    start = time.time()
    enriched = enrich_cve_list(cve_list)
    elapsed = time.time() - start
    console.print(f"[green]Enrichment finished in {elapsed:.1f}s. Retrieved {len(enriched)} items.[/green]")

    # CVSS extraction fallback
    for it in enriched:
        if not it.get("cvss"):
            cv = extract_cvss_from_text(it.get("_raw_context_snippet","") or it.get("summary",""))
            if cv:
                it["cvss"] = cv
                it["cvss_extracted"] = True

    # compute priority
    for it in enriched:
        if "error" not in it:
            it["priority_score"] = compute_priority_score(it)
        else:
            it["priority_score"] = 0.0

    # optional re-enrich low-scoring entries
    if re_enrich_low:
        low = [it["cve"] for it in enriched if it.get("priority_score",0) < 30]
        if low:
            console.print(f"[yellow]Re-enriching {len(low)} low-score CVEs with more Tavily hits...[/yellow]")
            old_max = cfg.TAVILY_MAX_RESULTS
            cfg.TAVILY_MAX_RESULTS = max(old_max, 12)
            for cve in low:
                new_it = enrich_single_cve(cve)
                for i, it in enumerate(enriched):
                    if it.get("cve") == cve:
                        enriched[i] = new_it
                        break
            cfg.TAVILY_MAX_RESULTS = old_max
            for it in enriched:
                if "error" not in it:
                    it["priority_score"] = compute_priority_score(it)

    # Save outputs
    outs = save_outputs(enriched)
    pdf = generate_pdf(enriched, pdf_path=cfg.PDF_OUTPUT)
    html = generate_html(enriched)
    console.print(f"Saved CSV: {outs['csv']}")
    console.print(f"Saved JSON: {outs['json']}")
    console.print(f"Saved PDF: {pdf}")
    console.print(f"Saved HTML: {html}")

    # return summary
    return {"enriched": enriched, "files": outs, "pdf": pdf, "html": html}

# ------------------------
# 12) Run example
# ------------------------
if __name__ == "__main__":
    # Example CVEs - replace with your list
    example_cves = [
        "CVE-2023-12345",
        "CVE-2022-54321",
        "CVE-2020-9999",
        "CVE-2024-77777"
    ]
    # Optionally set model here:
    # cfg.GEMINI_MODEL = "gemini-2.5-pro"
    result = run_pipeline(example_cves, re_enrich_low=True, notify=False)
    console.print("[bold green]Done.[/bold green]")
    df = pd.json_normalize(result["enriched"])
    if "priority_score" not in df.columns:
        df["priority_score"] = df.apply(lambda r: compute_priority_score(r.to_dict()), axis=1)
    # ensure columns exist
    for c in ["exploit_status","patch_available"]:
        if c not in df.columns:
            df[c] = None
    console.print(df.sort_values("priority_score", ascending=False)[["cve","priority_score","exploit_status","patch_available"]].to_string(index=False))
    print("\nFiles saved to:", cfg.OUTPUT_DIR)
    print("To download files in Colab, run:")
    print("from google.colab import files")
    print(f"files.download('{result['files']['csv']}')  # CSV")
    print(f"files.download('{result['files']['json']}')  # JSON")
    print(f"files.download('{result['pdf']}')  # PDF")


Enriching CVEs:   0%|          | 0/4 [00:00<?, ?it/s]

ERROR:tornado.access:503 POST /v1beta/models/gemini-2.5-flash:generateContent?%24alt=json%3Benum-encoding%3Dint (127.0.0.1) 39239.13ms



Files saved to: /content/tavily_outputs
To download files in Colab, run:
from google.colab import files
files.download('/content/tavily_outputs/enriched_20250907_060418.csv')  # CSV
files.download('/content/tavily_outputs/enriched_20250907_060418.json')  # JSON
files.download('/content/tavily_outputs/security_report.pdf')  # PDF


In [12]:
import os
print("HTTP_PROXY:", os.environ.get("HTTP_PROXY"), "HTTPS_PROXY:", os.environ.get("HTTPS_PROXY"))
# To unset for this session:
os.environ.pop("HTTP_PROXY", None); os.environ.pop("HTTPS_PROXY", None)
os.environ.pop("http_proxy", None); os.environ.pop("https_proxy", None)
print("Proxies removed for session.")


HTTP_PROXY: None HTTPS_PROXY: None
Proxies removed for session.
