In [None]:
from sklearn.model_selection import GridSearchCV
from imblearn.pipeline import Pipeline

import wandb
from sklearn.model_selection import GridSearchCV
from imblearn.pipeline import Pipeline

def model_with_gridsearch(
    data, 
    model, 
    param_grid, 
    pipeline_steps=None, 
    target='TenYearCHD',
    scoring='f1', 
    cv=5,
    verbose=1,
    use_wandb=False,  # <--- New optional parameter
    wandb_project="ml_experiments"  # <--- Default project name
):
    """
    Generalized pipeline with GridSearchCV for any model, scoring, and preprocessing steps,
    now optionally logging results to Weights & Biases (wandb).
    
    Returns
    -------
    best_pipeline : sklearn Pipeline
    grid : GridSearchCV object
    """
    df = data.copy()
    X = df.drop(columns=target)
    y = df[target]

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    if pipeline_steps is None:
        pipeline_steps = [
            ('outlier_handler', OutlierCapperOrRemover(strategy='cap', factor=1.5)),
            ('imputer', KNNImputer(n_neighbors=5)),
            ('scaler', StandardScaler()),
            ('smote', SMOTE(random_state=42))
        ]

    full_pipeline = Pipeline(pipeline_steps + [('model', model)])

    grid = GridSearchCV(
        estimator=full_pipeline,
        param_grid=param_grid,
        scoring=scoring,
        cv=cv,
        n_jobs=-1,
        refit=scoring if isinstance(scoring, str) else 'f1',
        verbose=verbose
    )

    grid.fit(X_train, y_train)

    best_pipeline = grid.best_estimator_

    print("\nBest hyperparameters found:", grid.best_params_)
    model_scoring(best_pipeline, X_train, X_test, y_train, y_test)

    # 🚀 WandB Logging
    if use_wandb:
        wandb.init(project=wandb_project, name=type(model).__name__)
        
        # Best model metrics
        y_pred = best_pipeline.predict(X_test)
        acc = accuracy_score(y_test, y_pred)
        f1 = f1_score(y_test, y_pred)
        prec = precision_score(y_test, y_pred)
        rec = recall_score(y_test, y_pred)

        wandb.config.update({
            "model": type(model).__name__,
            "preprocessing": [step[0] for step in pipeline_steps],
            **grid.best_params_  # Add best hyperparameters
        })

        wandb.log({
            "accuracy": acc,
            "f1_score": f1,
            "precision": prec,
            "recall": rec
        })

        wandb.finish()

    return best_pipeline, grid




    ########################################################



best_pipeline, search = model_with_gridsearch(
    data=df,
    model=LogisticRegression(),
    param_grid=param_grid_lr,
    use_wandb=True,  # <--- Turn on wandb logging
    wandb_project="logistic_regression_colab"
)

    #########################################################


from sklearn.linear_model import LogisticRegression

param_grid_lr = {
    'model__C': [0.01, 0.1, 1, 10, 100],
    'model__penalty': ['l2'],
    'model__solver': ['lbfgs', 'saga']
}

model_with_gridsearch(df, LogisticRegression(max_iter=1000, random_state=42), param_grid_lr)



    #########################################################



from sklearn.ensemble import RandomForestClassifier

param_grid_rf = {
    'model__n_estimators': [100, 200],
    'model__max_depth': [None, 10, 20],
    'model__min_samples_split': [2, 5]
}

custom_pipeline_rf = [
    ('outlier_handler', OutlierCapperOrRemover(strategy='cap', factor=1.5)),
    ('imputer', KNNImputer(n_neighbors=5)),
    ('smote', SMOTE(random_state=42))  # no scaler for trees
]

model_with_gridsearch(
    data=df,
    model=RandomForestClassifier(random_state=42),
    param_grid=param_grid_rf,
    pipeline_steps=custom_pipeline_rf,
    scoring='roc_auc'
)


