# Sagemaker Feature Engineering using Python code

---

## Background
Direct marketing, either through mail, email, phone, etc., is a common tactic to acquire customers.  Because resources and a customer's attention is limited, the goal is to only target the subset of prospects who are likely to engage with a specific offer.  Predicting those potential customers based on readily available information like demographics, past interactions, and environmental factors is a common machine learning problem.


---

## Preparation

Let's start by specifying:

- The S3 bucket and prefix that you want to use for training and model df.  This should be within the same region as the Notebook Instance, training, and hosting.
- The IAM role arn used to give training and hosting access to your df. See the documentation for how to create these.  


# Learning Journey 1: Feature Engineering with Feature Store

In [None]:
import sagemaker
bucket=sagemaker.Session().default_bucket()
prefix = 'mlops/activity-1'
 
# Define IAM role
import boto3
import re
from sagemaker import get_execution_role

role = get_execution_role()

Now let's bring in the Python libraries that we'll use throughout the analysis

In [None]:
from sagemaker.session import Session
from sagemaker.feature_store.feature_group import FeatureGroup

region = boto3.Session().region_name
boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

feature_group_name = "<feature-group-name>"
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session)

In [None]:
fs_query = feature_group.athena_query()
fs_table = fs_query.table_name
query_string = 'SELECT * FROM "'+fs_table+'"'
print('Running ' + query_string)

In [None]:
fs_query.run(query_string=query_string, output_location='s3://'+bucket+'/'+prefix+'/fs_query_results/')
fs_query.wait()
model_data = fs_query.as_dataframe()

In [None]:
model_data = model_data.drop(['fs_id', 'fs_time', 'write_time', 'api_invocation_time', 'is_deleted'], axis=1)

In [None]:
model_data

In [None]:
train_data, validation_data, test_data = np.split(model_data.sample(frac=1, random_state=1729), [int(0.7 * len(model_data)), int(0.9 * len(model_data))])   # Randomly sort the data then split out first 70%, second 20%, and last 10%

In [None]:
pd.concat([train_data['y_yes'], train_data.drop(['y_no', 'y_yes'], axis=1)], axis=1).to_csv('train.csv', index=False, header=False)
pd.concat([validation_data['y_yes'], validation_data.drop(['y_no', 'y_yes'], axis=1)], axis=1).to_csv('validation.csv', index=False, header=False)

In [None]:
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train/train_fs.csv')).upload_file('train_fs.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'validation/validation_fs.csv')).upload_file('validation_fs.csv')

In [1]:
#Validate in S3 Bucket

# Learning Journey 2 : Working on Notebook Instance using Python code

In [None]:
import sagemaker_datawrangler           # For interactive data prep widget
import numpy as np                                # For matrix operations and numerical processing
import pandas as pd                               # For munging tabular data
import matplotlib.pyplot as plt                   # For charts and visualizations
from IPython.display import Image                 # For displaying images in the notebook
from IPython.display import display               # For displaying outputs in the notebook
from time import gmtime, strftime                 # For labeling SageMaker models, endpoints, etc.
import sys                                        # For writing outputs to notebook
import math                                       # For ceiling function
import json                                       # For parsing hosting outputs
import os                                         # For manipulating filepath names
import sagemaker 

In [None]:
pd.__version__

---

## Data
Let's start by downloading the dfset from github 

\[Moro et al., 2014\] S. Moro, P. Cortez and P. Rita. A df-Driven Approach to Predict the Success of Bank Telemarketing. Decision Support Systems, Elsevier, 62:22-31, June 2014


In [None]:
!wget https://raw.githubusercontent.com/manifoldailearning/mlops-with-aws-dfscientists/main/Section-13-Feature-Engineering/dfset/bank-additional-full.csv

In [None]:
df= pd.read_csv('./bank-additional-full.csv')
pd.set_option('display.max_columns', 500)     # Make sure we can see all of the columns
pd.set_option('display.max_rows', 20)         # Keep the output on one page
df

In [None]:
for column in df.select_dtypes(include=['object']).columns:
    if column != 'y':
        print(pd.crosstab(index=df[column], columns=df['y'], normalize='columns'))

for column in df.select_dtypes(exclude=['object']).columns:
    print(column)
    hist = df[[column, 'y']].hist(by='y', bins=30)
    plt.show()

