In [None]:
import ast
import numpy as np
import pandas as pd
import re
import json
import os

In [None]:
prs = pd.read_parquet(r'output_files\fix_prs_with_issues_and_files.parquet')
prs

In [None]:
def remover_patch_e_converter(item):
    """
    Removes the "patch" field (which is corrupted) from the string
    and then converts it to a list.
    """
    if not isinstance(item, str):
        return item
    
    item_limpo = item.strip()
    if item_limpo == "":
        return []

    # --- THE MOST IMPORTANT STEP ---
    # This regex finds "patch": null OR "patch": "..."
    # and correctly handles escaped quotes (\"...\") inside the patch.
    # It replaces the field with "patch": null, which is safe to parse.
    
    # Regex to find "patch": "..." (handling escapes)
    regex_string_patch = r'"patch":\s*"(?:[^"\\]|\\.)*"'
    # Regex to find "patch": null
    regex_null_patch = r'"patch":\s*null'
    
    # Combines both:
    regex_full = f"({regex_string_patch}|{regex_null_patch})"
    
    # Replaces what was found with a harmless value
    item_sem_patch = re.sub(regex_full, '"patch": null', item_limpo)
    
    # 1. Fix Python literals -> JSON
    item_corrigido = re.sub(r'\bNone\b', 'null', item_sem_patch, flags=re.IGNORECASE)
    item_corrigido = re.sub(r'\bTrue\b', 'true', item_corrigido, flags=re.IGNORECASE)
    item_corrigido = re.sub(r'\bFalse\b', 'false', item_corrigido, flags=re.IGNORECASE)
    item_corrigido = re.sub(r'\bnan\b', 'null', item_corrigido, flags=re.IGNORECASE)

    # 2. Try to decode as JSON
    try:
        return json.loads(item_corrigido)
    except json.JSONDecodeError as e:
        # If it fails even without the patch, the string is 100% lost
        return np.nan

In [None]:
def converter_para_lista_json_robusta(item):
    """
    Converts a string (JSON-like) to a list.
    Handles Python literals (None, True, nan) that might be mixed in.
    Returns np.nan if the string is 100% corrupted.
    """
    # 1. If it is not a string (already a list, or real NaN), just return
    if not isinstance(item, str):
        return item
    
    # 2. If it is a string, first remove whitespace
    item_limpo = item.strip()
    
    # 3. If it is an empty string, return an empty list []
    if item_limpo == "":
        return []
        
    # --- CORRECTION STEP (Python -> JSON) ---
    item_corrigido = re.sub(r'\bNone\b', 'null', item_limpo, flags=re.IGNORECASE)
    item_corrigido = re.sub(r'\bTrue\b', 'true', item_corrigido, flags=re.IGNORECASE)
    item_corrigido = re.sub(r'\bFalse\b', 'false', item_corrigido, flags=re.IGNORECASE)
    item_corrigido = re.sub(r'\bnan\b', 'null', item_corrigido, flags=re.IGNORECASE)
    item_corrigido = re.sub(r'\binf\b', 'null', item_corrigido, flags=re.IGNORECASE)
    item_corrigido = re.sub(r'\b-inf\b', 'null', item_corrigido, flags=re.IGNORECASE)

    # 4. Try to decode as JSON
    try:
        return json.loads(item_corrigido)
    except json.JSONDecodeError as e:
        # This is where the 144 corrupted lines (with patch) will fall
        return np.nan # Return NaN

In [None]:
prs['modified_files_list'] = prs['modified_files'].apply(converter_para_lista_json_robusta)

# --- STEP 1: Perform explode() ---
df_exploded = prs.explode('modified_files_list')
#display(df_exploded)

# --- STEP 2: Normalize (expand the dict) ---
normalized_cols = pd.json_normalize(df_exploded['modified_files_list'])
normalized_cols.index = df_exploded.index

# --- STEP 3: Join everything ---
prs_and_changes = pd.concat(
    [df_exploded.drop(['modified_files'], axis=1), normalized_cols],
    axis=1
)
prs_and_changes

In [None]:
# --- 1. REGEX TO IDENTIFY TEST FILES ---
# (Corrected: '(?_tests__' -> '(?:__tests__')
filename_regex = (
    r'('
    r'(?:^tests[/\\])'                     # starts with tests/
    r'|(?:[/\\]tests?[/\\])'               # contains /tests/ or /test/
    r'|(?:[/\\]test[/\\])'                 # contains /test/
    r'|(?:__tests__[/\\])'                 # __tests__/  <-- CORRECTION IS HERE
    r'|(?:\.spec\b)'                       # .spec (ex: index.spec.tsx)
    r'|(?:\_test\.)'                       # _test. (ex: foo_test.py)
    r'|(?:\.test\.)'                       # .test. (ex: utils.test.js)
    r'|(?:src[/\\]test[/\\])'              # src/test/ (Java/Kotlin)
    r'|(?:[/\\](?:unit[-_]?tests?|integration[-_]?tests?)[/\\])' # /unit-tests/
    r'|(?:\_spec\.)'                       # _spec. (Ruby, etc.)
    r'|(?:[A-Za-z0-9_]+(?:Test|Tests|Spec)\.[a-z0-9]+$)' # NameTest.ext
    r'|(?:Test\.(?:php|java|cs|kt|ts|tsx|py|go|rb))'  # Test.php, Test.java, ...
    r')'
)

