# **Step 1 Importing libraries:**


In [2]:
import pandas as pd
import numpy as np
import re
from dateutil import parser as dtp

# **Step 2 : Clean & Normalize Vulnerability Data**

In [3]:
import pandas as pd
import numpy as np
import re
from dateutil import parser as dtp

def parse_date_safe(x):
    try:
        return pd.to_datetime(x, errors="coerce")
    except Exception:
        return pd.NaT

SEV_ALIASES = {
    "NONE": "NONE", "LOW": "LOW", "MEDIUM": "MEDIUM", "HIGH": "HIGH", "CRITICAL": "CRITICAL",
    "MODERATE": "MEDIUM",
    "INFO": "NONE", "INFORMATIONAL": "NONE",
    "NA": "UNKNOWN", "N/A": "UNKNOWN", "UNKNOWN": "UNKNOWN"
}

def normalize_sev_text(s):
    if pd.isna(s):
        return None
    t = str(s).strip().upper()
    return SEV_ALIASES.get(t, t)

def sev_from_cvss(score):
    if score is None or pd.isna(score):
        return None
    try:
        score = float(score)
    except Exception:
        return None
    if score == 0.0:
        return "NONE"
    if 0.0 < score <= 3.9:
        return "LOW"
    if 4.0 <= score <= 6.9:
        return "MEDIUM"
    if 7.0 <= score <= 8.9:
        return "HIGH"
    return "CRITICAL"

# main cleaning function

def clean_and_normalize_vulns(df_raw: pd.DataFrame) -> pd.DataFrame:
    """
    Takes a raw CVE dataframe (from Threat Collector or CSV)
    and returns a cleaned + normalized dataframe.
    """
    df = df_raw.copy()

    # Summary before
    print("Rows (raw):", len(df))
    print("Columns:", df.columns.tolist())

    # 1) Remove accidental header-duplicate rows like a literal 'id' in id column
    if df['id'].astype(str).str.lower().eq('id').any():
        n_before = len(df)
        df = df[~df['id'].astype(str).str.lower().eq('id')].copy()
        print(f"Removed {n_before - len(df)} stray header rows")

    # 2) Drop exact duplicates
    before = len(df)
    df = df.drop_duplicates().reset_index(drop=True)
    after = len(df)
    print(f"Dropped exact duplicates: {before - after}")

    # 3) Convert / normalize columns
    # cvss_score -> numeric
    df['cvss_score'] = pd.to_numeric(
        df.get('cvss_score', pd.Series([np.nan] * len(df))),
        errors='coerce'
    )

    # Keep original severity text for auditing, then normalize to our canonical set
    df['original_severity_raw'] = df.get('severity')
    df['severity_text_norm'] = df['original_severity_raw'].apply(normalize_sev_text)

    # parsed dates
    for date_col in ['published_date', 'last_modified', 'ingested_at']:
        if date_col in df.columns:
            df[date_col] = df[date_col].apply(parse_date_safe)

    # 4) Decide final severity: prefer cvss numeric if available
    def decide_final_severity(row):
        score = row.get('cvss_score')
        sev_txt = row.get('severity_text_norm')
        sev_from_score = sev_from_cvss(score)
        if sev_from_score and not sev_txt:
            return sev_from_score, "score"
        if sev_from_score and sev_txt:
            if sev_from_score != sev_txt:
                return sev_from_score, "score_over_text"
            return sev_txt, "score_and_text_agree"
        if not sev_from_score and sev_txt:
            return sev_txt, "text_only"
        return "UNKNOWN", "none"

    decisions = df.apply(decide_final_severity, axis=1, result_type='expand')
    decisions.columns = ['final_severity', 'severity_source']
    df[['final_severity', 'severity_source']] = decisions

    # Adding a quick helper
    df['has_cvss'] = df['cvss_score'].notna()

    # 5) Quick checks
    print("\nFinal severity distribution:")
    print(df['final_severity'].value_counts(dropna=False))

    # IMPORTANT: no saving here, we just return the dataframe
    return df

# **Step 3: Technique Matching (MITRE ATT&CK Enrichment)**

In [5]:
import re
import pandas as pd


