# Feature Engineering Notebook

## Objectives

* Perform Correlation and PPS Analysis
* Engineer features for ML models

## Inputs

* outputs/datasets/cleaned/FertilityTreatmentDataCleaned.csv
* outputs/datasets/cleaned/TestSetCleaned.csv
* outputs/datasets/cleaned/TrainSetCleaned.csv

## Outputs

* generate a list with variables to engineer

## Conclusions

Feature Engineering Transformers

  * Ordinal Encoding:
    `['Patient age at treatment',
    'Total number of previous IVF cycles',
    'Patient/Egg provider age',
    'Partner/Sperm provider age',
    'Fresh eggs collected',
    'Total eggs mixed',
    'Total embryos created',
    'Embryos transferred',
    'Total embryos thawed']`

  * One Hot Encoding:
    `['Specific treatment type',
    'Egg source',
    'Sperm source',
    'Patient ethnicity',
    'Date of embryo transfer']`

  * Smart Correlation Selection:
    `['Total embryos created',
    'Stimulation used',
    'Fresh cycle',
    'Frozen cycle',
    'Date of embryo transfer_0 - frozen']`



---

# Change working directory

Change the working directory from its current folder to its parent folder
* Access the current directory with os.getcwd()

In [None]:
import os
current_dir = os.getcwd()
current_dir

To make the parent of the current directory the new current directory:
* os.path.dirname() gets the parent directory
* os.chir() defines the new current directory

In [None]:
os.chdir(os.path.dirname(current_dir))
print("A new current directory has been set")

Confirm the new current directory

In [None]:
current_dir = os.getcwd()
current_dir

---

# Load cleaned data

In [None]:
import pandas as pd
# Read the DataFrame from the compressed CSV file
df = pd.read_csv('outputs/datasets/cleaned/FertilityTreatmentDataCleaned.csv')
df.head(3)

### Correlation and PPS Analysis

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import ppscore as pps
import warnings

warnings.filterwarnings("ignore")


def heatmap_corr(df, threshold, figsize=(20, 12), font_annot=8):
    if len(df.columns) > 1:
        mask = np.zeros_like(df, dtype=bool)
        mask[np.triu_indices_from(mask)] = True
        mask[abs(df) < threshold] = True

        fig, axes = plt.subplots(figsize=figsize)
        sns.heatmap(
            df,
            annot=True,
            xticklabels=True,
            yticklabels=True,
            mask=mask,
            cmap="viridis",
            annot_kws={"size": font_annot},
            ax=axes,
            linewidth=0.5,
        )
        axes.set_yticklabels(df.columns, rotation=0)
        plt.ylim(len(df.columns), 0)
        plt.show()


def heatmap_pps(df, threshold, figsize=(20, 12), font_annot=8):
    if len(df.columns) > 1:
        mask = np.zeros_like(df, dtype=bool)
        mask[abs(df) < threshold] = True
        fig, ax = plt.subplots(figsize=figsize)
        ax = sns.heatmap(
            df,
            annot=True,
            xticklabels=True,
            yticklabels=True,
            mask=mask,
            cmap="rocket_r",
            annot_kws={"size": font_annot},
            linewidth=0.05,
            linecolor="grey",
        )
        plt.ylim(len(df.columns), 0)
        plt.show()


def CalculateCorrAndPPS(df):
    df_corr_spearman = df.corr(method="spearman", numeric_only=True)
    df_corr_pearson = df.corr(method="pearson", numeric_only=True)

    pps_matrix_raw = pps.matrix(df)
    pps_matrix = pps_matrix_raw.filter(["x", "y", "ppscore"]).pivot(
        columns="x", index="y", values="ppscore"
    )

    pps_score_stats = (
        pps_matrix_raw.query("ppscore < 1").filter(["ppscore"]).describe().T
    )
    print("PPS threshold - check PPS score IQR to decide threshold for heatmap \n")
    print(pps_score_stats.round(3))

    return df_corr_pearson, df_corr_spearman, pps_matrix