# --- 2. REGEX TO IGNORE CONFIG/LOCK/DOC FILES ---
config_regex = (
    r'('
    # 1. Lock and dependency files (exact names at end of string)
    r'(?:package-lock\.json$|yarn\.lock$|composer\.lock$|requirements\.txt$)'
    
    # 2. Common extensions for config, data, or documentation
    r'|(?:.(json|ya?ml|xml|ini|toml|conf(ig)?|lock|log|md|txt)$)'
    
    # 3. Dotfiles (files starting with ., ex: .gitignore, .prettierrc)
    r'|([/\\]\.[^/\\]+$)'
    r')'
)


# --- 3. APPLYING THE LOGIC ---

# Step 1: Mark everything that looks like a test
is_test_file = prs_and_changes['filename'].str.contains(
    filename_regex, 
    case=False,
    na=False
)

# Step 2: Mark everything that is a config/documentation file
is_config_file = prs_and_changes['filename'].str.contains(
    config_regex, 
    case=False,
    na=False
)

# Step 3: The final rule.
prs_and_changes['is_filename_a_test_file'] = is_test_file
prs_and_changes['is_filename_a_config_file'] = is_config_file

In [None]:
content_pattern = re.compile(
    r'(def\s+test_|pytest\b|unittest\b|assert\b|describe\s*\(|\bit\s*\(|\btest\(|\bexpect\(|@Test\b|func\s+Test\b)',
    re.IGNORECASE
)
def is_test_by_content(patch):
    # 1. Check for non-strings:
    #    - pd.isna() catches None and np.nan
    #    - not isinstance(patch, str) catches numbers, lists, etc.
    if pd.isna(patch) or not isinstance(patch, str):
        return False
    
    # 2. Check for empty strings (what 'not patch' did)
    if not patch:
        return False
        
    # 3. If it is a valid string, perform the search
    return bool(content_pattern.search(patch))

# --- CORRECTED APPLICATION ---
# Remove .fillna('') - the function now handles everything.
prs_and_changes['is_patch_a_test_file'] = prs_and_changes['patch'].apply(is_test_by_content)

In [None]:
prs_and_changes['is_test_file'] = ~prs_and_changes['is_filename_a_config_file'] & \
(prs_and_changes['is_filename_a_test_file'] | prs_and_changes['is_patch_a_test_file'])


# Optional: see how many were detected by each heuristic
print('Total Files:',len(prs_and_changes))
print("Config file identified by name:", prs_and_changes['is_filename_a_config_file'].sum())
print("Count of test files (excluding configs):")
print("Test identified by name:", prs_and_changes['is_filename_a_test_file'].sum())

print("Test identified by patch:", prs_and_changes['is_patch_a_test_file'].sum(),'<- Generating False Positives, should we keep this?')

print("Combined:", prs_and_changes['is_test_file'].sum())

In [None]:
prs_and_changes[prs_and_changes['is_test_file'] == True].groupby(['id','number','repo_url','status'])\
                                                                            .count().reset_index()\
                                                                            [['id','number','repo_url','status','user']]

In [None]:
number_of_tests = prs_and_changes[prs_and_changes['is_test_file'] == True].groupby(['id','number','repo_url','status'])\
                                                                            .count().reset_index()\
                                                                            [['id','number','repo_url','status','user']]
number_of_tests.rename(columns={"user": "quantity"},inplace = True)
number_of_tests.sort_values('quantity')
prs['#_of_files'] = prs['modified_files_list'].str.len()

# 1. Use pivot_table to perform the transformation
pivoted_number_of_tests = number_of_tests.pivot_table(
    index=['id', 'number', 'repo_url'],  # Columns identifying the row
    columns='status',                   # Column whose values will become new columns
    values='quantity',                  # Column whose values will fill the cells
    aggfunc='sum',                      # What to do if duplicates exist (sum)
    fill_value=0                        # Replace NaNs with 0
)

# 2. Add the suffix '_tests' to the new column names (x, y, z)
pivoted_number_of_tests = pivoted_number_of_tests.add_suffix('_tests')

# 3. Cleanup: 'flatten' the column index and bring the index back
df_final = pivoted_number_of_tests.rename_axis(columns=None).reset_index()

# 1. Perform the merge. This will create the DataFrame with NaNs
merged_df = pd.merge(prs, df_final, on=['id', 'number', 'repo_url'], how='left')

