In [2]:
import numpy as np
import pandas as pd
from gplearn.genetic import SymbolicTransformer
from sklearn.base import BaseEstimator, TransformerMixin
from pmlb import fetch_data
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from npeet import entropy_estimators as ee
import time
from datetime import timedelta



SEED = 42

In [3]:
def calculate_ur_with_npeet(x_k, y, z):
    """
    Calculates the Unique Relevance (Conditional Mutual Information) I(x_k; y | z)
    using the entropy-based chain rule and the npeet KSG estimator.

    Formula: I(X;Y|Z) = H(X,Z) + H(Y,Z) - H(Z) - H(X,Y,Z)

    Args:
        x_k (pd.Series): The candidate feature vector.
        y (pd.Series): The target vector.
        z (pd.DataFrame): The conditioning set of all other features.

    Returns:
        float: The estimated conditional mutual information.
    """
    # npeet expects numpy arrays of shape (n_samples, n_dimensions)
    x_k_arr = x_k.values.reshape(-1, 1)
    y_arr = y.values.reshape(-1, 1)
    z_arr = z.values

    # H(X,Z)
    xz_entropy = ee.entropy(np.c_[x_k_arr, z_arr])
    # H(Y,Z)
    yz_entropy = ee.entropy(np.c_[y_arr, z_arr])
    # H(Z)
    z_entropy = ee.entropy(z_arr)
    # H(X,Y,Z)
    xyz_entropy = ee.entropy(np.c_[x_k_arr, y_arr, z_arr])

    # The result must be non-negative
    return max(0, xz_entropy + yz_entropy - z_entropy - xyz_entropy)


def mrmr_bur_npeet(X: pd.DataFrame, y: pd.Series, K: int, beta: float = 0.1) -> list:
    """
    Implements MRwMR-BUR using the npeet KSG estimator for ALL mutual
    information and entropy calculations.

    Args:
        X (pd.DataFrame): The input feature matrix.
        y (pd.Series): The target vector (can be discrete or continuous).
        K (int): The number of features to select.
        beta (float): The weighting factor for balancing mRMR and UR.

    Returns:
        list: A list of the names of the K selected features.
    """
    if not 0 <= beta <= 1:
        raise ValueError("beta must be between 0 and 1.")

    # --- Step 1: Calculate Relevance using npeet ---
    print("Calculating Relevance scores with npeet...")
    relevance_scores = pd.Series(
        [ee.mi(X[[feature]].values, y.values.reshape(-1, 1)) for feature in X.columns],
        index=X.columns
    )

    # --- Step 2: Calculate Unique Relevance (UR) using npeet ---
    print("Calculating Unique Relevance for all features with npeet...")
    ur_scores = {}
    for feature_k in X.columns:
        Z = X.drop(columns=[feature_k])
        ur_scores[feature_k] = calculate_ur_with_npeet(X[feature_k], y, Z)
    ur_scores = pd.Series(ur_scores)
    print("Unique Relevance calculation complete.")

    # --- Step 3: Initialize the selection process ---
    selected_features = []
    # Note: The first feature is now chosen based on the full MRwMR-BUR score
    # This is a slight deviation but more aligned with a complete BUR approach
    
    # --- Step 4: Iteratively select all K features ---
    remaining_features = X.columns.tolist()
    
    for i in range(K):
        final_scores = {}
        
        for feature in remaining_features:
            relevance = relevance_scores[feature]
            ur_score = ur_scores[feature]

            # Calculate Redundancy with already selected features using npeet
            if i > 0:
                redundancy_per_feature = [
                    ee.mi(X[[feature]].values, X[[selected]].values) for selected in selected_features
                ]
                redundancy = np.mean(redundancy_per_feature)
            else:
                redundancy = 0 # No redundancy for the first feature

            # The augmented scoring function from the paper
            original_mrmr_score = relevance - redundancy
            final_scores[feature] = (1 - beta) * original_mrmr_score + beta * ur_score

        best_next_feature = max(final_scores, key=final_scores.get)
        selected_features.append(best_next_feature)
        remaining_features.remove(best_next_feature)

    return selected_features

In [4]:
def identify_feature_types(df, cardinality_threshold=5):
    """
    Identifies categorical and numerical features in a DataFrame.

    Args:
        df (pd.DataFrame): The input DataFrame.
        cardinality_threshold (int): The maximum number of unique values
                                     for a numeric column to be considered
                                     categorical.

    Returns:
        tuple: A tuple containing two lists:
               - categorical_features (list)
               - numerical_features (list)
    """
    categorical_features = []
    numerical_features = []

    for column in df.columns:
        dtype = df[column].dtype
        # Rule 1: Check for 'object' or 'category' data types
        if dtype == 'object' or isinstance(dtype, pd.CategoricalDtype):
            categorical_features.append(column)

        # Rule 2: Check for numeric types
        elif pd.api.types.is_numeric_dtype(df[column]):
            # Check if it's a low-cardinality numeric column (likely categorical)
            if df[column].nunique() < cardinality_threshold:
                categorical_features.append(column)
            else:
                # High-cardinality numeric column (likely continuous or an ID)
                numerical_features.append(column)
        else:
            # Handle other potential data types if necessary
            pass # Or add to a separate list for review

    return categorical_features, numerical_features

