# Create a SageMaker pipeline to run a Processing job for model inference

## Introduction

Generally, when working with SageMaker processing jobs, the input data is expected to be stored in Amazon S3 and the SageMaker platform will handle data downloading from S3 and save the output results back to S3. In this way, users can achieve better lineage between the input data and output results, which also benefits the reproducability and auditability of the job. However, sometimes users would like to directly read data from their data storage without saving the data to S3 as an additional step for the processing job. In fact, it is possible to bypass the S3 data input for the processing jobs and directly read the data from other data storage given the proper connector is available. 

Processing job is suitable for many use cases beyond just data pre-processing. It basically launches a cluster of instances, run the specified docker container and execute the python script provided for the job in the running container image. Users can design their script to do model training, model inference and so on. To automate the job with event driven action, you can create the processing job as a step in a SageMaker pipeline and trigger the pipeline using EventBridge based on a schedule or an event-based event.

In this notebook, we will demonstrate how to create a SageMaker processing job in a pipeline and trigger this pipleine using EventBridge.

### Section 1: Traditional way to run job and inference as python code
Firstly, let's write a script that performs model training and inference. This is typically how data scientist build and test their code.

In [None]:
# Support Vector Regression: 

#1 Importing the libraries
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from numpy import savetxt
output_file = "predict_output.csv"

#2 Importing the dataset
dataset = pd.read_csv('data/Position_Salaries.csv')
X = dataset.iloc[:,1:2].values.astype(float)
y = dataset.iloc[:,2:3].values.astype(float)

#3 Feature Scaling
from sklearn.preprocessing import StandardScaler
sc_X = StandardScaler()
sc_y = StandardScaler()
X = sc_X.fit_transform(X)
y = sc_y.fit_transform(y)

#4 Fitting the Support Vector Regression Model to the dataset
# Create your support vector regressor here
from sklearn.svm import SVR
# most important SVR parameter is Kernel type. It can be linear,polynomial or gaussian.
#SVR. We have a non-linear condition so we can select polynomial or gaussian but here
#we select RBF(a gaussian type) kernel. 
regressor = SVR(kernel='rbf')
regressor.fit(X,y)

#5 Predicting a new result
y_pred = sc_y.inverse_transform((regressor.predict(sc_X.transform(np.array([[6.5]])))))

y_out = regressor.predict(X_grid)
savetxt(output_file, y_out, delimiter=',')

#6 Visualising the Support Vector Regression results
plt.scatter(X, y, color = 'magenta')
plt.plot(X, regressor.predict(X), color = 'green')
plt.title('Truth or Bluff (Support Vector Regression Model)')
plt.xlabel('Position level')
plt.ylabel('Salary')
plt.show()

#6 Visualising the Regression results (for higher resolution and smoother curve)
X_grid = np.arange(min(X), max(X), 0.1)
X_grid = X_grid.reshape((len(X_grid), 1))
plt.scatter(X, y, color = 'red')
plt.plot(X_grid, regressor.predict(X_grid), color = 'blue')
plt.title('Truth or Bluff (Support Vector Regression Model(High Resolution))')
plt.xlabel('Position level')
plt.ylabel('Salary')
plt.show()

you can check the output in the `predict_output.csv` from the left-hand side folder.

### Section 2: Run the script in a Processing job
Next step, we show how to convert the script into a SageMaker processing job using SageMaker python SDK, which is a high level api. Firstly, import the necessary packaged and define the role and bucket info.

In [None]:
import sagemaker
import boto3
import re
import os
from sagemaker import get_execution_role

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.processing import FrameworkProcessor
from sagemaker.utils import name_from_base
from sagemaker.sklearn.estimator import SKLearn

bucket=sagemaker.Session().default_bucket()
prefix = 'processing-job-sagemaker'

role = get_execution_role()
sess = sagemaker.Session()

In [None]:
data_location = sess.upload_data(
    './data/Position_Salaries.csv', key_prefix="{}/data".format(prefix)
)

In [None]:
%%writefile code/Support_Vector_Regression.py

#1 Importing the libraries
import json
import pathlib
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from numpy import savetxt
import argparse
import os
import glob
output_file = "predict_output.csv"

def parse_args() -> None:
    parser = argparse.ArgumentParser()
    parser.add_argument('--base_dir', type=str, default="/opt/ml/processing")
    args, _ = parser.parse_known_args()
    return args

