# Code to find missing GH repos for advisories that are missing

* Requires you to run ecosystem_identify_missing_vfc_adv.ipynb
    * Generates: ./data/final_data/advisories_missing_GH_repo_20221204.csv
    * Prior cell to last cell in notebook

In [None]:
import pandas as pd
import requests
import re
import time
import glob
import json
from xml.etree import ElementTree
from bs4 import BeautifulSoup

# Script to load current GHSA DB

* Make sure you have the GHSA DB locally cloned
    * set the ghsa_db_path to your cloned location

In [None]:
ghsa_db_path = f"../advisory-database/advisories/github-reviewed/"

# get all JSON files
ghsa_files = glob.glob(f"{ghsa_db_path}*/*/*/*.json")

df_ghsa = pd.DataFrame()

# load all json files to obtain the "PACKAGE" within the references list
for index, file in enumerate(ghsa_files):
    # placeholders for GHSA Info
    id = None
    package_ecosystem = None
    package_name = None
    references_package = None
    
    with open(file, 'r') as f:
        # load JSON
        temp_file = json.load(f)
        
        # set GHSA info
        id = temp_file['id']
        package_ecosystem = temp_file["affected"][0]["package"]["ecosystem"]
        package_name = temp_file["affected"][0]["package"]["name"]
        
        # check each reference for the package (source code) url
        for temp_ref in temp_file["references"]:
            if temp_ref["type"] == "PACKAGE":
                references_package = temp_ref["url"]
        
        # append to df_ghsa
        df_ghsa = pd.concat([df_ghsa, pd.DataFrame([[id, package_ecosystem,
                                                     package_name, references_package]],
                                                   columns=["id", "package_ecosystem",
                                                            "package_name", "references_package"])])
    
        f.close()


In [None]:
df_ghsa.head(n=10)

In [None]:
# create a df for advisories with missing source code link
ghsa_missing_GH_repo = df_ghsa[df_ghsa['references_package'].isna()]

print(f"Unique GHSA Advisories loaded: {df_ghsa.id.nunique()}")
print(f"GHSA without a package (source code) link: {ghsa_missing_GH_repo.id.nunique()}\n")

# create a breakdown of ecosystem info
breakdowns = df_ghsa.package_ecosystem.value_counts().to_frame().reset_index(drop=False)
breakdowns.columns = ["ecosystem", "ghsa_count"]
breakdowns_missing = ghsa_missing_GH_repo.package_ecosystem.value_counts().to_frame().reset_index(drop=False)
breakdowns_missing.columns = ["ecosystem", "source_link_missing_count"]

breakdowns = pd.merge(breakdowns, breakdowns_missing,
                      on=["ecosystem"],
                      how="left")

breakdowns = breakdowns.fillna(0)

breakdowns["percent_missing_source_link"] = breakdowns.apply(
      lambda x: f"{round(100*x['source_link_missing_count']/x['ghsa_count'], 2)}%",
      axis=1
)

breakdowns["missing_complete"] = breakdowns.apply(
      lambda x: f"{x['source_link_missing_count']} ({x['percent_missing_source_link']})",
      axis=1
)

breakdowns.head(n=15)

# MAVEN Finds

* API Guide from SonaType
    * https://central.sonatype.org/search/rest-api-guide/

In [None]:
def maven_pom_scm_check(groupId, artifactId, latest_version):
    """Check for SCM in Maven POM file

    Args:
        groupId (_type_): _description_
        artifactId (_type_): _description_
        latest_version (_type_): _description_

    Returns:
        _type_: _description_
    """
    # generate pom xml filepath from Maven search API
    url = f"https://search.maven.org/remotecontent?filepath={'/'.join(groupId.split('.'))}/{artifactId}/{latest_version}/{artifactId}-{latest_version}.pom"
    response = requests.get(url)
    # response.close()
    
    # holder value for SCM repo
    scm_repo = None
    
    try:
        # parse the XML response
        tree = ElementTree.fromstring(response.content)
                
        # iterate through children in tree
        for child in tree.findall('*'):
            # if SCM appear then set the scm_repo
            if 'scm' in child.tag:
                # find the child tag
                scm_tags = tree.findall(child.tag)
                # for each child tag
                for scm_tags_repo in scm_tags:
                    # find all repos in scm child tag
                    scm_repos = scm_tags_repo.findall('*')
                    # pull the text tag for the scm tag
                    for repos in scm_repos:
                        scm_repo = repos.text
                        
        response.close()
        return scm_repo   
    
    except Exception as e:
        print(f"Failure in file request: {url} | {str(e)}")
    

