In [None]:
import os
import zipfile
import numpy as np
import pandas as pd
from sklearn.metrics import precision_score, recall_score, accuracy_score, f1_score, matthews_corrcoef
from sklearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline as ImbPipeline
from sklearn.model_selection import StratifiedKFold
from sklearn.feature_extraction.text import CountVectorizer
import pandas as pd
import numpy as np
from itertools import product
import os

# Function to extract zip files
def extract_zip(zip_file, extract_to):
    """Extracts a zip file to the specified directory."""
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

# Function to read .py files from the given directory
def getDataPoints(path):
    """Collects content of all .py files within the given directory."""
    dataPointsList = []
    if not os.path.exists(path):
        print(f"Directory does not exist: {path}")
        return dataPointsList

    for root, dirs, files in os.walk(path):
        for dataPointName in files:
            if dataPointName.endswith(".py"):
                file_path = os.path.join(root, dataPointName)
                with open(file_path, encoding="utf-8") as fileIn:
                    dp = fileIn.read().strip()
                    if dp:
                        dataPointsList.append(dp)
                    else:
                        print(f"Empty file skipped: {file_path}")
    return dataPointsList
# Custom MCC scorer function
def mcc_scorer(estimator, X, y_true):
    y_pred = estimator.predict(X)
    return matthews_corrcoef(y_true, y_pred)
# Paths to your zip files (update these paths as necessary)
flakyZip = "/content/flaky_files.zip"
nonFlakyZip = "/content/all_nonflaky_files.zip"

# Create directories for extraction
extractDir = "extracted_files"
flakyDir = os.path.join(extractDir, 'flaky')
nonFlakyDir = os.path.join(extractDir, 'nonFlaky')

# Ensure directories exist for flaky and non-flaky files
os.makedirs(flakyDir, exist_ok=True)
os.makedirs(nonFlakyDir, exist_ok=True)

# Extract the zip files to their respective directories
extract_zip(flakyZip, flakyDir)
extract_zip(nonFlakyZip, nonFlakyDir)

# Collect data points (Python files) from both flaky and non-flaky directories
dataPointsFlaky = getDataPoints(flakyDir)
dataPointsNonFlaky = getDataPoints(nonFlakyDir)

# Combine the data points from both classes (flaky and non-flaky)
dataPoints = dataPointsFlaky + dataPointsNonFlaky

# Create labels: 1 for flaky files and 0 for non-flaky files
dataLabelsList = np.array([1] * len(dataPointsFlaky) + [0] * len(dataPointsNonFlaky))

# Output some basic information
print(f"Number of flaky files: {len(dataPointsFlaky)}")
print(f"Number of non-flaky files: {len(dataPointsNonFlaky)}")
print(f"Total number of data points: {len(dataPoints)}")


Number of flaky files: 45
Number of non-flaky files: 243
Total number of data points: 288


Random Forest

In [None]:
from sklearn.ensemble import RandomForestClassifier



# Summarize and find best parameter combinations
def summarize_and_find_best(df_results):
    summary_df = df_results.groupby(['rf__n_estimators', 'rf__max_depth', 'rf__min_samples_split', 'rf__min_samples_leaf', 'rf__criterion', 'threshold']).agg({
        'accuracy': 'mean',
        'precision': 'mean',
        'recall': 'mean',
        'f1': 'mean',
        'mcc': 'mean'
    }).reset_index()

    # Finding the best parameter set based on the highest F1 score
    best_row = summary_df.loc[summary_df['f1'].idxmax()]

    return summary_df, best_row

