In [None]:
import numpy as np
import pandas as pd
import random
from collections import defaultdict

In [None]:
pd.read_csv("Commons_codec.csv").columns

Index(['test_class_name', 'cycle_id', 'tests', 'failures_0', 'failures_1',
       'failures_2', 'failures_3', 'current_failures', 'failures_%', 'time_0',
       'time', 'time_since', 'AvgCyclomatic', 'AvgCyclomaticModified',
       'AvgCyclomaticStrict', 'AvgEssential', 'AvgLine', 'AvgLineBlank',
       'AvgLineCode', 'AvgLineComment', 'CountDeclClass',
       'CountDeclClassMethod', 'CountDeclClassVariable',
       'CountDeclExecutableUnit', 'CountDeclFunction',
       'CountDeclInstanceMethod', 'CountDeclInstanceVariable',
       'CountDeclMethod', 'CountDeclMethodDefault', 'CountDeclMethodPrivate',
       'CountDeclMethodProtected', 'CountDeclMethodPublic', 'CountLine',
       'CountLineBlank', 'CountLineCode', 'CountLineCodeDecl',
       'CountLineCodeExe', 'CountLineComment', 'CountSemicolon', 'CountStmt',
       'CountStmtDecl', 'CountStmtExe', 'MaxCyclomatic',
       'MaxCyclomaticModified', 'MaxCyclomaticStrict', 'MaxEssential',
       'MaxNesting', 'RatioCommentToCode', 'SumCy

In [None]:


class PairwiseEnv:
    """
    A reinforcement learning environment for prioritizing test cases using a pairwise comparison approach.

    Attributes:
        data (DataFrame): The test case dataset.
        test_cases (ndarray): NumPy array representation of the dataset.
        n (int): Total number of test cases.
        idx0 (int): Index of the first test case in a comparison.
        idx1 (int): Index of the second test case in a comparison.
        done (bool): Indicates whether all test cases have been compared.
    """
    def __init__(self, data):
        self.data = data
        self.test_cases = self.data.to_numpy()
        self.n = len(self.test_cases)
        self.idx0, self.idx1 = 0, 1
        self.done = False

    def reset(self):
        """Resets the environment by shuffling test cases and restarting the comparison process."""
        np.random.shuffle(self.test_cases)
        self.idx0, self.idx1 = 0, 1
        self.done = False
        return self._get_obs()

    def _get_obs(self):
        """Returns the current pair of test cases being compared."""
        return self.test_cases[self.idx0], self.test_cases[self.idx1]

    def step(self, action):
        """
        Executes the selected action and moves to the next test case pair.

        Parameters:
            action (int): 0 to keep order, 1 to swap test cases.

        Returns:
            tuple: (next_state, reward, done) where:
                - next_state is the next test case pair.
                - reward is the score assigned based on the action.
                - done is True if all test cases have been processed.
        """
        reward = self._calc_reward(action)

        if action == 1:
            self.test_cases[[self.idx0, self.idx1]] = self.test_cases[[self.idx1, self.idx0]]

        if self.idx1 < self.n - 1:
            self.idx1 += 1
        elif self.idx0 < self.n - 2:
            self.idx0 += 1
            self.idx1 = self.idx0 + 1
        else:
            self.done = True

        return self._get_obs(), reward, self.done

    def _calc_reward(self, action):
        """
        Calculates the reward for an action based on test case failure rates and execution time.

        Parameters:
            action (int): 0 or 1, determining the order of test cases.

        Returns:
            float: The reward value based on prioritization rules.
        """
        sel = self.test_cases[self.idx0] if action == 1 else self.test_cases[self.idx1]
        non_sel = self.test_cases[self.idx1] if action == 1 else self.test_cases[self.idx0]

        if sel[0] > non_sel[0]:
            return 1  # Prefer failing test cases
        elif sel[0] == non_sel[0]:
            return 0.5 if sel[2] <= non_sel[2] else 0
        else:
            return 0

    def compute_apfd(self, failure_positions, num_failures):
        """
        Computes the Average Percentage of Faults Detected (APFD) metric.

        Parameters:
            failure_positions (list): Positions of failing test cases.
            num_failures (int): Total number of failing test cases.

        Returns:
            float: The APFD score.
        """
        if num_failures == 0 or self.n <= 1:
            return 1

        failure_positions = sorted(failure_positions)
        apfd = 1 - (sum(failure_positions) / (num_failures * self.n)) + (1 / (2 * self.n))
        return apfd

    def compute_nrpa(self, failure_positions):
        """
        Computes the Normalized Rank Percentile Average (NRPA) metric.

        Parameters:
            failure_positions (list): Positions of failing test cases.

        Returns:
            float: The NRPA score.
        """
        num_failures = len(failure_positions)
        if num_failures == 0 or self.n <= 1:
            return 0
        nrpa = sum(1 - (pos / (self.n - 1)) for pos in failure_positions) / num_failures
        return nrpa

    def evaluate(self):
        """
        Evaluates the test case prioritization using APFD and NRPA.

        Returns:
            tuple: (APFD score, NRPA score)
        """
        failing_tests = self.data[self.data["failures_%"] > 0].index.tolist()
        rank_positions = [np.where(self.test_cases == self.data.iloc[test_idx].to_numpy())[0][0] + 1
                          for test_idx in failing_tests]

        apfd_score = self.compute_apfd(rank_positions, len(failing_tests))
        nrpa_score = self.compute_nrpa(rank_positions)
        return apfd_score, nrpa_score

def choose_action(Q, state_tuple, epsilon):
    """
    Chooses an action using an epsilon-greedy strategy.

    Parameters:
        Q (dict): Q-table mapping states to action values.
        state_tuple (tuple): Current state represented as a tuple.
        epsilon (float): Probability of choosing a random action (exploration).

    Returns:
        int: Selected action (0 or 1).
    """
    if random.random() > epsilon:
        return np.argmax(Q[state_tuple])  # Exploitation: Choose best action
    else:
        return random.choice([0, 1])  # Exploration: Choose randomly

# Load dataset
df = pd.read_csv("Commons_codec.csv")
selected_features = ["current_failures", "failures_%", "time", "AvgCyclomatic", "MaxNesting"]
df = df[selected_features]

# Train Q-learning agent
env = PairwiseEnv(df)
Q = defaultdict(lambda: np.zeros(2))  # Q-table with two actions (0,1)
alpha=0.5  # Learning rate
gamma=0.9 # Discount factor
epsilon = 0.1 #Exploration rate

for episode in range(2):
    state = env.reset()
    done = False
    while not done:
        state_tuple = tuple(state[0]) + tuple(state[1])
        action = choose_action(Q, state_tuple, epsilon)
        next_state, reward, done = env.step(action)
        next_state_tuple = tuple(next_state[0]) + tuple(next_state[1])
        Q[state_tuple][action] += alpha * (reward + gamma * np.max(Q[next_state_tuple]) - Q[state_tuple][action])

print("Training complete.")

# Evaluate performance
apfd_score, nrpa_score = env.evaluate()
print(f"APFD Score: {apfd_score}")
print(f"NRPA Score: {nrpa_score}")


Training complete.
APFD Score: 0.9998422712933753
NRPA Score: 0.9996844430419691