In [None]:
def maven_search(package_name, id):
    """Search for the Maven project name to see if it exists

    Args:
        package_name (str): Target package

    Returns:
        str: Repo link
    """
    # parse groupId/artifactId from package name
    groupId = package_name.split(':')[0]
    artifactId = package_name.split(':')[1]
    
    # set url
    url = f"https://search.maven.org/solrsearch/select?q={groupId}+AND+a:{artifactId}&rows=10&wt=json"
    
    response = requests.get(url)
    response.close()
    
    if response.status_code == 200:
        temp = response.json()
        if temp["response"]["numFound"] >= 1:
            # print(temp["response"]["numFound"])
            # obtain the latestVersion so we can obtain the pom.xml file
            for temp_response in temp['response']['docs']:
                # make sure the id matches the package_name for the search
                if temp_response['id'] == package_name:
                    latestVersion = temp_response['latestVersion']
    
                    # get pom.xml file
                    temp_scm_repo = maven_pom_scm_check(groupId, artifactId, latestVersion)
                    
                    # print(f"{package_name} | repo={temp_scm_repo}")
                    
                    return temp_scm_repo
        else:
            print(f"{package_name} | 0 matches in search")
    else:
        print(f"{url} | Non-200 response")
        return None
    
    

In [None]:
maven_missing = ghsa_missing_GH_repo[ghsa_missing_GH_repo['package_ecosystem']=="Maven"]

maven_missing = maven_missing.drop_duplicates().reset_index(drop=True)

print(f"Missing Maven: {maven_missing.id.nunique()}")

maven_missing['temp_repo'] = maven_missing.apply(
    lambda x: maven_search(x['package_name'], x['id']),
    axis=1
)

In [None]:
# check if github is in link
maven_missing["github_repo"] = maven_missing.apply(
    lambda x: x['temp_repo'] if 'github.com' in str(x['temp_repo']) else None,
    axis=1
)

# cleaning the scm links
maven_missing["github_repo"] = maven_missing.apply(
    lambda x: f"https://github.com/{x['github_repo'].split('@github.com:')[-1]}" if '@github.com:' in str(x['github_repo']) else x['github_repo'],
    axis=1
)

# remove .git from link
maven_missing["github_repo"] = maven_missing.apply(
    lambda x: x['github_repo'].replace('.git', '') if x['github_repo'] != None else None,
    axis=1
)


print(f"Found GH Repo: {maven_missing[maven_missing['github_repo'].notna()].id.nunique()}")
print(f"No GH Repo: {maven_missing[maven_missing['github_repo'].isna()].id.nunique()}\n")

# save data to a CSV
maven_missing_final = maven_missing[["id","package_ecosystem",
                                 "package_name", "github_repo"]].drop_duplicates()

# when saving data only keep the ones we found a GH repo for
maven_missing_final = maven_missing_final[maven_missing_final['github_repo'].notna()].reset_index(drop=True)

print(f"Saving info for {maven_missing_final.id.nunique()} Maven GHSAs | ./missing_GH_info/maven_20230111.csv")

maven_missing_final.to_csv(f"./missing_GH_info/maven_20230111.csv", encoding='utf-8', index=False)

# NPM Finds

* We can directly hit the NPM registry at https://registry.npmjs.org/{package_name}
    * The response is a JSON that we can parse for the repository url

In [None]:
def npm_registry_source_code(package_name: str) -> str:
    """Obtain the repository git link

    Args:
        package_name (str): Target package name

    Returns:
        str: Git repo link
    """
    # set URL
    url = f"https://registry.npmjs.org/{package_name}"
    
    response = requests.get(url)
    response.close()
    
    if response.status_code == 200:
        temp = response.json()
        try:
            repo = temp['repository']['url']
            repo_match = re.findall("(?=github.com\/)(.*)(\/)(.*)", repo.strip('.git'))
            if len(repo_match) > 0:
                repo_clean = ''.join(repo_match[0])
                repo_clean = f"https://{repo_clean}"
                return repo_clean
        except Exception as e:
            # print(f"{url} | {str(e)}")
            return None
    else:
        print(f"{url} | Non-200 response")
        return None


In [None]:
npm_missing = ghsa_missing_GH_repo[ghsa_missing_GH_repo['package_ecosystem']=="npm"]

