In [None]:
import boto3
import json

#### Overview
This notebook is part of developing the `inspector-remediate` package. The package is designed to query package vulnerabilities on EC2 instances and leverage an LLM chosen by the user to recommend remediation. This notebook was used to develop several functions to list AWS Inspector findings, extract relevant details, and prepare the details for an LLM.

#### Connect to AWS Inspector
The following expects credentials in `~/.aws/credentials` under `[default]`.

In [None]:
client = boto3.client('inspector2')

#### Get Findings
This function lists active findings for packages vulnerabilities on EC2 instances.

In [None]:
def get_active_findings(client, finding_filter=None):
    """
    Query active vulnerability findings from AWS Inspector2.

    Args:
        client (botocore.client.BaseClient):
            A boto3 Inspector2 client.
        finding_filter(dict, optional):
            A filterCriteria override. If None, a default filter returns 
            active package vulnerabilities on EC2 instances.

    Returns:
        list: A list of Inspector2 findings matching the filter criteria.
    """

    findings = []
    next_token = None

    if finding_filter is None:
        finding_filter = {
            "findingStatus": [
                {
                    "comparison": "EQUALS",
                    "value": "ACTIVE"
                }
            ],
            "findingType": [
                {
                    "comparison": "EQUALS",
                    "value": "PACKAGE_VULNERABILITY"
                }
            ],
            "resourceType": [
                {
                    "comparison": "EQUALS",
                    "value": "AWS_EC2_INSTANCE"
                }
            ]
        }
    
    while True:
        if next_token:
            response = client.list_findings(
                filterCriteria=finding_filter,
                nextToken=next_token
            )
        else:
            response = client.list_findings(
                filterCriteria=finding_filter
            )
    
        findings.extend(response.get("findings", []))
        next_token = response.get("nextToken")
    
        if not next_token:
            break
    
    return findings

In [None]:
active_findings = get_active_findings(client)

In [None]:
for f in active_findings:
    print(f["title"])

#### Normalize Finding Details for LLM
The following function extracts relevant fields from a finding and returns them as a JSON object.

In [None]:
def normalize_finding_for_llm(finding, region=None):
    """
    Normalize an Inspector2 finding for LLM input.

    Args:
        finding (dict):
            A single Inspector2 finding contained in output from
            `get_active_findings`.

    Returns:
        dict:
            - finding_arn
            - cve
            - title
            - description
            - severity
            - resource
            - package
            - references
    """

    pvd = finding.get("packageVulnerabilityDetails", {}) or {}
    vuln_id = pvd.get("vulnerabilityId")
    vulnerable_packages = pvd.get("vulnerablePackages") or []
    pkg = vulnerable_packages[0] if vulnerable_packages else {}

    resources = finding.get("resources") or []
    r0 = resources[0] if resources else {}
    r_details = r0.get("details", {}) or {}
    ec2 = r_details.get("awsEc2Instance", {}) or {}

    refs = pvd.get("referenceUrls") or []
    remediation = finding.get("remediation") or {}
    remediation_url = remediation.get("url")
    if remediation_url and remediation_url not in refs:
        refs.append(remediation_url)

    normalized = {
        "finding_arn": finding.get("findingArn"),
        "cve": vuln_id,
        "title": finding.get("title"),
        "description": finding.get("description"),
        "severity": finding.get("severity"),
        
        "resource": {
            "account_id": finding.get("awsAccountId"),
            "region": r0.get("region"),
            "instance_id": r0.get("id") or ec2.get("instanceId"),
            "os": ec2.get("platformDetails") or ec2.get("platform")
        },
        
        "package": {
            "name": pkg.get("packageName"),
            "installed_version": pkg.get("installedVersion") or pkg.get("version"),
            "fixed_version": pkg.get("fixedVersion") or pkg.get("fixedInVersion"),
        },
        "references": refs,
    }

    return normalized

In [None]:
llm_payload = [normalize_finding_for_llm(f) for f in package_findings]

In [None]:
print(f"Total findings in payload: {len(llm_payload)}\n")
print("First finding in payload:\n")
print(json.dumps(llm_payload[0], indent=2))