In [None]:
import os
import re
import time
import json
import math
import random
import warnings
import numpy as np
import pandas as pd
from typing import List, Tuple
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import (
    f1_score,
    accuracy_score,
    precision_score,
    recall_score,
    classification_report
)
import math

from scipy.spatial.distance import euclidean, cityblock, cosine


warnings.filterwarnings("ignore")
PATH_EVU = "experiment/data/evolution"
os.makedirs(PATH_EVU, exist_ok=True)

class TwoClassTargetDPG_GA:
    def __init__(self,
                 population_size,
                 mutation_rate,
                 crossover_rate,
                 model,
                 target_class,
                 border_class,
                 n_classes,
                 class_bounds,
                 augmentation_mode,  # "traditional" or "border"
                 boundary_points,
                 boundary_weight,
                 max_other_prob,
                 diversity_weight,
                 re_inject_threshold,  # not used
                 re_inject_ratio,        # not used
                 repulsion_weight,
                 feature_order,
                 random_seed,
                 default_intervals,
                 sample,
                 distance_factor,
                 sparsity_factor,
                 constraints_factor):
        
        random.seed(random_seed)
        np.random.seed(random_seed)
        self.population_size = population_size
        self.mutation_rate = mutation_rate
        self.crossover_rate = crossover_rate
        self.model = model
        # self.target_class = target_class % n_classes
        # self.border_class = border_class % n_classes
        self.target_class = target_class
        self.border_class = border_class
        self.n_classes = n_classes
        self.class_bounds = class_bounds
        self.augmentation_mode = augmentation_mode
        self.boundary_points = boundary_points  # not used in "traditional" mode
        self.boundary_weight = boundary_weight
        self.max_other_prob = max_other_prob
        self.diversity_weight = diversity_weight
        self.re_inject_threshold = re_inject_threshold
        self.re_inject_ratio = re_inject_ratio
        self.repulsion_weight = repulsion_weight
        self.feature_order = feature_order
        # Use provided bounds if available; otherwise fallback to default_intervals.
        self.target_intervals = self.class_bounds.get(f"Class {self.target_class}", default_intervals)
        self.sample = sample
        self.distance_factor = distance_factor
        self.sparsity_factor = sparsity_factor
        self.constraints_factor = constraints_factor
        
    # def random_individual(self):
    #     ind = [random.uniform(self.target_intervals[feat][0], self.target_intervals[feat][1])
    #            for feat in self.feature_order]
    #     return np.array(ind) if self.augmentation_mode == "traditional" else ind
    def random_individual(self):
        original = self.sample.flatten()[:-1]  # Assuming last value is the target
        num_features = len(original)
        min_changes = int(0.05 * num_features)
        max_changes = int(0.1 * num_features)
        num_changes = random.randint(min_changes, max_changes)
        
        # Choose random feature indices to change
        change_indices = random.sample(range(num_features), num_changes)
        
        # Copy original
        new_individual = original.copy()
        
        # Apply random changes to selected features
        for i in change_indices:
            feat_name = self.feature_order[i]
            low, high = self.target_intervals[feat_name]
            new_individual[i] = random.uniform(low, high)
    
        return new_individual if self.augmentation_mode == "traditional" else new_individual.tolist()

    def initialize_population(self):
        if self.augmentation_mode == "traditional":
            return np.array([self.random_individual() for _ in range(self.population_size)])
        else:
            return [self.random_individual() for _ in range(self.population_size)]

    def mutate(self, pop: np.ndarray) -> np.ndarray:
        for i in range(len(pop)):
            if random.random() < self.mutation_rate:
                for j, feat in enumerate(self.feature_order):
                    low, high = self.target_intervals[feat]
                    r = (high - low) * 0.05
                    delta = random.uniform(-r, r)
                    pop[i, j] += delta
                    pop[i, j] = max(low, min(pop[i, j], high))
        return pop
    # check size
    def mutate_individual(self, ind: List[float]) -> List[float]:
        new_ind = ind.copy()
        for j, feat in enumerate(self.feature_order):
            low, high = self.target_intervals[feat]
            r = (high - low) * 0.05
            delta = random.uniform(-r, r)
            new_ind[j] += delta
            new_ind[j] = max(low, min(new_ind[j], high))
        return new_ind

    def crossover(self, pop: np.ndarray) -> np.ndarray:
        children = []
        for i in range(0, len(pop), 2):
            p1, p2 = pop[i], pop[i + 1]
            if random.random() < self.crossover_rate:
                point = random.randint(1, len(p1) - 1)
                c1 = np.concatenate([p1[:point], p2[point:]])
                c2 = np.concatenate([p2[:point], p1[point:]])
                children.extend([c1, c2])
            else:
                children.extend([p1, p2])
        return np.array(children)

    def crossover_individuals(self, ind1: List[float], ind2: List[float]) -> Tuple[List[float], List[float]]:
        point = random.randint(1, len(ind1) - 1)
        c1 = ind1[:point] + ind2[point:]
        c2 = ind2[:point] + ind1[point:]
        return c1, c2

    def fitness_function_traditional(self, pop: np.ndarray) -> np.ndarray:
        fitness = np.zeros(len(pop))        
        sample_features = self.sample.flatten()[:-1]  # remove target
        
        for i, individual in enumerate(pop):
            X_pred = individual.reshape(1, -1)
            predicted_class = self.model.predict(X_pred)
            constraints_score = round(self.calculate_constraint_violation(individual),4)
            distance_score = round(self.calculate_distance(sample_features, individual), 4)
            sparsity_score = round(self.calculate_sparsity(sample_features, individual), 4)
            
            fitness[i] += (1 - constraints_score) * self.constraints_factor
            fitness[i] += distance_score * self.distance_factor
            fitness[i] += (1 - sparsity_score) * self.sparsity_factor
            
            # print(f"Constraint Violation: {(1 - constraints_score) * self.constraints_factor}")
            # print(f"Distance: {distance_score * self.distance_factor}")
            # print(f"Sparsity: {(1 - sparsity_score) * self.sparsity_factor}")
            # print(f"Predicted Class: {predicted_class} Minority Class: {self.target_class}")
            if predicted_class != self.target_class:
                fitness[i] *= -1
        return fitness

    def base_border_fitness(self, ind: List[float]) -> float:
        INVALID_FITNESS = -50.0
        for feat in self.feature_order:
            low, high = self.target_intervals[feat]
            val = ind[self.feature_order.index(feat)]
            if not (low <= val <= high):
                return INVALID_FITNESS
        arr2d = np.array(ind)
        probs = self.model.predict_proba([arr2d])[0]
        pred = self.model.predict([arr2d])[0]
        if pred != self.target_class:
            return INVALID_FITNESS
        p_t = probs[self.target_class]
        p_b = probs[self.border_class]
        boundary_score = 1.0 - abs(p_t - p_b)
        for c in range(self.n_classes):
            if c not in (self.target_class, self.border_class):
                if probs[c] > self.max_other_prob:
                    return INVALID_FITNESS
        return 5.0 * p_t + 3.0 * boundary_score

    def individual_diversity(self, ind: List[float], pop: List[List[float]]) -> float:
        distances = [np.linalg.norm(np.array(ind) - np.array(other)) for other in pop if other != ind]
        return np.mean(distances) if distances else 0.0

    def min_distance_to_others(self, ind: List[float], pop: List[List[float]]) -> float:
        distances = [np.linalg.norm(np.array(ind) - np.array(other)) for other in pop if other != ind]
        return min(distances) if distances else 0.0

    def distance_to_boundary_line(self, ind: List[float]) -> float:
        return 0.05

    def compute_population_diversity(self, pop: List[List[float]]) -> float:
        if len(pop) < 2:
            return 0.0
        distances = [np.linalg.norm(np.array(ind1) - np.array(ind2))
                     for i, ind1 in enumerate(pop) for ind2 in pop[i+1:]]
        return np.mean(distances)

    def total_fitness(self, ind: List[float], pop: List[List[float]]) -> float:
        base = self.base_border_fitness(ind)
        if base < -40:
            return base
        div = self.individual_diversity(ind, pop)
        div_bonus = self.diversity_weight * div
        min_d = self.min_distance_to_others(ind, pop)
        rep_bonus = self.repulsion_weight * min_d
        dist_line = self.distance_to_boundary_line(ind)
        line_bonus = 1.0 / (1.0 + dist_line) * self.boundary_weight
        penalty = -50.0 if dist_line > 0.1 else 0.0
        return base + div_bonus + rep_bonus + line_bonus + penalty

    def get_fitness(self, ind: List[float], pop: List[List[float]]) -> float:
        if self.augmentation_mode == "traditional":
            arr = np.array(ind).reshape(1, -1)
            return self.fitness_function_traditional(arr)[0]
        else:
            return self.total_fitness(ind, pop)

    def select_parents_tournament(self, pop: List[List[float]], t_size=3):
        def tournament():
            candidates = random.sample(pop, t_size)
            return max(candidates, key=lambda c: self.get_fitness(c, pop))
        return tournament(), tournament()
        
    def calculate_sparsity(self, original_sample: np.ndarray, individual_sample: np.ndarray) -> float:
        # Ensure inputs are 1D arrays
        original_sample = original_sample.flatten()
        individual_sample = individual_sample.flatten()
        
        # Count how many features differ
        changed_features = np.sum(original_sample != individual_sample)
        
        # Ratio of changed features
        # 1 -> every feature changed
        # 0 -> none
        sparsity = changed_features / len(original_sample)
        return sparsity
    
    def calculate_distance(self, original_sample: np.ndarray, individual_sample: np.ndarray) -> float:
        original_sample = np.asarray(original_sample)
        individual_sample = np.asarray(individual_sample)

        # Avoid division by zero
        denominator = np.where(original_sample == 0, 1e-8, original_sample)
        
        relative_distance = np.abs((individual_sample - original_sample) / denominator)
        return np.mean(relative_distance)
    
    def calculate_constraint_violation(self, individual: np.ndarray) -> float:
        """
        Calculates the percentage of features that are outside the allowed target_intervals.
        
        Parameters:
            individual (np.ndarray): 1D array of feature values for a candidate individual.
        
        Returns:
            float: Ratio (0 to 1) of features that violate their interval constraints.
        """
        violations = 0
        total_features = len(self.feature_order)

        for i, feat in enumerate(self.feature_order):
            low, high = self.target_intervals[feat]
            if not (low <= individual[i] <= high):
                violations += 1

        return violations / total_features

    # class -00 1
    # features 0 a 1
    def evolve(self, generations=20, tournament_size=3, stagnation_limit=5, dataset_name=None, perc=None):
        if self.augmentation_mode == "traditional":
            pop = self.initialize_population()
            # print(f"Population: {len(pop)}")
            # with open("population_dump.txt", "w") as f:
            #     for individual in pop:
            #         f.write(str(individual) + "\n")
            best_fit_so_far = float('-inf')
            no_improve = 0
            log_df = pd.DataFrame(columns=["generation", "fitness", "individual"])
            for gen in range(generations):
                # print(f"Generation: {gen}")
                fitness = self.fitness_function_traditional(pop)
                best_ind = pop[np.argmax(fitness)]
                best_val = np.max(fitness)
                # print(f"Best Ind: {best_val}")
                # print(f"Fitness: {fitness}")

                log_df = pd.concat([
                    log_df,
                    pd.DataFrame([{
                        "generation": gen,
                        "fitness": float(best_val),
                        "individual": best_ind.tolist(),
                    }])
                ], ignore_index=True)

                # print(f"Best Individual {best_ind}")
                new_pop = [best_ind]

                while len(new_pop) < self.population_size:
                    # print(f"NEW POP {new_pop}")
                    p1, p2 = self.select_parents_tournament(list(pop), tournament_size)
                    # print("Tournament")
                    if random.random() < self.crossover_rate:
                        # print("Crossover")
                        # print(f"TYPE: {type(p1)}")
                        c1, c2 = self.crossover_individuals(list(p1), list(p2))
                        # print("Crossover!")
                    else:
                        c1, c2 = p1[:], p2[:]
                    if random.random() < self.mutation_rate:
                        # print(f"C1: {len(c1)}")
                        c1 = self.mutate_individual(c1)
                        # print("Mutation")
                    if random.random() < self.mutation_rate:
                        # print(f"C2: {len(c2)}")
                        c2 = self.mutate_individual(c2)
                        # print("Mutation2")
                    new_pop.extend([c1, c2])
                pop = new_pop[:self.population_size]
                pop = np.array(pop)
                if best_val > best_fit_so_far:
                    best_fit_so_far = best_val
                    no_improve = 0
                else:
                    no_improve += 1
                if no_improve >= stagnation_limit:
                    print(f"[Early Stop Gen {gen}] No improvement for {stagnation_limit} generations.")
                    break
            # Save the log to CSV or Parquet
            log_df.to_csv(f"{PATH_EVU}/{dataset_name}_{perc}_evolution_log.csv", index=False)
            return pop
        else:
            pop = self.initialize_population()
            best_fit_so_far = float('-inf')
            no_improve = 0
            for gen in range(generations):
                best_ind = max(pop, key=lambda c: self.get_fitness(c, pop))
                best_val = self.get_fitness(best_ind, pop)
                new_pop = [best_ind]
                while len(new_pop) < self.population_size:
                    p1, p2 = self.select_parents_tournament(pop, tournament_size)
                    if random.random() < self.crossover_rate:
                        c1, c2 = self.crossover_individuals(p1, p2)
                    else:
                        c1, c2 = p1[:], p2[:]
                    if random.random() < self.mutation_rate:
                        c1 = self.mutate_individual(c1)
                    if random.random() < self.mutation_rate:
                        c2 = self.mutate_individual(c2)
                    new_pop.extend([c1, c2])
                pop = new_pop[:self.population_size]
                if best_val > best_fit_so_far:
                    best_fit_so_far = best_val
                    no_improve = 0
                else:
                    no_improve += 1
                if no_improve >= stagnation_limit:
                    print(f"[Early Stop Gen {gen}] No improvement for {stagnation_limit} generations.")
                    break
            return pop

    def generate_samples(self, num_samples=5, generations=20, dataset_name=None, perc=None) -> np.ndarray:
        results = []
        for i in range(num_samples):
            if self.augmentation_mode == "traditional":
                final_pop = self.evolve(generations=generations, dataset_name=dataset_name, perc=perc)
                trad_fitness = self.fitness_function_traditional(final_pop)
                best_ind = final_pop[np.argmax(trad_fitness)]
            else:
                final_pop = self.evolve(generations=generations, dataset_name=dataset_name, perc=perc)
                best_ind = max(final_pop, key=lambda c: self.get_fitness(c, final_pop))
            results.append(best_ind)
        return np.array(results)

