# Applied Machine Learning: In-class Exercise 04-2

## Goal

After this exercise, you should understand and be able to perform feature selection using wrapper methods with `scikit-learn`. You should also be able to integrate various performance measures and calculate the generalization error.

## Wrapper Methods

In addition to filtering, wrapper methods are another variant of feature selection. While filtering methods apply predefined criteria directly to feature values, wrapper methods evaluate different subsets of features by repeatedly training and validating a predictive model. Since models need to be repeatedly refitted, this method can be computationally expensive.

For wrapper methods in Python, we will primarily use the classes from `sklearn.feature_selection`, e.g., `SequentialFeatureSelector`. This approach incrementally adds or removes features based on model performance.

## Prerequisites

We first load the necessary Python libraries and set a fixed random seed for reproducibility.

In [1]:
import numpy as np
import pandas as pd
from sklearn.datasets import fetch_openml


rng = np.random.default_rng(2025)

In this exercise, we use the `credit-g` dataset (German Credit Data) and a random forest classifier (`RandomForestClassifier`).

In [2]:
X, y = fetch_openml('credit-g', version=1, as_frame=True, return_X_y=True)

## 1 Basic Application

### 1.1 Create the Framework

Create a feature selection setup using `SequentialFeatureSelector`. The feature selection should use 3-fold cross-validation, classification accuracy as the scoring metric, and sequentially select the best-performing subset of features from the following: `"age"`, `"credit_amount"`, `"credit_history"`, and `"duration"`.

We first provide code for data preprocessing as follows:

In [3]:
from sklearn.preprocessing import OrdinalEncoder


# For simplification, select only the desired features
selected_features = ["age", "credit_amount", "credit_history", "duration"]
X_four_feats = X[selected_features].copy()

categorical_features = ["credit_history", ]
numerical_features = [feat for feat in selected_features if feat not in categorical_features]

# This feature is ordinal, so we encode it using OrdinalEncoder
unique_credit_history = X_four_feats['credit_history'].unique()
print(f"Unique credit history values: {unique_credit_history}")

# Define the categories based on actual values in the dataset
credit_history_categories = [
    'critical/other existing credit',  # Most risky - 0
    'delayed previously',              # Somewhat risky - 1
    'existing paid',                   # Moderate - 2
    'no credits/all paid',             # Good - 3
    'all paid'                         # Best - 4
]

ordinal_encoder = OrdinalEncoder(
    categories=[credit_history_categories],
    dtype=np.int32
)

X_cat = X_four_feats[categorical_features]
X_cat_encoded = pd.DataFrame(
    ordinal_encoder.fit_transform(X_cat),
    columns=categorical_features,
    index=X_cat.index
)

X_num = X_four_feats[numerical_features]
X_four_feats = pd.concat([X_num, X_cat_encoded], axis=1)

Unique credit history values: ['critical/other existing credit', 'existing paid', 'delayed previously', 'no credits/all paid', 'all paid']
Categories (5, object): ['all paid', 'critical/other existing credit', 'delayed previously', 'existing paid', 'no credits/all paid']


Now please write your code to solve the given task.

<details><summary>Hint 1:</summary> 
Use `SequentialFeatureSelector()` from `sklearn.feature_selection` and set the parameter `cv=3`. 
</details>

In [4]:
#===SOLUTION===

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold
from sklearn.feature_selection import SequentialFeatureSelector


clf = RandomForestClassifier(random_state=2025)
cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=2025)

sfs_forward = SequentialFeatureSelector(
    clf, 
    n_features_to_select='auto',
    direction='forward',
    scoring='accuracy',
    cv=cv,
    n_jobs=-1
)

## 1.2 Start the Feature Selection

Start the feature selection by fitting the selector (`SequentialFeatureSelector`) to your reduced feature set (`X_four_feats`) and target (`y`). Then, identify the subset of selected features based on classification accuracy.

<details><summary>Hint 1:</summary> 
Use the `.fit()` method of your initialized `SequentialFeatureSelector` object. 
</details> 

<details><summary>Hint 2:</summary> 
After fitting, use `.get_support()` or `.get_feature_names_out()` to identify which features have been selected. 
</details>

