In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
import os

class CommitPath:
    
    def __init__(self, repo_name, organization_name):
        self.repo_name = repo_name
        self.organization_name = organization_name
        self.commit_diff_info_path = f"/home/commit-diff/{organization_name}_{repo_name}_commit_diff.csv"
        self.dependency_folder_path = f"/home/commit/commit_info_{repo_name}"
        self.processed_commit_info_path = f"/home/commit-info/commit_info_{repo_name}/processed_commit_data_update_test.csv"
        self.commit_info_path = f"/home/commit-info/commit_info_{repo_name}/commit_data.csv"

def get_path_object(repo_name, organization_name):
    return CommitPath(repo_name, organization_name)

In [None]:
from queue import Queue


class Dependency:
    func_name: str
    package_name: str

    def __init__(self, func_name, package_name, previous_func=None, pos=None):
        self.func_name = func_name
        self.package_name = package_name
        self.previous_func = previous_func
        self.pos = pos

    def __eq__(self, other):
        return self.func_name == other.func_name and self.package_name == other.package_name

    def __str__(self):
        if self.previous_func is None:
            return self.func_name + " " + self.package_name
        else:
            return self.func_name + " " + self.package_name + " " + self.previous_func

def find_repo_with_test_cases(temp_df):
    repo_dict = {}
    for i in temp_df.iterrows():
        if type(i[1]['func_decl_name']) == float:
            continue
        if i[1]['func_decl_name'].startswith('Test') or i[1]['func_decl_name'] == 'Describe':
            node = Dependency(i[1]['func_decl_name'], i[1]['package_name'], pos=i[1]['pos'])
            if i[1]['repo_name'] not in repo_dict:
                repo_dict[i[1]['repo_name']] = [node]
            elif node not in repo_dict[i[1]['repo_name']]:
                repo_dict[i[1]['repo_name']].append(node)
    return repo_dict

# Find chain dependency with function call
def find_chain_dependency(repo_df, current_func_name, package_name, repo_name, pos):
    func_df = pd.DataFrame()
    adj_list = Queue()
    for dependency in find_called_func(repo_df, current_func_name, package_name, pos):
        adj_list.put(dependency)
    travelled_list = []
    while not adj_list.empty():
        node = adj_list.get()
        node_func_name = node.func_name
        node_package_name = node.package_name
        if node not in travelled_list:
            temp_df = pd.DataFrame([[repo_name, node_func_name, node_package_name]], columns=['repo_name', 'func_name', 'package_name'])
            func_df = pd.concat([func_df, temp_df], ignore_index=True)
            if repo_name in node_package_name or not 'github' in node_package_name:
                for next_node in find_called_func(repo_df, node_func_name, node_package_name, None):
                    next_node.previous_func = node_func_name
                    adj_list.put(next_node)
            travelled_list.append(node)
    return func_df


def find_called_func(repo_df, func_name, package_name, pos):
    dependency_list = []
    if pos is not None:
        temp_df = repo_df[(repo_df['func_decl_name'] == func_name) & (repo_df['package_name'] == (package_name.split("/")[-1])) & (repo_df['pos'] == pos)]
    else:
        temp_df = repo_df[(repo_df['func_decl_name'] == func_name) & (repo_df['package_name'] == (package_name.split("/")[-1]))]
    for i in temp_df.iterrows():
        dependency_list.append(Dependency(i[1]['func_name'], i[1]['module_path']))

    return dependency_list

def swap_columns(df, col1, col2):
    col_list = list(df.columns)
    x, y = col_list.index(col1), col_list.index(col2)
    col_list[y], col_list[x] = col_list[x], col_list[y]
    df = df[col_list]
    return df

def detect_dependency_df(inpt):
    repo_df = inpt[0]
    func_name = inpt[1]
    package_name = inpt[2]
    repo_name = inpt[3]
    pos = None
    result = find_chain_dependency(repo_df, func_name, package_name, repo_name, pos)
    if result.empty:
        return result
    result['func_decl_name'] = func_name
    result['func_decl_package_name'] = package_name
    result['pos'] = pos
    result = swap_columns(result, 'func_decl_name', 'func_name')
    result = swap_columns(result, 'func_decl_package_name', 'package_name')
    return result

def get_commit_list(commit_path_object: CommitPath, conndition):
    commit_info = pd.read_csv(commit_path_object.processed_commit_info_path)

    
    commit_list = []
    for i, row in commit_info.iterrows():
        if not conndition(row):
            continue
        commit_sha = row["commit_html_url"].split('/')[-1]
        parent_sha = row["parent_commit_html_url"].split('/')[-1]
        
        if commit_sha not in commit_list:
            commit_list.append(commit_sha)
        if parent_sha not in commit_list:
            commit_list.append(parent_sha)
    return commit_list

