# Inference pipeline using raw Snowflake data


Let's first create our Sagemaker session and role, and create a S3 prefix to use for the notebook example.

In [None]:
import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

# Get a SageMaker-compatible role used by this Notebook Instance.
role = get_execution_role()

# S3 prefix
bucket = sagemaker_session.default_bucket()
prefix = 'pipeline-churn'
WORK_DIRECTORY = 'pipeline-churn'
# Snowflake credentials
ssm_client = sagemaker_session.boto_session.client(service_name='ssm',region_name='ap-southeast-2')
snowflake_account = ssm_client.get_parameter(Name='snowflake_account',WithDecryption=False)['Parameter']['Value']
snowflake_user = ssm_client.get_parameter(Name='snowflake_user',WithDecryption=False)['Parameter']['Value']
snowflake_password = ssm_client.get_parameter(Name='snowflake_password',WithDecryption=True)['Parameter']['Value']


# Preprocessing data and training the model <a class="anchor" id="training"></a>
## Downloading dataset <a class="anchor" id="download_data"></a>
SageMaker team has downloaded the dataset from UCI and uploaded to one of the S3 buckets in our account.

In [None]:
import snowflake.connector
ctx = snowflake.connector.connect(
  user=snowflake_user,
  password=snowflake_password,
  account=snowflake_account
)
cs=ctx.cursor()
allrows=cs.execute( \
"select Geography,Gender,CreditScore,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited from \"DEMO_DB\".\"DBT_ML_PIPELINE\".\"CHURN\"").fetchall()
import pandas as pd
# leave out the row number and customer id
data = pd.DataFrame(allrows)
data[5] = data[5].replace({0:None})
print(data.head().to_markdown())   

## Upload the data for training <a class="anchor" id="upload_data"></a>

When training large models with huge amounts of data, you'll typically use big data tools, like Amazon Athena, AWS Glue, or Amazon EMR, to create your data in S3. We can use the tools provided by the SageMaker Python SDK to upload the data to a default bucket. 

In [None]:
train_input=f's3://{bucket}/{WORK_DIRECTORY}/train/input.csv'
data.to_csv(train_input, index=False,header=False)
print('uploaded training data location: {}'.format(train_input))


## Create SageMaker Scikit Estimator <a class="anchor" id="create_sklearn_estimator"></a>

To run our Scikit-learn training script on SageMaker, we construct a `sagemaker.sklearn.estimator.sklearn` estimator, which accepts several constructor arguments:

* __entry_point__: The path to the Python script SageMaker runs for training and prediction.
* __role__: Role ARN
* __framework_version__: Scikit-learn version you want to use for executing your model training code.
* __train_instance_type__ *(optional)*: The type of SageMaker instances for training. __Note__: Because Scikit-learn does not natively support GPU training, Sagemaker Scikit-learn does not currently support training on GPU instance types.
* __sagemaker_session__ *(optional)*: The session used to train on Sagemaker.

To see the code for the SKLearn Estimator, see here: https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/sklearn

In [None]:
from sagemaker.sklearn.estimator import SKLearn

sklearn_preprocessor = SKLearn(
    entry_point='sklearn_churn_featurizer.py',
    role=role,
    framework_version="0.23-1",
    instance_type="ml.c4.xlarge",
    sagemaker_session=sagemaker_session)


## Batch transform our training data <a class="anchor" id="preprocess_train_data"></a>
Now that our preprocessor is properly fitted, let's go ahead and preprocess our training data. Let's use batch transform to directly preprocess the raw data and store right back into s3.

In [None]:
# Define a SKLearn Transformer from the trained SKLearn Estimator
transformer = sklearn_preprocessor.transformer(
    instance_count=1, 
    instance_type='ml.m5.xlarge',
    assemble_with = 'Line',
    accept = 'text/csv')

In [None]:
# Preprocess training input
transformer.transform(train_input, content_type="text/csv")
print("Waiting for transform job: " + transformer.latest_transform_job.job_name)
transformer.wait()
preprocessed_train = transformer.output_path
print(preprocessed_train)
# At this point we have a single CSV file in S3 as the result of the preprocessing

In [None]:
# We need to split out a testing set for the keras script to consume
# First, download the preprocessed CSV back locally
import numpy as np
transformer_output_path = transformer.output_path.split('/')[-1]
preprocessed_data = sagemaker_session.download_data(
    path='{}/{}'.format(WORK_DIRECTORY, 'preprocessed'), 
    bucket=bucket,
    key_prefix='{}'.format(transformer_output_path))