In [None]:
# Function to detect outlier rows using IQR
def detect_outliers_iqr(data, columns):
    outlier_indices = set()
    for col in columns:
        Q1 = np.percentile(data[col].dropna(), 25)
        Q3 = np.percentile(data[col].dropna(), 75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        outliers = data[(data[col] < lower_bound) | (data[col] > upper_bound)].index
        outlier_indices.update(outliers)
    return list(outlier_indices)

In [None]:
def cap_outliers_iqr(data, columns):
    capped_data = data.copy()
    for col in columns:
        Q1 = np.percentile(capped_data[col].dropna(), 25)
        Q3 = np.percentile(capped_data[col].dropna(), 75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        # Cap the values outside the bounds
        capped_data[col] = np.where(
            capped_data[col] < lower_bound, lower_bound,
            np.where(capped_data[col] > upper_bound, upper_bound, capped_data[col])
        )
    return capped_data

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

def manual_correlation_analysis(X, threshold=0.9, plot=True):
    """
    Analyze feature-feature correlations and suggest features to drop manually.

    Parameters
    ----------
    X : pandas DataFrame
        Input feature matrix (without target variable).
    
    threshold : float, optional (default=0.9)
        Correlation value above which features are considered too correlated.

    plot : bool, optional (default=True)
        Whether to plot the correlation heatmap.

    Returns
    -------
    to_drop : list
        List of feature names suggested to be dropped.
    """

    # Step 1: Correlation matrix
    corr_matrix = X.corr().abs()

    # Step 2: Plot heatmap if needed
    if plot:
        plt.figure(figsize=(12, 10))
        sns.heatmap(corr_matrix, cmap='coolwarm', annot=False)
        plt.title('Feature Correlation Matrix')
        plt.show()

    # Step 3: Get the upper triangle
    upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))

    # Step 4: Find features to drop
    to_drop = [column for column in upper.columns if any(upper[column] > threshold)]

    print(f"\nFeatures suggested for dropping (correlation > {threshold}):\n{to_drop}")

    return to_drop


In [None]:
######## DO NOT USE ############### Learning Purpose Only ################
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
import pandas as pd
from scipy.stats import chi2_contingency

class CorrelationFilterMixed(BaseEstimator, TransformerMixin):
    """
    A transformer to drop highly correlated features, handling numeric and categorical features separately.
    Keeps the feature more correlated with the target.
    
    Parameters:
    ----------
    threshold: float (default=0.9)
        Threshold for correlation to consider dropping.
    categorical_cols: list (optional)
        List of column names that are categorical.
    """
    def __init__(self, threshold=0.9, categorical_cols=None):
        self.threshold = threshold
        self.categorical_cols = categorical_cols
        self.features_to_drop_ = None
        self.target_correlation_ = None

    def _cramers_v(self, x, y):
        """Calculate Cramér's V statistic for categorical-categorical association."""
        confusion_matrix = pd.crosstab(x, y)
        chi2 = chi2_contingency(confusion_matrix)[0]
        n = confusion_matrix.sum().sum()
        phi2 = chi2 / n
        r, k = confusion_matrix.shape
        phi2corr = max(0, phi2 - ((k-1)*(r-1)) / (n-1))    
        rcorr = r - ((r-1)**2)/(n-1)
        kcorr = k - ((k-1)**2)/(n-1)
        return np.sqrt(phi2corr / min((kcorr-1), (rcorr-1)))

    def _calculate_target_correlation(self, X, y):
        target_corr = {}
        for col in X.columns:
            if col in self.categorical_cols:
                target_corr[col] = np.abs(self._cramers_v(X[col], y))
            else:
                target_corr[col] = np.abs(np.corrcoef(X[col], y)[0, 1])
        return target_corr

    def fit(self, X, y):
        """
        Identify features to drop by checking feature-feature and feature-target correlations.
        """
        if not isinstance(X, pd.DataFrame):
            raise ValueError("Input must be a pandas DataFrame")
        
        X = X.copy()
        y = pd.Series(y)
        
        num_cols = [col for col in X.columns if col not in self.categorical_cols]
        
        features_to_drop = set()

        # 1. Handle numeric features
        if num_cols:
            corr_matrix = X[num_cols].corr().abs()
            upper_triangle = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
            for col in upper_triangle.columns:
                for row in upper_triangle.index:
                    if upper_triangle.loc[row, col] > self.threshold:
                        features_to_drop.add((row, col))
        
        # 2. Handle categorical features
        if self.categorical_cols:
            for i, col1 in enumerate(self.categorical_cols):
                for col2 in self.categorical_cols[i+1:]:
                    v = self._cramers_v(X[col1], X[col2])
                    if v > self.threshold:
                        features_to_drop.add((col1, col2))

        # 3. Now decide which feature to drop based on correlation with target
        target_corr = self._calculate_target_correlation(X, y)
        self.target_correlation_ = target_corr

        drop_list = set()
        for f1, f2 in features_to_drop:
            if target_corr[f1] < target_corr[f2]:
                drop_list.add(f1)
            else:
                drop_list.add(f2)

        self.features_to_drop_ = list(drop_list)
        return self

    def transform(self, X):
        """
        Drop the identified features from the dataset.
        """
        X = X.copy()
        if self.features_to_drop_:
            X = X.drop(columns=self.features_to_drop_, errors='ignore')
        return X
    

