<a href="https://colab.research.google.com/github/micah-shull/pipelines/blob/main/pipelines_06_pytorch_pipeline_03_ANOVA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Modular Preprocessig Code

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.base import BaseEstimator, ClassifierMixin, TransformerMixin
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.decomposition import PCA
from sklearn.metrics import f1_score, classification_report
from imblearn.under_sampling import RandomUnderSampler
import matplotlib.pyplot as plt
import logging

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def set_seed(seed):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def load_data(url="https://archive.ics.uci.edu/ml/machine-learning-databases/00350/default%20of%20credit%20card%20clients.xls"):
    df = pd.read_excel(url, header=1)
    df.columns = [col.lower().replace(' ', '_') for col in df.columns]
    return df

def convert_categorical(df, categorical_columns):
    df[categorical_columns] = df[categorical_columns].astype('category')
    return df

def split_data(df, target):
    X = df.drop(columns=[target])
    y = df[target]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
    return X_train, X_test, y_train, y_test

def define_preprocessor(X_train):
    numeric_features = X_train.select_dtypes(include=['int64', 'float64']).columns.tolist()
    categorical_features = X_train.select_dtypes(include=['object', 'category']).columns.tolist()

    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())])

    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))])

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)])

    return preprocessor

def preprocess_data(preprocessor, X_train, X_test):
    X_train_processed = preprocessor.fit_transform(X_train)
    X_test_processed = preprocessor.transform(X_test)
    return X_train_processed, X_test_processed

def calculate_class_weights(y_train):
    return len(y_train) / (2 * np.bincount(y_train))

def save_data(X_train_processed, X_test_processed, y_train, y_test, filename='preprocessed_data.npz'):
    np.savez(filename, X_train_processed=X_train_processed, X_test_processed=X_test_processed, y_train=y_train, y_test=y_test)
    print("Data preparation complete and saved.")


## ANOVA Feature Selection Script

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.pipeline import Pipeline as SklearnPipeline
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.metrics import f1_score, classification_report
from imblearn.under_sampling import RandomUnderSampler
import matplotlib.pyplot as plt
import logging

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class SimpleNN(nn.Module):
    def __init__(self, input_dim):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 32)
        self.fc2 = nn.Linear(32, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

class SklearnSimpleNN(BaseEstimator, ClassifierMixin):
    def __init__(self, input_dim, learning_rate=0.001, epochs=50, batch_size=64, pos_weight=1.0):
        self.input_dim = input_dim
        self.learning_rate = learning_rate
        self.epochs = epochs
        self.batch_size = batch_size
        self.pos_weight = pos_weight
        self.model = SimpleNN(self.input_dim)

    def fit(self, X, y):
        criterion = nn.BCEWithLogitsLoss(pos_weight=torch.tensor(self.pos_weight, dtype=torch.float32))
        optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        train_dataset = torch.utils.data.TensorDataset(torch.tensor(X, dtype=torch.float32), torch.tensor(y, dtype=torch.float32).unsqueeze(1))
        train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)

        for epoch in range(self.epochs):
            self.model.train()
            for inputs, targets in train_loader:
                optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = criterion(outputs, targets.view(-1, 1))
                loss.backward()
                optimizer.step()
        return self

    def predict(self, X):
        self.model.eval()
        with torch.no_grad():
            outputs = self.model(torch.tensor(X, dtype=torch.float32))
            probabilities = torch.sigmoid(outputs)
            predictions = (probabilities > 0.5).float()
        return predictions.numpy().squeeze()

def evaluate_anova(X_train, y_train, X_test, y_test, preprocessor, class_weights, k_values):
    f1_scores = []

    for k in k_values:
        logging.info(f"Evaluating ANOVA with {k} features")

        anova_pipeline = SklearnPipeline(steps=[
            ('preprocessor', preprocessor),
            ('anova', SelectKBest(score_func=f_classif, k=k))
        ])

        X_train_anova = anova_pipeline.fit_transform(X_train, y_train)
        X_test_anova = anova_pipeline.transform(X_test)

        undersampler = RandomUnderSampler(sampling_strategy=0.75, random_state=42)
        X_train_resampled, y_train_resampled = undersampler.fit_resample(X_train_anova, y_train)

        X_train_tensor = torch.tensor(X_train_resampled, dtype=torch.float32)
        y_train_tensor = torch.tensor(y_train_resampled, dtype=torch.float32).unsqueeze(1)
        X_test_tensor = torch.tensor(X_test_anova, dtype=torch.float32)
        y_test_tensor = torch.tensor(y_test.to_numpy(), dtype=torch.float32).unsqueeze(1)

        nn_estimator = SklearnSimpleNN(input_dim=X_train_tensor.shape[1], pos_weight=class_weights[1])
        nn_estimator.fit(X_train_tensor.numpy(), y_train_tensor.numpy())

        y_pred = nn_estimator.predict(X_test_tensor.numpy())
        f1 = f1_score(y_test_tensor.numpy(), y_pred)
        f1_scores.append(f1)
        logging.info(f"Number of features: {k}, F1-score: {f1}")

    return k_values, f1_scores

