In [None]:
import os
from bs4 import BeautifulSoup

def get_repo_link(link):
    try:
        link_args = link.split('/')
        author = link_args[3]
        repo_name = link_args[4]
        return f'https://github.com/{author}/{repo_name}'
    except:
        return None

def extract_github_links(html_content):
    soup = BeautifulSoup(html_content, 'html.parser')
    github_links = set()
    
    # Find all <a> tags
    for a_tag in soup.find_all('a', href=True):
        href = a_tag['href']
        if 'https://github.com' in href and 'github.com/snyk' not in href.lower():
            repo_link = get_repo_link(href)
            if repo_link:
                github_links.add(repo_link)
    
    return github_links

def process_saved_html_files(directory):
    all_github_links = {}

    for filename in os.listdir(directory):
        if filename.endswith('.html'):
            filepath = os.path.join(directory, filename)
            with open(filepath, 'r', encoding='utf-8') as file:
                html_content = file.read()
            
            # Extract GitHub links
            github_links = extract_github_links(html_content)
            
            if github_links:
                vuln_id = filename.split('.')[0]  # Get the vulnerability ID from the filename
                all_github_links[vuln_id] = github_links
    
    return all_github_links

# Directory where the saved HTML files are located
html_files_directory = 'snyk_crawls/htmls/'

# Extract all GitHub links from the HTML files
github_links_by_vuln = process_saved_html_files(html_files_directory)

# Print the results
for vuln_id, links in github_links_by_vuln.items():
    if len(links) > 1:
        print(vuln_id)
    # print(f"Vulnerability ID: {vuln_id}")
    # for link in links:
    #     print(f"  GitHub Link: {link}")


In [None]:
import pickle
with open('snyk_data_git_urls.pickle', 'wb') as pickle_file:
        pickle.dump(github_links_by_vuln, pickle_file)

In [None]:
from bs4 import BeautifulSoup

def get_vuln_rows_disclosed(html_content):
    # Parse the HTML
    soup = BeautifulSoup(html_content, 'html.parser')

    # Array to store extracted vulnerabilities
    vulnerabilities = []

    # Find all <li> elements corresponding to vulnerabilities
    vuln_items = soup.find_all('li', {'data-snyk-test': 'ProprietaryVulns: item'})

    # Extract the required data for each item
    for item in vuln_items:
        severity = item.find('span', class_='vue--severity__label').text.strip()
        _type = item.find('a').text.strip()
        link = item.find('a')['href']
        _id = link.split('/')[-1]
        description = item.find('p').text.strip().split(' in ')[-1].strip()[:-2]
        pkg = description.split()[0]
        platform = description.split()[1][1:-1]
        publication_date = item.find('div', {'data-snyk-test': 'ProprietaryVulns: publicationTime'}).text.strip().split('\\n')[1].strip()


        vuln = [_id, _type, pkg, 'NA', platform, publication_date]
        # Append the extracted data as a dictionary to the list
        vulnerabilities.append(vuln)

    # # Output the extracted vulnerabilities
    # for vuln in vulnerabilities:
    #     print(vuln)
    return vulnerabilities


html = str(curl('https://security.snyk.io/disclosed-vulnerabilities/8'))
rows = get_vuln_rows_disclosed(html)
print(rows)

In [None]:

from html.parser import HTMLParser

import json
import urllib

def curl(url):
    req = urllib.request.Request(url)
    response = urllib.request.urlopen(req)
    data = response.read()
    return data



class SnykNPM(HTMLParser):
    def __init__(self) -> None:

        self.counter = 0

        self.row_complete = False

        self.vulns = []
        self.packages = []
        self.dates = []
        self.vuln_flag = False
        self.package_flag = False
        self.date_flag = False
        
        self.table_row_flag = False
        self.rows = [[]]

        self.tag = None

        super().__init__()

    def handle_starttag(self, tag, attrs):
        self.tag = tag
        self.attrs = attrs

        if tag != 'tr':
            # self.table_row_flag = False
            return

        for attr in attrs:
            if attr[0] == 'class':
                # print(attr)
                if 'vue--table__row' == attr[1]:
                    self.table_row_flag = True
                else:
                    self.table_row_flag = False


    def handle_data(self, data):
        if not self.table_row_flag:
            return

        if len(data) < 2:
            return
        if not self.row_complete:
            if '\\n' in data.strip():
                data = data.split('\\n')[1].strip()
            self.rows[-1].append(data)
        else:
            if '\\n' in data.strip():
                data = data.split('\\n')[1].strip()

            vuln_id = None
            for attr in self.attrs:
                if attr[0] == 'href':
                    vuln_id = attr[1].split('/')[-1]
            # print(self.tag, data, )
            self.rows.append([vuln_id, data])
            self.row_complete = False
        
        if '2024' in data or '2023' in data  or '2022' in data  or '2021' in data or 'PUBLISHED' in data:
            self.row_complete = True





