## README:

App : **Sample Application**

Stage : **Data preparation**

This is the sample notebook for loading data from warehouse

The notebook expects the required inputs in the adjacent `data` folder

Loading configuration from `config/data_prep.yaml` file

In [1]:
import os
import sys
import time
import json
import logging
import pickle
import datetime

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

from tqdm import tqdm
from pathlib import Path
from sklearn.model_selection import train_test_split
from IPython.display import display
from pprint import pprint

#Rudderlab data utilities imports
from rudderlabs.data.apps.log import setup_file_logger
from rudderlabs.data.apps.config import read_yaml
from rudderlabs.data.apps.utils.data import NamedColumns, get_onehot_encoder_names
from rudderlabs.data.apps.aws.s3 import upload_file_to_s3


from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.feature_selection import GenericUnivariateSelect, chi2, f_classif, VarianceThreshold
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import StandardScaler, MinMaxScaler, PowerTransformer, FunctionTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder

pd.options.display.max_columns=None
tqdm.pandas()

In [None]:
# Parameters cell for papermill. These values can get overridden by parameters passed by papermill
job_id = str(int(time.time()))
local_input_path = None
local_output_path = None
code_path = "../";

In [None]:
#Initialize input and output paths if they are not passed by papermill
if local_input_path is None:
    local_input_path = f"../data/{job_id}/data-prep"
    
if local_output_path is None:
    local_output_path = f"../data/{job_id}/data-prep"

In [None]:
print(job_id)
print(f"local_input_path {local_input_path}")
print(f"local_output_path {local_output_path}")

In [None]:
#Local imports
sys.path.append(code_path)
from data_loader import DataIO

In [None]:
# Constants
# All the required constants are defined here
IMAGE_FORMAT = 'png'

In [None]:
#Logging setup
try:
    log_file_path = os.path.join(local_output_path, "logs", "sample_notebook.log")
    logging = setup_file_logger(log_file_path)
except:
    pass

logging.info("\n\n\t\tSTARTING FEATURE PREPROCESSING")

In [None]:
#Configurations
notebook_config = read_yaml(os.path.join(code_path, "config/data_prep.yaml"))
print("Notebook config:")
pprint(notebook_config)

In [None]:
creds_config = read_yaml(os.path.join(code_path, "credentials.yaml"))
print("Credentials config:")
pprint(creds_config)

In [None]:
# All the output files get stored in the output_directory. Each run of the feature_processing generates a new sub directory based on the timestamp.
# output directory structure
# - data
#   - <job_id>
#       - data-prep
#           - visuals
#           - model_artifacts
visuals_dir = os.path.join( local_output_path, "visuals" )
model_artifacts_dir = os.path.join(local_output_path, "model_artifacts")

logging.info(f"All the output files will be saved to following location: {local_output_path}")
for output_path in [local_output_path, visuals_dir, model_artifacts_dir]:
    Path(output_path).mkdir(parents=True, exist_ok=True)

In [None]:
#Data splitting
train_split = notebook_config['data']['train_size']
val_split = notebook_config['data']['val_size']
test_split = notebook_config['data']['test_size']

ignore_features = notebook_config['data']['ignore_features']
label_column = notebook_config['data']['label_column']

In [None]:
print("Getting data from warehouse")
dataIO = DataIO(notebook_config, creds_config)
input_data = dataIO.get_data()

In [None]:
#Ignoring features
#Select valid columns to ignore from the feature table
ignore_features = [ col for col in ignore_features if col in input_data.columns ]
print(f"Ignoring features {ignore_features}")
logging.info(f"Ignoring features {ignore_features}")
input_data = input_data.drop(columns=ignore_features)

In [None]:
print("Sample rows from the transformed wide form data")
input_data.head()

In [None]:
fig, axs = plt.subplots(1,2,figsize=(16,6))
fig.suptitle("Label distribution")
input_data[label_column].value_counts().plot.pie(explode=[0,0.1], autopct="%1.1f%%", ax=axs[0]);

bars = (axs[1].barh(list(input_data[label_column].value_counts().index), list(input_data[label_column].value_counts().values)))

for bars in axs[1].containers:
    axs[1].bar_label(bars)
    
axs[1].set_yticks([0,1])
axs[1].set_xlabel("Frequency")
axs[1].set_ylabel("Label");

