In [35]:
# all imports needed for tools and libraries
import math
import numpy as np
from matplotlib import pyplot as plt
import pandas as pd
import seaborn as sns # for data visualization
import time
import json
import os
import sys
from collections import Counter
from pathlib import Path
from dotenv import load_dotenv
from github import Github
from github import Auth
from datetime import datetime
import ast

## Mining the Software Repository

In [None]:
# define the initial repository data
repo_url = "https://github.com/zephyrproject-rtos/zephyr"
repo_name = "zephyr"
owner = "zephyrproject-rtos"

# define data that will be re-used in the API calls
GITHUB_TOKEN = "INSERT_UNIQUE_PAT_HERE"

# Date/Time Range Data
# the initial dates for only the required part of assignment.
start_date_str = "2023-11-18"
until_date_str = "2025-11-18"
# need to convert date strings to datetime objects in order to use pygithub created_at() method for date range filtering
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
until_date = datetime.strptime(until_date_str, "%Y-%m-%d")

In [11]:
# testing access
target_repo = owner + "/" + repo_name

print ("Target Repository (Owner/Name):", target_repo)

# start github connection
print("Starting GitHub Connection Test...")
auth = Auth.Token(GITHUB_TOKEN)
g = Github(auth=auth)
repo = g.get_repo(target_repo)
print(f"Successfully connected to repository: {target_repo}")

Target Repository (Owner/Name): zephyrproject-rtos/zephyr
Starting GitHub Connection Test...
Successfully connected to repository: zephyrproject-rtos/zephyr


In [None]:
# begin mining pull requests
allPulls = repo.get_pulls(state="all", sort="created", direction="desc")

# each pr is a dictionary, that will be added to a list
for pr in allPulls:
    # get the PR creation date
    pr_created_at = pr.created_at.replace(tzinfo=None) 

    # filter by date
    if pr_created_at < start_date:
        print(f"Reached PR created on {pr.created_at}. Stopping loop as all remaining PRs are older than {start_date_str}.")
        break

    if pr_created_at > until_date:
        continue 

    # process PR only if within [start_date, until_date] range
    labels = Counter()

    for label in pr.labels:
        totalLabels[label.name] += 1
        labels[label.name] += 1

    prInfo = {
        "id": pr.id,
        "title": pr.title,
        "author": pr.user.login if pr.user else "unknown",
        "created_at": pr.created_at,
        "closed_at": pr.closed_at,
        "labels": dict(labels),
    }

    prData.append(prInfo)
    count += 1
    
    if count % 200 == 0:
        print(f"Processed {count} PRs...")

print(f"All PR's processed. Total PRs in range: {count}")

In [None]:
# create data frame to manipulate and analyze label data (for bug PR count) and open/closed timestamps (for median time to close)
dfAll = pd.DataFrame(prData)

In [None]:
# save all PR data to CSV (a part of the deliverable)
folder = 'data'
file_name = f"{repo_name}_prs_2023-2025.csv"
file_path = os.path.join(folder, file_name)
dfAll.to_csv(file_path, index=False)

print(f"All PRs saved to CSV!")

## Data Formatting/Cleaning

In [41]:
# function to calculate time to solve based on open and closed timestamp
def calculateTimeToClose(row):
    created = row['created_at']
    closed = row['closed_at']

    return closed - created

In [15]:
# NOTE: the closed_at column contains an empty or NaN value if the pr was never closed
def countNumberClosed(dataset):
    # .notna() returns True for non-NaN values
    closed_mask = dataset['closed_at'].notna()
    
    # create new dataset with only closed PRs
    closedPRs = dataset[closed_mask].copy()
    
    # count the number of closed PRs
    closedCount = len(closedPRs)
    return closedPRs, closedCount

In [37]:
# NOTE: eahc entry in the labels column is a dictionary since any one PR can contain multiple labels
def checkIfBug(labels):
    # define all the strings that indicate a PR is a bug.
    BUG_INDICATORS = ['bug','priority: high','priority: low','priority: medium']
    
    # each label is a dict get the keys and convert them to lowercase
    bug_keys = [key.lower() for key in labels.keys()]
    
    return any(indicator in bug_keys for indicator in BUG_INDICATORS)

In [36]:
# also return a new data frame with ONLY bug PRs
# The following method was created based on reading the repository developer descriptions of the unique labels created
def countNumberBugsClosed(dataset):
    # conver labels column to dictionary
    dataset['labels'] = dataset['labels'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)
    
    # look at the labels column and cehck if bug-realted
    bug_mask = dataset['labels'].apply(checkIfBug)
    
    closedBugPRs = dataset[bug_mask].copy()
    
    bugCount = len(closedBugPRs)
    
    return  closedBugPRs, bugCount

In [42]:
# in the data frame, create a new variable/column ("time to solve") for each PR
def getTimeToClose(dataset):
    # convert to datetime format to make calculations easier
    dataset['created_at'] = pd.to_datetime(df['created_at'])
    dataset['closed_at'] = pd.to_datetime(df['closed_at'])

    # apply the function row-wise (axis=1) and save to new column
    dataset['time_to_solve'] = dataset.apply(calculateTimeToClose, axis=1)
        
    # return new dataset with new column
    return dataset

In [17]:
# to not pull again, turn csv into dataframe
allPRs = pd.read_csv("data/zephyr_prs_2023-2025.csv")

In [18]:
# we are only concerned with closed PRs:
closedPRs, closedCount = countNumberClosed(allPRs)

In [38]:
# from the closed issues, we are only concerned with bugs
closedBugPRs, closedBugCount = countNumberBugsClosed(closedPRs)

In [43]:
# begin calculations for the median time to solve a bug
final_dataset = getTimeToClose(closedBugPRs)

# save cleaned PR data to CSV 
folder = 'data'
file_name = f"{repo_name}_prs_2023-2025-CLEANED.csv"
file_path = os.path.join(folder, file_name)
final_dataset.to_csv(file_path, index=False)

print(f"All CLEANED PRs saved to CSV!")

All CLEANED PRs saved to CSV!


## Data Analysis

In [48]:
def getMedianTimeToSolve(dataset):
    medianTime = dataset['time_to_solve'].median()
    return medianTime

In [51]:
medianTimeToSolve = getMedianTimeToSolve(final_dataset)

In [53]:
print(medianTimeToSolve)

3 days 23:19:36


## Data Visualization

In [55]:
data = [
    ['All PRs', len(allPRs)],
    ['All Closed PRs', closedCount],
    ['All Closed Bug-Related PRs', closedBugCount]
]
pd.DataFrame(data, columns = ['PR Type', 'Quantity'])

Unnamed: 0,PR Type,Quantity
0,All PRs,43651
1,All Closed PRs,41355
2,All Closed Bug-Related PRs,2077