npm_missing = npm_missing.drop_duplicates().reset_index(drop=True)

print(f"Missing NPM: {npm_missing.id.nunique()}")

npm_missing['temp_repo'] = npm_missing.apply(
    lambda x: npm_registry_source_code(x['package_name']),
    axis=1
)

In [None]:
# with NPM, malicious packages have been removed and replaced with "https://github.com/npm/security-holder"
print(f"Setting {npm_missing[npm_missing['temp_repo']=='https://github.com/npm/security-holder'].id.nunique()}"
      f" GHSAs to None due to security-holder from NPM\n")

npm_missing["github_repo"] = npm_missing.apply(
    lambda x: None if x['temp_repo'] == "https://github.com/npm/security-holder" else x['temp_repo'],
    axis=1
)

print(f"Found GH Repo for NPM GHSAs: {npm_missing[npm_missing['github_repo'].notna()].id.nunique()}")
print(f"No GH Repo for NPM GHSAs: {npm_missing[npm_missing['github_repo'].isna()].id.nunique()}\n")

# save data to a CSV
npm_missing_final = npm_missing[["id","package_ecosystem",
                                 "package_name", "github_repo"]].drop_duplicates()

# when saving data only keep the ones we found a GH repo for
npm_missing_final = npm_missing_final[npm_missing_final['github_repo'].notna()].reset_index(drop=True)

print(f"Saving info for {npm_missing_final.id.nunique()} NPM GHSAs | ./missing_GH_info/npm_20230111.csv")

npm_missing_final.to_csv(f"./missing_GH_info/npm_20230111.csv", encoding='utf-8', index=False)

# PyPI Finds

In [None]:
def get_soup(url):
    response = requests.get(url=url)  # request response from url
    soup = BeautifulSoup(response.content, 'html.parser')
    return soup


class GetPackageProjectLinks():
    def __init__(self, package_name):
        self.package_name = package_name
        self.home_page = None
        self.bug_tracker = None
        self.documentation = None
        self.source_code = None
        self.url = f"https://pypi.org/project/{self.package_name}/"

        soup = get_soup(self.url)
        table = soup.find_all("div", attrs={"class": "sidebar-section"})

        for each in table:
            for row in each.find_all("a"):
                if row.text.strip() == "Homepage":
                    self.home_page = row.get("href")
                if row.text.strip() == "Bug Tracker":
                    self.bug_tracker = row.get("href")
                if row.text.strip() == "Documentation":
                    self.documentation = row.get("href")
                if row.text.strip() == "Source":
                    self.source_code = row.get("href")

In [None]:
pypi_missing = ghsa_missing_GH_repo[ghsa_missing_GH_repo['package_ecosystem']=="PyPI"]

pypi_missing = pypi_missing.drop_duplicates().reset_index(drop=True)

print(f"Missing PyPI: {pypi_missing.id.nunique()}")

# get all potential links
pypi_missing["links"] = pypi_missing.apply(
    lambda x: GetPackageProjectLinks(package_name=x['package_name']),
    axis=1
)

# set source code
pypi_missing["source_code"] = pypi_missing.apply(
    lambda x: x['links'].source_code,
    axis=1
)

# set home page link
pypi_missing["home_page"] = pypi_missing.apply(
    lambda x: x['links'].home_page,
    axis=1
)

# check if github url in either source_code or home_page
pypi_missing["github_repo"] = pypi_missing.apply(
    lambda x: x['source_code'] if 'github.com/' in str(x['source_code']) else None,
    axis=1
)
# check home_page for github link
pypi_missing["github_repo"] = pypi_missing.apply(
    lambda x: x['home_page'] if 'github.com/' in str(x['home_page']) else x['github_repo'],
    axis=1
)

In [None]:
print(f"Found GH Repo for PyPI GHSAs: {pypi_missing[pypi_missing['github_repo'].notna()].id.nunique()}")
print(f"No GH Repo for PyPI GHSAs: {pypi_missing[pypi_missing['github_repo'].isna()].id.nunique()}\n")

# save data to a CSV
pypi_missing_final = pypi_missing[["id","package_ecosystem",
                                 "package_name", "github_repo"]].drop_duplicates()

# when saving data only keep the ones we found a GH repo for
pypi_missing_final = pypi_missing_final[pypi_missing_final['github_repo'].notna()].reset_index(drop=True)