In [5]:
#===SOLUTION===

sfs_forward.fit(X_four_feats, y)
print(f"Selected features: {sfs_forward.get_feature_names_out()}")

Selected features: ['duration' 'credit_history']


<div style="background-color: #fff3cd; border-left: 6px solid #ffa502; padding: 1em; margin: 1em 0; border-radius: 4px;">
  <strong>⚠️ Skipped Task: 1.3 Evaluate</strong>
  <p>View the four characteristics and the accuracy from the instance archive for each of the first two batches.</p>
  <p><em>This sub-section is skipped because the <code>SequentialFeatureSelector</code> does not store search results at intermediate steps, unlike the R solution.</em></p>
</div>



## 1.4 Model Training

Train the model with the subset of features indentified by the feature selector. Compute the accuracy on the test set.

In [6]:
#===SOLUTION===

from sklearn.model_selection import train_test_split

best_feature_set = sfs_forward.get_feature_names_out()

print(f"Best feature set: {best_feature_set}")

X_best = sfs_forward.transform(X_four_feats)

X_train, X_test, y_train, y_test = train_test_split(
    X_best, y, test_size=0.2, random_state=42, stratify=y
)

# Initialize and train the model with best features
final_model = RandomForestClassifier(max_depth=10, random_state=2025)
final_model.fit(X_train, y_train)

test_accuracy = final_model.score(X_test, y_test)
print(f"Test accuracy with best features: {test_accuracy:.4f}")

Best feature set: ['duration' 'credit_history']
Test accuracy with best features: 0.7000


## 2 Multiple Performance Measures

To optimize multiple performance metrics, the procedure is similar to the previous step, but now multiple metrics are considered separately. Perform feature selection using random search to optimize three different metrics: classification accuracy, True Positive Rate (TPR), and True Negative Rate (TNR).

Again, use the full `german_credit` dataset and perform the following steps to process the data.

In [7]:
# First, let's prepare the full dataset with proper encoding
# Separate numerical and categorical features
numerical_features = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
categorical_features = X.select_dtypes(include=['object', 'category']).columns.tolist()

# Apply ordinal encoding to categorical features
X_cat_encoded = pd.DataFrame()
for feature in categorical_features:
    enc = OrdinalEncoder()
    encoded_feature = enc.fit_transform(X[[feature]])
    X_cat_encoded[feature] = encoded_feature.flatten()

# Combine numerical and encoded categorical features
X_num = X[numerical_features].copy()
X_full = pd.concat([X_num, X_cat_encoded], axis=1)

Now, please write the code to perform the following tasks:

- Set up a random search feature selector.
- Perform random search independently for each performance metric (`accuracy`, `TPR`, and `TNR`) using 3-fold cross-validation.
- Evaluate and report the resulting feature subsets and their performances across all metrics.

Use the provided `random_feature_search` function separately for each performance metric: `accuracy`, `tpr_scorer`, and `tnr_scorer`.

<details><summary>Hint 1:</summary>
After selecting the best features for each metric, evaluate the performance of each subset across all three metrics to compare results comprehensively.
</details>

In [8]:
from sklearn.model_selection import cross_val_score


def random_feature_search(X, y, clf, cv, scorer, n_iterations=50, min_features=1, max_features=None, rng = None):
    """This function is provided as a helper function to perform random search for feature selection."""
    rng = rng if rng is not None else np.random.RandomState(42)
    
    if max_features is None:
        max_features = X.shape[1]
    
    all_features = list(X.columns)
    best_score = -np.inf
    best_features = []
    
    # Try different feature subset sizes
    for _ in range(n_iterations):
        # Randomly select number of features to use
        n_features = rng.integers(min_features, max_features + 1)
        
        # Randomly select features without replacement
        feature_indices = rng.choice(len(all_features), size=n_features, replace=False)
        feature_subset = [all_features[i] for i in feature_indices]
        
        # Evaluate this feature subset
        X_subset = X[feature_subset]
        scores = cross_val_score(clf, X_subset, y, cv=cv, scoring=scorer)
        avg_score = np.mean(scores)
        
        # Update best if this is better
        if avg_score > best_score:
            best_score = avg_score
            best_features = feature_subset
    
    return best_features, best_score