# 2. Fill NaNs with 0 ONLY in the specific column(s)
# (Replace 'count_column_name' with your actual column name)
merged_df[['added_tests','modified_tests','removed_tests','renamed_tests']] = merged_df[['added_tests','modified_tests',
                                                                                                     'removed_tests','renamed_tests']].fillna(0)

# (Optional) If you want the column to be integers (instead of float 6.0)
merged_df[['added_tests','modified_tests','removed_tests','renamed_tests']] = merged_df[['added_tests','modified_tests',
                                                                                                     'removed_tests','renamed_tests']].astype(int)
prs_with_files = merged_df[merged_df['#_of_files'] > 0]
prs_without_files = merged_df[merged_df['#_of_files'] == 0]

colunas_de_teste = ['added_tests', 'modified_tests', 'removed_tests']

# 1. Sum the columns (treat NaNs as 0)
soma_testes = merged_df[colunas_de_teste].sum(axis=1)

# 2. Create the final column (True if sum is greater than 0)
merged_df['has_modified_test'] = (soma_testes > 0)

merged_df.to_parquet(r'output_files\fix_prs_with_issues_and_files_and_tests.parquet')
merged_df.sort_values('#_of_files')

In [None]:
prs_with_test_list = prs_and_changes[prs_and_changes['is_test_file'] == True]['id'].unique().tolist()
prs_with_identified_test = prs[prs['id'].isin(prs_with_test_list)]
prs_without_identified_test = prs[~prs['id'].isin(prs_with_test_list)]
print('Total PRs & fix & issues:',len(prs))
print('Total PRs with files:',len(prs_with_files))
print("Total PRs without files:", len(prs_without_files))
print('Total PRs of fix & issues & tests:',len(prs_with_identified_test))
print("Total PRs of fix & issues & ~tests:", len(prs_without_identified_test))

In [None]:
merged_df['modified_files_list'] = merged_df['modified_files'].apply(converter_para_lista_json_robusta)

def explode_df(merged_df):
    extensions_df_exploded = merged_df.explode('modified_files_list')
    extensions_normalized_cols = pd.json_normalize(extensions_df_exploded['modified_files_list'])
    extensions_normalized_cols.index = extensions_df_exploded.index
    to_be_extensions_df = pd.concat(
        [extensions_df_exploded.drop(['modified_files'], axis=1), extensions_normalized_cols],
        axis=1
    )
    return to_be_extensions_df
    
to_be_extensions_df = explode_df(merged_df)
to_be_extensions_df

In [None]:
def extrair_extensao(caminho):
    # Checks if it is a string (ignores NaNs, Nones, etc.)
    if isinstance(caminho, str):
        # Gets the extension part (ex: '.py')
        return os.path.splitext(caminho)[1]
    # Returns None if input is not a string
    return None

def extensions_count(df):
    df['extensao'] = df['filename'].apply(extrair_extensao)
    extensions_count = df['extensao'].value_counts(dropna=False).reset_index()
    extensions_count.columns = ['extensao', 'count']
    extensions_count['extensao'] = extensions_count['extensao'].fillna('No file')
    extensions_count['extensao'] = extensions_count['extensao'].replace('', 'No extension')
    return extensions_count
extensions_count_fix = extensions_count(to_be_extensions_df)
extensions_count_fix.to_csv(r'output_files\files_extesions_count.csv',index = False)
extensions_count_fix

In [None]:
display(extensions_count_fix[extensions_count_fix['extensao']== 'No extension'])
display(extensions_count_fix[extensions_count_fix['extensao']== 'No file'])
#extensions_count['count'].sum()
extensions_count_fix['count'].sum()

In [None]:
unique_files = to_be_extensions_df[['filename','extensao']].drop_duplicates().groupby('extensao').count().reset_index()
unique_files['extensao'] = unique_files['extensao'].fillna('No file')
unique_files['extensao'] = unique_files['extensao'].replace('', 'No extension')
unique_files.sort_values('filename')
unique_files.sort_values('filename').to_csv(r'output_files\unique_files_extesions_count.csv',index = False)

In [None]:
exploded_test_df = explode_df(merged_df[merged_df['has_modified_test']])
extensions_count_test = extensions_count(exploded_test_df)
extensions_count_test.to_csv(r'output_files\(test)_files_extesions_count.csv',index = False)
display(extensions_count_test)
display(extensions_count_test[extensions_count_test['extensao']== 'No extension'])
extensions_count_test['count'].sum()

In [None]:
test_unique_files = exploded_test_df[['filename','extensao']].drop_duplicates().groupby('extensao').count().reset_index()
test_unique_files['extensao'] = test_unique_files['extensao'].fillna('No file')
test_unique_files['extensao'] = test_unique_files['extensao'].replace('', 'No extension')
test_unique_files.sort_values('filename').to_csv(r'output_files\(test)_unique_files_extesions_count.csv',index = False)
#unique_files.sort_values('filename')