In [3]:
import os
import sys
from typing import List, Dict
from collections import Counter
from dotenv import load_dotenv
try:
    import boto3
except ImportError:
    pd = None
    print("[!] boto3 is not present", file=sys.stderr)

from botocore.exceptions import ClientError, NoCredentialsError


#### AWS IP Address Discovery Notebook
Part of the DUNE project (https://github.com/opendr-io/dune) and useful for hunting threats that are resistant to conventional detection. When threat hunting for credentialed cloud access, activity from outside the cloud provider can be easier to spot, as there often isn't a ton of normal outlier activity coming from the outside. What if the source is another AWS tenant? In order to disambiguate that, we have to know which source IPs are associated with your tenant and which are not. This notebook enumerates IP addressees used by a set of AWS accounts for use in a classification table. Activity coming from another cloud tenant could be an unofficial or unmanaged account; someone testing something using a personal account; a vendor or service provider; or a red team doing exercises. If it is none of these, you may have a case of credentialed access or exfiltration involving a threat actor's account.

This version does key based auth so it needs a .env file with "AWS_ACCESS_KEY_ID" and "AWS_SECRET_ACCESS_KEY" populated.

In [5]:
# =========================
# Error handlers
# =========================

DENY_CODES = {
    "AccessDenied",
    "AccessDeniedException",
    "UnauthorizedOperation",
    "Client.UnauthorizedOperation",
    "UnrecognizedClientException",
    "InvalidClientTokenId",
    "AuthFailure",
    "ExpiredToken",
}

def aws_call(
    api_errors: List[Dict],
    *,
    account_id: str,
    region: str,
    service: str,
    operation: str,
    fn,
    default
):
    """
    Execute a boto3 call and centrally log denied/auth errors.
    """
    try:
        return fn()
    except ClientError as e:
        err = (e.response or {}).get("Error", {}) or {}
        meta = (e.response or {}).get("ResponseMetadata", {}) or {}

        rec = {
            "account_id": account_id,
            "region": region,
            "service": service,
            "operation": operation,
            "code": err.get("Code", "Unknown"),
            "http_status": meta.get("HTTPStatusCode"),
            "message": err.get("Message", str(e)),
        }
        api_errors.append(rec)

        if rec["code"] in DENY_CODES or rec["http_status"] in (401, 403):
            msg = (rec["message"] or "").replace("\n", " ")
            print(
                f"[DENIED] acct={account_id} region={region} "
                f"svc={service} op={operation} code={rec['code']} msg={msg}",
                file=sys.stderr,
            )
        return default



In [6]:
# =========================
# AWS helpers
# =========================

def load_aws_credentials_from_env():
    load_dotenv()
    if not os.getenv("AWS_ACCESS_KEY_ID") or not os.getenv("AWS_SECRET_ACCESS_KEY"):
        raise RuntimeError("Missing AWS credentials in environment")

    if not os.getenv("AWS_DEFAULT_REGION"):
        os.environ["AWS_DEFAULT_REGION"] = "us-east-1"


def get_boto3_session() -> boto3.Session:
    return boto3.Session(
        aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
        aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
        aws_session_token=os.getenv("AWS_SESSION_TOKEN"),
        region_name=os.getenv("AWS_DEFAULT_REGION"),
    )


def get_account_id(session: boto3.Session) -> str:
    sts = session.client("sts")
    return sts.get_caller_identity()["Account"]


def list_regions(session: boto3.Session) -> List[str]:
    ec2 = session.client("ec2")
    resp = ec2.describe_regions(AllRegions=True)
    return sorted(
        r["RegionName"]
        for r in resp["Regions"]
        if r.get("OptInStatus") in (None, "opt-in-not-required", "opted-in")
    )

In [7]:
# =========================
# Collectors
# =========================

def collect_elastic_ips(ec2, account_id, region, api_errors):
    rows = []
    resp = aws_call(
        api_errors,
        account_id=account_id,
        region=region,
        service="ec2",
        operation="DescribeAddresses",
        fn=lambda: ec2.describe_addresses(),
        default={"Addresses": []},
    )

    for a in resp.get("Addresses", []):
        rows.append({
            "account_id": account_id,
            "region": region,
            "service": "ElasticIP",
            "public_ip": a.get("PublicIp"),
            "resource_id": a.get("InstanceId") or a.get("NetworkInterfaceId") or "",
            "allocation_id": a.get("AllocationId", ""),
            "extra": "attached" if a.get("InstanceId") else "unattached",
        })
    return rows


def collect_instance_public_ips(ec2, account_id, region, api_errors):
    rows = []

    if ec2.can_paginate("describe_instances"):
        paginator = aws_call(
            api_errors,
            account_id=account_id,
            region=region,
            service="ec2",
            operation="GetPaginator(describe_instances)",
            fn=lambda: ec2.get_paginator("describe_instances"),
            default=None,
        )
        pages = paginator.paginate() if paginator else []
    else:
        resp = aws_call(
            api_errors,
            account_id=account_id,
            region=region,
            service="ec2",
            operation="DescribeInstances",
            fn=lambda: ec2.describe_instances(),
            default={"Reservations": []},
        )
        pages = [resp]

    for page in pages:
        for r in page.get("Reservations", []):
            for inst in r.get("Instances", []):
                if not inst.get("PublicIpAddress"):
                    continue
                rows.append({
                    "account_id": account_id,
                    "region": region,
                    "service": "EC2Instance",
                    "public_ip": inst["PublicIpAddress"],
                    "resource_id": inst["InstanceId"],
                    "allocation_id": "",
                    "extra": f"state={inst.get('State', {}).get('Name')}",
                })
    return rows


def collect_nat_gateway_ips(ec2, account_id, region, api_errors):
    rows = []

    if ec2.can_paginate("describe_nat_gateways"):
        paginator = aws_call(
            api_errors,
            account_id=account_id,
            region=region,
            service="ec2",
            operation="GetPaginator(describe_nat_gateways)",
            fn=lambda: ec2.get_paginator("describe_nat_gateways"),
            default=None,
        )
        pages = paginator.paginate() if paginator else []
    else:
        resp = aws_call(
            api_errors,
            account_id=account_id,
            region=region,
            service="ec2",
            operation="DescribeNatGateways",
            fn=lambda: ec2.describe_nat_gateways(),
            default={"NatGateways": []},
        )
        pages = [resp]

    for page in pages:
        for nat in page.get("NatGateways", []):
            for a in nat.get("NatGatewayAddresses", []):
                if not a.get("PublicIp"):
                    continue
                rows.append({
                    "account_id": account_id,
                    "region": region,
                    "service": "NATGateway",
                    "public_ip": a["PublicIp"],
                    "resource_id": nat["NatGatewayId"],
                    "allocation_id": a.get("AllocationId", ""),
                    "extra": f"state={nat.get('State')}",
                })
    return rows

In [8]:
# =========================
# Main function
# =========================

def main():
    try:
        load_aws_credentials_from_env()
        session = get_boto3_session()
        account_id = get_account_id(session)
    except (RuntimeError, NoCredentialsError, ClientError) as e:
        print(f"[FATAL] {e}", file=sys.stderr)
        sys.exit(1)

    print(f"[*] Enumerating public IPs for account {account_id}")

    api_errors: List[Dict] = []
    all_rows: List[Dict] = []

    regions = list_regions(session)
    print(f"[*] Regions: {len(regions)}")

    for region in regions:
        print(f"[*] Scanning {region}")
        ec2 = session.client("ec2", region_name=region)

        all_rows.extend(collect_elastic_ips(ec2, account_id, region, api_errors))
        all_rows.extend(collect_instance_public_ips(ec2, account_id, region, api_errors))
        all_rows.extend(collect_nat_gateway_ips(ec2, account_id, region, api_errors))

    if not all_rows:
        print("[*] No public IPs found")
        return

    dedup = {
        (r["account_id"], r["region"], r["service"], r["public_ip"], r["resource_id"]): r
        for r in all_rows
    }
    rows = list(dedup.values())

    print(f"[*] Found {len(rows)} public IPs")

    if pd:
        df = pd.DataFrame(rows).sort_values(
            ["region", "service", "public_ip"]
        )
        df.to_csv("aws_public_ips.csv", index=False)
        print("[*] Wrote aws_public_ips.csv")
        print(df.head(20).to_string(index=False))

    if api_errors:
        denied = [
            e for e in api_errors
            if e["code"] in DENY_CODES or e["http_status"] in (401, 403)
        ]
        print("\n=== DENIED API CALL SUMMARY ===")
        c = Counter((e["region"], e["operation"], e["code"]) for e in denied)
        for (region, op, code), n in c.most_common(25):
            print(f"{n:>3} region={region:<12} op={op:<28} code={code}")


if __name__ == "__main__":
    main()


[*] Enumerating public IPs for account 637602092272
[*] Regions: 17
[*] Scanning ap-northeast-1
[*] Scanning ap-northeast-2
[*] Scanning ap-northeast-3
[*] Scanning ap-south-1
[*] Scanning ap-southeast-1
[*] Scanning ap-southeast-2
[*] Scanning ca-central-1
[*] Scanning eu-central-1
[*] Scanning eu-north-1
[*] Scanning eu-west-1
[*] Scanning eu-west-2
[*] Scanning eu-west-3
[*] Scanning sa-east-1
[*] Scanning us-east-1
[*] Scanning us-east-2
[*] Scanning us-west-1
[*] Scanning us-west-2
[*] No public IPs found