In [9]:
#===SOLUTION===

from sklearn.metrics import recall_score, make_scorer


def tpr_score(y_true, y_pred):
    return recall_score(y_true, y_pred, pos_label="good")

def tnr_score(y_true, y_pred):
    return recall_score(y_true, y_pred, pos_label="bad")

tpr_scorer = make_scorer(tpr_score)
tnr_scorer = make_scorer(tnr_score)

clf = RandomForestClassifier(max_depth=10, random_state=2025)
cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=2025)

print("2 Multiple Performance Measures. Using custom random search for feature selection with multiple metrics")
print("Running random feature search...")

# Function to evaluate feature set across all metrics
def evaluate_feature_set(features, name):
    X_subset = X_full[features]
    acc = np.mean(cross_val_score(clf, X_subset, y, cv=cv, scoring='accuracy'))
    tpr = np.mean(cross_val_score(clf, X_subset, y, cv=cv, scoring=tpr_scorer))
    tnr = np.mean(cross_val_score(clf, X_subset, y, cv=cv, scoring=tnr_scorer))
    
    return {
        "features": features,
        "features_str": f"{name} optimized",
        "num_features": len(features),
        "accuracy": acc,
        "classif.tpr": tpr,
        "classif.tnr": tnr
    }

# Run random search for each metric
metrics = {
    'Accuracy': ('accuracy', None),
    'TPR': (tpr_scorer, 'TPR'),
    'TNR': (tnr_scorer, 'TNR')
}

results = []
best_features = {}

# Perform feature selection for each metric
for name, (scorer, _) in metrics.items():
    features, score = random_feature_search(
        X_full, y, clf, cv, scorer, n_iterations=20, rng=rng
    )
    best_features[name] = features
    
    # Evaluate this feature set across all metrics
    results.append(evaluate_feature_set(features, name))

results_df = pd.DataFrame(results)
print("\nFeature selection results with different optimization criteria:")
print(results_df[["features_str", "num_features", "accuracy", "classif.tpr", "classif.tnr"]])

print("\nAccuracy-optimized features:", best_features['Accuracy'])
print("\nTPR-optimized features:", best_features['TPR'])
print("\nTNR-optimized features:", best_features['TNR'])

2 Multiple Performance Measures. Using custom random search for feature selection with multiple metrics
Running random feature search...

Feature selection results with different optimization criteria:
         features_str  num_features  accuracy  classif.tpr  classif.tnr
0  Accuracy optimized            18  0.752993     0.922851     0.356667
1       TPR optimized             1  0.699999     1.000000     0.000000
2       TNR optimized             9  0.731989     0.867148     0.416667

Accuracy-optimized features: ['age', 'savings_status', 'property_magnitude', 'installment_commitment', 'personal_status', 'housing', 'credit_history', 'credit_amount', 'employment', 'other_payment_plans', 'checking_status', 'purpose', 'own_telephone', 'num_dependents', 'duration', 'job', 'foreign_worker', 'existing_credits']

TPR-optimized features: ['own_telephone']