In [5]:
class GPForestFeatureTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, gp_params=None, top_k_features=None, random_state=None):
        self.gp_params = gp_params
        self.top_k_features = top_k_features
        self.random_state = random_state

    def fit(self, X, y):
        if isinstance(X, np.ndarray):
            X = pd.DataFrame(X, columns=[f"feat_{i}" for i in range(X.shape[1])])
        
        gp_params = self.gp_params.copy() if self.gp_params else {}
        if 'random_state' not in gp_params and self.random_state is not None:
            gp_params['random_state'] = self.random_state

        self.gp = SymbolicTransformer(**gp_params)

        # Identify feature types and store the list of numerical features
        _, self.numerical_features_ = identify_feature_types(X) # <-- CHANGE: Store numerical feature names

        # Fit on numerical features only
        self.gp.fit(X[self.numerical_features_], y)

        gp_features_array = self.gp.transform(X[self.numerical_features_])

        # Store the names of the generated features for use in transform()
        self.gp_feature_names_ = [f"gp_feat_{i}" for i in range(gp_features_array.shape[1])]
        gp_features = pd.DataFrame(gp_features_array, 
                                     columns=self.gp_feature_names_, 
                                     index=X.index)

        # Combine original data with new GP features for selection
        X_combined = pd.concat([X, gp_features], axis=1)

        # Select the best features from the combined set
        self.final_feature_names_ = mrmr_bur_npeet(X=X_combined, y=y, K=self.top_k_features)

        return self

    def transform(self, X):
        if isinstance(X, np.ndarray):
            # Ensure columns match what was seen in fit
            X = pd.DataFrame(X, columns=[f"feat_{i}" for i in range(X.shape[1])])
        
        # --- FIX: Use the stored numerical_features_ list ---
        gp_features_array = self.gp.transform(X[self.numerical_features_])
        
        # Use the stored GP feature names
        gp_features = pd.DataFrame(gp_features_array, 
                                     columns=self.gp_feature_names_, 
                                     index=X.index)

        # Combine original data with new GP features
        X_combined = pd.concat([X, gp_features], axis=1)
        
        # Select the final features determined during fit
        return X_combined[self.final_feature_names_] # <-- REFINEMENT: Return DataFrame
    
    def explain(self):
        # Print best programs (optional)
        print("Constructed features:")
        for i, program in enumerate(self.gp._best_programs):
            print(f"gp_feat_{i}: {program}")
        print("Final selected features:")
        for i, program in enumerate(self.feature_names_):
            print(f"{program}")


In [6]:


datasets = ["allbp", "Hill_Valley_with_noise","Hill_Valley_without_noise","adult","allhyper","breast_cancer"]


In [7]:
def new_approach_benchmark(dataset_name, seed):
    df = fetch_data(dataset_name)

    X_train, X_valid, y_train, y_valid = train_test_split(df.drop(["target"], axis="columns"), df["target"], train_size = 0.8, stratify=df["target"], random_state = SEED)

    transformer = GPForestFeatureTransformer(gp_params={'generations': 10, 'population_size': 10000, 'hall_of_fame': 100, 'n_components': 10, 
                                                    'function_set' : ['add', 'sub', 'mul', 'div','sqrt', 'log', 'abs', 'neg', 'inv', 'max', 'min'],
                                                    'parsimony_coefficient': 0.01, 'max_samples': 0.9, 'verbose': 1, 'random_state': SEED, 'n_jobs': 3, 
                                                    'metric': "spearman"}, top_k_features=(len(df.columns)+10) // 2, random_state=SEED)
    
    transformer.fit(X_train, y_train)
    transformed_X_train = transformer.transform(X_train)
    transformed_X_valid = transformer.transform(X_valid)

    clf = RandomForestClassifier(random_state=seed)
    clf.fit(transformed_X_train, y_train)
    y_pred = clf.predict(transformed_X_valid)
    acc = accuracy_score(y_valid, y_pred)
    print(f"Accuracy on {dataset_name}: {acc}")
    # transformer.explain()
    return acc


In [8]:
new_approach_acc = []
start_time = time.monotonic()

for i in datasets:
    new_approach_acc.append(new_approach_benchmark(i, SEED))

end_time = time.monotonic()
print(f"Running time: {timedelta(minutes=end_time - start_time)}")

for i in range(len(datasets)):
    print(f"Accuracy of New Approach on {datasets[i]}:{new_approach_acc[i]}")

    |   Population Average    |             Best Individual              |
---- ------------------------- ------------------------------------------ ----------
 Gen   Length          Fitness   Length          Fitness      OOB Fitness  Time Left
   0    12.63        0.0722931        7         0.304656        0.0213994     57.02s
   1     4.03          0.15972       14         0.328537         0.220896     51.63s
   2     2.86         0.190788       14         0.332976        0.0944118     44.32s
   3     3.86         0.230595        3         0.327797        0.0489404     33.78s
   4     3.12         0.242116        3         0.320954        0.0428614     26.30s
   5     3.05         0.241312        3         0.323014        0.0122369     19.00s
   6     3.05          0.24041        5         0.328458        0.0496743     13.00s
   7     3.03         0.240531        3         0.322423        0.0431377      7.81s
   8     3.01         0.238993        3         0.331325        0.0521459  