def DisplayCorrAndPPS(
    df_corr_pearson,
    df_corr_spearman,
    pps_matrix,
    CorrThreshold,
    PPS_Threshold,
    figsize=(20, 12),
    font_annot=8,
):

    print("\n")
    print(
        "* Analyse how the target variable for your ML models are correlated with other variables (features and target)"
    )
    print(
        "* Analyse multi-colinearity, that is, how the features are correlated among themselves"
    )

    print("\n")
    print("*** Heatmap: Spearman Correlation ***")
    print("It evaluates monotonic relationship \n")
    heatmap_corr(
        df=df_corr_spearman,
        threshold=CorrThreshold,
        figsize=figsize,
        font_annot=font_annot,
    )

    print("\n")
    print("*** Heatmap: Pearson Correlation ***")
    print("It evaluates the linear relationship between two continuous variables \n")
    heatmap_corr(
        df=df_corr_pearson,
        threshold=CorrThreshold,
        figsize=figsize,
        font_annot=font_annot,
    )

    print("\n")
    print("*** Heatmap: Power Predictive Score (PPS) ***")
    print(
        f"PPS detects linear or non-linear relationships between two columns.\n"
        f"The score ranges from 0 (no predictive power) to 1 (perfect predictive power) \n"
    )
    heatmap_pps(
        df=pps_matrix, threshold=PPS_Threshold, figsize=figsize, font_annot=font_annot
    )

Calculate Correlations and Power Predictive Score

In [None]:
df_corr_pearson, df_corr_spearman, pps_matrix = CalculateCorrAndPPS(df)

Display Heatmaps

In [None]:
DisplayCorrAndPPS(df_corr_pearson = df_corr_pearson,
                  df_corr_spearman = df_corr_spearman, 
                  pps_matrix = pps_matrix,
                  CorrThreshold = 0.4, PPS_Threshold =0.2,
                  figsize=(12,10), font_annot=7)

---

## Load Train and Test Sets

Tain Set

In [None]:
import pandas as pd
train_set_path = "outputs/datasets/cleaned/TrainSetCleaned.csv"
TrainSet = pd.read_csv(train_set_path)
TrainSet.head(3)

Test Set

In [None]:
test_set_path = 'outputs/datasets/cleaned/TestSetCleaned.csv'
TestSet = pd.read_csv(test_set_path)
TestSet.head(3)

---

## Data Exploration

In [None]:
from ydata_profiling import ProfileReport
pandas_report = ProfileReport(df=TrainSet, minimal=True)
pandas_report.to_notebook_iframe()

In [None]:
print (f"Number of empty entries followed by the unique values and data type at each column:\n")

for column in TrainSet.columns:
    # Check how many empty fields there are in each column
    empty_fields_count = TrainSet[column].isnull().sum()
    # Check unique values there are in each column
    unique_values = TrainSet[column].unique()
    # Check data type of each column
    data_type = TrainSet[column].dtype
    
    print (f"- {column}: {empty_fields_count}, {unique_values}, {data_type}\n")

## Feature Engineering

Custom transformer for Ordinal Encoding. Encodes the values in a specified order. It is saved under `src/custom_transformers.py`

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import OrdinalEncoder

class OrdinalEncoderWithCategories(BaseEstimator, TransformerMixin):
    def __init__(self, categories, columns):
        self.categories = categories
        self.columns = columns
        self.encoder = OrdinalEncoder()  # Initialize without categories

    def fit(self, X, y=None):
        # Ensures the categories match the number of features in the data
        if len(self.categories) != len(self.columns):
            raise ValueError(
                "The number of category lists must match the number of features."
            )
        # Set categories during fit
        self.encoder.set_params(categories=self.categories)
        self.encoder.fit(X[self.columns])
        return self

    def transform(self, X):
        transformed = self.encoder.transform(X[self.columns])
        # Return a DataFrame with the original column names
        X[self.columns] = pd.DataFrame(transformed, columns=self.columns, index=X.index)
        return X

