-----------------------------------------------------------------------
<font size=6 color="#36454F"><b>Coding Sample for Crogl Team Members Notebook</b></font><br><br>
by: Ted Summey

This doc exemplifi es a problem that Crogl dev teams come across routinely. If this is the kind of work you enjoy doing and you excel at it, you will love working with the Crogl team.

**Problem Statement**

As a software developer I want to know if the code that I am using is vulnerable to known vulnerabilities. I am building an application that gathers vulnerability data, analyzes my code and identifi es vulnerabilities in my code. I want a teammate to help me with part of this project.

I want you to create a script that downloads all github security vulnerabilities from the [Github Advisory Database](https://github.com/advisories?query=type%3Areviewed+ecosystem%3Apip). Then zips up the advisories by severity: 4 zips for each category of severity: low, moderate, high, critical.

Extra credit for generating a csv fi le with a row for every vulnerability and a set of attributes summarizing the key information for each vulnerability. Extra credit for CSV to contain a fi eld called KEV. If the vulnerability is in the CISA Known Exploited Vulnerabilities Catalog, the KEV fi eld value will be 1, Otherwise the fi eld will be empty.

***

![alt Process Flow Diagram](images/flow-diagram.png)

<div class="alert alert-block alert-info">
<font size=4 color="#36454F"><b>Code Block #1: Get the number of pages</b></font><br><br>
The URL "https://github.com/advisories?query=type%3Areviewed+ecosystem%3Apip" (GitHub Advisory Database) is actually a URL with search parameters applied, base URL https://github.com/advisories with type:reviewed ecosystem:pip parameters applied. To get a list I needed to use pagination in a way that didn't trigger GitHub's 1000 limit. To do this I need to know how many pages, so I created the code below to dynamically determine the number of pages and store it in the variable "total_pages" for the next code block.
</div>

In [4]:
# CODE BLOCK #1

import requests
from bs4 import BeautifulSoup

def get_total_pages_dynamic(base_url, search_query):
    """
    Dynamically finds the highest page number in GitHub advisory results by inspecting the body class for the last 'aria-label' value.

    Parameters:
        base_url (str): The base URL of the GitHub advisories page.
        search_query (str): Search query parameters (e.g., "type:reviewed ecosystem:pip").

    Returns:
        int: The total number of pages found.
    """
    headers = {'User-Agent': 'Mozilla/5.0'}
    url = f"{base_url}?query={search_query}"
    response = requests.get(url, headers=headers)
    
    if response.status_code == 200:
        soup = BeautifulSoup(response.content, 'html.parser')
        
        # Find the highest page number from the aria-label attribute
        pagination_links = soup.find_all('a', {'aria-label': True})
        page_numbers = []

        for link in pagination_links:
            label = link.attrs['aria-label']
            if label.startswith("Page "):  # Look for "Page X"
                try:
                    page_number = int(label.replace("Page ", "").strip())
                    page_numbers.append(page_number)
                except ValueError:
                    continue

        if page_numbers:
            return max(page_numbers)  # Return the highest page number
        else:
            print("No pagination found. Defaulting to page 1.")
            return 1
    else:
        print(f"Failed to fetch page: {response.status_code}")
        return 1

# Parameters
BASE_URL = "https://github.com/advisories"
SEARCH_QUERY = "type:reviewed ecosystem:pip"

# Fetch total pages dynamically
total_pages = get_total_pages_dynamic(BASE_URL, SEARCH_QUERY)
print(f"Total pages found: {total_pages}")


Total pages found: 146


<div class="alert alert-block alert-info">
<font size=4 color="#36454F"><b>Code Block #2: Create a list of advisory URL's for GraphQL API</b></font><br><br>
Initially, I didn't see how to "search" the GraphQL API applying filters for the type:reviewed ecosystem:pip parameters so I had to try and scrape out a list of URLs for all the type:reviewed ecosystem:pip advisories. The code block below generates the github_advisories_full.csv in which the URL column is used in the next code block.
</div>

In [6]:
# CODE BLOCK #2

import requests
from bs4 import BeautifulSoup
import time
import pandas as pd

def get_all_advisories(base_url, search_query, total_pages, delay=2):
    """
    Scrapes GitHub advisories across all pages.

    Parameters:
        base_url (str): The base URL of the GitHub advisories page.
        search_query (str): Search query parameters (e.g., "type:reviewed ecosystem:pip").
        total_pages (int): Total number of pages to scrape.
        delay (int): Delay in seconds between requests to avoid rate limits.

    Returns:
        list: A list of advisories (dicts).
    """
    advisories = []
    headers = {'User-Agent': 'Mozilla/5.0'}

    for page in range(1, total_pages + 1):
        print(f"Fetching page {page}...")
        url = f"{base_url}?query={search_query}&page={page}"
        response = requests.get(url, headers=headers)
        
        if response.status_code == 200:
            soup = BeautifulSoup(response.content, 'html.parser')
            advisory_elements = soup.find_all('div', class_='Box-row')

            for advisory in advisory_elements:
                title = advisory.find('a', class_='Link--primary').text.strip()
                details_url = advisory.find('a', class_='Link--primary')['href']
                severity = advisory.find('span', class_='Label').text.strip() if advisory.find('span', class_='Label') else None
                
                advisories.append({
                    "Title": title,
                    "URL": f"https://github.com{details_url}",
                    "Severity": severity
                })
        else:
            print(f"Failed to fetch page {page}: {response.status_code}")
        
        time.sleep(delay)  # Pause to avoid rate-limiting

    return advisories


def save_to_csv(advisories, output_file):
    """
    Saves the list of advisories to a CSV file.

    Parameters:
        advisories (list): List of advisories (dicts).
        output_file (str): Output file path.
    """
    df = pd.DataFrame(advisories)
    df.to_csv(output_file, index=False)
    print(f"Data saved to {output_file}!")


# Parameters
BASE_URL = "https://github.com/advisories"
SEARCH_QUERY = "type:reviewed ecosystem:pip"
OUTPUT_FILE = "github_advisories_full.csv"

# Scrape advisories
all_advisories = get_all_advisories(BASE_URL, SEARCH_QUERY, total_pages)

# Save results
save_to_csv(all_advisories, OUTPUT_FILE)


Fetching page 1...
Fetching page 2...
Fetching page 3...
Fetching page 4...
Fetching page 5...
Fetching page 6...
Fetching page 7...
Fetching page 8...
Fetching page 9...
Fetching page 10...
Fetching page 11...
Fetching page 12...
Fetching page 13...
Fetching page 14...
Fetching page 15...
Fetching page 16...
Fetching page 17...
Fetching page 18...
Fetching page 19...
Fetching page 20...
Fetching page 21...
Fetching page 22...
Fetching page 23...
Fetching page 24...
Fetching page 25...
Fetching page 26...
Fetching page 27...
Fetching page 28...
Fetching page 29...
Fetching page 30...
Fetching page 31...
Fetching page 32...
Fetching page 33...
Fetching page 34...
Fetching page 35...
Fetching page 36...
Fetching page 37...
Fetching page 38...
Fetching page 39...
Fetching page 40...
Fetching page 41...
Fetching page 42...
Fetching page 43...
Fetching page 44...
Fetching page 45...
Fetching page 46...
Fetching page 47...
Fetching page 48...
Fetching page 49...
Fetching page 50...
Fetching 

<div class="alert alert-block alert-info">
<font size=4 color="#36454F"><b>Code Block #3: Download the advisories</b></font><br><br>
At the start I was going to download the advisories, as they were, in HTML format. However, in reviewing the graphql API documentation I realized
there are only approximately 9 keys available (ghsaId, severity, summary, description, permalink, publisedAt, withdrawnAt, identifiers, and references) so I elected to download the advisories into separate JSON files separated into severity folders.

For security reasons, I used the .env file to store my token locally, not creating a plain text variable in the code. If I were using Visual Studio to sync the repository with between my local endpoint and a GitHub I would add a .gitignore file listing the .env so the key is not added to the repository.

I've worked in FedRAMP environments, AWS, Azure, and GCP FedRAMP environments are generally pretty secure. One vector of gaining access to a fed cloud environment insecure CI/CD using GitHub (and others) that contain files or code with credentials or tokens.
</div>

In [8]:
# CODE BLOCK #3

import os
import csv
import requests
import json
from dotenv import load_dotenv

# Load the .env file
load_dotenv()

# Retrieve GitHub token
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
if not GITHUB_TOKEN:
    raise Exception("GitHub token not found. Please set GITHUB_TOKEN in your .env file.")

# Directory for storing advisories
BASE_DIR = "advisories"
os.makedirs(BASE_DIR, exist_ok=True)

# Headers for API requests
HEADERS = {
    "Authorization": f"Bearer {GITHUB_TOKEN}",
    "Content-Type": "application/json"
}

# Function to extract `ghsaId` from the advisory URL
def extract_ghsa_id(url):
    return url.split("/")[-1]  # Extract the last part of the URL

# Function to fetch advisory details by `ghsaId`
def fetch_advisory(ghsa_id):
    query = f"""
    query {{
      securityAdvisory(ghsaId: "{ghsa_id}") {{
        ghsaId
        severity
        summary
        description
        permalink
        publishedAt
        withdrawnAt
        identifiers {{
          type
          value
        }}
      }}
    }}
    """
    response = requests.post(
        "https://api.github.com/graphql",
        json={"query": query},
        headers=HEADERS
    )
    if response.status_code == 200:
        return response.json()["data"]["securityAdvisory"]
    else:
        raise Exception(f"Failed to fetch advisory: {response.status_code}, {response.text}")

# Save advisory JSON to the appropriate directory based on severity
def save_advisory_by_severity(advisory):
    severity = advisory["severity"].lower()
    dir_path = os.path.join(BASE_DIR, severity)
    os.makedirs(dir_path, exist_ok=True)
    
    file_path = os.path.join(dir_path, f"{advisory['ghsaId']}.json")
    with open(file_path, "w") as file:
        json.dump(advisory, file, indent=4)
    print(f"Saved: {file_path}")

# Main function to process advisory URLs from CSV
def process_advisories_from_csv(csv_file):
    with open(csv_file, "r") as file:
        reader = csv.DictReader(file)
        for row in reader:
            url = row["URL"]  # Adjust column name if needed
            try:
                ghsa_id = extract_ghsa_id(url)
                advisory = fetch_advisory(ghsa_id)
                save_advisory_by_severity(advisory)
            except Exception as e:
                print(f"Error processing {url}: {e}")

# Path to the CSV file containing advisory URLs
csv_file_path = "github_advisories_full.csv"  # Update with your actual file path

# Process advisories from the CSV file
process_advisories_from_csv(csv_file_path)


Saved: advisories\high\GHSA-qrv3-jc3h-f3m6.json
Saved: advisories\moderate\GHSA-v342-4xr9-x3q3.json
Saved: advisories\moderate\GHSA-3hj6-r5c9-q8f3.json
Saved: advisories\critical\GHSA-mcrp-whpw-jp68.json
Saved: advisories\moderate\GHSA-969w-gqqr-g6j3.json
Saved: advisories\low\GHSA-4rj2-9gcx-5qhx.json
Saved: advisories\high\GHSA-fjcf-3j3r-78rp.json
Saved: advisories\critical\GHSA-gjxm-x497-4h6h.json
Saved: advisories\high\GHSA-j5qj-rg5j-j7c2.json
Saved: advisories\high\GHSA-fm93-g6xp-35xq.json
Saved: advisories\high\GHSA-879v-fggm-vxw2.json
Saved: advisories\moderate\GHSA-32g6-mg92-ghm2.json
Saved: advisories\moderate\GHSA-49m6-vrr9-2cqm.json
Saved: advisories\high\GHSA-g5pg-73fc-hjwq.json
Saved: advisories\high\GHSA-5ccf-884p-4jjq.json
Saved: advisories\critical\GHSA-747f-ww56-4q4h.json
Saved: advisories\high\GHSA-6gmf-2369-c76c.json
Saved: advisories\high\GHSA-hw8j-hw49-752c.json
Saved: advisories\high\GHSA-m76r-xqqj-mqmv.json
Saved: advisories\critical\GHSA-pgr7-mhp5-fgjp.json
Saved

<div class="alert alert-block alert-info">
<font size=4 color="#36454F"><b>Code Block #4: Merge all the JSON advisories into one to correlate with CISA KVE</b></font><br><br>
All of the Advisory JSON files in the /advisories/severity folders are merge into one JSON to make it easier to correlate with CISA JSON.
</div>

In [10]:
#CODE BLOCK #4

import os
import json

# Define the base advisories directory and output file
BASE_DIR = "advisories"
OUTPUT_FILE = "advisories_all.json"

# Subdirectories containing advisories categorized by severity
SEVERITY_DIRS = ["low", "moderate", "high", "critical"]

def merge_json_files(base_dir, severity_dirs, output_file):
    all_advisories = []  # List to store all advisories

    # Loop through each severity directory
    for severity in severity_dirs:
        severity_dir = os.path.join(base_dir, severity)

        if not os.path.exists(severity_dir):
            print(f"Directory not found: {severity_dir}")
            continue

        # Loop through all JSON files in the severity directory
        for file_name in os.listdir(severity_dir):
            if file_name.endswith(".json"):
                file_path = os.path.join(severity_dir, file_name)
                
                # Read and append the JSON content to the list
                try:
                    with open(file_path, "r", encoding="utf-8") as json_file:
                        advisory = json.load(json_file)
                        all_advisories.append(advisory)
                except Exception as e:
                    print(f"Error reading {file_path}: {e}")

    # Write all advisories to the output JSON file
    try:
        with open(output_file, "w", encoding="utf-8") as output_json:
            json.dump(all_advisories, output_json, indent=4)
        print(f"All advisories merged and saved to {output_file}")
    except Exception as e:
        print(f"Error writing to {output_file}: {e}")

# Run the function to merge JSON files
merge_json_files(BASE_DIR, SEVERITY_DIRS, OUTPUT_FILE)


All advisories merged and saved to advisories_all.json


<div class="alert alert-block alert-info">
<font size=4 color="#36454F"><b>Code Block #4: Download the CISA JSON File</b></font><br><br>
Download the CISA KVE as a JSON file.
</div>

In [12]:
import requests

# URL of the JSON file
url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"

# Path to save the downloaded JSON file
output_file = "known_exploited_vulnerabilities.json"

def download_json(url, output_file):
    try:
        # Send a GET request to the URL
        response = requests.get(url)
        response.raise_for_status()  # Raise an exception for HTTP errors
        
        # Save the content to a local file
        with open(output_file, "w", encoding="utf-8") as file:
            file.write(response.text)
        
        print(f"JSON file successfully downloaded and saved as '{output_file}'.")
    except requests.exceptions.RequestException as e:
        print(f"Failed to download the JSON file: {e}")

# Call the function to download the JSON
download_json(url, output_file)


JSON file successfully downloaded and saved as 'known_exploited_vulnerabilities.json'.


<div class="alert alert-block alert-info">
<font size=4 color="#36454F"><b>Code Block #5: Create the extra credit CVE</b></font><br><br>
This code block compares the advisories advisories_all.json file identfiers type CVE value with the cveID value in the known_exploited_vulnerabilities.json and if there is a match it sets a value of 1 in the corresponding KVE column, if not the cell value is
NULL. For key information I used the values of "GHSAID", "SEVERITY", "SUMMARY", "DESCRIPTION", "PUBLISHED".
</div>

In [14]:
#CODE BLOCK #5

import json
import csv

# Load JSON data
with open('advisories_all.json', 'r') as f:
    advisories = json.load(f)

with open('known_exploited_vulnerabilities.json', 'r') as f:
    kev_data = json.load(f)

# Create a set of all CVE IDs in KEV catalog
kev_cve_ids = {vuln["cveID"] for vuln in kev_data["vulnerabilities"]}

# Extract advisory info with KEV comparison
rows = []
for advisory in advisories:
    ghsa_id = advisory.get("ghsaId")
    severity = advisory.get("severity")
    summary = advisory.get("summary")
    published = advisory.get("publishedAt")
    
    # Find CVE identifiers
    cve_ids = [id_["value"] for id_ in advisory.get("identifiers", []) if id_["type"] == "CVE"]
    kev_flag = 1 if any(cve in kev_cve_ids for cve in cve_ids) else None

    rows.append({
        "GHSAID": ghsa_id,
        "SEVERITY": severity,
        "SUMMARY": summary,
        "PUBLISHED": published,
        "KEV": kev_flag
    })

# Write to CSV
output_file = "advisory-kev-correlation.csv"
with open(output_file, "w", newline='', encoding='utf-8') as f:
    writer = csv.DictWriter(f, fieldnames=["GHSAID", "SEVERITY", "SUMMARY", "PUBLISHED", "KEV"])
    writer.writeheader()
    writer.writerows(rows)

print(f"CSV file created: {output_file}")


CSV file created: advisory-kev-correlation.csv


<div class="alert alert-block alert-info">
<font size=4 color="#36454F"><b>Code Block #6: Zip it!</b></font><br><br>
Create a zip file by severity as requested. The zip file will be in the /advisories/severity folders.
</div>

In [3]:
# CODE BLOCK #6

import zipfile
import os

# Advisory levels
levels = ['critical', 'high', 'moderate', 'low']

for level in levels:
    folder_path = os.path.join('advisories', level)
    zip_path = os.path.join(folder_path, f'{level}.zip')

    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, _, files in os.walk(folder_path):
            for file in files:
                if file != f'{level}.zip':  # Avoid recursion
                    file_path = os.path.join(root, file)
                    arcname = os.path.relpath(file_path, folder_path)
                    zipf.write(file_path, arcname)

print("All ZIP files created successfully.")

All ZIP files created successfully.
