In [1]:
from pathlib import Path
import urllib.parse as url

import os
import re
import time
from datetime import datetime
from typing import Callable, Dict, List


import requests
import pandas as pd

import jupyter_black  # Amazing python code formatter, this will save you hundreds of hours of work.

jupyter_black.load()

In [2]:
data_path = Path("..") / "data"
cve_data = pd.read_feather(data_path / "all_parsed_cve_references.feather")
github_links = cve_data.loc[
    cve_data["url"].str.contains("github.com"), "url"
].drop_duplicates()
github_links

1274       https://github.com/maxpl0it/CVE-2020-0674-Exploit
2100       https://github.com/zephyrproject-rtos/zephyr/p...
2102       https://github.com/zephyrproject-rtos/zephyr/p...
2105       https://github.com/zephyrproject-rtos/zephyr/p...
2110       https://github.com/zephyrproject-rtos/zephyr/p...
                                 ...                        
1028199        https://github.com/benc-uk/kubeview/issues/95
1028202    https://github.com/openedx/xblock-drag-and-dro...
1028203    https://github.com/openedx/xblock-drag-and-dro...
1028204    https://github.com/openedx/xblock-drag-and-dro...
1028205    https://github.com/openedx/xblock-drag-and-dro...
Name: url, Length: 34344, dtype: object

In [9]:
timer = 0


def handle_get_requests(api_url, headers=None, data=None, time_to_wait=3):
    global timer
    """
    Uses the GitHub API response to wait as appropriate for the specified time after called.
    """
    if timer > 0:
        hours = int(timer / 60 / 60)
        minutes = int(timer / 60) - (hours * 60)
        print(f"Waiting for {hours}:{minutes} (hh:mm)", flush=True)
        time.sleep(timer)
    timer = 0

    req_headers = {}
    if headers:
        req_headers.update(headers)

    api_token = get_api_token()
    if api_token:
        req_headers.update({"Authorization": f"Bearer {api_token}"})
    response = requests.get(api_url, headers=req_headers, data=data)

    response_data = response.json()

    resp_headers = response.headers
    requests_left = int(resp_headers["x-ratelimit-remaining"])
    time_left = int(resp_headers["x-ratelimit-reset"]) - datetime.now().timestamp()
    if time_left > 0:
        time_to_wait = time_left

    wait_more = False
    if requests_left <= 1:
        wait_more = True

    if wait_more:
        timer = time_to_wait
    return response


def get_github_repo_paths(raw_urls: pd.Series) -> pd.Series:
    # Get each part of the URL and pull out just the repo path
    url_parts = raw_urls.str.strip().apply(url.urlsplit)

    # Github Repo Links are the first two parts of the "path"
    repo_paths = (
        url_parts.apply(lambda x: x.path)
        .str.split("/")
        .apply(lambda x: "/".join(x[0:3]))
    )
    # return the final reconstructed URL to each repo
    return ("https://api.github.com/repos" + repo_paths).drop_duplicates()


def get_github_languages(api_url: str, headers=None, data=None) -> Dict[str, Dict]:
    languages_url = api_url + "/languages"
    response = handle_get_requests(languages_url)
    print(".", end="", flush=True)
    if response.status_code == 200:
        return (api_url, response.json())
    else:
        return ("Failed", str(response.status_code) + "-" + response.text)


def get_github_contributor_data(
    api_url: str, headers=None, data=None
) -> List[Dict[str, str]]:
    contributor_url = api_url + "/contributors"
    response = handle_get_requests(contributor_url)
    print(".", end="", flush=True)
    if response.status_code == 200:
        return (api_url, response.json()[0])
    else:
        return ("Failed", str(response.status_code) + "-" + response.text)


def get_api_token(file_path=Path("../api_token.secret")):
    file_path = Path(file_path)
    if file_path.exists():
        with open(file_path, "r") as secret_path:
            return secret_path.readline().strip()
    else:
        return None

In [None]:
batch_size = 2400 # Needs to be half the max of 5000, leaving some room for error too.
start = 0   # Set this to your assigned start values.


# Put repo paths and netloc together to get repo links.
github_repo_urls = get_github_repo_paths(github_links)
end = min((start + batch_size), len(github_repo_urls))

languages = github_repo_urls.iloc[start:end].apply(get_github_languages)
languages = languages.reset_index().rename(columns={"index": "original_index","url":"languages"})
languages.to_feather(data_path / f"languages_{start}.feather")
print(languages)

contributors = github_repo_urls.iloc[start:end].apply(get_github_contributor_data)
contributors = contributors.reset_index().rename(columns={"index": "original_index","url":"contributors"})
contributors.to_feather(data_path / f"languages_{start}.feather")
print(contributors)

........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................

In [None]:
languages