plt.savefig(os.path.join(visuals_dir, f"label_distribution.{IMAGE_FORMAT}"))

In [None]:
logging.info("Converting categorical columns to 'object' datatype ")
categorical_columns = notebook_config['preprocessing']['categorical_columns']
nl = "\n    - "
logging.info(f"Categorical columns: {nl}{nl.join(categorical_columns)}")
print(f"Categorical columns: {nl}{nl.join(categorical_columns)}")

input_data[categorical_columns] = input_data[categorical_columns].astype('object')

In [None]:
categorical_columns = list(input_data.select_dtypes(include='object'))
numeric_columns = list(input_data.select_dtypes(exclude='object'))

categorical_columns = [col for col in categorical_columns if col != label_column ]
numeric_columns = [col for col in numeric_columns if col != label_column ]
nl = "\n    - "

print(f"Following are all categorical columns:{nl}{nl.join(categorical_columns)}")
print(f"Numeric columns:{nl}{nl.join(numeric_columns)}")

preparing boolean columns to avoid any transformations, trying to find columns names where unique values are two among all numberic columns

In [None]:
print("Detecting boolean columns")
logging.info("Detecting boolean columns")
boolean_columns = []

for col in numeric_columns:
    if len(input_data[col].unique()) == 2:
        boolean_columns.append(col)

print(f"Boolean columns : {nl}{nl.join(boolean_columns)}")
logging.info(f"Boolean columns : {nl}{nl.join(boolean_columns)}")

print("Removing boolean columns from numeric columns list")
for col in boolean_columns:
    numeric_columns.remove(col)

print(f"Numeric columns:{nl}{nl.join(numeric_columns)}")

### Inspecting categorical features:

In [None]:
print("Frequency tables for each categorical feature - Showing only top 5 categories\n")
for column in input_data.select_dtypes(include=["object"]).columns:
    try:
        print(f"Feature: {column}")
        print(input_data[column].value_counts(normalize=True).round(4).head())
        print("\n")
    except TypeError:
        print(f"Unable to show cross tab for {column} variable. This typically happens if the variable type is unhashable. Ex: List. Sample values:")
        print(input_data.query(f"~{column}.isnull()", engine='python')[column].head())

In [None]:
print("Individual effect of each categorical variable on conversion - Showing only top 5 categories\n")
for column in input_data.select_dtypes(include=["object"]).columns:
    if column == label_column:
        continue
    try:
        display(pd.crosstab(input_data[column], input_data[label_column], normalize='index').round(4).head())
        print("\n")
    except TypeError:
        print(f"Unable to show cross tab for {column} variable. This typically happens if the variable type is unhashable. Ex: List. Sample values:")
        display(input_data.query(f"~{column}.isnull()", engine='python')[column].head())

### Inspecting numerical features

In [None]:
print("Basic stats of all numerical features in input:")
pd.options.display.max_columns = None
# Histograms for each numeric features
display(input_data.describe())

In [None]:
print("Distribution of each numerical feature values:")
hist = input_data.hist(bins=30, sharey=True, figsize=(16, 16))
plt.savefig(os.path.join(visuals_dir, f"numerical_features_distribution.{IMAGE_FORMAT}"))

Let's plot the cluster map, where we can see correlation between pairs of features, and hierarchical clusters of the features. This helps in feature selection, to remove redundant features

In [None]:
cor = input_data.corr()

cluster_map = sns.clustermap(cor, cmap=sns.diverging_palette(20, 220, n=200), linewidths=0.1)
plt.setp(cluster_map.ax_heatmap.yaxis.get_majorticklabels(), rotation=0)
cluster_map
plt.savefig(os.path.join(visuals_dir, f"correlation.{IMAGE_FORMAT}"))

The above plot shows correlations between pairs. A correlation close to zero is ideal as that shows independent information. High correlation suggest that the features carry similar information and some of them may be dropped.

In [None]:
print("Skewness in each numerical value:")
input_data.skew(axis=0, numeric_only=True).to_csv(os.path.join(visuals_dir, "feature_skew.csv"))
input_data.skew(axis=0, numeric_only=True)

The above table shows skew of each numerical feature. A value close to zero indicates low skew. A large number either positive or negative indicates a large skew. For some algorithms, it might be useful to reduce the skew using transformations such as log transformation, power transformation etc.