##########################################################


# Assume we know which columns are categorical
categorical_columns = ['currentSmoker', 'prevalentStroke', 'prevalentHyp', 'diabetes', 'male']

# Initialize upgraded filter
corr_filter = CorrelationFilterMixed(threshold=0.9, categorical_cols=categorical_columns)

# Fit and transform
X_reduced = corr_filter.fit_transform(X_train, y_train)

print("Features dropped:", corr_filter.features_to_drop_)



##########################################################



from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import KNNImputer
from imblearn.over_sampling import SMOTE
from sklearn.linear_model import LogisticRegression

pipeline = Pipeline([
    ('corr_filter', CorrelationFilterMixed(threshold=0.9, categorical_cols=categorical_columns)),
    ('scaler', StandardScaler()),
    ('imputer', KNNImputer(n_neighbors=5)),
    ('smote', SMOTE(random_state=42)),
    ('model', LogisticRegression(max_iter=1000, random_state=42))
])

pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)


In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
import pandas as pd

class OutlierCapperOrRemover(BaseEstimator, TransformerMixin):
    """
    Scikit-learn compatible transformer to cap or remove outliers based on IQR method.

    Parameters:
    ----------
    strategy : str, optional (default='cap')
        Strategy to handle outliers: 
        'cap' - clip values to IQR bounds, 
        'remove' - remove rows containing outliers.
    
    factor : float, optional (default=1.5)
        The multiplication factor for IQR to define outlier boundaries (standard = 1.5).
    
    columns : list, optional (default=None)
        List of column names to apply outlier handling on. 
        If None, will apply to all numeric columns.
    """
    def __init__(self, strategy='cap', factor=1.5, columns=None):
        self.strategy = strategy
        self.factor = factor
        self.columns = columns
        self.bounds_ = {}

    def fit(self, X, y=None):
        """
        Learn the IQR bounds for each feature.
        """
        X = X.copy()
        if self.columns is None:
            self.columns = X.select_dtypes(include=[np.number]).columns.tolist()
        
        for col in self.columns:
            Q1 = np.percentile(X[col].dropna(), 25)
            Q3 = np.percentile(X[col].dropna(), 75)
            IQR = Q3 - Q1
            lower_bound = Q1 - self.factor * IQR
            upper_bound = Q3 + self.factor * IQR
            self.bounds_[col] = (lower_bound, upper_bound)
        
        return self

    def transform(self, X):
        """
        Apply capping or removal based on learned bounds.
        """
        X = X.copy()
        
        if self.strategy == 'cap':
            for col, (lower, upper) in self.bounds_.items():
                X[col] = np.clip(X[col], lower, upper)
            return X

        elif self.strategy == 'remove':
            for col, (lower, upper) in self.bounds_.items():
                X = X[(X[col] >= lower) & (X[col] <= upper)]
            X = X.reset_index(drop=True)
            return X

        else:
            raise ValueError("Strategy must be either 'cap' or 'remove'.")
    
    
    ##########################################################


# Example with Outlier Capping
outlier_handler = OutlierCapperOrRemover(strategy='cap', factor=1.5)
X_transformed = outlier_handler.fit_transform(X_train)

# Example with Outlier Removal
outlier_handler = OutlierCapperOrRemover(strategy='remove', factor=1.5)
X_transformed = outlier_handler.fit_transform(X_train)


   ##########################################################


from imblearn.pipeline import Pipeline

pipeline = Pipeline([
    ('outlier_handler', OutlierCapperOrRemover(strategy='cap', factor=1.5)),
    ('corr_filter', CorrelationFilterMixed(threshold=0.9, categorical_cols=['categorical_columns'])),
    ('scaler', StandardScaler()),
    ('imputer', KNNImputer(n_neighbors=5)),
    ('smote', SMOTE(random_state=42)),
    ('model', LogisticRegression(max_iter=1000, random_state=42))
])



<img src="Images/Pipeline_Order.png" width="400" height="600" alt text="Pipeline_Order.png">