def get_vuln_rows(html):
    # print(html)
    parser = SnykNPM()
    parser.feed(html)
    return parser.rows

html = str(curl('https://security.snyk.io/vuln/golang/2'))
rows = get_vuln_rows(html)
# print(len(rows))

res = []
for t in rows:
    print(t)
    if len(t) == 6:
        t.insert(3, None)
    if len(t) == 7:
        print(t)
        res.append(t)

print(len(res))

# html = str(curl('https://www.npmjs.com/package/jsreport'))
# n_dep = get_n_dependent_from_npm_html(html)
# print(n_dep)

In [None]:
class SnykVuln(HTMLParser):
    def __init__(self) -> None:

        self.score = None
        self.CWE = None
        self.CVE = None

        self.detailsbox = []


        self.cve_flag = False
        self.cwe_flag = False
        self.score_flag = False
        self.detailsbox_flag = False


        self.tag = None

        super().__init__()

    def handle_starttag(self, tag, attrs):
        self.tag = tag
        self.attrs = attrs

        for attr in attrs:
            if attr[0] == 'class':
                if 'cve' == attr[1]:
                    self.cve_flag = True
                else:
                    if tag == 'a' and self.cve_flag:
                        pass
                    else:
                        self.cve_flag = False
            
            if attr[0] == 'data-snyk-test':
                if 'VendorCvssCard: Badge' in attr[1]:
                    self.score_flag = True
                else:
                    if tag == 'span' and self.score_flag:
                        pass
                    else:
                        self.score_flag = False

                if 'vuln detailsbox value' == attr[1]:
                    self.detailsbox_flag = True
                else:
                    self.detailsbox_flag = False

                if 'cwe' == attr[1]:
                    self.cwe_flag = True
                else:
                    if tag == 'a' and self.cwe_flag:
                        pass
                    else:
                        self.cwe_flag = False

            
    def handle_data(self, data):
        if self.cve_flag:
            self.CVE = data
            # print('cve', data)
        
        if self.cwe_flag:
            # print('cwe', data)
            self.CWE = data
        
        if self.detailsbox_flag:
            if len(data.strip()) > 1:
                self.detailsbox.append(data)
            # print(self.detailsbox)

        if self.score_flag:
            tmp = data.split('\\n')
            if len(tmp)> 1:
                value = tmp[1].strip()
                if 'Rec' not in value:
                    if not self.score:
                        self.score = value

def get_vuln_detail(html):
    # print(html)
    parser = SnykVuln()
    parser.feed(html)
    return parser.score, parser.CVE, parser.CWE, parser.detailsbox

html = str(curl('https://security.snyk.io/vuln/SNYK-JS-EVOLUTIONDS-7925464'))
score, cve, cwe, detailsbox = get_vuln_detail(html)

print(score, cve, cwe, detailsbox)

In [None]:
import pandas as pd
import time


def clean_row(row):
    id = row[0]
    vuln = row[1]
    pkg = row[2]
    version = ''
    for i in range(3, len(row)-2):
        version += str(row[i])
    platform = row[-2]
    date = row[-1]

    return [id, vuln, pkg, version, platform, date]

def first_crawl_disclosed_dataset():
    result = []
    
    urls = [
        'https://security.snyk.io/disclosed-vulnerabilities/',
    ] + \
    [
        f'https://security.snyk.io/disclosed-vulnerabilities/{i}' for i in range(2, 336+1)
    ]

    for url in urls:
        try:
            html = str(curl(url))
            rows = get_vuln_rows_disclosed(html)

            time.sleep(0.5)
            for t in rows:
                # new_row = clean_row(t)

                result.append(t)
        except Exception as e:
            print(e)
            print('error for url first crawl', url)
        
    print(len(result))

    df = pd.DataFrame(result, columns=['ID', 'Vulnerability', 'Package', 'Version Range', 'Platform', 'Date'])
    return df

def first_crawl(platform, last_page_number):
    platform_alt = platform
    
    if platform == 'golang':
        platform_alt = 'go'
    if platform == 'unmanaged':
        platform_alt = 'unmanaged (c/c++)'
    result = []
    urls = [
        f'https://security.snyk.io/vuln/{platform}/',
    ] + \
    [
        f'https://security.snyk.io/vuln/{platform}/{i}' for i in range(2, last_page_number+1)
    ]

    for url in urls:
        try:
            html = str(curl(url))
            rows = get_vuln_rows(html)

            time.sleep(1)
            for t in rows:
                # print(t)
                if len(t) < 4:
                    continue

                if len(t) == 5:
                    t.insert(3, None)

                if t[-2].lower() == platform or t[-2].lower() == platform_alt:

                    new_row = clean_row(t)

                    result.append(new_row)
        except Exception as e:
            print(e)
            print('error for url first crawl', url)
        
    print(len(result))

    df = pd.DataFrame(result, columns=['ID', 'Vulnerability', 'Package', 'Version Range', 'Platform', 'Date'])
    return df