if __name__ == "__main__":
    print("Starting job")
    args = parse_args()
    base_dir = args.base_dir
    input_dir = os.path.join(base_dir, "data")
    
    input_file_list = glob.glob(f"{input_dir}/*.csv")
    #2 Concat input files with select columns
    df = []
    for file in input_file_list:
        df_tmp = pd.read_csv(file)
        df.append(df_tmp)
    dataset = pd.concat(df, ignore_index=True)
        
    print("Data loaded in to a dataframe")

        
    X = dataset.iloc[:,1:2].values.astype(float)
    y = dataset.iloc[:,2:3].values.astype(float)

    #3 Feature Scaling
    from sklearn.preprocessing import StandardScaler
    sc_X = StandardScaler()
    sc_y = StandardScaler()
    X = sc_X.fit_transform(X)
    y = sc_y.fit_transform(y)

    #4 Fitting the Support Vector Regression Model to the dataset
    # Create your support vector regressor here
    from sklearn.svm import SVR
    # most important SVR parameter is Kernel type. It can be linear,polynomial or gaussian.
    #SVR. We have a non-linear condition so we can select polynomial or gaussian but here
    #we select RBF(a gaussian type) kernel. 
    regressor = SVR(kernel='rbf')
    regressor.fit(X,y)

    #5 Predicting a new result
    y_pred = sc_y.inverse_transform((regressor.predict(sc_X.transform(np.array([[6.5]])))))
    
    X_grid = np.arange(min(X), max(X), 0.1)
    X_grid = X_grid.reshape((len(X_grid), 1))

    y_out = regressor.predict(X_grid)
    savetxt(f"{base_dir}/output/{output_file}", y_out, delimiter=',')
    print("finish processing job")


In [None]:
base_job_name = 'sagemaker-processing-job'
est_cls = SKLearn
#Initialize the FrameworkProcessor
sklearn = FrameworkProcessor(
    estimator_cls=est_cls,
    framework_version='0.23-1',
    role=get_execution_role(),
    instance_type="ml.m5.xlarge",
    instance_count=1, 
    base_job_name=base_job_name,
)

processing_job_name = name_from_base(base_job_name)

In [None]:
#Run the processing job
sklearn.run(
    code='Support_Vector_Regression.py',
    source_dir='code',
    arguments = [
                 '--base_dir', '/opt/ml/processing', # you can also ignore this arguments as it has a default value
                ],
    inputs = [
        ProcessingInput
        (
            source=data_location,
            destination="/opt/ml/processing/data",
        )
    ],
    outputs=[
        ProcessingOutput(output_name="output", source="/opt/ml/processing/output"),
    ],
    job_name=processing_job_name,
)

### Section 3: Create a SageMaker pipeline with one step of the processing job
Now we can create a SageMaker pipeline based on the processing job. You can directly come to this section without executing previous cells in the notebook.

In [None]:
import json
import os

import boto3
import sagemaker
from sagemaker import get_execution_role

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.processing import FrameworkProcessor
from sagemaker.utils import name_from_base
from sagemaker.sklearn.estimator import SKLearn

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep

bucket=sagemaker.Session().default_bucket()
prefix = 'processing-job-sagemaker'

role = get_execution_role()
sess = sagemaker.Session()
pipeline_session = sagemaker.workflow.pipeline_context.PipelineSession()

In [None]:
data_location = sess.upload_data(
    './data/Position_Salaries.csv', key_prefix="{}/data".format(prefix)
)
print(data_location)

In [None]:
input_data = ParameterString(
    name="InputDataUrl",
    default_value=data_location,
)

In [None]:
# processing step for model training and inference
base_job_name = 'sagemaker-processing-job'
est_cls = SKLearn

sklearn = FrameworkProcessor(
    estimator_cls=est_cls,
    framework_version='0.23-1',
    role=get_execution_role(),
    instance_type="ml.m5.xlarge",
    instance_count=1, 
    base_job_name=base_job_name,
    sagemaker_session=pipeline_session
)


step_args = sklearn.run(
    code='Support_Vector_Regression.py',
    source_dir='code',
    arguments = [
                 '--base_dir', '/opt/ml/processing', # you can also ignore this arguments as it has a default value
                ],
    inputs = [
        ProcessingInput
        (
            source=data_location,
            destination="/opt/ml/processing/data",
        )
    ],
    outputs=[
        ProcessingOutput(output_name="output", source="/opt/ml/processing/output"),
    ],
)


step_process = ProcessingStep(
   name="TrainandInference",
   step_args=step_args,
)

Next we can define the pipeline based on the processing step

In [None]:
pipeline = Pipeline(
    name="demo-pipeline-processing-job-automate",
    parameters=[
        input_data,
    ],
    steps=[step_process],
    sagemaker_session=pipeline_session,
)

#### Submit the pipeline to SageMaker and start execution
Submit the pipeline definition to the Pipeline service. The role passed in will be used by the Pipeline service to create all the jobs defined in the steps.

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()
execution.describe()

In [None]:
# Wait for the execution to complete.
execution.wait()

You can also monitor the pipeline execution from the Studio **Home** page under **Pipeline**

After you have setup the EventBridge event to trigger by s3 file putobject event, you can uncomment below line to test

In [None]:
# client = boto3.client('s3')
# df = pd.read_csv("data/Position_Salaries.csv")
# from io import BytesIO
# csv_buffer = BytesIO()
# df.to_csv(csv_buffer)
# content = csv_buffer.getvalue()
# k = f"{prefix}/data/Position_Salaries.csv"
# response = client.put_object(Bucket=bucket, Key=k, Body=content)