print(f"Saving info for {pypi_missing_final.id.nunique()} PyPI GHSAs | ./missing_GH_info/pypi_20230111.csv")

pypi_missing_final.to_csv(f"./missing_GH_info/pypi_20230111.csv", encoding='utf-8', index=False)

# Packagist Finds

In [None]:
def get_soup(url):
    response = requests.get(url=url)  # request response from url
    soup = BeautifulSoup(response.content, 'html.parser')
    return soup


class PackagistProjectLinks():
    def __init__(self, package_name):
        self.package_name = package_name
        self.canonical_repo = None
        self.home_page = None
        self.source_code = None
        self.issues = None
        self.url = f"https://packagist.org/packages/{self.package_name}"

        soup = get_soup(self.url)
        table = soup.find_all("div", attrs={"class": "row package-aside"})

        for each in table:
            for row in each.find_all("a"):
                if row.get("title") == "Canonical Repository URL":
                    self.canonical_repo = row.text.strip()
                if row.text.strip() == "Homepage":
                    self.home_page = row.get("href")
                if row.text.strip() == "Source":
                    self.source_code = row.get("href")
                if row.text.strip() == "Issues":
                    self.issues = row.get("href")
                

In [None]:
packagist_missing = ghsa_missing_GH_repo[ghsa_missing_GH_repo['package_ecosystem']=="Packagist"]

packagist_missing = packagist_missing.drop_duplicates().reset_index(drop=True)

print(f"Missing Packagist: {packagist_missing.id.nunique()}")

# get links for Packagist Repo
packagist_missing["links"] = packagist_missing.apply(
    lambda x: PackagistProjectLinks(package_name=x['package_name']),
    axis=1
)

# set source code
packagist_missing["source_code"] = packagist_missing.apply(
    lambda x: x['links'].source_code,
    axis=1
)

# set home page link
packagist_missing["home_page"] = packagist_missing.apply(
    lambda x: x['links'].home_page,
    axis=1
)

# set canonical_repo link
packagist_missing["canonical_repo"] = packagist_missing.apply(
    lambda x: x['links'].canonical_repo,
    axis=1
)

# check if github url in either source_code or home_page
packagist_missing["github_repo"] = packagist_missing.apply(
    lambda x: x['source_code'] if 'github.com/' in str(x['source_code']) else None,
    axis=1
)
# check home_page for github link
packagist_missing["github_repo"] = packagist_missing.apply(
    lambda x: x['home_page'] if 'github.com/' in str(x['home_page']) else x['github_repo'],
    axis=1
)

# check home_page for github link
packagist_missing["github_repo"] = packagist_missing.apply(
    lambda x: x['canonical_repo'] if 'github.com/' in str(x['canonical_repo']) else x['github_repo'],
    axis=1
)

# add https:// to link
packagist_missing["github_repo"] = packagist_missing.apply(
    lambda x: x['github_repo'].replace('github.com', 'https://github.com') if 'github.com/' in str(x['canonical_repo']) else x['github_repo'],
    axis=1
)

In [None]:
print(f"Found GH Repo for Packagist GHSAs: {packagist_missing[packagist_missing['github_repo'].notna()].id.nunique()}")
print(f"No GH Repo for Packagist GHSAs: {packagist_missing[packagist_missing['github_repo'].isna()].id.nunique()}\n")

# save data to a CSV
packagist_missing_final = packagist_missing[["id","package_ecosystem",
                                             "package_name", "github_repo"]].drop_duplicates()

# when saving data only keep the ones we found a GH repo for
packagist_missing_final = packagist_missing_final[packagist_missing_final['github_repo'].notna()].reset_index(drop=True)

print(f"Saving info for {packagist_missing_final.id.nunique()} Packagist GHSAs | ./missing_GH_info/packagist_20230111.csv")

packagist_missing_final.to_csv(f"./missing_GH_info/packagist_20230111.csv", encoding='utf-8', index=False)

# RubyGems Finds

In [None]:
def get_soup(url):
    headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
    response = requests.get(url=url, headers=headers)  # request response from url
    soup = BeautifulSoup(response.content, 'html.parser')
    return soup