In [None]:
# Split the preprocessed csv into the four test+train features+labels files, then upload it back to S3
dataset = np.loadtxt(f'./{WORK_DIRECTORY}/preprocessed/input.csv.out',delimiter=",", skiprows=0)
data = pd.DataFrame(dataset)
print(data.head(10).to_markdown())  

from sklearn.model_selection import train_test_split
y=dataset[:,0]
x=dataset[:,1:]
X_train, X_test, y_train, y_test = train_test_split(x,y, test_size = 0.2, random_state = 0)

split_files_path=f'./{WORK_DIRECTORY}/preprocessed/split'
from pathlib import Path
Path(split_files_path).mkdir(parents=True, exist_ok=True)

np.save(f'{split_files_path}/train_X.npy', X_train)
np.save(f'{split_files_path}/train_Y.npy', y_train)
np.save(f'{split_files_path}/test_X.npy', X_test)
np.save(f'{split_files_path}/test_Y.npy', y_test)

data_dir = sagemaker_session.upload_data(path=f'{split_files_path}', bucket=bucket, key_prefix='preprocessed_split_data')

#     Exit |  Credit    |    Age      |  Tenure   | Balance      | no. prods |  Salary    |Frnce|Grmny|Spain|Female| Male |      |HasCrCd|     |Active|

## Step 6: Train as a SageMaker training job

The TensorFlow estimator uses the `keras_ann.py` script as the entrypoint. Give special attention to the `keras_model_fn` which was re-defined within this python script.

In [None]:
# Based-off: https://github.com/aws-samples/amazon-sagemaker-script-mode/blob/master/keras-embeddings-script-mode/keras-embeddings.ipynb

from sagemaker.tensorflow import TensorFlow

s3_tf_output_key_prefix = "tf_training_output"
s3_tf_output_location = 's3://{}/{}/{}/{}'.format(bucket, prefix, s3_tf_output_key_prefix, 'tf_model')

tf_estimator_sm = TensorFlow(
    entry_point="keras_ann_script_mode.py",
    role=role,
    model_dir=s3_tf_output_location,
    framework_version="1.12.0",
    train_instance_count=1, 
    train_instance_type="ml.c4.xlarge",
    hyperparameters={'learning_rate': 0.1, 
                     'epochs': 1, 
                     'batch_size': 10},
    script_mode=True,
    py_version="py3"
)

tf_estimator_sm.fit({'train': data_dir, 'eval': data_dir})


# Serial Inference Pipeline with Scikit preprocessor and Tensorflow predictor <a class="anchor" id="serial_inference"></a>


## Set up the inference pipeline <a class="anchor" id="pipeline_setup"></a>
Setting up a Machine Learning pipeline can be done with the Pipeline Model. This sets up a list of models in a single endpoint; in this example, we configure our pipeline model with the fitted Scikit-learn inference model and the fitted Linear Learner model. Deploying the model follows the same ```deploy``` pattern in the SDK.

In [None]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
import boto3
from time import gmtime, strftime

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

scikit_learn_inferencee_model = sklearn_preprocessor.create_model()
tf_estimator = tf_estimator_sm.create_model()

model_name = 'inference-pipeline-' + timestamp_prefix
endpoint_name = 'inference-pipeline-ep-' + timestamp_prefix
sm_model = PipelineModel(
    name=model_name, 
    role=role, 
    models=[
        scikit_learn_inferencee_model, 
        tf_estimator])

sm_model.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge', endpoint_name=endpoint_name)

## Make a request to our pipeline endpoint <a class="anchor" id="pipeline_inference_request"></a>

Here we just grab the first line from the test data (you'll notice that the inference python script is very particular about the ordering of the inference request data). The ```ContentType``` field configures the first container, while the ```Accept``` field configures the last container. You can also specify each container's ```Accept``` and ```ContentType``` values using environment variables.

We make our request with the payload in ```'text/csv'``` format, since that is what our script currently supports. If other formats need to be supported, this would have to be added to the ```output_fn()``` method in our entry point. Note that we set the ```Accept``` to ```application/json```, since Linear Learner does not support ```text/csv``` ```Accept```. The prediction output in this case is trying to guess the number of rings the abalone specimen would have given its other physical features; the actual number of rings is 10.

In [None]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer,JSONSerializer

payload = 'Germany,Male,653,58,1,132602.88,1,1,0,5097.67'

predictor = Predictor(
    endpoint_name=endpoint_name,
    sagemaker_session=sagemaker_session,
    serializer=CSVSerializer())

print(predictor.predict(payload))


## Delete Endpoint <a class="anchor" id="delete_endpoint"></a>
Once we are finished with the endpoint, we clean up the resources!

In [None]:
sm_client = sagemaker_session.boto_session.client('sagemaker')
sm_client.delete_endpoint(EndpointName=endpoint_name)