# Batch Transform

In [None]:
!pip install -U pandas pandas-profiling scikit-learn sagemaker

## Create the inference script
- Similar to real time inference, first we need to write the 4 functions for model inference in a .py script
- Sagemaker API documentation: https://sagemaker.readthedocs.io/en/stable/api/index.html

In [None]:
%%writefile serve.py

import os
import joblib
import pandas as pd

def model_fn(model_dir):
    """Load and return the model"""
    model_file_name = "pipeline_model.joblib"
    pipeline_model = joblib.load(os.path.join(model_dir, model_file_name))
    
    return pipeline_model
      
def input_fn(request_body, request_content_type):
    """Process the input json data and return the processed data.
    You can also add any input data pre-processing in this function
    """
    if request_content_type == "application/json":
        input_object = pd.read_json(request_body, lines=True)
        
        return input_object
    else:
        raise ValueError("Only application/json content type supported!")        

def predict_fn(input_object, pipeline_model):
    """Make predictions on processed input data"""
    predictions = pipeline_model.predict(input_object)
    pred_probs = pipeline_model.predict_proba(input_object)
    
    prediction_object = pd.DataFrame(
        {
            "prediction": predictions.tolist(),
            "pred_prob_class0": pred_probs[:, 0].tolist(),
            "pred_prob_class1": pred_probs[:, 1].tolist()
        }
    )
    
    return prediction_object

def output_fn(prediction_object, request_content_type):
    """Post process the predictions and return as json"""
    return_object = prediction_object.to_json(orient="records", lines=True)
    
    return return_object

In [None]:
%%writefile requirements.txt
pandas
numpy

## Trigger Batch Transfrom Job

In [None]:
# Create the deployment
from sagemaker.sklearn.model import SKLearnModel
from sagemaker import Session, get_execution_role

session = Session()
bucket = session.default_bucket()

training_job_name = "..." # TODO: Update with best TrainingJobName from hyperparameter tuning
model_artifact = f"s3://{bucket}/{training_job_name}/output/model.tar.gz"
endpoint_name = "heart-disease-rfc-pipeline-batch-transform"

base_model = SKLearnModel(
    name=endpoint_name,
    framework_version="1.0-1",
    entry_point="serve.py",
    dependencies=["requirements.txt"],
    model_data=model_artifact,
    role=get_execution_role(),
    sagemaker_session = session
)

In [None]:
# NEW! Create a batch transformer from the base model
output_path = f"s3://{bucket}/sagemaker/heart_disease/test_preds"
batch_transformer = base_model.transformer(instance_count=2, 
                                           instance_type="ml.m5.large",
                                           strategy="MultiRecord",
                                           accept="application/json",
                                           assemble_with="Line", 
                                           output_path=output_path)

In [None]:
%%time
# Feed the test data
test_data_path = "..."  # TODO: Paste the S3 path to your bigtest.json
batch_transformer.transform(test_data_path, content_type="application/json", split_type="Line")

In [None]:
# Print the output path
output_path = f"{batch_transformer.output_path}/bigtest.json.out"
print("Output written to: ")
print(f"{output_path}")

## Analyse the predictions

In [None]:
import pandas as pd
output_path = f"{batch_transformer.output_path}/bigtest.json.out"
preds_df = pd.read_json(output_path, lines=True)

print(preds_df.shape)
preds_df.head()

In [None]:
# Join predictions to input
bigtest = "../data/bigtest.json"
bigtest_df = pd.read_json(bigtest, lines=True)

bigtest_df = bigtest_df.join(preds_df)
bigtest_df.head()

In [None]:
# Calculate test accuracy
len(bigtest_df[bigtest_df["target"]==bigtest_df["prediction"]])/len(bigtest_df)

## Cleanup

In [None]:
import boto3

sm_client = boto3.client("sagemaker")
sm_client.delete_model(ModelName=endpoint_name)