# Combined SMOTE and Threshold-based Random Forest 
def runRFWithSMOTEAndThreshold(dataPoints, y, outDir, n_splits, param_grid, thresholds):
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    metrics_per_combination = []

    for params in product(*param_grid.values()):
        param_dict = dict(zip(param_grid.keys(), params))

        # Define pipeline with Vectorizer, SMOTE, and Random Forest
        pipeline = ImbPipeline([
            ('vectorizer', CountVectorizer(stop_words=None)),  # Vectorizer to convert text data to numerical
            ('smote', SMOTE(random_state=42)),
            ('rf', RandomForestClassifier(
                n_estimators=param_dict['rf__n_estimators'],
                max_depth=param_dict['rf__max_depth'],
                min_samples_split=param_dict['rf__min_samples_split'],
                min_samples_leaf=param_dict['rf__min_samples_leaf'],
                criterion=param_dict['rf__criterion'],
                random_state=42,
                n_jobs=-1
            ))
        ])

        for fold, (train_index, test_index) in enumerate(skf.split(dataPoints, y)):
            X_train = [dataPoints[i] for i in train_index]
            X_test = [dataPoints[i] for i in test_index]
            y_train, y_test = y[train_index], y[test_index]

            # Train the pipeline
            pipeline.fit(X_train, y_train)

            # Predict probabilities
            y_pred_proba = pipeline.predict_proba(X_test)[:, 1]  

            for threshold in thresholds:
                y_pred_threshold = (y_pred_proba >= threshold).astype(int)

                # Calculate metrics
                accuracy = accuracy_score(y_test, y_pred_threshold)
                precision = precision_score(y_test, y_pred_threshold, zero_division=1)
                recall = recall_score(y_test, y_pred_threshold, zero_division=1)
                f1 = f1_score(y_test, y_pred_threshold, zero_division=1)
                mcc = matthews_corrcoef(y_test, y_pred_threshold)

                # Store results
                metrics_per_combination.append({
                    'rf__n_estimators': param_dict['rf__n_estimators'],
                    'rf__max_depth': param_dict['rf__max_depth'],
                    'rf__min_samples_split': param_dict['rf__min_samples_split'],
                    'rf__min_samples_leaf': param_dict['rf__min_samples_leaf'],
                    'rf__criterion': param_dict['rf__criterion'],
                    'fold': fold + 1,
                    'threshold': threshold,
                    'accuracy': accuracy,
                    'precision': precision,
                    'recall': recall,
                    'f1': f1,
                    'mcc': mcc
                })

    # Convert metrics to DataFrame and save results
    df_results = pd.DataFrame(metrics_per_combination)
    os.makedirs(outDir, exist_ok=True)
    outFile = os.path.join(outDir, "rf-smote-threshold-results.csv")
    df_results.to_csv(outFile, index=False)

    # Summarize results and return the best parameter set
    summary_df, best_params = summarize_and_find_best(df_results)

    # Save summarized results to a new CSV file
    summary_outFile = os.path.join(outDir, "rf-smote-threshold-summary.csv")
    summary_df.to_csv(summary_outFile, index=False)
    print(f"Summary of results saved to {summary_outFile}")

    return df_results, best_params

# Hyperparameters and thresholds
param_grid = {
    'rf__n_estimators': [50, 100, 200],                   # Number of trees in the forest
    'rf__max_depth': [10, 20, 30],                        # Maximum depth of the tree
    'rf__min_samples_split': [5, 10],                     # Minimum number of samples required to split a node
    'rf__min_samples_leaf': [2, 5],                       # Minimum number of samples required at a leaf node
    'rf__criterion': ['gini', 'entropy']                  # Function to measure the quality of a split
}

thresholds = np.linspace(0.1, 0.9, 9)  # Thresholds from 0.1 to 0.9

outDir = "results/"
df_results, best_params = runRFWithSMOTEAndThreshold(dataPoints, dataLabelsList, outDir, n_splits=5, param_grid=param_grid, thresholds=thresholds)

print("\nRandom Forest with SMOTE and Threshold analysis completed.")
print("Best parameter set based on F1-score:")
print(best_params)


Summary of results saved to results/rf-smote-threshold-summary.csv

Random Forest with SMOTE and Threshold analysis completed.
Best parameter set based on F1-score:
rf__n_estimators              200
rf__max_depth                  20
rf__min_samples_split           5
rf__min_samples_leaf            2
rf__criterion             entropy
threshold                     0.5
accuracy                 0.947973
precision                0.863636
recall                        0.8
f1                       0.822353
mcc                      0.798379
Name: 508, dtype: object


Decision Tree

In [None]:
from sklearn.tree import DecisionTreeClassifier

# Summarize and find best parameter combinations
def summarize_and_find_best(df_results):
    summary_df = df_results.groupby(['dt__max_depth', 'dt__min_samples_split', 'dt__min_samples_leaf', 'dt__criterion', 'dt__max_features', 'threshold']).agg({
        'accuracy': 'mean',
        'precision': 'mean',
        'recall': 'mean',
        'f1': 'mean',
        'mcc': 'mean'
    }).reset_index()

    # Finding the best parameter set based on the highest F1 score
    best_row = summary_df.loc[summary_df['f1'].idxmax()]

    return summary_df, best_row