STOP = set([
    "the","and","with","from","that","this","into","over","under","using","used",
    "will","have","been","their","they","them","such","also","able","allow","allows",
    "may","can","for","onto","than","then","when","where","while","within",
    "task","tasks"
])

def build_keywords(text: str):
    txt = str(text).lower()
    toks = re.findall(r"[a-z0-9][a-z0-9\-\_]{2,}", txt)  # tokens len >= 3
    toks = [t for t in toks if t not in STOP]
    return set(toks)

def build_tech_index(df_mitre: pd.DataFrame):
    """
    Build a keyword index from a MITRE techniques dataframe.
    Expects columns like: tech_id, name, description
    """
    df_mitre = df_mitre.fillna("")
    tech_index = []
    for _, r in df_mitre.iterrows():
        name = r.get("name", "")
        desc = r.get("description", "")
        kws  = build_keywords(name) | build_keywords(desc)
        tech_index.append({
            "tech_id":   r.get("tech_id"),
            "tech_name": name,
            "keywords":  kws
        })
    print("Built keyword index for", len(tech_index), "techniques")
    return tech_index

def match_technique_for_text(text: str, tech_index, min_score: int = 1):
    """
    Given a free-text description and a pre-built tech_index,
    return (tech_id, tech_name, score).
    """
    text_kws = build_keywords(text)
    best = None
    best_score = 0
    for tech in tech_index:
        score = len(text_kws & tech["keywords"])
        if score > best_score:
            best_score = score
            best = tech
    if best and best_score >= min_score:
        return best["tech_id"], best["tech_name"], int(best_score)
    return None, None, 0

#  MITRE ENRICHMENT

def enrich_with_mitre(df_cve: pd.DataFrame,
                      df_mitre: pd.DataFrame,
                      min_score: int = 1) -> pd.DataFrame:
    """
    Takes:
      - df_cve   : cleaned vulnerabilities (output of Step 2)
      - df_mitre : MITRE ATT&CK techniques table
    Returns:
      - df_enriched : df_cve + ['technique_id','technique_name','tech_match_score']
    """

    print("CVE rows:", len(df_cve))
    print("MITRE techniques:", len(df_mitre))

    # build index from MITRE table
    tech_index = build_tech_index(df_mitre)

    # helper to combine title + description safely
    def merged_text(row):
        return f"{row.get('title', '')} {row.get('description', '')}"

    # apply matching to every CVE
    matches = df_cve.apply(
        lambda r: match_technique_for_text(merged_text(r), tech_index, min_score=min_score),
        axis=1,
        result_type="expand"
    )
    matches.columns = ["technique_id", "technique_name", "tech_match_score"]

    df_enriched = pd.concat([df_cve, matches], axis=1)

    # coverage / sanity check
    matched = (df_enriched["technique_id"].notna()) & (df_enriched["tech_match_score"] > 0)
    print(f"\nMatched rows: {matched.sum()} / {len(df_enriched)} "
          f"({matched.sum()/len(df_enriched):.1%})")

    print("\nTop 5 enriched rows (preview):")
    preview_cols = [
        "id",
        "title",
        "final_severity" if "final_severity" in df_enriched.columns else "severity",
        "technique_name",
        "technique_id",
        "tech_match_score"
    ]
    display(df_enriched.head(5)[preview_cols])

    # IMPORTANT: no CSV saving here â€” just return the enriched dataframe
    return df_enriched

# **Step 4: Risk Scoring, Prioritization & Remediation Suggestions**

In [7]:
import pandas as pd
import numpy as np

