# Feature Engineering on AWS Sagemaker 

---

## Background
Direct marketing, either through mail, email, phone, etc., is a common tactic to acquire customers.  Because resources and a customer's attention is limited, the goal is to only target the subset of prospects who are likely to engage with a specific offer.  Predicting those potential customers based on readily available information like demographics, past interactions, and environmental factors is a common machine learning problem.


---

## Preparation


- The S3 bucket and prefix that we use for training and model df.  This should be within the same region as the Notebook Instance, training, and hosting.
- The IAM role arn used to give training and hosting access to the df.

# Learning Journey  : Working on Notebook Instance using Python code

In [None]:
import sagemaker_datawrangler           # For interactive data prep widget
import numpy as np                                # For matrix operations and numerical processing
import pandas as pd                               # For munging tabular data
import matplotlib.pyplot as plt                   # For charts and visualizations
from IPython.display import Image                 # For displaying images in the notebook
from IPython.display import display               # For displaying outputs in the notebook
from time import gmtime, strftime                 # For labeling SageMaker models, endpoints, etc.
import sys                                        # For writing outputs to notebook
import math                                       # For ceiling function
import json                                       # For parsing hosting outputs
import os                                         # For manipulating filepath names
import sagemaker 

In [None]:
pd.__version__

In [None]:
import sagemaker
bucket=sagemaker.Session().default_bucket()
prefix = 'mlops/activity-2'
 
# Define IAM role
import boto3
import re
from sagemaker import get_execution_role

role = get_execution_role()

---

## Data


In [None]:
!wget https://github.com/srushtii-m/MLOps-With-AWS/blob/main/Feature%20Engineering/datasets/bank-additional-full.csv

In [None]:
df= pd.read_csv('./bank-additional-full.csv')
pd.set_option('display.max_columns', 500)     # Make sure we can see all of the columns
pd.set_option('display.max_rows', 20)         # Keep the output on one page
df

In [None]:
for column in df.select_dtypes(include=['object']).columns:
    if column != 'y':
        print(pd.crosstab(index=df[column], columns=df['y'], normalize='columns'))

for column in df.select_dtypes(exclude=['object']).columns:
    print(column)
    hist = df[[column, 'y']].hist(by='y', bins=30)
    plt.show()

In [None]:
print(df.corr())
pd.plotting.scatter_matrix(df, figsize=(12, 12))
plt.show()

In [None]:
# Transformation
df['no_previous_contact'] = np.where(df['pdays'] == 999, 1, 0)                                 # Indicator variable to capture when pdays takes a value of 999
df['not_working'] = np.where(np.in1d(df['job'], ['student', 'retired', 'unemployed']), 1, 0)   # Indicator for individuals not actively employed

In [None]:
model_df= pd.get_dummies(df)                                                                  # Convert categorical variables to sets of indicators

In [None]:
model_df= model_df.drop(['duration', 'emp.var.rate', 'cons.price.idx', 'cons.conf.idx', 'euribor3m', 'nr.employed'], axis=1)

In [None]:
train_df, validation_df, test_df= np.split(model_df.sample(frac=1, random_state=1729), [int(0.7 * len(model_df)), int(0.9 * len(model_df))])   # Randomly sort the dfthen split out first 70%, second 20%, and last 10%

In [None]:
pd.concat([train_df['y_yes'], train_df.drop(['y_no', 'y_yes'], axis=1)], axis=1).to_csv('train.csv', index=False, header=False)
pd.concat([validation_df['y_yes'], validation_df.drop(['y_no', 'y_yes'], axis=1)], axis=1).to_csv('validation.csv', index=False, header=False)

In [None]:
# Uploading train and validation sets to S3 buckets
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train/train.csv')).upload_file('train.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'validation/validation.csv')).upload_file('validation.csv')

# Feature Engineering with Sagemaker Processing

In [None]:
# Upload to S3 Bucket
from sagemaker import Session
import sagemaker
bucket=sagemaker.Session().default_bucket()
prefix = 'mlops/activity-3'

sess = Session()
input_source = sess.upload_data('./bank-additional-full.csv', bucket=bucket, key_prefix=f'{prefix}/input_data')
input_source

In [None]:
# Define IAM role
import boto3
import re
from sagemaker import get_execution_role

role = get_execution_role()

Amazon SageMaker Processing allows us to run steps for data pre- or post-processing, feature engineering, data validation, or model evaluation workloads on Amazon SageMaker. Processing jobs accept data from Amazon S3 as input and store data into Amazon S3 as output.