We would often want to normalize numerical features and apply transformations such as one-hot encoding on categorical features. The normalizer is taken as an input from the config.

In [None]:
numeric_columns = list(input_data.select_dtypes(exclude=["object"]))
categorical_columns = list(input_data.select_dtypes(include=["object"]))

categorical_columns = [col for col in categorical_columns if col != label_column ]
numeric_columns = [col for col in numeric_columns if col != label_column ]

print(f"Following are all categorical columns:{nl}{nl.join(categorical_columns)}")
print(f"Numeric columns:{nl}{nl.join(numeric_columns)}")

### Train-Val-Test split

In [None]:
X_train, X_test = train_test_split(input_data, test_size=val_split+test_split)
X_val, X_test = train_test_split(X_test, test_size=test_split/(test_split + val_split))

In [None]:
X_train.head()

### Column Transformations

In [None]:
# Prepare transforer name to instance dictionary for quering
transformers =  { transformer.__name__: transformer for transformer in 
        [StandardScaler, PowerTransformer, MinMaxScaler, SimpleImputer, OneHotEncoder, FunctionTransformer]}
feature_selectors = { selector.__name__: selector for selector in [GenericUnivariateSelect, chi2, f_classif, VarianceThreshold]}

def build_pipeline(pipeline_config: dict) -> Pipeline:
        pipeline_steps = []
        for transform_options in pipeline_config:
                options = transform_options.copy()
                name = options.pop("name")
                #All other things will be treated as options to transformer function
                pipeline_steps.append(
                        (name, transformers[name](**options))
                )

        pipeline = Pipeline(steps=pipeline_steps)
        return pipeline

numeric_cols_to_transform = numeric_columns
numeric_pipeline = build_pipeline(notebook_config["preprocessing"]["numeric_pipeline"]["pipeline"])

categorical_cols_to_transform = categorical_columns
categorical_pipeline = build_pipeline(notebook_config["preprocessing"]["categorical_pipeline"]["pipeline"])

boolean_transformer = Pipeline(steps=[
    ('identity', FunctionTransformer(lambda x:x))
])

preprocessing = ColumnTransformer(transformers=[
        ("num", numeric_pipeline, numeric_cols_to_transform),
        ("cat", categorical_pipeline, categorical_cols_to_transform),
        ("bool", boolean_transformer, boolean_columns)
], remainder="passthrough")


pipeline_steps = [
        ("reorder", NamedColumns()),
        ("preprocessor", preprocessing)
]

#Feature selection
for i, feature_selection_options in enumerate(notebook_config["preprocessing"]["feature_selectors"]):
        options = feature_selection_options.copy()
        name = options.pop("name")
        pipeline_steps.append(
                (f"feature_selector", feature_selectors[name](**options))
        )

pipeline = Pipeline(steps=pipeline_steps)

In [None]:
pipeline.fit(X_train.drop(columns=[label_column]), X_train[label_column])

In [None]:
one_hot = dict(pipeline.steps)["preprocessor"].transformers[1][1].named_steps["OneHotEncoder"]
imputer = dict(pipeline.steps)["preprocessor"].transformers[1][1].named_steps["SimpleImputer"]
cat_data = X_train[categorical_cols_to_transform].copy()

data = imputer.fit_transform(cat_data)
one_hot.fit(data)
#one_hot = OneHotEncoder(handle_unknown="ignore")
#one_hot.fit(cat_data)

In [None]:
# As onehot encoding changes column count and doesnt return column names, we need to extract the column names. We do this also for the feature selector transformers.
onehot_encoder_columns = get_onehot_encoder_names(dict(pipeline.steps)["preprocessor"].transformers[1][1].named_steps["OneHotEncoder"], categorical_columns)

col_names_ = numeric_columns + onehot_encoder_columns + [col for col in list(X_train.drop(columns=[label_column])) if col not in numeric_columns and col not in categorical_columns]

feature_selector_indices = np.where(dict(pipeline.steps)['feature_selector'].get_support()==True)[0]
col_names = [col_names_[i] for i in feature_selector_indices]

In [None]:
X_train_transformed = pipeline.transform(X_train.drop(columns=[label_column]))
X_train_df = pd.DataFrame(X_train_transformed, columns=col_names)
X_train_df[label_column] = X_train[label_column].values