def risk_scoring_and_prioritization(df_enriched: pd.DataFrame):
    """
    Step 4: Risk Scoring, Prioritization & Remediation Suggestions

    Input:
        df_enriched - DataFrame from Step 3 with columns like:
            ['id','title','cvss_score','final_severity',
             'technique_name','technique_id','tech_match_score','affected_products',...]
    Output:
        df_prior   - prioritized vulnerabilities with risk_score, priority, remediation_suggestion
        df_rem     - compact remediation view
        prod_summary - per-product summary table
    """

    df = df_enriched.copy()

    # 2) Prepare numeric fields and handle missing values
    df['cvss_score'] = pd.to_numeric(df.get('cvss_score'), errors='coerce').fillna(0.0)
    df['tech_match_score'] = pd.to_numeric(df.get('tech_match_score'), errors='coerce').fillna(0).astype(int)

    # Compute maximum observed tech match score (used for normalization)
    max_tech_score = max(1, int(df['tech_match_score'].max()))
    print("Max tech_match_score observed:", max_tech_score)

    # 3) Risk score formula (simple, tunable)
    # - CVSS (0-10) contributes 70% of the score
    # - MITRE tech match normalized contributes 30%
    def compute_risk_score(row):
        cvss = float(row['cvss_score']) if not np.isnan(row['cvss_score']) else 0.0
        tv = float(row['tech_match_score']) / max_tech_score  # 0..1
        # scale to 0..100
        score = (cvss / 10.0) * 70.0 + tv * 30.0
        return round(score, 1)

    df['risk_score'] = df.apply(compute_risk_score, axis=1)

    # 4) Priority rules (combine final_severity and risk_score)
    def assign_priority(row):
        sev = str(row.get('final_severity') or row.get('severity') or "").upper()
        s = float(row['risk_score'])
        if sev == 'CRITICAL' or s >= 80:
            return "CRITICAL"
        if sev == 'HIGH' or s >= 60:
            return "HIGH"
        if sev == 'MEDIUM' or s >= 40:
            return "MEDIUM"
        if s >= 20:
            return "LOW"
        return "INFO"

    df['priority'] = df.apply(assign_priority, axis=1)

    # 5) Simple remediation suggestion rules
    def suggest_remediation(row):
        tech = str(row.get('technique_name') or "").lower()
        title = str(row.get('title') or "").lower()
        cwe = str(row.get('cwe') or "").lower()

        if "registry" in tech or "registry" in title:
            return "Apply vendor patch / registry hardening / restrict write access"
        if "dll" in tech or "dll" in title:
            return "Update/replace vulnerable DLLs, validate signatures, apply vendor patch"
        if "remote code" in title or "remote" in tech or "rce" in title:
            return "Apply vendor patch immediately, restrict network access, enable WAF/IDS"
        if "cloud" in tech or "iam" in tech or "cloud" in title:
            return "Review cloud IAM policies, rotate credentials, apply vendor patch"
        if "sql" in title or "sql injection" in title or "injection" in tech:
            return "Sanitize inputs, apply patch, review DB permissions and WAF rules"
        if "modify registry" in tech:
            return "Block unauthorized registry changes and apply vendor fixes"
        # fallback
        return "Check vendor advisory for patch; apply patch and follow recommended mitigations"

    df['remediation_suggestion'] = df.apply(suggest_remediation, axis=1)

    # 6) Per-product summary
    df['affected_products'] = df.get('affected_products', pd.Series([None]*len(df))).fillna("UNKNOWN_PRODUCT")

    prod_summary = (
        df.groupby(['affected_products','priority'])
          .size()
          .reset_index(name='count')
          .pivot(index='affected_products', columns='priority', values='count')
          .fillna(0)
          .reset_index()
    )

    # main prioritized view (same columns as before)
    cols_out = [
        'id','title','cvss_score','final_severity','technique_name','technique_id',
        'tech_match_score','risk_score','priority','remediation_suggestion','affected_products'
    ]
    df_prior = df[cols_out].copy()

    # compact remediation view
    df_rem = df[['id','remediation_suggestion','priority','risk_score']].copy()

    print("\nTop 8 prioritized vulnerabilities (preview):")
    display(df_prior.sort_values(['priority','risk_score'], ascending=[True, False]).head(8))

    # return 3 tables instead of saving CSV files
    return df_prior, df_rem, prod_summary


# **Step 5: Build the Vulnerability Analyzer Agent**

In [8]:
import pandas as pd
import json
from typing import Optional