# Violations - Section 6.1

In [None]:
# Violations eval
def evaluate_healthcare_constraints(data):
    """Evaluate constraints for the healthcare dataset."""
    violations = []
    for idx, row in data.iterrows():
        if row['Age'] < 0 or row['Age'] > 120:
            violations.append((idx, 'Invalid Age'))
        if row['BMI'] < 10 or row['BMI'] > 50:
            violations.append((idx, 'Invalid BMI'))
        if row['Cholesterol'] < 0:
            violations.append((idx, 'Invalid Cholesterol'))
    return violations

def evaluate_finance_constraints(data):
    """Evaluate constraints for the finance dataset."""
    violations = []
    for idx, row in data.iterrows():
        if row['Income'] < 0:
            violations.append((idx, 'Negative Income'))
        if row['CreditScore'] < 300 or row['CreditScore'] > 850:
            violations.append((idx, 'Invalid Credit Score'))
        # if row['MaritalStatus'] == 'Single' and row['NumChildren'] > 0:
        #     violations.append((idx, 'Single with Children'))
    return violations

def evaluate_quality_control_constraints(data):
    """Evaluate constraints for the quality control dataset."""
    violations = []
    for idx, row in data.iterrows():
        if row['Temperature'] < 0 or row['Temperature'] > 150:
            violations.append((idx, 'Invalid Temperature'))
        if row['Pressure'] < 0 or row['Pressure'] > 20:
            violations.append((idx, 'Invalid Pressure'))
        if row['Speed'] < 0:
            violations.append((idx, 'Negative Speed'))
        if row['Vibration'] < 0:
            violations.append((idx, 'Negative Vibration'))
    return violations