In [None]:
import scipy.stats as stats
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import warnings
from feature_engine import transformation as vt
from feature_engine.outliers import Winsorizer
from feature_engine.encoding import OneHotEncoder

sns.set_theme(style="whitegrid")
warnings.filterwarnings("ignore")


def FeatureEngineeringAnalysis(df, analysis_type=None):
    """
    - used for quick feature engineering on numerical and categorical variables
    to decide which transformation can better transform the distribution shape
    - Once transformed, use a reporting tool, like ydata-profiling, to evaluate distributions
    """
    check_missing_values(df)
    allowed_types = [
        "numerical",
        "ordinal_encoder",
        "outlier_winsorizer",
        "one_hot_encoder",
    ]
    check_user_entry_on_analysis_type(analysis_type, allowed_types)
    list_column_transformers = define_list_column_transformers(analysis_type)

    # Loop in each variable and engineer the data according to the analysis type
    df_feat_eng = pd.DataFrame([])
    for column in df.columns:
        # create additional columns (column_method) to apply the methods
        df_feat_eng = pd.concat([df_feat_eng, df[column]], axis=1)
        for method in list_column_transformers:
            df_feat_eng[f"{column}_{method}"] = df[column]

        # Apply transformers in respective column_transformers
        df_feat_eng, list_applied_transformers = apply_transformers(
            analysis_type, df_feat_eng, column
        )

        # For each variable, assess how the transformations perform
        transformer_evaluation(
            column, list_applied_transformers, analysis_type, df_feat_eng
        )

    return df_feat_eng


def check_user_entry_on_analysis_type(analysis_type, allowed_types):
    """Check analysis type"""
    if analysis_type is None:
        raise SystemExit(
            f"You should pass analysis_type parameter as one of the following options: {allowed_types}"
        )
    if analysis_type not in allowed_types:
        raise SystemExit(
            f"analysis_type argument should be one of these options: {allowed_types}"
        )


def check_missing_values(df):
    if df.isna().sum().sum() != 0:
        raise SystemExit(
            f"There is a missing value in your dataset. Please handle that before getting into feature engineering."
        )


def define_list_column_transformers(analysis_type):
    """Set suffix columns according to analysis_type"""
    if analysis_type == "numerical":
        list_column_transformers = [
            "log_e",
            "log_10",
            "reciprocal",
            "power",
            "box_cox",
            "yeo_johnson",
        ]

    elif analysis_type == "ordinal_encoder":
        list_column_transformers = ["ordinal_encoder"]

    elif analysis_type == "outlier_winsorizer":
        list_column_transformers = ["iqr"]

    elif analysis_type == "one_hot_encoder":
        list_column_transformers = ["one_hot_encoder"]

    return list_column_transformers


def apply_transformers(analysis_type, df_feat_eng, column):
    for col in df_feat_eng.select_dtypes(include="category").columns:
        df_feat_eng[col] = df_feat_eng[col].astype("object")

    if analysis_type == "numerical":
        df_feat_eng, list_applied_transformers = FeatEngineering_Numerical(
            df_feat_eng, column
        )

    elif analysis_type == "outlier_winsorizer":
        df_feat_eng, list_applied_transformers = FeatEngineering_OutlierWinsorizer(
            df_feat_eng, column
        )

    elif analysis_type == "ordinal_encoder":
        df_feat_eng, list_applied_transformers = FeatEngineering_CustomOrdinalEncoder(
            df_feat_eng, column
        )

    elif analysis_type == "one_hot_encoder":
        df_feat_eng, list_applied_transformers = FeatEngineering_OneHotEncoder(
            df_feat_eng, column
        )

    return df_feat_eng, list_applied_transformers