def get_commit_df(path, commit):
    result = pd.DataFrame()
    file_list = os.listdir(path)
    for file in file_list:
        if commit in file and not file.endswith('_error.csv'):
            temp_df = pd.read_csv(os.path.join(path, file))
            result = pd.concat([result, temp_df], ignore_index=True)
    result = result.drop_duplicates()
    return result

def get_terratest_function():
    terratest_integration_func = pd.read_csv('terratest_integration_func.csv')
    terratest_func_list = []

    for i, row in terratest_integration_func.iterrows():
        terratest_func_list.append((row['func_name'], row['package_name']))
    return terratest_func_list

In [None]:
from datetime import datetime

def condition(row):
    date_format = "%Y-%m-%d %H:%M:%S"
    date = datetime.strptime(row["commit_date"], date_format)
    return date.year >= 2022

def run(full_repo_name):
    repo_name = full_repo_name.split("/")[1]
    organization_name = full_repo_name.split("/")[0]
    repo_path_obj = get_path_object(repo_name, organization_name)
    repo_commit_list = get_commit_list(repo_path_obj, condition)
    diff_info = pd.read_csv(repo_path_obj.commit_diff_info_path)
    terratest_func_list = get_terratest_function()

    repo_final_df = pd.DataFrame()
    for commit in repo_commit_list:
    
        diff_commit_info = diff_info[diff_info["file_commit_sha"] == commit]
        repo_commit_df = get_commit_df(repo_path_obj.dependency_folder_path, commit)
        result = []
        for i, diff in diff_commit_info.iterrows():
            
            temp = detect_dependency_df([repo_commit_df, diff['function_name'], diff['package_name'], repo_name])
            if temp is None or temp.empty:
                continue
            temp['commit_sha'] = diff['commit_sha']
            temp['file_commit_sha'] = diff['file_commit_sha']
            temp['is_terratest'] = temp['package_name'].str.contains('github.com/gruntwork-io/terratest')
            temp['is_integrate'] = temp.apply(lambda row: ((row['func_name'], row['package_name']) in terratest_func_list) or (row['package_name'] in ['github.com/gruntwork-io/terratest/modules/azure', 'github.com/gruntwork-io/terratest/modules/gcp', 'github.com/gruntwork-io/terratest/modules/aws']), axis=1)
            count_terratest = temp[temp['is_terratest'] == True].size
            count_integration = temp[temp['is_integrate'] == True].size
            result_dict = {
                'func_name': diff['function_name'],
                'commit_sha': temp['commit_sha'][0],
                'file_commit_sha': temp['file_commit_sha'][0],
                'is_terratest': (count_terratest > 0),
                'is_integrate': (count_integration > 0),
            }
            result.append(result_dict)
            # break
        result_df = pd.DataFrame(result)
        repo_final_df = pd.concat([repo_final_df, result_df], ignore_index=True)
        # break
    return repo_final_df

def get_total_commit(full_repo_name):
    date_format = "%Y-%m-%d %H:%M:%S"
    repo_name = full_repo_name.split("/")[1]
    organization_name = full_repo_name.split("/")[0]
    repo_path_obj = get_path_object(repo_name, organization_name)
    commit = pd.read_csv(repo_path_obj.commit_info_path)
    commit = commit.drop_duplicates()
    commit['date_time'] = commit['commit_date'].apply(lambda x: datetime.strptime(x, date_format))
    return commit[commit['date_time'].dt.year >= 2022].count()


In [None]:
class Result:
    def __init__(self, repo_name, num_test, num_local_test, num_server_test, num_other_test, num_commit):
        self.repo_name = repo_name
        self.num_test = num_test
        self.num_local_test = num_local_test
        self.num_server_test = num_server_test
        self.num_other_test = num_other_test
        self.num_commit = num_commit
        
    def __str__(self):
        return f"{self.repo_name}: test:{self.num_test} local test:{self.num_local_test} server test:{self.num_server_test} other test:{self.num_other_test} total commit:{self.num_commit}"

def get_result(repo_name):
    result_with_out_groupby = run(repo_name)
    result = result_with_out_groupby.groupby("commit_sha").agg({'is_terratest':'sum', 'is_integrate':'sum'}).reset_index()
    num_test = result.shape[0]
    num_terratest = result[result['is_terratest'] > 0].shape[0]
    num_server_test = result[result['is_integrate'] > 0].shape[0]
    num_local_test = num_terratest - num_server_test
    num_other_test = result[(result['is_terratest'] == 0) & (result['is_integrate'] == 0)].shape[0]
    num_commit = get_total_commit(repo_name)
    result_object = Result(repo_name, num_test, num_local_test, num_server_test, num_other_test, num_commit)
    return result_with_out_groupby, result, result_object