def evaluate_fraud_detection_constraints(data):
    """Evaluate constraints for the fraud detection dataset."""
    violations = []
    for idx, row in data.iterrows():
        if row['TransactionAmount'] < 0:
            violations.append((idx, 'Negative Transaction Amount'))
        if row['TransactionTime'] < 0:
            violations.append((idx, 'Negative Transaction Time'))
    return violations

def evaluate_energy_constraints(data):
    """Evaluate constraints for the energy dataset."""
    violations = []
    for idx, row in data.iterrows():
        if row['Usage'] < 0:
            violations.append((idx, 'Negative Usage'))
        if row['Voltage'] < 190 or row['Voltage'] > 250:
            violations.append((idx, 'Invalid Voltage'))
    return violations

def evaluate_education_constraints(data):
    """Evaluate constraints for the education dataset."""
    violations = []
    for idx, row in data.iterrows():
        if row['Attendance'] < 0 or row['Attendance'] > 100:
            violations.append((idx, 'Invalid Attendance'))
        if row['StudyHours'] < 0:
            violations.append((idx, 'Negative Study Hours'))
        if row['Grades'] < 0 or row['Grades'] > 100:
            violations.append((idx, 'Invalid Grades'))
    return violations

constraint_evaluators = {
    'healthcare_dataset': evaluate_healthcare_constraints,
    'finance_dataset': evaluate_finance_constraints,
    'quality_control_dataset': evaluate_quality_control_constraints,
    'fraud_detection_dataset': evaluate_fraud_detection_constraints,
    'energy_dataset': evaluate_energy_constraints,
    'education_dataset': evaluate_education_constraints,
}

def evaluate_constraints(dataset_name, data):
    """Evaluate constraints dynamically based on the dataset name."""
    if dataset_name not in constraint_evaluators:
        raise ValueError(f"Unknown dataset: {dataset_name}")

    evaluator = constraint_evaluators[dataset_name]
    return evaluator(data)

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score
import os
import pandas as pd
import numpy as np
import time
import re
import json

PATH = "extract_constraints/data/synthetic_data"
results_path = "experiment/result/"
os.makedirs(PATH, exist_ok=True)

def get_minority_idx(y):
    return list(y.unique()).index(y.value_counts().idxmin())

classifiers = {
        "LogisticRegression": LogisticRegression(random_state=42, max_iter=1000),
        "kNN": KNeighborsClassifier(),
        "DecisionTree": DecisionTreeClassifier(random_state=42)
    }

distance_factor=2
sparsity_factor=1
constraints_factor=3