def transformer_evaluation(
    column, list_applied_transformers, analysis_type, df_feat_eng
):
    # For each variable, assess how the transformations perform
    print(f"* Variable Analyzed: {column}")
    print(f"* Applied transformation: {list_applied_transformers} \n")

    if analysis_type == "one_hot_encoder":
        # Show the first few rows and the shape of the DataFrame after encoding
        print(f"Transformed DataFrame shape: {df_feat_eng.shape}")
        print("Columns added during One Hot Encoding:\n")
        print([col for col in df_feat_eng.columns if column in col])  # Display columns created by One-Hot Encoding
        print(df_feat_eng.head())  # Display the first few rows to inspect the transformations

    else:
        for col in [column] + list_applied_transformers:
            if analysis_type != "ordinal_encoder":
                DiagnosticPlots_Numerical(df_feat_eng, col)
            else:
                if col == column:
                    DiagnosticPlots_Categories(df_feat_eng, col)
                else:
                    DiagnosticPlots_Numerical(df_feat_eng, col)

    print("\n")


def DiagnosticPlots_Categories(df_feat_eng, col):
    plt.figure(figsize=(4, 3))
    sns.countplot(
        data=df_feat_eng,
        x=col,
        palette=["#432371"],
        order=df_feat_eng[col].value_counts().index,
    )
    plt.xticks(rotation=90)
    plt.suptitle(f"{col}", fontsize=30, y=1.05)
    plt.show()
    print("\n")


def DiagnosticPlots_Numerical(df, variable):
    fig, axes = plt.subplots(1, 3, figsize=(12, 4))
    sns.histplot(data=df, x=variable, kde=True, element="step", ax=axes[0])
    stats.probplot(df[variable], dist="norm", plot=axes[1])
    sns.boxplot(x=df[variable], ax=axes[2])

    axes[0].set_title("Histogram")
    axes[1].set_title("QQ Plot")
    axes[2].set_title("Boxplot")
    fig.suptitle(f"{variable}", fontsize=30, y=1.05)
    plt.tight_layout()
    plt.show()


def map_categories_to_order(df, column, category_order):
    """ Map categories to their respective order based on predefined order """
    category_mapping = {category: idx for idx, category in enumerate(category_order)}
    df[f"{column}_ordinal_encoder"] = df[column].map(category_mapping)
    return df

def FeatEngineering_CustomOrdinalEncoder(df_feat_eng, column):
    list_methods_worked = []

    try:
        # Find the index of the current column in the list of columns
        column_index = columns.index(column)
        
        # Use the corresponding categories based on the column index
        custom_encoder = OrdinalEncoderWithCategories(
            categories=[categories[column_index]],  # Match category to the column
            columns=[column]
        )

        # Apply the custom transformer
        df_feat_eng = custom_encoder.fit_transform(df_feat_eng)
        list_methods_worked.append(f"{column}_ordinal_encoder")

    except Exception as e:
        print(f"Error encoding {column} with custom encoder: {e}")
        if f"{column}_ordinal_encoder" in df_feat_eng.columns:
            df_feat_eng.drop([f"{column}_ordinal_encoder"], axis=1, inplace=True)

    return df_feat_eng, list_methods_worked


def FeatEngineering_OutlierWinsorizer(df_feat_eng, column):
    list_methods_worked = []

    # Winsorizer iqr
    try:
        disc = Winsorizer(
            capping_method="iqr", tail="both", fold=1.5, variables=[f"{column}_iqr"]
        )
        df_feat_eng = disc.fit_transform(df_feat_eng)
        list_methods_worked.append(f"{column}_iqr")
    except Exception:
        df_feat_eng.drop([f"{column}_iqr"], axis=1, inplace=True)

    return df_feat_eng, list_methods_worked


