In [1]:
import os
os.chdir('../')

In [2]:
import sys
from importlib import reload 
#reload(sys.modules["source.lib.helpers"])

In [3]:
import pandas as pd
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor
import matplotlib.pyplot as plt
import random
import networkx as nx
import numpy as np
from source.derived.contributor_stats.calculate_contributions import *
from source.lib.helpers import *

INFO: Pandarallel will run on 16 workers.
INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.


In [4]:
indir_data = Path('drive/output/derived/data_export')
indir_committers_info = Path('drive/output/scrape/link_committers_profile')
commit_cols = ['commits','commit additions','commit deletions','commit changes total','commit files changed count']

time_period = 6
author_thresh = 1/3

In [5]:
df_issue = pd.read_parquet(indir_data / 'df_issue.parquet')
df_pr = pd.read_parquet(indir_data / 'df_pr.parquet')
df_pr_commits = pd.read_parquet(indir_data / 'df_pr_commits.parquet')

df_issue['created_at'] = pd.to_datetime(df_issue['created_at'])
df_pr['created_at'] = pd.to_datetime(df_pr['created_at'])
df_pr_commits['created_at'] = pd.to_datetime(df_pr_commits['commit time'], unit = 's')

df_issue = df_issue[~df_issue['created_at'].isna()]
df_pr = df_pr[~df_pr['created_at'].isna()]
df_pr_commits = df_pr_commits[~df_pr_commits['created_at'].isna()]

In [6]:
repo_list = df_issue['repo_name'].unique().tolist()
# DTR I can explore closing is a response to the original issue opener

In [7]:
def BuildIssueInteractionGraph(df, method):
    """
    Build an undirected graph from the dataframe using one of two methods.
    
    Parameters:
    - df: A pandas DataFrame sorted by id_number and created_at.
    - method: 'keep_consecutive' or 'drop_consecutive'
       * keep_consecutive: For each row (ignoring "opened","reopened" actions), finds the most recent previous row 
         (within the same id_number) with a different actor.
    Returns:
    - G: A NetworkX undirected graph where edges have a "weight" attribute indicating the 
         number of interactions between actor pairs.
    """
    # Reset the index to ensure uniqueness and sort the dataframe.
    df = df.reset_index(drop=True)
    df_sorted = df.sort_values(['id_number', 'created_at']).copy()
    
    # Vectorized function to compute previous different actor.
    def compute_prev_diff_vectorized(s):
        # s is a Series of actor_ids for a single issue, sorted by created_at.
        # Shift to get the immediate previous actor.
        shifted = s.shift(1)
        # Where the shifted value is equal to the current actor, we set it to NaN.
        candidate = shifted.where(shifted != s)
        # Forward-fill the last valid candidate so that all rows after a change inherit it.
        candidate = candidate.ffill()
        # Ensure the first row stays None.
        candidate.iloc[0] = None
        return candidate
    
    # Compute prev_diff for each group using groupby transform.
    df_sorted['prev_diff'] = df_sorted.groupby('id_number')['actor_id'].transform(compute_prev_diff_vectorized)
    
    # Filter out rows with 'opened' action or with no previous different actor.
    df_edges = df_sorted[(df_sorted['issue_action'] != 'opened') & \
        (df_sorted['issue_action'] != 'reopened') & \
        (df_sorted['prev_diff'].notnull())]
    
    # Group by (current actor, previous different actor) and count interactions.
    edge_weights = (
        df_edges.groupby(['actor_id', 'prev_diff'])
        .size()
        .reset_index(name='weight')
    )
    
    # Build the undirected graph with weighted edges.
    G = nx.Graph()
    for _, row in edge_weights.iterrows():
        G.add_edge(row['actor_id'], row['prev_diff'], weight=row['weight'])
    return G

In [8]:
random.seed(42)
selected_repos = df_issue['repo_name'].tolist()#np.unique(random.sample(df_issue['repo_name'].tolist(), 30))

In [9]:
committers_match = CleanCommittersInfo(indir_committers_info)