# for dataset in os.listdir(PATH):
for dataset in ["education_dataset", "energy_dataset", "finance_dataset", "fraud_detection_dataset", "healthcare_dataset", "quality_control_dataset"]:
    print(f"Dataset {dataset}")
    overall_start = time.time()
    
    dataset_path = f"{PATH}/{dataset}"
    
    X_test = pd.read_csv(f"{dataset_path}/X_test.csv")
    y_test = pd.read_csv(f"{dataset_path}/y_test.csv")
    
    y_test = y_test.iloc[:, -1]

    full_data = pd.read_csv(f"{PATH}/{dataset}.csv")  # Replace with actual file name
    
    # Split X and y
    X_full = full_data.iloc[:, :-1]  # all columns except the last
    y_full = full_data.iloc[:, -1]   # the last column is y

    # Remove test rows from full dataset to get training set
    X_train = X_full[~X_full.apply(tuple, axis=1).isin(X_test.apply(tuple, axis=1))]
    numerical_cols = X_train.select_dtypes(include=["number"]).columns.tolist()
    y_train = y_full.loc[X_train.index]
    
    X_train = X_train[numerical_cols]
    features = numerical_cols
    feature_min_max = {feat: (X_train[feat].min(), X_train[feat].max()) for feat in features}
    X_train = X_train.to_numpy()
    
    model = RandomForestClassifier(n_estimators=3, random_state=42, class_weight='balanced')
    model.fit(X_train, y_train)
    
    y_pred = model.predict(X_test)

    # Generate classification report
    report = classification_report(y_test, y_pred)
    # print(report)
    
    with open(f"{dataset_path}/constraints.json", "r") as f:
        constraints = json.load(f)  # constraints is now a Python dict
    
    target_class = f"Class {get_minority_idx(y_train)}"
    print(f"Target Class: {target_class}")
    
    class_counts = pd.Series(y_train).value_counts()
    majority_class = class_counts.idxmax()
    minority_class = class_counts.idxmin()
    majority_count = class_counts.max()
    minority_count = class_counts.min()

    X_test_minority = X_test[y_test == minority_class]
    X_test_minority_np = X_test_minority.to_numpy()

    sample_size = min(1, len(X_test_minority_np)) # just in case we want to pass a group of samples later
    X_test_np = X_test.to_numpy()

    # Sample indices from minority class only
    sample_indices = np.random.default_rng(seed=42).choice(len(X_test_minority_np), size=sample_size, replace=False)
    sample_features = X_test_minority_np[sample_indices]

    # Create corresponding labels
    sample_labels = np.full((sample_size, 1), minority_class)

    # Combine features and label
    sample = np.hstack([sample_features, sample_labels])
    # print(sample)
    # break
    try:
        ga = TwoClassTargetDPG_GA(
                        population_size=30,
                        mutation_rate=0.3,
                        crossover_rate=0.7,
                        model=model,
                        target_class=minority_class,
                        border_class=minority_class,
                        n_classes=len(np.unique(y_train)),
                        class_bounds=constraints,
                        augmentation_mode="traditional",
                        boundary_points=None,
                        boundary_weight=15.0,
                        max_other_prob=0.2,
                        diversity_weight=0.5,
                        re_inject_threshold=0.5,
                        re_inject_ratio=0.2,
                        repulsion_weight=4.0,
                        feature_order=features,
                        random_seed=123,
                        default_intervals=feature_min_max,
                        sample = sample,
                        distance_factor=distance_factor,
                        sparsity_factor=sparsity_factor,
                        constraints_factor=constraints_factor
                        )
        
        augmentation_percentages = [0.05, 0.15, 0.3, 0.5]
        
        for perc in augmentation_percentages:
            violations_list = []
            print(f"===========\n Augmentation Percentage {perc}\n")
            results_list = []
            new_count = int(math.ceil(minority_count * (1 + perc)))
            gen_start = time.time()
            synthetic_data = ga.generate_samples(num_samples=abs(new_count-minority_count), generations=20, dataset_name=dataset, perc=perc)
            
            gen_time = time.time() - gen_start
            X_train_aug = np.vstack((X_train, synthetic_data))
            synthetic_labels = np.full((len(synthetic_data),), minority_class)
            X_train_aug = pd.DataFrame(X_train_aug, columns=features)

            violations = evaluate_constraints(dataset, X_train_aug)            
            violations_dict = {"Dataset": dataset,
                                "Method":'DPG-da',
                                "Aug": perc,
                                "Samples": new_count,
                                "Number of Violation": len(violations),
                                "Violations":violations,
                                "Rep":1}
            violations_list.append(violations_dict)
        
            file_exists = os.path.isfile(f"{results_path}/violations_dpg.csv")
            violations_df = pd.DataFrame(violations_list)
            violations_df.to_csv(f"{results_path}/violations_dpg.csv", index=False, mode="a", header= not file_exists)
    except Exception as e:
        print(f"Dataset {dataset} - {e}")


In [None]:
import pandas as pd
import re

PATH = "experiment/data/violation_data"

def read_keel_dat(filepath):
    attributes = {}
    data_started = False
    data_rows = []

    with open(filepath, 'r') as f:
        for line in f:
            line = line.strip()
            if not line or line.startswith('%'):
                continue  # skip empty/comment lines

            if line.lower().startswith('@attribute'):
                parts = line.split()
                attr_name = parts[1]

                if '{' in line:  # nominal attribute
                    values = re.search(r"\{(.*)\}", line).group(1).split(',')
                    values = [v.strip() for v in values]
                    attributes[attr_name] = {"type": "nominal", "values": values}
                elif 'real' in line or 'integer' in line:
                    rng_match = re.search(r"\[(.*),(.*)\]", line)
                    if rng_match:
                        lo, hi = float(rng_match.group(1)), float(rng_match.group(2))
                        attributes[attr_name] = {"type": "numeric", "range": (lo, hi)}
                    else:
                        attributes[attr_name] = {"type": "numeric", "range": None}

            elif line.lower().startswith('@data'):
                data_started = True

            elif data_started:
                row = [v.strip() for v in line.split(',')]
                data_rows.append(row)

    # Build dataframe
    colnames = list(attributes.keys())
    df = pd.DataFrame(data_rows, columns=colnames)

    # Cast numeric columns properly
    for col, meta in attributes.items():
        if meta["type"] == "numeric":
            df[col] = pd.to_numeric(df[col])

    return df, attributes


def check_violation(instance, attributes):
    """Check if a pandas Series (row) violates attribute constraints."""
    violations = {}
    for col, meta in attributes.items():
        val = instance[col]
        if meta["type"] == "numeric" and meta["range"]:
            lo, hi = meta["range"]
            if val < lo or val > hi:
                violations[col] = f"{val} out of range [{lo},{hi}]"
        elif meta["type"] == "nominal":
            if val not in meta["values"]:
                violations[col] = f"{val} not in {meta['values']}"
    return violations


def check_all_violations(df, attributes):
    """Check all rows in DataFrame for violations and return summary."""
    violation_summary = {col: 0 for col in attributes.keys()}
    detailed = []

    for idx, row in df.iterrows():
        vios = check_violation(row, attributes)
        if vios:
            detailed.append((idx, vios))
            for col in vios.keys():
                violation_summary[col] += 1

    return violation_summary, detailed

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score
import os
import pandas as pd
import numpy as np
import time
import re
import json

PATH = "extract_constraints/data/violation_data"
# PATH = "extract_constraints/data/synthetic_data"
results_path = "experiment/result/"
os.makedirs(PATH, exist_ok=True)

def get_minority_idx(y):
    return list(y.unique()).index(y.value_counts().idxmin())

classifiers = {
        "LogisticRegression": LogisticRegression(random_state=42, max_iter=1000),
        "kNN": KNeighborsClassifier(),
        "DecisionTree": DecisionTreeClassifier(random_state=42)
    }