def FeatEngineering_Numerical(df_feat_eng, column):
    list_methods_worked = []

    # LogTransformer base e
    try:
        lt = vt.LogTransformer(variables=[f"{column}_log_e"])
        df_feat_eng = lt.fit_transform(df_feat_eng)
        list_methods_worked.append(f"{column}_log_e")
    except Exception:
        df_feat_eng.drop([f"{column}_log_e"], axis=1, inplace=True)

    # LogTransformer base 10
    try:
        lt = vt.LogTransformer(variables=[f"{column}_log_10"], base="10")
        df_feat_eng = lt.fit_transform(df_feat_eng)
        list_methods_worked.append(f"{column}_log_10")
    except Exception:
        df_feat_eng.drop([f"{column}_log_10"], axis=1, inplace=True)

    # ReciprocalTransformer
    try:
        rt = vt.ReciprocalTransformer(variables=[f"{column}_reciprocal"])
        df_feat_eng = rt.fit_transform(df_feat_eng)
        list_methods_worked.append(f"{column}_reciprocal")
    except Exception:
        df_feat_eng.drop([f"{column}_reciprocal"], axis=1, inplace=True)

    # PowerTransformer
    try:
        pt = vt.PowerTransformer(variables=[f"{column}_power"])
        df_feat_eng = pt.fit_transform(df_feat_eng)
        list_methods_worked.append(f"{column}_power")
    except Exception:
        df_feat_eng.drop([f"{column}_power"], axis=1, inplace=True)

    # BoxCoxTransformer
    try:
        bct = vt.BoxCoxTransformer(variables=[f"{column}_box_cox"])
        df_feat_eng = bct.fit_transform(df_feat_eng)
        list_methods_worked.append(f"{column}_box_cox")
    except Exception:
        df_feat_eng.drop([f"{column}_box_cox"], axis=1, inplace=True)

    # YeoJohnsonTransformer
    try:
        yjt = vt.YeoJohnsonTransformer(variables=[f"{column}_yeo_johnson"])
        df_feat_eng = yjt.fit_transform(df_feat_eng)
        list_methods_worked.append(f"{column}_yeo_johnson")
    except Exception:
        df_feat_eng.drop([f"{column}_yeo_johnson"], axis=1, inplace=True)

    return df_feat_eng, list_methods_worked


def FeatEngineering_OneHotEncoder(df_feat_eng, column):
    list_methods_worked = []
    try:
        # Initialize the OneHotEncoder from feature_engine
        encoder = OneHotEncoder(drop_last=True, variables=[column])

        # Fit and transform the dataframe
        df_feat_eng = encoder.fit_transform(df_feat_eng)

        # Record the applied transformations
        encoded_columns = df_feat_eng.columns[
            df_feat_eng.columns.str.startswith(column)
        ]
        list_methods_worked.extend(encoded_columns)

    except Exception as e:
        print(f"Error with column {column}: {e}")
        if any(f"{column}_" in col for col in df_feat_eng.columns):
            columns_to_drop = [
                col for col in df_feat_eng.columns if f"{column}_" in col
            ]
            df_feat_eng.drop(columns=columns_to_drop, axis=1, inplace=True)

    return df_feat_eng, list_methods_worked

## Transformers

* Categorical Encoding (Ordinal Encoder and OneHotEncoder)
* Smart Correlation Selection (SmartCorrelatedSelection)

#### Ordinal Encoder

Categorical Encoding - Ordinal: replaces categories with ordinal numbers. For features where the order of categories is important.

In [None]:
ordinal_columns = [
        'Patient age at treatment',
        'Total number of previous IVF cycles',
        'Patient/Egg provider age',
        'Partner/Sperm provider age',
        'Fresh eggs collected',
        'Total eggs mixed',
        'Total embryos created',
        'Embryos transferred',
        'Total embryos thawed'
        ]

ordinal_columns

Define the categories for each column in the same order as ordinal_columns

In [None]:
# Proper order for the values to be encoded
categories = [
    ['18-34', '35-37', '38-39', '40-42', '43-44', '45-50'],  # Patient age at treatment
    ['0', '1', '2', '3', '4', '5', '>5'],  # Total number of previous IVF cycles
    ['18-34', '35-37', '38-39', '40-42', '43-44', '45-50'],  # Patient/Egg provider age
    ['18-34', '35-37', '38-39', '40-42', '43-44', '45-50', '51-55', '56-60', '>60'],  # Partner/Sperm provider age
    ['0', '0 - frozen cycle', '1-5', '6-10', '11-15', '16-20', '21-25', '26-30', '31-35', '36-40', '>40'],  # Fresh eggs collected
    ['0', '0 - frozen cycle', '1-5', '6-10', '11-15', '16-20', '21-25', '26-30', '31-35', '36-40', '>40'],  # Total eggs mixed
    ['0', '0 - frozen cycle', '1-5', '6-10', '11-15', '16-20', '21-25', '26-30', '>30'],  # Total embryos created
    ['0', '1', '1e', '2', '3'],  # Embryos transferred
    ['0 - fresh cycle', '0 - frozen cycle', '1-5', '6-10', '>10']  # Total embryos thawed
]