In [None]:
logging.info(f"X Train has {X_train_transformed.shape[0]} rows and {X_train_transformed.shape[1]} columns")

In [None]:
print("Sample data after applying the transformations:")
X_train_df.head()

In [None]:
file_data_pipeline_pkl = notebook_config["output_files"]["data_pipeline_file"]
file_output_column_names_pkl = notebook_config["output_files"]["final_column_names_file"]

logging.info("Dumping the pipeline pickle and column names in output directory")
with open(os.path.join(model_artifacts_dir, file_data_pipeline_pkl), 'wb') as f:
    pickle.dump(pipeline, f)

with open(os.path.join(model_artifacts_dir, file_output_column_names_pkl), "wb") as f:
    pickle.dump(col_names, f)

### Feature importance

In [None]:
import xgboost
from xgboost import XGBClassifier

In [None]:
logging.info("Getting feature importance by fitting an xgb model")

xgboost_model = XGBClassifier()
xgboost_model.fit(X_train_df.drop(columns=[label_column]), X_train[label_column].astype(int));

In [None]:
print("Feature importances:\n")
feature_importances_xgboost = xgboost_model.feature_importances_
print(f"{'Feature':<50}: {'Score'}")
for feature_id in np.argsort(feature_importances_xgboost)[::-1]:
    if feature_importances_xgboost[feature_id]==0:
        print("\n\nRest all features have importance score 0.")
        break
    print(f"{X_train_df.columns[feature_id]:<50}: {feature_importances_xgboost[feature_id]:.4f}")

In [None]:
with open(os.path.join(local_output_path, "feature_importances_xgb.json"), "w") as f:
    json.dump(dict(zip(list(X_train_df), [float(f) for f in feature_importances_xgboost])), f)

In [None]:
def create_bar_plot(feature_importances: list, X_train: pd.DataFrame, top_features: int=30):
    """
    Create a bar plot of features against their corresponding feature importance score.
    """
    plt.figure(figsize=(15, 5))
    feature_importances_sorted_idx = np.argsort(feature_importances)[::-1][:top_features]
    feature_importances_sorted = feature_importances[feature_importances_sorted_idx]
    column_names = [list(X_train)[idx] for idx in feature_importances_sorted_idx]
    x_indices = [_ for _ in range(len(column_names))]
    plt.bar(x_indices, feature_importances_sorted, color="blue")
    plt.xticks(x_indices, column_names, rotation=90)
    plt.xlabel("Feature", fontsize=18)
    plt.ylabel("Importance Score", fontsize=18)
    plt.title(f"XGBoost based Feature Importance Scores of top {top_features} features", fontsize=18)
    plt.savefig(os.path.join(local_output_path, f"feature_importances_xgb.{IMAGE_FORMAT}"))
    plt.show()

In [None]:
create_bar_plot(feature_importances_xgboost, X_train_df)

### Saving train test val datasets

In [None]:
X_val_transformed = pipeline.transform(X_val.drop(columns=[label_column]))
X_val_df = pd.DataFrame(X_val_transformed, columns=col_names)
X_val_df[label_column] = X_val[label_column].values

X_test_transformed = pipeline.transform(X_test.drop(columns=[label_column]))
X_test_df = pd.DataFrame(X_test_transformed, columns=col_names)
X_test_df[label_column] = X_test[label_column].values

In [None]:
logging.info("Dumping train, val, test datasets")
X_train_df.to_csv(os.path.join(local_output_path, "train.csv"), index=False)
X_val_df.to_csv(os.path.join(local_output_path, "val.csv"), index=False)
X_test_df.to_csv(os.path.join(local_output_path, "test.csv"), index=False)

### Uploading pre-processing pipeline files

In [None]:
print("Uploading preprocessing files to staging location")
for filename in [file_data_pipeline_pkl, file_output_column_names_pkl]
    print(f"Uploading {filename} to s3")
    upload_file_to_s3(
        creds = creds_config,
        local_file_path = f"{model_artifacts_dir}/{filename}",
        s3_bucket_name = creds_config["aws"]["s3Bucket"],
        s3_path = f"{creds_config['aws']['staging_models_s3_prefix']}/{job_id}/{filename}"
    )

In [None]:
## Cell to hide code while converting to a html page
from IPython.display import HTML

HTML('''<script>
$('div.input').hide();
</script>''')