distance_factor=2
sparsity_factor=1
constraints_factor=3
# violation_datasets = ['iris0','pima','wisconsin','paw02a-600-5-70-BI', '04clover5z-800-7-70-BI', '03subcl5-800-7-50-BI']
violation_datasets = ['paw02a-600-5-70-BI', '04clover5z-800-7-70-BI', '03subcl5-800-7-50-BI']
# violation_datasets = ["education_dataset", "energy_dataset", "finance_dataset", "fraud_detection_dataset", "healthcare_dataset", "quality_control_dataset"]
for dataset in violation_datasets:
    print(f"Dataset {dataset}")
    dataset_path = f"{PATH}/{dataset.replace('.csv','')}"

    overall_start = time.time()
    df, attrs = read_keel_dat(f"{PATH}/{dataset}.dat")
    
    X_test = pd.read_csv(f"{dataset_path}/X_test.csv")
    y_test = pd.read_csv(f"{dataset_path}/y_test.csv")
    
    
    # TODO - investigate this
    y_test = y_test['Class']
    y_test = y_test.astype(str) # 0 to '0'
    
    # Split X and y
    X_full = df.iloc[:, :-1]  # all columns except the last
    y_full = df.iloc[:, -1]   # the last column is y
    
    # Remove test rows from full dataset to get training set
    X_train = X_full[~X_full.apply(tuple, axis=1).isin(X_test.apply(tuple, axis=1))]
    numerical_cols = X_train.select_dtypes(include=["number"]).columns.tolist()
    
    y_train = y_full.loc[X_train.index]
    X_train = X_train[numerical_cols]
    features = numerical_cols
    feature_min_max = {feat: (X_train[feat].min(), X_train[feat].max()) for feat in features}
    X_train = X_train.to_numpy()
    
    model = RandomForestClassifier(n_estimators=3, random_state=42, class_weight='balanced')
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    
    with open(f"{dataset_path}/constraints.json", "r") as f:
        constraints = json.load(f)  # constraints is now a Python dict
    
    target_class = f"Class {get_minority_idx(y_train)}"
    print(f"Target Class: {target_class}")
    
    class_counts = pd.Series(y_train).value_counts()
    majority_class = class_counts.idxmax()
    minority_class = class_counts.idxmin()
    majority_count = class_counts.max()
    minority_count = class_counts.min()

    X_test_minority = X_test[y_test == minority_class]
    X_test_minority_np = X_test_minority.to_numpy()

    sample_size = min(1, len(X_test_minority_np)) # just in case we want to pass a group of samples later
    X_test_np = X_test.to_numpy()

    # Sample indices from minority class only
    sample_indices = np.random.default_rng().choice(len(X_test_minority_np), size=sample_size, replace=False)
    
    sample_features = X_test_minority_np[sample_indices]
    
    # Create corresponding labels
    sample_labels = np.full((sample_size, 1), minority_class, dtype=object)

    # Combine features and label
    sample = np.hstack([sample_features, sample_labels])
    
    print(sample)
    
    try:
        ga = TwoClassTargetDPG_GA(
                        population_size=30,
                        mutation_rate=0.3,
                        crossover_rate=0.7,
                        model=model,
                        target_class=minority_class,
                        border_class=minority_class,
                        n_classes=len(np.unique(y_train)),
                        class_bounds=constraints,
                        augmentation_mode="traditional",
                        boundary_points=None,
                        boundary_weight=15.0,
                        max_other_prob=0.2,
                        diversity_weight=0.5,
                        re_inject_threshold=0.5,
                        re_inject_ratio=0.2,
                        repulsion_weight=4.0,
                        feature_order=features,
                        random_seed=123,
                        default_intervals=feature_min_max,
                        sample = sample,
                        distance_factor=distance_factor,
                        sparsity_factor=sparsity_factor,
                        constraints_factor=constraints_factor
                        )
        
        augmentation_percentages = [0.15, 0.3, 0.5]
        
        for perc in augmentation_percentages:
            violations_list = []
            print(f"===========\n Augmentation Percentage {perc}\n")
            results_list = []
            new_count = int(math.ceil(minority_count * (1 + perc)))
            gen_start = time.time()
            synthetic_data = ga.generate_samples(num_samples=abs(new_count-minority_count), generations=20, dataset_name=dataset, perc=perc)
            
            gen_time = time.time() - gen_start
            X_train_aug = np.vstack((X_train, synthetic_data))
            synthetic_labels = np.full((len(synthetic_data),), minority_class)
            y_train_aug = np.concatenate((y_train, synthetic_labels))

            
            X_train_aug = pd.DataFrame(X_train_aug, columns=features)

            df_aug = pd.DataFrame(X_train_aug, columns=features)
            df_aug["Class"] = y_train_aug

            summary, details = check_all_violations(df_aug, attrs)

            # violations = evaluate_constraints(dataset, X_train_aug)            
            violations_dict = {"Dataset": dataset,
                                "Method":'DPG-da',
                                "Aug": perc,
                                "Samples": new_count,
                                "Number of Violation": len(details),
                                "Violations":details,
                                "Rep":1}
            violations_list.append(violations_dict)
        
            file_exists = os.path.isfile(f"{results_path}/keel_violations_dpg.csv")
            violations_df = pd.DataFrame(violations_list)
            violations_df.to_csv(f"{results_path}/keel_violations_dpg.csv", index=False, mode="a", header= not file_exists)
    except Exception as e:
        print(f"ERROR Dataset {dataset} - {e}")


---

# Performance Section 6.2

In [None]:
import os
import re
import time
import json
import math
import random
import warnings
import numpy as np
import pandas as pd
from typing import List, Tuple
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import (
    f1_score,
    accuracy_score,
    precision_score,
    recall_score,
    classification_report
)
import math

from scipy.spatial.distance import euclidean, cityblock, cosine

np.set_printoptions(precision=3, suppress=True)

warnings.filterwarnings("ignore")
PATH_EVU = "experiment/data/evolution"
os.makedirs(PATH_EVU, exist_ok=True)