class RubyGemsProjectLinks():
    def __init__(self, package_name):
        self.package_name = package_name
        self.home_page = None
        self.source_code = None
        self.issues = None
        self.url = f"https://rubygems.org/gems/{self.package_name}"

        soup = get_soup(self.url)
        table = soup.find_all("div", attrs={"class": "gem__aside l-col--r--pad"})

        for each in table:
            for row in each.find_all("a"):
                if row.text.strip() == "Homepage":
                    self.home_page = row.get("href")
                    if 'github.com/' in str(self.home_page):
                        # some link cleaning
                        temp_home = self.home_page.split("github.com/")[-1].split('/')[:2]
                        self.home_page = f"https://github.com/{'/'.join(temp_home)}"
                        
                if row.text.strip() == "Source Code":
                    self.source_code = row.get("href")
                    if 'github.com/' in str(self.source_code):
                        # some link cleaning
                        temp_source = self.source_code.split("github.com/")[-1].split('/')[:2]
                        self.source_code = f"https://github.com/{'/'.join(temp_source)}"
                        
                if row.text.strip() == "Bug Tracker":
                    self.issues = row.get("href")
                    if 'github.com/' in str(self.issues):
                        # some link cleaning
                        temp_issues = self.issues.split("github.com/")[-1].split('/')[:2]
                        self.issues = f"https://github.com/{'/'.join(temp_issues)}"
    

In [None]:
rubygems_missing = ghsa_missing_GH_repo[ghsa_missing_GH_repo['package_ecosystem']=="RubyGems"]

rubygems_missing = rubygems_missing.drop_duplicates().reset_index(drop=True)

print(f"Missing RubyGems: {rubygems_missing.id.nunique()}")

# set links
rubygems_missing["links"] = rubygems_missing.apply(
    lambda x: RubyGemsProjectLinks(package_name=x['package_name']),
    axis=1
)

# set source code
rubygems_missing["source_code"] = rubygems_missing.apply(
    lambda x: x['links'].source_code,
    axis=1
)

# set home page link
rubygems_missing["home_page"] = rubygems_missing.apply(
    lambda x: x['links'].home_page,
    axis=1
)

# set issues link
rubygems_missing["issues"] = rubygems_missing.apply(
    lambda x: x['links'].issues,
    axis=1
)

# check home_page for github link
rubygems_missing["github_repo"] = rubygems_missing.apply(
    lambda x: x['home_page'] if 'github.com/' in str(x['home_page']) else None,
    axis=1
)

# check home_page for github link
rubygems_missing["github_repo"] = rubygems_missing.apply(
    lambda x: x['issues'] if 'github.com/' in str(x['issues']) else x['github_repo'],
    axis=1
)

# check if github url in either source_code or home_page
rubygems_missing["github_repo"] = rubygems_missing.apply(
    lambda x: x['source_code'] if 'github.com/' in str(x['source_code']) else x['github_repo'],
    axis=1
)

In [None]:
print(f"Found GH Repo for RubyGems GHSAs: {rubygems_missing[rubygems_missing['github_repo'].notna()].id.nunique()}")
print(f"No GH Repo for RubyGems GHSAs: {rubygems_missing[rubygems_missing['github_repo'].isna()].id.nunique()}\n")

# save data to a CSV
rubygems_missing_final= rubygems_missing[["id","package_ecosystem",
                                          "package_name", "github_repo"]].drop_duplicates()

# when saving data only keep the ones we found a GH repo for
rubygems_missing_final = rubygems_missing_final[rubygems_missing_final['github_repo'].notna()].reset_index(drop=True)

print(f"Saving info for {rubygems_missing_final.id.nunique()} Packagist GHSAs | ./missing_GH_info/rubygems_20230111.csv")

rubygems_missing_final.to_csv(f"./missing_GH_info/rubygems_20230111.csv", encoding='utf-8', index=False)

# NuGet Finds

In [None]:
def get_soup(url):
    headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
    response = requests.get(url=url, headers=headers)  # request response from url
    soup = BeautifulSoup(response.content, 'html.parser')
    return soup


class NuGetProjectLinks():
    def __init__(self, package_name):
        self.package_name = package_name
        self.home_page = None
        self.source_code = None
        self.url = f"https://www.nuget.org/packages/{self.package_name}"

        soup = get_soup(self.url)
        
        if "Rate limit is exceeded" in soup.text:
            print(soup.text)
            print(f"Sleeping for 60 seconds...")
            time.sleep(60)
            # get soup again
            soup = get_soup(self.url)
            
        table = soup.find_all("div", attrs={"class": "sidebar-section"})
        
        for each in table:
            for row in each.find_all("a"):
                if row.text.strip() == "Project website":
                    self.home_page = row.get("href")
                        
                if row.text.strip() == "Source repository":
                    self.source_code = row.get("href")