In [10]:
# we want to make pr reviewers interact with the authors
#df_pr.query('type == "PullRequestReviewEvent"') - is the reviewer
# df_pr_commit_author_stats[['repo_name','pr_number','actor_id','pr_opened_at']] - this is the PR author
#
df_issue.set_index('repo_name', inplace=True)
df_pr.set_index('repo_name', inplace=True)
df_pr_commits.set_index('repo_name', inplace=True)

In [11]:
graph_dict = dict()

In [13]:
def SelectRepoData(repo, df_issue, df_pr, df_pr_commits):
    """
    Select and return DataFrames for a given repo.
    Returns a tuple: (df_issue_sel, df_pr_sel, df_pr_commits_sel).
    If a DataFrame is missing for the repo, returns an empty DataFrame.
    """
    df_issue_sel = df_issue.loc[[repo]].copy() if repo in df_issue.index else pd.DataFrame()
    df_pr_sel = df_pr.loc[[repo]].copy() if repo in df_pr.index else pd.DataFrame()
    df_pr_commits_sel = df_pr_commits.loc[[repo]].copy() if repo in df_pr_commits.index else pd.DataFrame()
    return df_issue_sel, df_pr_sel, df_pr_commits_sel

def ProcessReviewComments(df_pr_sel):
    """
    Process Pull Request review comment events from df_pr_sel.
    Returns a processed DataFrame with a standardized 'id_number' column.
    """
    if df_pr_sel.shape[0]==0:
        return pd.DataFrame()
        
    df_review = df_pr_sel[df_pr_sel['type'] == "PullRequestReviewCommentEvent"].copy()
    df_review['combo'] = list(zip(
        df_review['pr_review_comment_path'],
        df_review['pr_review_comment_original_position'],
        df_review['pr_review_comment_original_commit_id']
    ))
    df_review['id_number'] = df_review.groupby('pr_number')['combo']\
        .transform(lambda x: pd.factorize(x)[0])
    valid_mask = df_review['pr_number'].notnull() & (df_review['pr_number'] != np.inf)
    df_review = df_review[valid_mask]
    df_review['id_number'] = (
        "pr_rc" + 
        df_review['pr_number'].astype(int).astype(str) + "_" + 
        df_review['id_number'].astype(str)
    )
    df_review.drop(columns='combo', inplace=True)
    return df_review

def ImputeTimeEmptyRobust(df, time_period):
    """
    Impute the time period into the DataFrame and reset the index.
    Assumes ImputeTimePeriod is defined elsewhere.
    """
    if df.empty:
        return df
    return ImputeTimePeriod(df, time_period).reset_index()


def ProcessOtherComments(df_pr_sel, df_issue_sel):
    """
    Process Pull Request events and Issue events.
    Returns two DataFrames with a standardized 'id_number' column.
    """
    if df_pr_sel.shape[0] != 0:
        df_pr_comments = df_pr_sel[df_pr_sel['type'] == "PullRequestEvent"].copy()
        df_pr_comments = df_pr_comments.rename(columns={'pr_number': 'id_number'})
    else:
        df_pr_comments = pd.DataFrame()

    if df_issue_sel.shape[0] != 0:
        df_issue_comments = df_issue_sel[df_issue_sel['issue_action'] != "closed"].copy()
        df_issue_comments = df_issue_comments.rename(columns={'issue_number': 'id_number'})
    else:
        df_issue_comments = pd.DataFrame()
    

    return df_pr_comments, df_issue_comments

def ConcatenateAndFilterDiscussions(df_pr_comments, df_issue_comments, df_review_comments, sel_cols, target_period):
    """
    Concatenate all discussion interactions, sort them, and filter by time period.
    """
    all_discussions = pd.concat(
        [df_pr_comments, df_issue_comments, df_review_comments],
        ignore_index=True
    ).sort_values(['id_number', 'created_at'])[sel_cols]
    
    return all_discussions[all_discussions['time_period'] == target_period]