In [None]:
print(df.corr())
pd.plotting.scatter_matrix(df, figsize=(12, 12))
plt.show()

In [None]:
# Transformation
df['no_previous_contact'] = np.where(df['pdays'] == 999, 1, 0)                                 # Indicator variable to capture when pdays takes a value of 999
df['not_working'] = np.where(np.in1d(df['job'], ['student', 'retired', 'unemployed']), 1, 0)   # Indicator for individuals not actively employed

In [None]:
model_df= pd.get_dummies(df)                                                                  # Convert categorical variables to sets of indicators

In [None]:
model_df= model_df.drop(['duration', 'emp.var.rate', 'cons.price.idx', 'cons.conf.idx', 'euribor3m', 'nr.employed'], axis=1)

In [None]:
train_df, validation_df, test_df= np.split(model_df.sample(frac=1, random_state=1729), [int(0.7 * len(model_df)), int(0.9 * len(model_df))])   # Randomly sort the dfthen split out first 70%, second 20%, and last 10%

In [None]:
pd.concat([train_df['y_yes'], train_df.drop(['y_no', 'y_yes'], axis=1)], axis=1).to_csv('train.csv', index=False, header=False)
pd.concat([validation_df['y_yes'], validation_df.drop(['y_no', 'y_yes'], axis=1)], axis=1).to_csv('validation.csv', index=False, header=False)

In [None]:
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train/train.csv')).upload_file('train.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'validation/validation.csv')).upload_file('validation.csv')

# Learning Journey 2 : Feature Engineering with Sagemaker Processing

In [None]:
# Upload to S3 Bucket
from sagemaker import Session

sess = Session()
input_source = sess.upload_data('./bank-additional-full.csv', bucket=bucket, key_prefix=f'{prefix}/input_data')
input_source

Amazon SageMaker Processing allows you to run steps for data pre- or post-processing, feature engineering, data validation, or model evaluation workloads on Amazon SageMaker. Processing jobs accept data from Amazon S3 as input and store data into Amazon S3 as output.

![processing](https://sagemaker.readthedocs.io/en/stable/_images/amazon_sagemaker_processing_image1.png)

Here, we'll import the dataset and transform it with SageMaker Processing, which can be used to process terabytes of data in a SageMaker-managed cluster separate from the instance running your notebook server. In a typical SageMaker workflow, notebooks are only used for prototyping and can be run on relatively inexpensive and less powerful instances, while processing, training and model hosting tasks are run on separate, more powerful SageMaker-managed instances.  SageMaker Processing includes off-the-shelf support for Scikit-learn, as well as a Bring Your Own Container option, so it can be used with many different data transformation technologies and tasks.    

To use SageMaker Processing, simply supply a Python data preprocessing script as shown below.  For this example, we're using a SageMaker prebuilt Scikit-learn container, which includes many common functions for processing data.  There are few limitations on what kinds of code and operations you can run, and only a minimal contract:  input and output data must be placed in specified directories.  If this is done, SageMaker Processing automatically loads the input data from S3 and uploads transformed data back to S3 when the job is complete.

In [None]:
train_path = f"s3://{bucket}/{prefix}/train"
validation_path = f"s3://{bucket}/{prefix}/validation"
test_path = f"s3://{bucket}/{prefix}/test"

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role


sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=get_execution_role(),
    instance_type="ml.m5.large",
    instance_count=1, 
    base_job_name='mlops-sklearnprocessing'
)

sklearn_processor.run(
    code='feature-engg-script.py',
    # arguments = ['arg1', 'arg2'],
    inputs=[
        ProcessingInput(
            source=input_source, 
            destination="/opt/ml/processing/input",
            s3_input_mode="File",
            s3_data_distribution_type="ShardedByS3Key"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="train_data", 
            source="/opt/ml/processing/output/train",
            destination=train_path,
        ),
        ProcessingOutput(output_name="validation_data", source="/opt/ml/processing/output/validation", destination=validation_path),
        ProcessingOutput(output_name="test_data", source="/opt/ml/processing/output/test", destination=test_path),
    ]
)

In [None]:
!aws s3 ls $train_path/

In [None]:
!aws s3 ls $test_path/