![processing](https://sagemaker.readthedocs.io/en/stable/_images/amazon_sagemaker_processing_image1.png)

Here, we will import the dataset and apply transformations using SageMaker Processing. This service is designed to handle large volumes of data in a SageMaker-managed cluster, separate from our notebook server. In a typical SageMaker workflow, notebooks are primarily used for initial exploration and prototyping, running on cost-effective and less powerful instances. The heavier processing, training, and model hosting tasks are performed on dedicated and more powerful SageMaker-managed instances. 

To utilize SageMaker Processing, we need to provide a Python script for data preprocessing. In this example, I'm using a SageMaker prebuilt Scikit-learn container. To perform the operations, input and output data must be placed in specified directories so that the SageMaker Processing automatically fetches the input data from S3 and uploads the transformed data back to S3 once the job is completed.

In [None]:
!wget https://github.com/srushtii-m/MLOps-With-AWS/blob/main/Feature%20Engineering/feature-engg-script.py

In [None]:
train_path = f"s3://{bucket}/{prefix}/train"
validation_path = f"s3://{bucket}/{prefix}/validation"
test_path = f"s3://{bucket}/{prefix}/test"

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role


sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=get_execution_role(),
    instance_type="ml.m5.large",
    instance_count=1, 
    base_job_name='mlops-sklearnprocessing'
)

sklearn_processor.run(
    code='feature-engg-script.py',
    # arguments = ['arg1', 'arg2'],
    inputs=[
        ProcessingInput(
            source=input_source, 
            destination="/opt/ml/processing/input",
            s3_input_mode="File",
            s3_data_distribution_type="ShardedByS3Key"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="train_data", 
            source="/opt/ml/processing/output/train",
            destination=train_path,
        ),
        ProcessingOutput(output_name="validation_data", source="/opt/ml/processing/output/validation", destination=validation_path),
        ProcessingOutput(output_name="test_data", source="/opt/ml/processing/output/test", destination=test_path),
    ]
)

In [None]:
!aws s3 ls $train_path/

In [None]:
!aws s3 ls $test_path/

# Model Building

In [None]:
# Use the previously prepared data
from sagemaker import Session
import sagemaker
import boto3
import re
from sagemaker import get_execution_role
import numpy as np
import pandas as pd
import os

role = get_execution_role()

bucket=sagemaker.Session().default_bucket()
prefix = 'mlops/activity-3'
sess = Session()
train_path = f"s3://{bucket}/{prefix}/train"
validation_path = f"s3://{bucket}/{prefix}/validation"
test_path = f"s3://{bucket}/{prefix}/test"

In [None]:
container = sagemaker.image_uris.retrieve(region=boto3.Session().region_name, framework='xgboost', version='latest')

In [None]:
s3_input_train = sagemaker.inputs.TrainingInput(s3_data='s3://{}/{}/train'.format(bucket, prefix), content_type='csv')
s3_input_validation = sagemaker.inputs.TrainingInput(s3_data='s3://{}/{}/validation/'.format(bucket, prefix), content_type='csv')

In [None]:
sess = sagemaker.Session()

xgb = sagemaker.estimator.Estimator(container,
                                    role, 
                                    instance_count=1, 
                                    instance_type='ml.m4.xlarge',
                                    output_path='s3://{}/{}/output'.format(bucket, prefix),
                                    sagemaker_session=sess)
xgb.set_hyperparameters(max_depth=5,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        silent=0,
                        objective='binary:logistic',
                        num_round=100)

xgb.fit({'train': s3_input_train, 'validation': s3_input_validation}) 

In [None]:
xgb_predictor = xgb.deploy(initial_instance_count=1,
                           instance_type='ml.m4.xlarge')

In [None]:
#Evaluation
xgb_predictor.serializer = sagemaker.serializers.CSVSerializer()

In [None]:
!aws s3 ls $test_path

In [None]:
test_data_x = pd.read_csv(os.path.join(test_path, 'test_script_x.csv'),header=None)
test_data_y = pd.read_csv(os.path.join(test_path, 'test_script_y.csv'),header=None)

In [None]:
def predict(data, predictor, rows=500 ):
    split_array = np.array_split(data, int(data.shape[0] / float(rows) + 1))
    predictions = ''
    for array in split_array:
        predictions = ','.join([predictions, predictor.predict(array).decode('utf-8')])

    return np.fromstring(predictions[1:], sep=',')
predictions = predict(test_data_x, xgb_predictor)

In [None]:
pd.crosstab(index=test_data_y[0], columns=np.round(predictions), rownames=['actuals'], colnames=['predictions'])

In [None]:
xgb_predictor.delete_endpoint(delete_endpoint_config=True)

# Model Deployment 

In [None]:
import boto3