def CreateGraph(repo):
    """
    Process repository data to build an issue interaction graph.
    This function:
      1. Selects relevant DataFrames for the repo.
      2. Imputes the time period and resets indices.
      3. Processes PR commits and PR authorship.
      4. Processes review comments and other discussion events.
      5. Concatenates interactions and filters by the target time period.
      6. Builds the graph, stores it in graph_dict, and outputs summary statistics.
    """
    # Step 1: Data selection.
    df_issue_sel, df_pr_sel, df_pr_commits_sel = SelectRepoData(repo, df_issue, df_pr, df_pr_commits)
    if df_issue_sel.empty and df_pr_sel.empty and df_pr_commits_sel.empty:
        return

    # Step 2: Impute time period.
    df_issue_sel = ImputeTimeEmptyRobust(df_issue_sel, time_period)
    df_pr_sel = ImputeTimeEmptyRobust(df_pr_sel, time_period)
    df_pr_commits_sel = ImputeTimeEmptyRobust(df_pr_commits_sel, time_period)

    # Step 3: Process PR commits and authorship. THIS IS ONLY IN THE CONTEXT OF REVIEWING COUNTING
    if df_pr_sel.shape[0]>0 and df_pr_commits_sel.shape[0]>0:
        df_pr_commit_stats = LinkCommits(df_pr_sel, df_pr_commits_sel, committers_match, commit_cols, 'pr')
        df_pr_commit_author_stats = AssignPRAuthorship(df_pr_commit_stats, df_pr_sel, author_thresh, commit_cols)
    else:
        df_pr_commit_author_stats = pd.DataFrame()

    sel_cols = ['created_at', 'actor_id', 'id_number', 'type', 'issue_action', 'time_period', 'origin']
    
    # Step 4: Process discussion interactions.
    df_pr_comments, df_issue_comments = ProcessOtherComments(df_pr_sel, df_issue_sel)
    df_review_comments = ProcessReviewComments(df_pr_sel)

    # Step 5: Concatenate and filter discussions.
    discussion_filtered = ConcatenateAndFilterDiscussions(
        df_pr_comments.assign(origin = 'pr'), 
        df_issue_comments.assign(origin = 'issue'), 
        df_review_comments.assign(origin = 'pr review'), sel_cols, "2017-01-01"
    )

    # Step 6: Build the interaction graph.
    G = BuildIssueInteractionGraph(discussion_filtered, 'keep_consecutive')
    graph_dict[repo] = G
    print(f"{repo}, 2017-01-01, {G.number_of_nodes()} nodes, {G.number_of_edges()} edges")
    if G.number_of_nodes() != 0:
        output_path = f"issue/graphs/{repo.replace('/', '_')}_201701.gexf"
        nx.write_gexf(G, output_path)


ERROR! Session/line number was not unique in database. History logging moved to new session 173


In [14]:
%%time
for repo in repo_list:
    CreateGraph(repo)

blaze/odo, 2017-01-01, 15 nodes, 13 edges
grpc/grpc, 2017-01-01, 935 nodes, 1676 edges
h5py/h5py, 2017-01-01, 70 nodes, 99 edges
pyeve/eve, 2017-01-01, 26 nodes, 28 edges
jaraco/irc, 2017-01-01, 5 nodes, 4 edges
numpy/numpy, 2017-01-01, 467 nodes, 928 edges
python/mypy, 2017-01-01, 259 nodes, 552 edges
sympy/sympy, 2017-01-01, 321 nodes, 703 edges
MTG/essentia, 2017-01-01, 41 nodes, 48 edges
rasbt/mlxtend, 2017-01-01, 36 nodes, 57 edges
scrapy/scrapy, 2017-01-01, 240 nodes, 361 edges
chrysn/aiocoap, 2017-01-01, 16 nodes, 16 edges
conan-io/conan, 2017-01-01, 179 nodes, 335 edges
msoulier/tftpy, 2017-01-01, 2 nodes, 1 edges
numba/llvmlite, 2017-01-01, 33 nodes, 50 edges
Azure/azure-cli, 2017-01-01, 444 nodes, 927 edges
ansible/ansible, 2017-01-01, 3655 nodes, 7561 edges
google/or-tools, 2017-01-01, 81 nodes, 103 edges
pytorch/pytorch, 2017-01-01, 725 nodes, 1419 edges
dask/fastparquet, 2017-01-01, 68 nodes, 87 edges
druid-io/pydruid, 2017-01-01, 18 nodes, 21 edges
wbond/asn1crypto, 2017-