class TwoClassTargetDPG_GA:
    def __init__(self,
                 population_size,
                 mutation_rate,
                 crossover_rate,
                 model,
                 target_class,
                 border_class,
                 n_classes,
                 class_bounds,
                 augmentation_mode,  # "traditional" or "border"
                 boundary_points,
                 boundary_weight,
                 max_other_prob,
                 diversity_weight,
                 re_inject_threshold,  # not used
                 re_inject_ratio,        # not used
                 repulsion_weight,
                 feature_order,
                 random_seed,
                 default_intervals,
                 sample,
                 distance_factor,
                 sparsity_factor,
                 constraints_factor):
        random.seed(random_seed)
        np.random.seed(random_seed)
        self.population_size = population_size
        self.mutation_rate = mutation_rate
        self.crossover_rate = crossover_rate
        self.model = model
        self.target_class = target_class % n_classes
        self.border_class = border_class % n_classes
        self.n_classes = n_classes
        self.class_bounds = class_bounds
        self.augmentation_mode = augmentation_mode
        self.boundary_points = boundary_points  # not used in "traditional" mode
        self.boundary_weight = boundary_weight
        self.max_other_prob = max_other_prob
        self.diversity_weight = diversity_weight
        self.re_inject_threshold = re_inject_threshold
        self.re_inject_ratio = re_inject_ratio
        self.repulsion_weight = repulsion_weight
        self.feature_order = feature_order
        # Use provided bounds if available; otherwise fallback to default_intervals.
        self.target_intervals = self.class_bounds.get(f"Class {self.target_class}", default_intervals)
        self.sample = sample
        self.distance_factor = distance_factor
        self.sparsity_factor = sparsity_factor
        self.constraints_factor = constraints_factor
        
    def random_individual(self):
        original = self.sample.flatten()[:-1]  # Assuming last value is the target
        num_features = len(original)
        min_changes = int(0.05 * num_features)
        max_changes = int(0.1 * num_features)
        num_changes = random.randint(min_changes, max_changes)
        
        # Choose random feature indices to change
        change_indices = random.sample(range(num_features), num_changes)
        
        # Copy original
        new_individual = original.copy()
        
        # Apply random changes to selected features
        for i in change_indices:
            feat_name = self.feature_order[i]
            low, high = self.target_intervals[feat_name]
            new_individual[i] = random.uniform(low, high)
    
        return new_individual if self.augmentation_mode == "traditional" else new_individual.tolist()

    def initialize_population(self):
        if self.augmentation_mode == "traditional":
            return np.array([self.random_individual() for _ in range(self.population_size)])
        else:
            return [self.random_individual() for _ in range(self.population_size)]

    def mutate(self, pop: np.ndarray) -> np.ndarray:
        for i in range(len(pop)):
            if random.random() < self.mutation_rate:
                for j, feat in enumerate(self.feature_order):
                    low, high = self.target_intervals[feat]
                    r = (high - low) * 0.05
                    delta = random.uniform(-r, r)
                    pop[i, j] += delta
                    pop[i, j] = max(low, min(pop[i, j], high))
        return pop
    # check size
    def mutate_individual(self, ind: List[float]) -> List[float]:
        new_ind = ind.copy()
        for j, feat in enumerate(self.feature_order):
            low, high = self.target_intervals[feat]
            r = (high - low) * 0.05
            delta = random.uniform(-r, r)
            new_ind[j] += delta
            new_ind[j] = max(low, min(new_ind[j], high))
        return new_ind

    def crossover(self, pop: np.ndarray) -> np.ndarray:
        children = []
        for i in range(0, len(pop), 2):
            p1, p2 = pop[i], pop[i + 1]
            if random.random() < self.crossover_rate:
                point = random.randint(1, len(p1) - 1)
                c1 = np.concatenate([p1[:point], p2[point:]])
                c2 = np.concatenate([p2[:point], p1[point:]])
                children.extend([c1, c2])
            else:
                children.extend([p1, p2])
        return np.array(children)

    def crossover_individuals(self, ind1: List[float], ind2: List[float]) -> Tuple[List[float], List[float]]:
        point = random.randint(1, len(ind1) - 1)
        c1 = ind1[:point] + ind2[point:]
        c2 = ind2[:point] + ind1[point:]
        return c1, c2

    def fitness_function_traditional(self, pop: np.ndarray) -> np.ndarray:
        fitness = np.zeros(len(pop))        
        sample_features = self.sample.flatten()[:-1]  # remove target
        
        for i, individual in enumerate(pop):
            X_pred = individual.reshape(1, -1)
            predicted_class = self.model.predict(X_pred)
            constraints_score = round(self.calculate_constraint_violation(individual),4)
            distance_score = round(self.calculate_distance(sample_features, individual), 4)
            sparsity_score = round(self.calculate_sparsity(sample_features, individual), 4)
            
            fitness[i] += (1 - constraints_score) * self.constraints_factor
            fitness[i] += distance_score * self.distance_factor
            fitness[i] += (1 - sparsity_score) * self.sparsity_factor
            
            if predicted_class != self.target_class:
                fitness[i] *= -1
        return fitness

    def base_border_fitness(self, ind: List[float]) -> float:
        INVALID_FITNESS = -50.0
        for feat in self.feature_order:
            low, high = self.target_intervals[feat]
            val = ind[self.feature_order.index(feat)]
            if not (low <= val <= high):
                return INVALID_FITNESS
        arr2d = np.array(ind)
        probs = self.model.predict_proba([arr2d])[0]
        pred = self.model.predict([arr2d])[0]
        if pred != self.target_class:
            return INVALID_FITNESS
        p_t = probs[self.target_class]
        p_b = probs[self.border_class]
        boundary_score = 1.0 - abs(p_t - p_b)
        for c in range(self.n_classes):
            if c not in (self.target_class, self.border_class):
                if probs[c] > self.max_other_prob:
                    return INVALID_FITNESS
        return 5.0 * p_t + 3.0 * boundary_score

    def individual_diversity(self, ind: List[float], pop: List[List[float]]) -> float:
        distances = [np.linalg.norm(np.array(ind) - np.array(other)) for other in pop if other != ind]
        return np.mean(distances) if distances else 0.0

    def min_distance_to_others(self, ind: List[float], pop: List[List[float]]) -> float:
        distances = [np.linalg.norm(np.array(ind) - np.array(other)) for other in pop if other != ind]
        return min(distances) if distances else 0.0

    def distance_to_boundary_line(self, ind: List[float]) -> float:
        return 0.05

    def compute_population_diversity(self, pop: List[List[float]]) -> float:
        if len(pop) < 2:
            return 0.0
        distances = [np.linalg.norm(np.array(ind1) - np.array(ind2))
                     for i, ind1 in enumerate(pop) for ind2 in pop[i+1:]]
        return np.mean(distances)

    def total_fitness(self, ind: List[float], pop: List[List[float]]) -> float:
        base = self.base_border_fitness(ind)
        if base < -40:
            return base
        div = self.individual_diversity(ind, pop)
        div_bonus = self.diversity_weight * div
        min_d = self.min_distance_to_others(ind, pop)
        rep_bonus = self.repulsion_weight * min_d
        dist_line = self.distance_to_boundary_line(ind)
        line_bonus = 1.0 / (1.0 + dist_line) * self.boundary_weight
        penalty = -50.0 if dist_line > 0.1 else 0.0
        return base + div_bonus + rep_bonus + line_bonus + penalty

    def get_fitness(self, ind: List[float], pop: List[List[float]]) -> float:
        if self.augmentation_mode == "traditional":
            arr = np.array(ind).reshape(1, -1)
            return self.fitness_function_traditional(arr)[0]
        else:
            return self.total_fitness(ind, pop)

    def select_parents_tournament(self, pop: List[List[float]], t_size=3):
        def tournament():
            candidates = random.sample(pop, t_size)
            return max(candidates, key=lambda c: self.get_fitness(c, pop))
        return tournament(), tournament()
        
    def calculate_sparsity(self, original_sample: np.ndarray, individual_sample: np.ndarray) -> float:
        # Ensure inputs are 1D arrays
        original_sample = original_sample.flatten()
        individual_sample = individual_sample.flatten()
        
        # Count how many features differ
        changed_features = np.sum(original_sample != individual_sample)
        
        # Ratio of changed features
        # 1 -> every feature changed
        # 0 -> none
        sparsity = changed_features / len(original_sample)
        return sparsity
    
    def calculate_distance(self, original_sample: np.ndarray, individual_sample: np.ndarray) -> float:
        original_sample = np.asarray(original_sample)
        individual_sample = np.asarray(individual_sample)

        # Avoid division by zero
        denominator = np.where(original_sample == 0, 1e-8, original_sample)
        
        relative_distance = np.abs((individual_sample - original_sample) / denominator)
        return np.mean(relative_distance)
    
    def calculate_constraint_violation(self, individual: np.ndarray) -> float:
        """
        Calculates the percentage of features that are outside the allowed target_intervals.
        
        Parameters:
            individual (np.ndarray): 1D array of feature values for a candidate individual.
        
        Returns:
            float: Ratio (0 to 1) of features that violate their interval constraints.
        """
        violations = 0
        total_features = len(self.feature_order)

        for i, feat in enumerate(self.feature_order):
            low, high = self.target_intervals[feat]
            if not (low <= individual[i] <= high):
                violations += 1

        return violations / total_features

    # class -00 1
    # features 0 a 1
    def evolve(self, generations=20, tournament_size=3, stagnation_limit=5, dataset_name=None, perc=None):
        if self.augmentation_mode == "traditional":
            pop = self.initialize_population()

            best_fit_so_far = float('-inf')
            no_improve = 0
            log_df = pd.DataFrame(columns=["generation", "fitness", "individual"])
            for gen in range(generations):
                # print(f"Generation: {gen}")
                fitness = self.fitness_function_traditional(pop)
                best_ind = pop[np.argmax(fitness)]
                best_val = np.max(fitness)

                log_df = pd.concat([
                    log_df,
                    pd.DataFrame([{
                        "generation": gen,
                        "fitness": float(best_val),
                        "individual": best_ind.tolist(),
                    }])
                ], ignore_index=True)

                # print(f"Best Individual {best_ind}")
                new_pop = [best_ind]

                while len(new_pop) < self.population_size:
                    # print(f"NEW POP {new_pop}")
                    p1, p2 = self.select_parents_tournament(list(pop), tournament_size)
                    # print("Tournament")
                    if random.random() < self.crossover_rate:
                        # print("Crossover")
                        # print(f"TYPE: {type(p1)}")
                        c1, c2 = self.crossover_individuals(list(p1), list(p2))
                        # print("Crossover!")
                    else:
                        c1, c2 = p1[:], p2[:]
                    if random.random() < self.mutation_rate:
                        # print(f"C1: {len(c1)}")
                        c1 = self.mutate_individual(c1)
                        # print("Mutation")
                    if random.random() < self.mutation_rate:
                        # print(f"C2: {len(c2)}")
                        c2 = self.mutate_individual(c2)
                        # print("Mutation2")
                    new_pop.extend([c1, c2])
                pop = new_pop[:self.population_size]
                pop = np.array(pop)
                if best_val > best_fit_so_far:
                    best_fit_so_far = best_val
                    no_improve = 0
                else:
                    no_improve += 1
                if no_improve >= stagnation_limit:
                    print(f"[Early Stop Gen {gen}] No improvement for {stagnation_limit} generations.")
                    break
            # Save the log to CSV or Parquet
            log_df.to_csv(f"{PATH_EVU}/{dataset_name}_{perc}_evolution_log.csv", index=False)
            return pop
        else:
            pop = self.initialize_population()
            best_fit_so_far = float('-inf')
            no_improve = 0
            for gen in range(generations):
                best_ind = max(pop, key=lambda c: self.get_fitness(c, pop))
                best_val = self.get_fitness(best_ind, pop)
                new_pop = [best_ind]
                while len(new_pop) < self.population_size:
                    p1, p2 = self.select_parents_tournament(pop, tournament_size)
                    if random.random() < self.crossover_rate:
                        c1, c2 = self.crossover_individuals(p1, p2)
                    else:
                        c1, c2 = p1[:], p2[:]
                    if random.random() < self.mutation_rate:
                        c1 = self.mutate_individual(c1)
                    if random.random() < self.mutation_rate:
                        c2 = self.mutate_individual(c2)
                    new_pop.extend([c1, c2])
                pop = new_pop[:self.population_size]
                if best_val > best_fit_so_far:
                    best_fit_so_far = best_val
                    no_improve = 0
                else:
                    no_improve += 1
                if no_improve >= stagnation_limit:
                    print(f"[Early Stop Gen {gen}] No improvement for {stagnation_limit} generations.")
                    break
            return pop
    
    def generate_samples(self, num_samples=5, generations=20, real_minority_samples=[], dataset_name=None, perc=None):
        print("Generate samples")
        print("Classe", self.target_class)
        real_minority_samples = list(real_minority_samples)
        results = []
        
        for i in range(num_samples):
            self.sample = random.choice(real_minority_samples)
            print(f"Minority Starting point: {self.sample}")
            
            pop = self.evolve(generations=generations, dataset_name=dataset_name, perc=perc)
            fitness_scores = self.fitness_function_traditional(pop)
            best_ind = pop[np.argmax(fitness_scores)]
            results.append(best_ind)
            best_ind_with_label = np.append(best_ind, self.target_class)
            real_minority_samples.append(best_ind_with_label)

        return np.array(results)