import pandas as pd


def get_first_page_results(platform):
    platform_alt = platform
    
    if platform == 'golang':
        platform_alt = 'go'
    if platform == 'uunmanaged':
        platform_alt = 'unmanaged (c/c++)'

    result = []
    url = f'https://security.snyk.io/vuln/{platform}/'

    html = str(curl(url))
    rows = get_vuln_rows(html)

    for t in rows:
        # print(t)
        if len(t) == 5:
            t.insert(3, None)
        
        if t[-2].lower() == platform or t[-2].lower() == platform_alt:
            new_row = clean_row(t)

            result.append(new_row)

    return result

def update_details_by_ids(ids, df, base_url='https://security.snyk.io/vuln/'):
    for id in ids:
        try:
            id_url = base_url + str(id)
            time.sleep(0.5)
            
            html = str(curl(id_url))
            score, cve, cwe, detailsbox = get_vuln_detail(html)
            
            if len(detailsbox) < 4:
                detailsbox.append('Unknown')

            # Find the index of the row where 'ID' matches
            row_index = df[df['ID'] == id].index

            # Check if the row is found and update the relevant columns
            
            if not row_index.empty:
                try:
                    df.loc[row_index, 'score'] = score.replace('\\xc2\\xa0', '-')
                except:
                    print('Score Error. Url:', id_url)
                df.loc[row_index, 'CWE'] = cwe
                df.loc[row_index, 'CVE'] = cve
                df.loc[row_index, 'published'] = detailsbox[1]
                df.loc[row_index, 'disclosed'] = detailsbox[2]
                df.loc[row_index, 'credit'] = detailsbox[3]
        except:
            print('Error in df update. Url:', id_url)


        with open(f"snyk_crawls/htmls/{id}.html", "w", encoding="utf-8") as file:
            file.write(html[2:-1])
    
    return df
        

def update_the_list(platform):
    # Load CSV into DataFrame
    df = pd.read_csv(f'snyk_crawls/{platform}.csv')
    
    result = get_first_page_results(platform)
    first_page_df = pd.DataFrame(result, columns=['ID', 'Vulnerability', 'Package', 'Version Range', 'Platform', 'Date'])

    old_ids = df['ID'].tolist()
    first_page_ids = first_page_df['ID'].tolist()

    new_ids = [id for id in first_page_ids if id not in old_ids]

    print('new_ids', len(new_ids), new_ids)

    first_page_df = update_details_by_ids(new_ids, first_page_df)

    # Concatenate the two DataFrames
    new_df = pd.concat([df, first_page_df], ignore_index=True)

    # Remove rows with duplicate 'id' values, keeping the first occurrence
    df_unique = new_df.drop_duplicates(subset='ID')


    df_unique.to_csv(f'snyk_crawls/{platform}.csv', index=False)
    print(len(df_unique))
    return df_unique


In [None]:
disclosed_res = first_crawl_disclosed_dataset()
disclosed_res.head()

In [None]:
# disclosed_res = first_crawl_disclosed_dataset()
print(len(disclosed_res))

ids = disclosed_res['ID'].tolist()
disclosed_res = update_details_by_ids(ids, disclosed_res)
disclosed_res.to_csv(f'snyk_crawls/disclosed_vulns.csv', index=False)

In [None]:
platforms = {
    'npm': 30,
    'pip': 30,
    'maven': 30,
    'composer': 30,
    'golang': 30,
    'cargo': 26,
    'cocoapods': 30,
    'nuget': 30,
    'rubygems': 30,
    # 'swift': 2,
    'unmanaged': 30,
}


In [None]:
df_platform = {}

for platform, last_page_number in platforms.items():
    print('..... Crawling', platform)
    first_crawl_df = first_crawl(platform, last_page_number)
    print('first crawl lengths', platform, len(first_crawl_df))
    df_platform[platform] = first_crawl_df
    
    ids = first_crawl_df['ID'].tolist()
    first_crawl_df = update_details_by_ids(ids, first_crawl_df)
    first_crawl_df.to_csv(f'snyk_crawls/{platform}.csv', index=False)

In [None]:
# df_platform = {}

# for platform, last_page_number in platforms.items():
#     df_platform[platform] = update_the_list(platform)
#     # # print('..... Crawling', platform)
#     # # first_crawl_df = first_crawl(platform, last_page_number)
#     # # print('first crawl lengths', platform, len(first_crawl_df))
#     # # df_platform[platform] = first_crawl_df
    
#     # ids = first_crawl_df['ID'].tolist()
#     # first_crawl_df = update_details_by_ids(ids, first_crawl_df)
#     # first_crawl_df.to_csv(f'data/21-sep/{platform}.csv', index=False)