class VulnerabilityAnalyzer:
    """
    Analyzer built on top of the prioritized vulnerabilities produced in Step 4.
    It does NOT read or write CSV files.
    It only uses in-memory DataFrames provided by the pipeline.
    """

    def __init__(self, df_prior: pd.DataFrame, df_rem: Optional[pd.DataFrame] = None):
        # Work on copies to avoid changing original DataFrames
        self.df_prior = df_prior.copy()
        self.df_rem = df_rem.copy() if df_rem is not None else None

        # Normalize helpful columns
        self.df_prior['risk_score'] = pd.to_numeric(
            self.df_prior.get('risk_score', 0),
            errors='coerce'
        ).fillna(0)

        self.df_prior['tech_match_score'] = pd.to_numeric(
            self.df_prior.get('tech_match_score', 0),
            errors='coerce'
        ).fillna(0)

        self.df_prior['cvss_score'] = pd.to_numeric(
            self.df_prior.get('cvss_score', 0),
            errors='coerce'
        ).fillna(0)

        if 'id' in self.df_prior.columns:
            self.df_prior['id'] = self.df_prior['id'].astype(str).str.strip()

        print("VulnerabilityAnalyzer initialized with", len(self.df_prior), "rows")

    # 1) Find by CVE id
    def find_by_cve(self, cve_id: str) -> Optional[dict]:
        """Return a single vulnerability row as dict for the given CVE id (case-insensitive)."""
        cve_id = str(cve_id).strip()
        if 'id' not in self.df_prior.columns:
            return None
        row = self.df_prior[self.df_prior['id'].str.lower() == cve_id.lower()]
        if row.empty:
            return None
        return row.iloc[0].to_dict()

    # 2) Top N by risk
    def top_n_by_risk(self, n: int = 10, severity: Optional[str] = None) -> pd.DataFrame:
        """Return top-N rows by risk_score. Optionally filter by final_severity."""
        df = self.df_prior.copy()
        if severity and 'final_severity' in df.columns:
            df = df[df['final_severity'].str.upper() == severity.upper()]
        return df.sort_values('risk_score', ascending=False).head(n)

    # 3) Free-text search
    def search_free_text(self, query: str, n: int = 20) -> pd.DataFrame:
        """Simple text search over title + description + technique_name + affected_products."""
        q = str(query).lower().strip()
        cols = ['title', 'description', 'technique_name', 'affected_products']
        mask = False
        for c in cols:
            if c in self.df_prior.columns:
                mask = mask | self.df_prior[c].astype(str).str.lower().str.contains(q, na=False)
        if isinstance(mask, bool) and mask is False:
            return pd.DataFrame()
        results = self.df_prior[mask].copy()
        # sort by risk and tech_match_score so more relevant appear first
        results['sort_key'] = results['risk_score'] * 2 + results['tech_match_score']
        return results.sort_values('sort_key', ascending=False).head(n).drop(columns=['sort_key'])

    # 4) Filter by product name
    def by_product(self, product_name: str, n: int = 50) -> pd.DataFrame:
        """Return vulnerabilities mentioning product_name in affected_products."""
        p = str(product_name).lower()
        if 'affected_products' not in self.df_prior.columns:
            return pd.DataFrame()
        df = self.df_prior[
            self.df_prior['affected_products'].astype(str).str.lower().str.contains(p, na=False)
        ].copy()
        return df.sort_values('risk_score', ascending=False).head(n)

    # 5) Get remediation suggestion for a CVE
    def remediation_for_cve(self, cve_id: str) -> Optional[str]:
        """
        Return remediation_suggestion from prioritized table
        or from a separate remediation dataframe if available.
        """
        row = self.find_by_cve(cve_id)
        if row is None:
            return None

        # try field on prioritized table first
        if 'remediation_suggestion' in row and pd.notna(row['remediation_suggestion']):
            return row['remediation_suggestion']

        # fallback to separate remediation file
        if (self.df_rem is not None and
            'id' in self.df_rem.columns and
            'remediation_suggestion' in self.df_rem.columns):
            r = self.df_rem[self.df_rem['id'].astype(str).str.lower() == str(cve_id).lower()]
            if not r.empty:
                return r.iloc[0]['remediation_suggestion']

        return None

    # 6) Export a single CVE record as JSON (string)
    def export_vuln_report_json(self, cve_id: str) -> str:
        """
        Export a single CVE report as a JSON string.
        (In an API, you might return this directly to the caller.)
        """
        rec = self.find_by_cve(cve_id)
        if rec is None:
            raise ValueError("CVE not found: " + str(cve_id))
        rec_serializable = json.loads(json.dumps(rec, default=str))
        return json.dumps(rec_serializable, indent=2, ensure_ascii=False)