TNR-optimized features: ['credit_amount', 'foreign_worker', 'purpose', 'checking_status', 'property_magnitude', 'other_parties', 'job', 'h

Question: what is your observation?

===SOLUTION===

Note that the measures can not be optimal at the same time so one has to choose according to their preferences. Here, we see different tradeoffs of sensitivity and specificity but no feature subset is dominated by another, i.e. has worse sensitivity and specificity than any other subset.

## 3 Nested Resampling

Nested resampling enables finding unbiased performance estimators when feature selection is part of the modeling process. In Python with `scikit-learn`, this can be implemented by creating a custom wrapper class (e.g., `AutoFSelector`) that integrates feature selection within cross-validation.

### 3.1 Create an AutoFSelector Instance

Implement an `AutoFSelector` class that uses random search to find the subset of features resulting in the highest classification accuracy for a logistic regression model. The feature selector should perform the search using holdout validation and terminate after `n_evals` evaluations.

Use the provided custom `AutoFSelector` class with logistic regression (`LogisticRegression`) as the base estimator.

Note: the random search feature selection comes with large randomness. So, you may need to set `n_evals` to 20 and try multiple random seeds to get the best performance. In the solution, we use a lucky `random_state=543` for `AutoFSelector`.


In [10]:
from sklearn.base import BaseEstimator, ClassifierMixin, clone


class AutoFSelector(BaseEstimator, ClassifierMixin):
    def __init__(self, base_estimator, n_evals=10, cv=3, random_state=None):
        self.base_estimator = base_estimator
        self.n_evals = n_evals
        self.cv = cv
        self.random_state = random_state

    def fit(self, X, y):
        # Ensure X is a DataFrame to ease column handling
        if not isinstance(X, pd.DataFrame):
            self.X_original_type = type(X)
            if hasattr(X, 'columns'):
                X_df = X.copy()
            else:
                # Create a DataFrame with default column names
                X_df = pd.DataFrame(X)
        else:
            self.X_original_type = pd.DataFrame
            X_df = X.copy()
            
        self.feature_names_ = X_df.columns.tolist()
        
        # Use the existing random_feature_search function
        self.selected_features_, self.best_score_ = random_feature_search(
            X_df, y, 
            self.base_estimator, 
            cv=StratifiedKFold(n_splits=self.cv, shuffle=True, random_state=self.random_state),
            scorer='accuracy', 
            n_iterations=self.n_evals,
            min_features=1,
            rng=np.random.default_rng(self.random_state)
        )
        
        # Create the best mask for feature selection
        self.best_mask_ = np.array([feat in self.selected_features_ for feat in self.feature_names_])
        
        # Fit the final model on the selected features
        self.estimator_ = clone(self.base_estimator)
        X_selected = X_df[self.selected_features_]
        self.estimator_.fit(X_selected, y)
        
        return self

    def _get_feature_subset(self, X):
        """Helper to get the selected feature subset from input X."""
        if isinstance(X, pd.DataFrame):
            # If X is already a DataFrame, simply select the features
            return X[self.selected_features_]
        else:
            # If X is a numpy array, convert to DataFrame with proper column names
            X_df = pd.DataFrame(X, columns=self.feature_names_)
            return X_df[self.selected_features_]

    def predict(self, X):
        X_subset = self._get_feature_subset(X)
        return self.estimator_.predict(X_subset)

    def predict_proba(self, X):
        X_subset = self._get_feature_subset(X)
        return self.estimator_.predict_proba(X_subset)

Now, write your code for the task.

In [11]:
#===SOLUTION===

from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline


log_reg = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(random_state=2025))
])

afs = AutoFSelector(
    base_estimator=Pipeline([
        ('scaler', StandardScaler()),
        ('classifier', LogisticRegression(random_state=2025))
    ]),
    n_evals=20,
    cv=3,
    random_state=543
)

### 3.2 Benchmark

Compare the `AutoFSelector` (nested resampling approach) against a plain logistic regression model using a 3-fold cross-validation on the Sonar dataset.

<details><summary>Hint 1:</summary>
Perform cross-validation separately for both the logistic regression and the `AutoFSelector`, then compute their average accuracies.
</details>

In [12]:
# Load the Sonar dataset from OpenML
X_sonar, y_sonar = fetch_openml('Sonar', version=1, as_frame=True, return_X_y=True)

# Set up 3-fold cross-validation
cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=2025)

# Compute CV accuracy for both models.
score_afs = cross_val_score(afs, X_sonar, y_sonar, cv=cv, scoring='accuracy')
score_lr = cross_val_score(log_reg, X_sonar, y_sonar, cv=cv, scoring='accuracy')

results = pd.DataFrame({
    "learner_id": ["AutoFSelector", "LogisticRegression"],
    "classif.acc": [score_afs.mean(), score_lr.mean()]
})
print("\nBenchmark Results:")
print(results)



Benchmark Results:
           learner_id  classif.acc
0       AutoFSelector     0.759834
1  LogisticRegression     0.759696


## Summary

* Wrapper methods calculate performance measures for various combinations of features in order to perform feature selection.
* They are computationally expensive since several models need to be fitted.
* The `AutoFSelector` inherits from the base classes `BaseEstimator` and `ClassifierMixin`, which is why it can be used like any other learner.