In [1]:
import os
import pandas as pd
from pymongo import MongoClient
from typing import List, Dict, Set
from datetime import datetime
from dataclasses import dataclass, asdict
from pymongo.cursor import Cursor
from tqdm import tqdm

import dotenv
dotenv.load_dotenv(".env", override=True) # Defines MONGO_URI

True

In [2]:
@dataclass(frozen=True)
class CVE:
    id: str
    severity: str
    fix_state: str

@dataclass(frozen=True)
class Remediation:
    cve: CVE
    first_seen_at: datetime
    remediated_at: datetime


def _init_active_table(first_scan: Dict) -> Dict[CVE, datetime]:
    return {CVE(**cve): first_scan["scan_start"] for cve in first_scan["cves"]}


def _get_remediated_cves(discovered: set[CVE], active: Dict[CVE, datetime]) -> Set[CVE]:
    past = set(active.keys())
    return past - discovered


def _get_new_cves(discovered: set[CVE], active: Dict[CVE, datetime]) -> Set[CVE]:
    past = set(active.keys())
    return discovered - past


def find_remediations(scans: Cursor) -> List[Remediation]:
    active = None
    last_scan = None
    rems = []

    for s in scans:
        if active is None:
            active = _init_active_table(s)
        else:
            discovered_cves = {CVE(**cve) for cve in s["cves"]}
            remediated_cves = _get_remediated_cves(discovered_cves, active)
            for cve in remediated_cves:
                r = Remediation(cve, active[cve], s["scan_start"])
                rems.append(r)
                del active[cve]
            new_cves = _get_new_cves(discovered_cves, active)
            for cve in new_cves:
                active[cve] = s["scan_start"]
        last_scan = s

    # Consider all remaining CVEs in the table "remediated"
    if active is not None:
        for cve, first_seen_at in active.items():
            r = Remediation(cve, first_seen_at, last_scan["scan_start"])
            rems.append(r)

    return rems

In [3]:
client = MongoClient(os.environ["MONGO_URI"])
db = client["gallery"]

images = list(db["images"].find())

rems_df = pd.DataFrame()

for img in tqdm(images):
    query = {
        "registry": img["registry"],
        "repository": img["repository"],
        "tag": img["tag"]
    }
    scans = db["cves"].find(query)
    rems = find_remediations(scans)
    df = pd.DataFrame(map(asdict, rems))
    for col in ["registry", "repository", "tag"]:
        df[col] = img[col]
    rems_df = pd.concat([rems_df, df], axis=0, ignore_index=True)

100%|██████████| 127/127 [02:13<00:00,  1.05s/it]


In [11]:
rems_df["rtime"] = (rems_df["remediated_at"] - rems_df["first_seen_at"])
rems_df["rtime"] = rems_df["rtime"].dt.total_seconds() / 3600
rtime_df = rems_df.groupby(["registry", "repository", "tag"])["rtime"].mean().reset_index()

In [12]:
rtime_df.sort_values(by="rtime", ascending=True).to_csv("times.csv", index=False)

Unnamed: 0,registry,repository,tag,rtime
7,cgr.dev,chainguard/python,latest,0.895860
42,docker.io,zookeeper,latest,18.065064
36,docker.io,sonarqube,latest,19.466991
23,docker.io,maven,latest,20.639536
38,docker.io,tomcat,latest,20.657875
...,...,...,...,...
6,cgr.dev,chainguard/mariadb,latest,22.927350
87,registry.access.redhat.com,ubi9/nginx-120,latest,22.929140
34,docker.io,registry,latest,22.930423
78,registry.access.redhat.com,ubi8/openssl,latest,22.931589