In [None]:
nuget_missing = ghsa_missing_GH_repo[ghsa_missing_GH_repo['package_ecosystem']=="NuGet"]

nuget_missing = nuget_missing.drop_duplicates().reset_index(drop=True)

print(f"Missing Nuget: {nuget_missing.id.nunique()}")

# set package links
nuget_missing["links"] = nuget_missing.apply(
    lambda x: NuGetProjectLinks(package_name=x['package_name']),
    axis=1
)

# set source code
nuget_missing["source_code"] = nuget_missing.apply(
    lambda x: x['links'].source_code,
    axis=1
)

# set home page link
nuget_missing["home_page"] = nuget_missing.apply(
    lambda x: x['links'].home_page,
    axis=1
)

# check home_page for github link
nuget_missing["github_repo"] = nuget_missing.apply(
    lambda x: x['home_page'] if 'github.com/' in str(x['home_page']) else None,
    axis=1
)

# check if github url in either source_code or home_page
nuget_missing["github_repo"] = nuget_missing.apply(
    lambda x: x['source_code'] if 'github.com/' in str(x['source_code']) else x['github_repo'],
    axis=1
)

In [None]:
print(f"Found GH Repo for Nuget GHSAs: {nuget_missing[nuget_missing['github_repo'].notna()].id.nunique()}")
print(f"No GH Repo for RubyGems GHSAs: {nuget_missing[nuget_missing['github_repo'].isna()].id.nunique()}\n")

# save data to a CSV
nuget_missing_final= nuget_missing[["id","package_ecosystem",
                                    "package_name", "github_repo"]].drop_duplicates()

# when saving data only keep the ones we found a GH repo for
nuget_missing_final = nuget_missing_final[nuget_missing_final['github_repo'].notna()].reset_index(drop=True)

print(f"Saving info for {nuget_missing_final.id.nunique()} Nuget GHSAs | ./missing_GH_info/nuget_20230111.csv")

nuget_missing_final.to_csv(f"./missing_GH_info/nuget_20230111.csv", encoding='utf-8', index=False)

# Go Finds

In [None]:
def get_soup(url):
    headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
    response = requests.get(url=url, headers=headers)  # request response from url
    soup = BeautifulSoup(response.content, 'html.parser')
    return soup


class GoProjectLinks():
    def __init__(self, package_name):
        self.package_name = package_name
        self.source_code = None
        self.url = f"https://pkg.go.dev/{self.package_name}"

        soup = get_soup(self.url)
            
        table = soup.find_all("div", attrs={"class": "UnitMeta-repo"})
        
        for each in table:
            for row in each.find_all("a"):
                self.source_code = row.get("href")

In [None]:
go_missing = ghsa_missing_GH_repo[ghsa_missing_GH_repo['package_ecosystem']=="Go"]

go_missing = go_missing.drop_duplicates().reset_index(drop=True)

print(f"Missing Go: {go_missing.id.nunique()}")

# get links for packages
go_missing["links"] = go_missing.apply(
    lambda x: GoProjectLinks(package_name=x['package_name']),
    axis=1
)

# set source code
go_missing["source_code"] = go_missing.apply(
    lambda x: x['links'].source_code,
    axis=1
)

# check if github url in either source_code or home_page
go_missing["github_repo"] = go_missing.apply(
    lambda x: x['source_code'] if 'github.com/' in str(x['source_code']) else None,
    axis=1
)

In [None]:
print(f"Found GH Repo for Go GHSAs: {go_missing[go_missing['github_repo'].notna()].id.nunique()}")
print(f"No GH Repo for Go GHSAs: {go_missing[go_missing['github_repo'].isna()].id.nunique()}\n")

# save data to a CSV
go_missing_final= go_missing[["id","package_ecosystem",
                                    "package_name", "github_repo"]].drop_duplicates()

# when saving data only keep the ones we found a GH repo for
go_missing_final = go_missing_final[go_missing_final['github_repo'].notna()].reset_index(drop=True)

print(f"Saving info for {go_missing_final.id.nunique()} Go GHSAs | ./missing_GH_info/go_20230111.csv")

go_missing_final.to_csv(f"./missing_GH_info/go_20230111.csv", encoding='utf-8', index=False)

# Crates.IO Finds

