#  MLOps Manual to Repeatable Workflow

<div class="alert alert-warning"> 
	⚠️ <strong> PRE-REQUISITE: </strong> Before proceeding with this notebook, please ensure that you have executed the <code>1-data-prep-feature-store.ipynb</code> and <code>2-training-registry.ipynb</code> Notebooks</li>
</div>

## Contents

- [Introduction](#Introduction)
- [SageMaker Endpoint](#SageMaker-Endpoint)

## Introduction

This is our third notebook which will explore the model deployment of ML workflow.

Here, we will put on the hat of a `Data Scientist` and will perform the task of model deployment which includes fetching the right model and deploying it for inference.  

For this task we will be using Amazon SageMaker Model Hosting capabilities.

Let's get started!

**Imports**

In [None]:
!pip install -U sagemaker

In [None]:
%store -r

In [None]:
from urllib.parse import urlparse
import io
import time
from sagemaker.model import ModelPackage
import boto3
import sagemaker
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer
import numpy as np
import pathlib
from sagemaker.feature_store.feature_group import FeatureGroup

**Session variables**

In [None]:
# Useful SageMaker variables
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
role_arn= sagemaker.get_execution_role()
region = sagemaker_session.boto_region_name
s3_client = boto3.client('s3', region_name=region)
sagemaker_client = boto3.client('sagemaker')

## SageMaker Endpoint

You can also deploy your trained model as [SageMaker hosted endpoint](https://docs.aws.amazon.com/sagemaker/latest/dg/realtime-endpoints-deployment.html) which serves real-time predictions from a trained model. The endpoint will retrieve the model created during training and deploy it within a SageMaker scikit-learn container. This all can be accomplished with one line of code. Note that it will take several minutes to deploy the model to a hosted endpoint.

Let's get the model we registered in the Model Registry.

In [None]:
random_forest_regressor_model = ModelPackage(
    role_arn,
    model_package_arn=model_package_arn,
    name=model_name
)

It's current status is `PendingApproval`. In order to use this model for offline predictions or as a real-time endpoint, we'll need to update its status to `Approved`.

In [None]:
sagemaker_client.update_model_package(
    ModelPackageArn=random_forest_regressor_model.model_package_arn,
    ModelApprovalStatus='Approved'
)

Now we can deploy it!

In [None]:
from IPython.core.display import display, HTML
endpoint_name = f'{model_name}-endpoint-' + time.strftime('%Y-%m-%d-%H-%M-%S')
display(
    HTML(
        '<b>Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={}#/endpoints/{}">The Endpoint</a> After About 5 Minutes</b>'.format(
            region, endpoint_name
        )
    )
)
random_forest_regressor_model.deploy(
    initial_instance_count=1,
    instance_type='ml.t2.medium',
    endpoint_name=endpoint_name
)

Let's test this real-time endpoint by passing it some data and getting a real-time prediction back.

## Read from offline Feature Store

In [None]:
# Read in test set that was used for batch transform
fs_group = FeatureGroup(name=test_feature_group_name, sagemaker_session=sagemaker_session)  
query = fs_group.athena_query()
table = query.table_name
query_string = f'SELECT {features_to_select} FROM "sagemaker_featurestore"."{table}"  ORDER BY record_id'
query_results = 'sagemaker-featurestore'
output_location = f's3://{bucket}/{query_results}/query_results/'
query.run(
    query_string=query_string, 
    output_location=output_location
)
query.wait()
df = query.as_dataframe()
df.head()

In [None]:
# Attach to the SageMaker endpoint
predictor = Predictor(
    endpoint_name=endpoint_name,
    sagemaker_session=sagemaker_session,
    serializer=CSVSerializer(),
    deserializer=JSONDeserializer()
)

dropped_df = df.drop(columns=["price"])

# Get a real-time prediction (only predicting the 1st 5 columns to reduce output size)
predictor.predict(dropped_df[:5].to_csv(index=False, header=False))

## Read from online Feature Store

In [None]:
boto_session = boto3.Session()
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime')

### Helper functions to read from online Feature Store

In [None]:
def _record_to_dict(rec, feature_types):
    tmp_dict = {}
    for f in rec:
        feature_name = f['FeatureName']
        string_feature_val = f['ValueAsString']
        feature_type = feature_types[feature_name]
        
        if feature_type == 'Integral':
            tmp_dict[f['FeatureName']] = int(string_feature_val)
        elif feature_type == 'Fractional':
            tmp_dict[f['FeatureName']] = float(string_feature_val)
        else:
            tmp_dict[f['FeatureName']] = string_feature_val

    return tmp_dict


def get_feature_definitions(fg_name):
    fgdescription = sagemaker_client.describe_feature_group(FeatureGroupName=fg_name)    
    return fgdescription 

def get_online_feature_group_records(fg_name, id_value_list):
    feature_defs = get_feature_definitions(fg_name)['FeatureDefinitions']
    feature_types = {}
    feature_names = []
    for fd in feature_defs:
        feature_names.append(fd['FeatureName'])
        feature_types[fd['FeatureName']] = fd['FeatureType']
        
    results = []
    
    identifiers = []
    ids_list = []
    for curr_id in id_value_list:
        record_identifier_value = str(curr_id)
        ids_list.append(record_identifier_value)
    
    identifiers.append({'FeatureGroupName': fg_name,
                        'RecordIdentifiersValueAsString': ids_list,
                        'FeatureNames': feature_names})
        
    resp = featurestore_runtime.batch_get_record(Identifiers=identifiers)
    
    for rec_dict in resp['Records']:
        results.append(_record_to_dict(rec_dict['Record'], feature_types))

    return results

def get_number_of_products_in_feature_set(dict):
    record_count = 0
    for i in enumerate(dict):
        record_count += 1
    return record_count

In [None]:
customer_record = get_online_feature_group_records(test_feature_group_name, ['1'])

In [None]:
record = customer_record[0]

In [None]:
record.pop('PRICE')
record.pop('event_time')
record.pop('record_id')

In [None]:
payload = ",".join(str(record[key]) for key in customer_record[0])

In [None]:
payload

In [None]:
predictor.predict(payload)

In [None]:
predictor.delete_predictor(delete_endpoint_config=True)