def plot_f1_scores(k_values, f1_scores):
    plt.figure(figsize=(10, 6))
    plt.plot(k_values, f1_scores, marker='o')
    plt.title('F1-Score vs. Number of Features (ANOVA)')
    plt.xlabel('Number of Features')
    plt.ylabel('F1-Score')
    plt.xticks(k_values)
    plt.grid()
    plt.show()


In [None]:
# Import necessary libraries
from ml_utils import set_seed, load_data, preprocess_data, define_preprocessor, calculate_class_weights
from anova_feature_selection import evaluate_anova, plot_f1_scores

def main():
    set_seed(42)
    data = load_data()
    data = convert_categorical(data, categorical_columns=['sex', 'education', 'marriage'])
    X_train, X_test, y_train, y_test = preprocess_data(data)
    preprocessor = define_preprocessor(X_train)
    class_weights = calculate_class_weights(y_train)

    # Define range of k values
    k_values = range(1, X_train.shape[1] + 1)

    # Evaluate ANOVA feature selection
    k_values, f1_scores = evaluate_anova(X_train, y_train, X_test, y_test, preprocessor, class_weights, k_values)

    # Plot F1-scores
    plot_f1_scores(k_values, f1_scores)

    # Find the optimal number of features
    optimal_k = k_values[np.argmax(f1_scores)]
    print(f"Optimal number of features: {optimal_k}")

    # Save the optimal number of features
    with open('optimal_anova_features.txt', 'w') as f:
        f.write(str(optimal_k))

if __name__ == "__main__":
    main()


In [1]:
# import pandas as pd
# import numpy as np
# import torch
# import torch.nn as nn
# import torch.optim as optim
# from sklearn.base import BaseEstimator, ClassifierMixin
# from sklearn.pipeline import Pipeline as SklearnPipeline
# from sklearn.feature_selection import SelectKBest, f_classif
# from sklearn.metrics import f1_score, classification_report
# from imblearn.under_sampling import RandomUnderSampler
# import matplotlib.pyplot as plt
# import logging

# # Set up logging
# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# def set_seed(seed):
#     np.random.seed(seed)
#     torch.manual_seed(seed)
#     if torch.cuda.is_available():
#         torch.cuda.manual_seed(seed)
#         torch.cuda.manual_seed_all(seed)
#     torch.backends.cudnn.deterministic = True
#     torch.backends.cudnn.benchmark = False

# class SimpleNN(nn.Module):
#     def __init__(self, input_dim):
#         super(SimpleNN, self).__init__()
#         self.fc1 = nn.Linear(input_dim, 32)
#         self.fc2 = nn.Linear(32, 1)

#     def forward(self, x):
#         x = torch.relu(self.fc1(x))
#         x = self.fc2(x)
#         return x

# class SklearnSimpleNN(BaseEstimator, ClassifierMixin):
#     def __init__(self, input_dim, learning_rate=0.001, epochs=50, batch_size=64, pos_weight=1.0):
#         self.input_dim = input_dim
#         self.learning_rate = learning_rate
#         self.epochs = epochs
#         self.batch_size = batch_size
#         self.pos_weight = pos_weight
#         self.model = SimpleNN(self.input_dim)

#     def fit(self, X, y):
#         criterion = nn.BCEWithLogitsLoss(pos_weight=torch.tensor(self.pos_weight, dtype=torch.float32))
#         optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
#         train_dataset = torch.utils.data.TensorDataset(torch.tensor(X, dtype=torch.float32), torch.tensor(y, dtype=torch.float32).unsqueeze(1))
#         train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)

#         for epoch in range(self.epochs):
#             self.model.train()
#             for inputs, targets in train_loader:
#                 optimizer.zero_grad()
#                 outputs = self.model(inputs)
#                 loss = criterion(outputs, targets.view(-1, 1))
#                 loss.backward()
#                 optimizer.step()
#         return self

#     def predict(self, X):
#         self.model.eval()
#         with torch.no_grad():
#             outputs = self.model(torch.tensor(X, dtype=torch.float32))
#             probabilities = torch.sigmoid(outputs)
#             predictions = (probabilities > 0.5).float()
#         return predictions.numpy().squeeze()