In [None]:
class CratesProjectLinks():
    def __init__(self, package_name):
        self.package_name = package_name
        self.source_code = None
        
        # we can use a direct API here
        self.url = f"https://crates.io/api/v1/crates/{self.package_name}"

        response = requests.get(url=self.url)
        
        if response.status_code == 200:
            self.source_code = response.json()["crate"]["repository"] 

In [None]:
crates_missing = ghsa_missing_GH_repo[ghsa_missing_GH_repo['package_ecosystem']=="crates.io"]

crates_missing = crates_missing.drop_duplicates().reset_index(drop=True)

print(f"Missing Crates: {crates_missing.id.nunique()}")

crates_missing["links"] = crates_missing.apply(
    lambda x: CratesProjectLinks(package_name=x['package_name']),
    axis=1
)

# set source code
crates_missing["source_code"] = crates_missing.apply(
    lambda x: x['links'].source_code,
    axis=1
)

# check if github url in either source_code or home_page
crates_missing["github_repo"] = crates_missing.apply(
    lambda x: x['source_code'] if 'github.com/' in str(x['source_code']) else None,
    axis=1
)

In [None]:
print(f"Found GH Repo for Go GHSAs: {crates_missing[crates_missing['github_repo'].notna()].id.nunique()}")
print(f"No GH Repo for Go GHSAs: {crates_missing[crates_missing['github_repo'].isna()].id.nunique()}\n")

# save data to a CSV
crates_missing_final= crates_missing[["id","package_ecosystem",
                                    "package_name", "github_repo"]].drop_duplicates()

# when saving data only keep the ones we found a GH repo for
crates_missing_final = crates_missing_final[crates_missing_final['github_repo'].notna()].reset_index(drop=True)

print(f"Saving info for {crates_missing_final.id.nunique()} Crates.io GHSAs | ./missing_GH_info/crates_20230111.csv")

crates_missing_final.to_csv(f"./missing_GH_info/crates_20230111.csv", encoding='utf-8', index=False)

# Combine all to a single dataframe

In [None]:
maven_missing_final = pd.read_csv(f"./missing_GH_info/maven_20230111.csv")
npm_missing_final = pd.read_csv(f"./missing_GH_info/npm_20230111.csv")
pypi_missing_final = pd.read_csv(f"./missing_GH_info/pypi_20230111.csv")
packagist_missing_final = pd.read_csv(f"./missing_GH_info/packagist_20230111.csv")
rubygems_missing_final = pd.read_csv(f"./missing_GH_info/rubygems_20230111.csv")
go_missing_final = pd.read_csv(f"./missing_GH_info/go_20230111.csv")
nuget_missing_final = pd.read_csv(f"./missing_GH_info/nuget_20230111.csv")
crates_missing_final = pd.read_csv(f"./missing_GH_info/crates_20230111.csv")

In [None]:
final_missing = maven_missing_final
final_missing = pd.concat([final_missing, npm_missing_final])
final_missing = pd.concat([final_missing, pypi_missing_final])
final_missing = pd.concat([final_missing, packagist_missing_final])
final_missing = pd.concat([final_missing, rubygems_missing_final])
final_missing = pd.concat([final_missing, go_missing_final])
final_missing = pd.concat([final_missing, nuget_missing_final])
final_missing = pd.concat([final_missing, crates_missing_final])

print(f"Final Found: {final_missing.id.nunique()}\n")

In [None]:
# create a breakdown of ecosystem info
breakdowns_missing = ghsa_missing_GH_repo.package_ecosystem.value_counts().to_frame().reset_index(drop=False)
breakdowns_missing.columns = ["ecosystem", "source_link_missing_count"]

breakdowns_missing_found = final_missing.package_ecosystem.value_counts().to_frame().reset_index(drop=False)
breakdowns_missing_found.columns = ["ecosystem", "source_link_found_count"]

breakdowns_found = pd.merge(breakdowns_missing, breakdowns_missing_found,
                      on=["ecosystem"],
                      how="left")

breakdowns_found = breakdowns_found.fillna(0)

breakdowns_found["percent_found_source_link"] = breakdowns_found.apply(
      lambda x: f"{round(100*x['source_link_found_count']/x['source_link_missing_count'], 2)}%",
      axis=1
)

breakdowns_found["missing_found"] = breakdowns_found.apply(
      lambda x: f"{x['source_link_found_count']} ({x['percent_found_source_link']})",
      axis=1
)

breakdowns_found.head(n=15)