Ensure that categories match the order and number of ordinal columns

In [None]:

if len(categories) != len(ordinal_columns):
    raise ValueError("The number of categories must match the number of ordinal columns.")


Create a separate DataFrame with the selected variables

In [None]:
df_eng_oe = TrainSet[ordinal_columns].copy()
df_eng_oe.head(3)

Apply the selected transformation to the Train and Test Set

In [None]:
ordinal_encoder = OrdinalEncoderWithCategories(categories=categories, columns=ordinal_columns)
TrainSet = ordinal_encoder.fit_transform(TrainSet)
TestSet = ordinal_encoder.transform(TestSet)


Check the result of the transformation

In [None]:
print(TrainSet.head())
for column in ordinal_columns:
    print(f"\nUnique values in transformed '{column}' column (TrainSet): {TrainSet[column].unique()}")


#### OneHotEncoder:

One hot encoder was used for variables with values without inherent order.

Select variables

In [None]:
variables_eng_ohe=[
    "Specific treatment type",
    "Egg source",
    "Sperm source",
    "Patient ethnicity",
    "Date of embryo transfer"
]

variables_eng_ohe

Create a separate DataFrame with the selected variables

In [None]:
df_eng_ohe = TrainSet[variables_eng_ohe].copy()
df_eng_ohe.head(3)

Apply the selected transformation to the Train and Test Set

In [None]:
df_eng_ohe = FeatureEngineeringAnalysis(df=df_eng_ohe, analysis_type='one_hot_encoder')

Apply the transformation to the Train ans Test Sets

In [None]:
ohe_encoder = OneHotEncoder(top_categories=None, drop_last=True, variables = variables_eng_ohe)
TrainSet = ohe_encoder.fit_transform(TrainSet)
TestSet = ohe_encoder.transform(TestSet)

### SmartCorrelatedSelection Variables

* To be applied in all variables

Create a new dataframe to apply the transformer on

In [None]:
df_eng_smart_corr_sel = TrainSet.copy()
df_eng_smart_corr_sel.head(3)

Apply the transformation

Note: The threshold of the smart correation was kept at 0.9 to keep as much important infomrartion as possible due to the low predictive score of the features from the dataset.

In [None]:
from feature_engine.selection import SmartCorrelatedSelection
corr_sel = SmartCorrelatedSelection(variables=None, method="spearman", threshold=0.9, selection_method="variance")

corr_sel.fit_transform(df_eng_smart_corr_sel)
corr_sel.correlated_feature_sets_

Check what freatures should be dropped

In [None]:
corr_sel.features_to_drop_

---

## Conclusion


Transformations needed for feature engineering:


Feature Engineering Transformers

  * Ordinal Encoding:
    `['Patient age at treatment',
    'Total number of previous IVF cycles',
    'Patient/Egg provider age',
    'Partner/Sperm provider age',
    'Fresh eggs collected',
    'Total eggs mixed',
    'Total embryos created',
    'Embryos transferred',
    'Total embryos thawed']`

  * One Hot Encoding:
    `['Specific treatment type',
    'Egg source',
    'Sperm source',
    'Patient ethnicity',
    'Date of embryo transfer']`

  * Smart Correlation Selection:
    `['Total embryos created',
    'Stimulation used',
    'Fresh cycle',
    'Frozen cycle',
    'Date of embryo transfer_0 - frozen']`
    