# def calculate_class_weights(y_train):
#     return len(y_train) / (2 * np.bincount(y_train))

# def evaluate_anova(X_train, y_train, X_test, y_test, preprocessor, class_weights, k_values):
#     f1_scores = []

#     for k in k_values:
#         logging.info(f"Evaluating ANOVA with {k} features")

#         anova_pipeline = SklearnPipeline(steps=[
#             ('preprocessor', preprocessor),
#             ('anova', SelectKBest(score_func=f_classif, k=k))
#         ])

#         X_train_anova = anova_pipeline.fit_transform(X_train, y_train)
#         X_test_anova = anova_pipeline.transform(X_test)

#         undersampler = RandomUnderSampler(sampling_strategy=0.75, random_state=42)
#         X_train_resampled, y_train_resampled = undersampler.fit_resample(X_train_anova, y_train)

#         X_train_tensor = torch.tensor(X_train_resampled, dtype=torch.float32)
#         y_train_tensor = torch.tensor(y_train_resampled, dtype=torch.float32).unsqueeze(1)
#         X_test_tensor = torch.tensor(X_test_anova, dtype=torch.float32)
#         y_test_tensor = torch.tensor(y_test.to_numpy(), dtype=torch.float32).unsqueeze(1)

#         nn_estimator = SklearnSimpleNN(input_dim=X_train_tensor.shape[1], pos_weight=class_weights[1])
#         nn_estimator.fit(X_train_tensor.numpy(), y_train_tensor.numpy())

#         y_pred = nn_estimator.predict(X_test_tensor.numpy())
#         f1 = f1_score(y_test_tensor.numpy(), y_pred)
#         f1_scores.append(f1)
#         logging.info(f"Number of features: {k}, F1-score: {f1}")

#     return k_values, f1_scores

# def plot_f1_scores(k_values, f1_scores):
#     plt.figure(figsize=(10, 6))
#     plt.plot(k_values, f1_scores, marker='o')
#     plt.title('F1-Score vs. Number of Features (ANOVA)')
#     plt.xlabel('Number of Features')
#     plt.ylabel('F1-Score')
#     plt.xticks(k_values)
#     plt.grid()
#     plt.show()


In [None]:
# # Import necessary libraries
# import numpy as np
# import pandas as pd
# from sklearn.compose import ColumnTransformer
# from sklearn.pipeline import Pipeline
# from sklearn.impute import SimpleImputer
# from sklearn.preprocessing import StandardScaler, OneHotEncoder
# from anova_feature_selection import set_seed, calculate_class_weights, evaluate_anova, plot_f1_scores

# # Set random seed
# set_seed(42)

# # Load the dataset
# url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00350/default%20of%20credit%20card%20clients.xls"
# df = pd.read_excel(url, header=1)
# df.columns = [col.lower().replace(' ', '_') for col in df.columns]

# # Convert specific numeric columns to categorical
# categorical_columns = ['sex', 'education', 'marriage']
# df[categorical_columns] = df[categorical_columns].astype('category')

# # Select features and target
# target = 'default_payment_next_month'
# X = df.drop(columns=[target])
# y = df[target]

# # Perform stratified train-test split
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# # Identify column types
# numeric_features = X_train.select_dtypes(include=['int64', 'float64']).columns.tolist()
# categorical_features = X_train.select_dtypes(include=['object', 'category']).columns.tolist()

# # Define preprocessing for numeric columns
# numeric_transformer = Pipeline(steps=[
#     ('imputer', SimpleImputer(strategy='median')),
#     ('scaler', StandardScaler())])

# # Define preprocessing for categorical columns
# categorical_transformer = Pipeline(steps=[
#     ('imputer', SimpleImputer(strategy='most_frequent')),
#     ('onehot', OneHotEncoder(handle_unknown='ignore'))])

# # Combine preprocessing steps
# preprocessor = ColumnTransformer(
#     transformers=[
#         ('num', numeric_transformer, numeric_features),
#         ('cat', categorical_transformer, categorical_features)])

# # Calculate class weights
# class_weights = calculate_class_weights(y_train)

# # Define range of k values
# k_values = range(1, X_train.shape[1] + 1)

# # Evaluate ANOVA feature selection
# k_values, f1_scores = evaluate_anova(X_train, y_train, X_test, y_test, preprocessor, class_weights, k_values)

# # Plot F1-scores
# plot_f1_scores(k_values, f1_scores)

# # Find the optimal number of features
# optimal_k = k_values[np.argmax(f1_scores)]
# print(f"Optimal number of features: {optimal_k}")

# # Save the optimal number of features
# with open('optimal_anova_features.txt', 'w') as f:
#     f.write(str(optimal_k))