# Combined SMOTE and Threshold-based Decision Tree 
def runDTWithSMOTEAndThreshold(dataPoints, y, outDir, n_splits, param_grid, thresholds):
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    metrics_per_combination = []

    for params in product(*param_grid.values()):
        param_dict = dict(zip(param_grid.keys(), params))

        # Define pipeline with Vectorizer, SMOTE, and Decision Tree
        pipeline = ImbPipeline([
            ('vectorizer', CountVectorizer(stop_words=None)),
            ('smote', SMOTE(random_state=42)),
            ('dt', DecisionTreeClassifier(
                max_depth=param_dict['dt__max_depth'],
                min_samples_split=param_dict['dt__min_samples_split'],
                min_samples_leaf=param_dict['dt__min_samples_leaf'],
                criterion=param_dict['dt__criterion'],
                max_features=param_dict['dt__max_features'],  # max_features parameter added
                random_state=42
            ))
        ])

        for fold, (train_index, test_index) in enumerate(skf.split(dataPoints, y)):
            X_train = [dataPoints[i] for i in train_index]
            X_test = [dataPoints[i] for i in test_index]
            y_train, y_test = y[train_index], y[test_index]

            # Train the pipeline
            pipeline.fit(X_train, y_train)

            # Predict probabilities
            y_pred_proba = pipeline.predict_proba(X_test)[:, 1]  # Decision Tree probability for positive class

            for threshold in thresholds:
                y_pred_threshold = (y_pred_proba >= threshold).astype(int)

                # Calculate metrics
                accuracy = accuracy_score(y_test, y_pred_threshold)
                precision = precision_score(y_test, y_pred_threshold, zero_division=1)
                recall = recall_score(y_test, y_pred_threshold, zero_division=1)
                f1 = f1_score(y_test, y_pred_threshold, zero_division=1)
                mcc = matthews_corrcoef(y_test, y_pred_threshold)

                # Store results
                metrics_per_combination.append({
                    'dt__max_depth': param_dict['dt__max_depth'],
                    'dt__min_samples_split': param_dict['dt__min_samples_split'],
                    'dt__min_samples_leaf': param_dict['dt__min_samples_leaf'],
                    'dt__criterion': param_dict['dt__criterion'],
                    'dt__max_features': param_dict['dt__max_features'],  # Store max_features in results
                    'fold': fold + 1,
                    'threshold': threshold,
                    'accuracy': accuracy,
                    'precision': precision,
                    'recall': recall,
                    'f1': f1,
                    'mcc': mcc
                })

    # Convert metrics to DataFrame and save results
    df_results = pd.DataFrame(metrics_per_combination)
    os.makedirs(outDir, exist_ok=True)
    outFile = os.path.join(outDir, "dt-smote-threshold-results.csv")
    df_results.to_csv(outFile, index=False)

    # Summarize results and return the best parameter set
    summary_df, best_params = summarize_and_find_best(df_results)

    # Save summarized results to a new CSV file
    summary_outFile = os.path.join(outDir, "dt-smote-threshold-summary.csv")
    summary_df.to_csv(summary_outFile, index=False)
    print(f"Summary of results saved to {summary_outFile}")

    return df_results, best_params

# Hyperparameters and thresholds
param_grid = {
    'dt__max_depth': [10, 20, 30],                  # Maximum depth of the decision tree
    'dt__min_samples_split': [5, 10],               # Minimum number of samples required to split a node
    'dt__min_samples_leaf': [2, 5],                 # Minimum number of samples required at a leaf node
    'dt__criterion': ['gini', 'entropy'],           # Function to measure the quality of a split
    'dt__max_features': [None, 'sqrt', 'log2'],     # Controls how many features to consider for splits
}

thresholds = np.linspace(0.1, 0.9, 9)  # Thresholds from 0.1 to 0.9

outDir = "results/"
df_results, best_params = runDTWithSMOTEAndThreshold(dataPoints, dataLabelsList, outDir, n_splits=5, param_grid=param_grid, thresholds=thresholds)

print("\nDecision Tree with SMOTE and Threshold analysis completed.")
print("Best parameter set based on F1-score:")
print(best_params)


Summary of results saved to results/dt-smote-threshold-summary.csv

Decision Tree with SMOTE and Threshold analysis completed.
Best parameter set based on F1-score:
dt__max_depth                  10
dt__min_samples_split           5
dt__min_samples_leaf            2
dt__criterion                gini
dt__max_features             sqrt
threshold                     0.4
accuracy                 0.902722
precision                0.664747
recall                   0.755556
f1                       0.705146
mcc                      0.650699
Name: 30, dtype: object