client = boto3.client(service_name="sagemaker")
runtime = boto3.client(service_name="sagemaker-runtime")

In [None]:
model_artifacts = xgb.model_data
model_artifacts

In [None]:
from time import gmtime, strftime

model_name = "xgboost-serverless" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print("Model name: " + model_name)

# dummy environment variables
byo_container_env_vars = {"SAGEMAKER_CONTAINER_LOG_LEVEL": "20", "SOME_ENV_VAR": "myEnvVar"}

create_model_response = client.create_model(
    ModelName=model_name,
    Containers=[
        {
            "Image": container,
            "Mode": "SingleModel",
            "ModelDataUrl": model_artifacts,
            "Environment": byo_container_env_vars,
        }
    ],
    ExecutionRoleArn=role,
)

print("Model Arn: " + create_model_response["ModelArn"])

In [None]:
xgboost_epc_name = "mlops-serverless-epc" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())

endpoint_config_response = client.create_endpoint_config(
    EndpointConfigName=xgboost_epc_name,
    ProductionVariants=[
        {
            "VariantName": "byoVariant",
            "ModelName": model_name,
            "ServerlessConfig": {
                "MemorySizeInMB": 4096,
                "MaxConcurrency": 1,
            },
        },
    ],
)

print("Endpoint Configuration Arn: " + endpoint_config_response["EndpointConfigArn"])

In [None]:
endpoint_name = "xgboost-serverless-ep" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())

create_endpoint_response = client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=xgboost_epc_name,
)

print("Endpoint Arn: " + create_endpoint_response["EndpointArn"])

In [None]:
# wait for endpoint to reach a terminal state (InService) using describe endpoint
import time

describe_endpoint_response = client.describe_endpoint(EndpointName=endpoint_name)

while describe_endpoint_response["EndpointStatus"] == "Creating":
    describe_endpoint_response = client.describe_endpoint(EndpointName=endpoint_name)
    print(describe_endpoint_response["EndpointStatus"])
    time.sleep(15)

describe_endpoint_response

In [None]:
# Endpoint invocation
payload = b"3., 999.,   0.,   1.,   0.,   0.,   0.,   0.,   0.,   0.,   0., 1.,   0.,   0.,   0.,   0.,   0.,   1.,   0.,   0.,   0.,   0., 0.,   0.,   0.,   0.,   0.,   1.,   0.,   1.,   0.,   0.,   1., 0.,   0.,   1.,   0.,   0.,   1.,   0.,   0.,   0.,   0.,   1., 0.,   0.,   0.,   0.,   0.,   0.,   0.,   1.,   0.,   0.,   0., 0.,   1.,   0."

response = runtime.invoke_endpoint(
    EndpointName=endpoint_name,
    Body=payload,
    ContentType="text/csv",
)

print(response["Body"].read().decode())

In [None]:
client.delete_model(ModelName=model_name)
client.delete_endpoint_config(EndpointConfigName=xgboost_epc_name)
client.delete_endpoint(EndpointName=endpoint_name)

# Automatic Model Tuning

In [None]:
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner
hyperparameter_ranges = {'eta': ContinuousParameter(0, 1),
                            'min_child_weight': ContinuousParameter(1, 10),
                            'alpha': ContinuousParameter(0, 2),
                            'max_depth': IntegerParameter(1, 10)}
objective_metric_name = 'validation:auc'

In [None]:
tuner = HyperparameterTuner(xgb,
                            objective_metric_name,
                            hyperparameter_ranges,
                            max_jobs=20,
                            max_parallel_jobs=3)

In [None]:
tuner.fit({'train': s3_input_train, 'validation': s3_input_validation})

In [None]:
boto3.client('sagemaker').describe_hyper_parameter_tuning_job(
HyperParameterTuningJobName=tuner.latest_tuning_job.job_name)['HyperParameterTuningJobStatus']

In [None]:
tuner.best_training_job()

In [None]:
tuner_predictor = tuner.deploy(initial_instance_count=1,
                           instance_type='ml.m4.xlarge')

In [None]:
tuner_predictor.serializer = sagemaker.serializers.CSVSerializer()

In [None]:
test_data_x = pd.read_csv(os.path.join(test_path, 'test_script_x.csv'),header=None)
test_data_y = pd.read_csv(os.path.join(test_path, 'test_script_y.csv'),header=None)

In [None]:
predictions = predict((test_data_x).to_numpy(),tuner_predictor)

In [None]:
pd.crosstab(index=test_data_y[0], columns=np.round(predictions), rownames=['actuals'], colnames=['predictions'])

### Delete End Point

In [None]:
tuner_predictor.delete_endpoint(delete_endpoint_config=True)