In [None]:

PATH = "experiment/data/constraints"
results_path = "experiment/result/dpg_aug_result_macro.csv"

os.listdir(PATH)

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score
import os
import pandas as pd
import numpy as np
import time
import re
import json

PATH = "experiment/data/constraints"
results_path = "experiment/result/dpg_aug_result.csv"

os.makedirs(PATH, exist_ok=True)

distance_factor=2
sparsity_factor=1
constraints_factor=3

def get_minority_idx(y):
    return list(y.unique()).index(y.value_counts().idxmin())

classifiers = {
        "LogisticRegression": LogisticRegression(random_state=42, max_iter=1000),
        "kNN": KNeighborsClassifier(),
        "DecisionTree": DecisionTreeClassifier(random_state=42)
    }

exclusion_list = ["education_dataset", "energy_dataset", "finance_dataset", "fraud_detection_dataset", "healthcare_dataset", "quality_control_dataset"]

# for dataset in ['ecoli']:
for dataset in os.listdir(PATH):
    if dataset in exclusion_list:
        continue  # Skip excluded datasets
    print(f"Dataset {dataset}")
    overall_start = time.time()
    
    dataset_path = f"{PATH}/{dataset}"
    
    X_test = pd.read_csv(f"{dataset_path}/X_test.csv")
    y_test = pd.read_csv(f"{dataset_path}/y_test.csv")
    
    # TODO - investigate this
    if 'target' in y_test.columns:
        y_test = y_test['target']

    full_data = pd.read_csv(f"{dataset_path}/{dataset}.csv")  # Replace with actual file name

    # Split X and y
    X_full = full_data.iloc[:, :-1]  # all columns except the last
    y_full = full_data.iloc[:, -1]   # the last column is y

    # Remove test rows from full dataset to get training set
    X_train = X_full[~X_full.apply(tuple, axis=1).isin(X_test.apply(tuple, axis=1))]
    numerical_cols = X_train.select_dtypes(include=["number"]).columns.tolist()
    y_train = y_full.loc[X_train.index]
    
    X_train = X_train[numerical_cols]
    features = numerical_cols
    feature_min_max = {feat: (X_train[feat].min(), X_train[feat].max()) for feat in features}
    X_train = X_train.to_numpy()
    
    model = RandomForestClassifier(n_estimators=3, random_state=42, class_weight='balanced')
    model.fit(X_train, y_train)
    
    # y_pred = model.predict(X_test)

    # Generate classification report
    # report = classification_report(y_test, y_pred)
    
    with open(f"{dataset_path}/constraints.json", "r") as f:
        constraints = json.load(f)  # constraints is now a Python dict
    
    target_class = f"Class {get_minority_idx(y_train)}"
    print(f"Target Class: {target_class}")
    
    # these counts are on the training
    class_counts = pd.Series(y_train).value_counts()
    majority_class = class_counts.idxmax()
    minority_class = class_counts.idxmin()
    majority_count = class_counts.max()
    minority_count = class_counts.min()

    X_test_minority = X_test[y_test == minority_class]
    X_test_minority_np = X_test_minority.to_numpy()

    # sample_size = min(1, len(X_test_minority_np)) # just in case we want to pass a group of samples later
    sample_size = len(X_test_minority_np) # just in case we want to pass a group of samples later
    X_test_np = X_test.to_numpy()

    # Sample indices from minority class only
    sample_indices = np.random.default_rng().choice(len(X_test_minority_np), size=sample_size, replace=False)
    
    sample_features = X_test_minority_np[sample_indices]

    # Create corresponding labels
    sample_labels = np.full((sample_size, 1), minority_class)

    # Combine features and label
    sample = np.hstack([sample_features, sample_labels])
    
    # print(sample)
    print(sample)
    print(len(sample))
    print(minority_count)
    # break
    
    try:
        ga = TwoClassTargetDPG_GA(
                        population_size=30,
                        mutation_rate=0.3,
                        crossover_rate=0.7,
                        model=model,
                        target_class=minority_class,
                        border_class=minority_class,
                        n_classes=len(np.unique(y_train)),
                        class_bounds=constraints,
                        augmentation_mode="traditional",
                        boundary_points=None,
                        boundary_weight=15.0,
                        max_other_prob=0.2,
                        diversity_weight=0.5,
                        re_inject_threshold=0.5,
                        re_inject_ratio=0.2,
                        repulsion_weight=4.0,
                        feature_order=features,
                        random_seed=123,
                        default_intervals=feature_min_max,
                        sample = sample,
                        distance_factor=distance_factor,
                        sparsity_factor=sparsity_factor,
                        constraints_factor=constraints_factor
                        )
        
        augmentation_percentages = [0.15, 0.3, 0.5]
        
        for perc in augmentation_percentages:
            print(f"===========\n Augmentation Percentage {perc}\n")
            results_list = []
            desired_minority_count = (perc * majority_count) / (1 - perc)
            additional_samples_needed = int(desired_minority_count - minority_count)

            print(f"Augmentation {perc*100:.0f}%:")
            print(f"Current minority count: {minority_count}")
            print(f"Target minority count: {int(desired_minority_count)}")
            print(f"Additional samples needed: {max(0, additional_samples_needed)}\n")
            
            new_count = max(0, additional_samples_needed)
            if new_count > 0:
                gen_start = time.time()

                # synthetic_data = ga.generate_samples(num_samples=new_count, generations=20, dataset_name=dataset, perc=perc)
                # gen_time = time.time() - gen_start
                # X_train_aug = np.vstack((X_train, synthetic_data))
                
                # -----
                synthetic_data = ga.generate_samples(num_samples=new_count, generations=20, real_minority_samples=sample, dataset_name=dataset, perc=perc)
                gen_time = time.time() - gen_start

                # print("synthetic data")
                # synthetic_df = pd.DataFrame(synthetic_data, columns=features)
                # synthetic_save_path = f"{dataset_path}/synthetic_{int(perc*100)}.csv"
                # synthetic_df.to_csv(synthetic_save_path, index=False)
                
                X_train_aug = np.vstack((X_train, synthetic_data))
                # -----

                synthetic_labels = np.full((len(synthetic_data),), minority_class)
                print(f"Original data: {X_train}")
                print(f"Synthetically added: {synthetic_data}")
                
                # Merge y_train and synthetic_labels
                y_train_aug = np.concatenate((y_train, synthetic_labels))
                
                final_minority_count = np.sum(y_train_aug == minority_class)
                final_majority_count = np.sum(y_train_aug == majority_class)
                synthetic_added = final_minority_count - minority_count
                for clf_name, clf in classifiers.items():
                    # Augmented data Stat
                    clf.fit(X_train_aug, y_train_aug)
                    y_pred_aug = clf.predict(X_test)
                    f1_aug = f1_score(y_test, y_pred_aug, average='macro')
                    acc_aug = accuracy_score(y_test, y_pred_aug)
                    precision_aug = precision_score(y_test, y_pred_aug, average='macro', zero_division=0)
                    recall_aug = recall_score(y_test, y_pred_aug, average='macro', zero_division=0)

                    results_list.append({
                        "Method": "DPG-da",
                        "Augmentation_Percentage": perc,
                        "Augmentation_Level": f"{perc*100:.2f}%",
                        "Repetition": 1,
                        "Classifier": clf_name,
                        "F1_aug": f1_aug,
                        "Accuracy_aug": acc_aug,
                        "Precision_aug": precision_aug,
                        "Recall_aug": recall_aug,            
                        "Original_Minority_Count": minority_count,
                        "Synthetic_Samples_Added": synthetic_added,
                        "Final_Minority_Count": final_minority_count,
                        "Majority_Count": final_majority_count,
                        "Run_Time": gen_time   # Time only for synthetic sample generation.
                    })
                results_df = pd.DataFrame(results_list)

                aggregated_df = results_df.groupby(["Method", "Augmentation_Percentage", "Classifier"]).agg(
                            Avg_F1_aug=("F1_aug", "mean"),
                            Std_F1_aug=("F1_aug", "std"),
                            Avg_Accuracy_aug=("Accuracy_aug", "mean"),
                            Std_Accuracy_aug=("Accuracy_aug", "std"),
                            Avg_Precision_aug=("Precision_aug", "mean"),
                            Std_Precision_aug=("Precision_aug", "std"),
                            Avg_Recall_aug=("Recall_aug", "mean"),
                            Std_Recall_aug=("Recall_aug", "std"),
                            Avg_Run_Time=("Run_Time", "mean"),
                            Std_Run_Time=("Run_Time", "std"),
                            Augmentation_Level=("Augmentation_Level", "first"),
                            Original_Minority_Count=("Original_Minority_Count", "first"),
                            Synthetic_Samples_Added=("Synthetic_Samples_Added", "first"),
                            Final_Minority_Count=("Final_Minority_Count", "first"),
                            Majority_Count=("Majority_Count", "first")
                    ).reset_index()

                overall_end = time.time()
                aggregated_df['Dataset'] = dataset.replace(".csv","")
                aggregated_df['Time'] = round(overall_end - overall_start,2)
                # Save aggregated results to CSV.
                file_exists = os.path.isfile(results_path)
                aggregated_df.to_csv(results_path, index=False, mode='a', header=not file_exists)
                print(f"\nTotal execution time: {overall_end - overall_start:.2f} seconds")
    except Exception as e:
        print(f"ERROR Dataset: {dataset} - {e}")
        break
    break