# SageMaker Model Deployment Examples

This notebook demonstrates how to deploy a model using different SageMaker deployment options: Real-Time Endpoint, Serverless Inference, Batch Transform, Asynchronous Inference, and Multi-Model Endpoint.

## Initializers and Setup

In [None]:
# %pip install boto3

In [None]:
# import boto3
# import sagemaker
# from sagemaker import get_execution_role

# sagemaker_session = sagemaker.Session()
# role = get_execution_role()
# region = boto3.Session().region_name

# S3 bucket for storing data
bucket = 'sagemaker-ml3'
prefix = 'deployment'
output_path = f's3://{bucket}/{prefix}/output'

## Data Preparation and Training

In [None]:
# Step 1: Setup
import pandas as pd
from sklearn.model_selection import train_test_split 

# Load the dataset
file_path = 'Employee.csv'  # Replace with your actual file path in S3 if needed
employee_df = pd.read_csv(file_path)
employee_df.head()


# Step 2: Data Preparation
# Convert categorical columns to numeric
employee_df['Education'] = employee_df['Education'].astype('category').cat.codes
employee_df['City'] = employee_df['City'].astype('category').cat.codes
employee_df['Gender'] = employee_df['Gender'].astype('category').cat.codes
employee_df['EverBenched'] = employee_df['EverBenched'].map({'Yes': 1, 'No': 0})

# Drop rows with NaN values in the target column
employee_df.dropna(subset=['LeaveOrNot'])

# Convert target column to numeric if needed
employee_df['LeaveOrNot'] = employee_df['LeaveOrNot'].astype(int)

# Ensure no missing values in feature columns
employee_df = employee_df.dropna()

# Verify all columns are numeric
print(employee_df.dtypes)

# Define features and target
feature_columns = [
    'Education', 'JoiningYear', 'City', 'PaymentTier', 'Age',
    'Gender', 'EverBenched', 'ExperienceInCurrentDomain'
]
target_column = 'LeaveOrNot'

employee_df = employee_df[[target_column] + feature_columns]
train_df, test_df = train_test_split(employee_df, test_size=0.2, random_state=42)


# Step 3: Upload data into S3
import boto3
s3 = boto3.client('s3')  # Initialize S3 client

# Save the data locally first
train_file = 'train.csv'
validation_file = 'validation.csv'
train_df.to_csv(train_file, index=False)
test_df.to_csv(validation_file, index=False)

# Upload the data to S3
s3.upload_file(train_file, bucket, f'{prefix}/train/{train_file}')
s3.upload_file(validation_file, bucket, f'{prefix}/validation/{validation_file}')

print(f"Training data uploaded to s3://{bucket}/{prefix}/train/{train_file}")
print(f"Validation data uploaded to s3://{bucket}/{prefix}/validation/{validation_file}")

# Display the transformed dataset
employee_df.head()

In [None]:
# Train an XGBoost Model using SageMaker
import sagemaker

# Setup XGBoost Estimator
xgboost_container = sagemaker.image_uris.retrieve("xgboost", boto3.Session().region_name, "1.3-1")

hyperparameters = {
    "max_depth":"5",
    "eta":"0.2",
    "gamma":"40",
    "min_child_weight":"6",
    "subsample":"0.7",
    "objective":"binary:logistic",
    "num_round":"50"
}

estimator = sagemaker.estimator.Estimator(
    image_uri=xgboost_container, 
    hyperparameters=hyperparameters,
    role=sagemaker.get_execution_role(),
    instance_count=1, 
    instance_type='ml.m5.xlarge', 
    volume_size=5,  # 5 GB 
    output_path=output_path
)

# Define the data type and paths to the training and validation datasets
content_type = "csv"
train_input = sagemaker.inputs.TrainingInput(f"s3://{bucket}/{prefix}/train/{train_file}", content_type=content_type)
validation_input = sagemaker.inputs.TrainingInput(f"s3://{bucket}/{prefix}/validation/{validation_file}", content_type=content_type)

# Execute the XGBoost training job
estimator.fit({'train': train_input, 'validation': validation_input})

## Deploy as Real-Time Endpoint

In [None]:

# Deploy the model as a real-time endpoint
predictor = estimator.deploy(
    initial_instance_count=1,  # Number of instances to deploy
    instance_type='ml.m5.xlarge',  # Instance type for the endpoint
    endpoint_name='employee-attrition-predictor'  # Name of the endpoint
)

In [None]:

# Optionally, configure the predictor for the specific input and output formats
predictor.serializer = sagemaker.serializers.CSVSerializer()
predictor.deserializer = sagemaker.deserializers.JSONDeserializer()



# Example input data (in the same format as your training data)

# Assuming 'LeaveOrNot' is the target column
features_df = test_df.drop(columns=['LeaveOrNot'])

# Convert the features DataFrame to CSV format
test_csv = features_df.to_csv(index=False, header=False).strip()

test_data = test_df[feature_columns].head(20)  # Select the first row of test data for prediction
test_data_csv = test_data.to_csv(index=False, header=False).strip()  # Convert to CSV format

In [None]:


# Invoke the endpoint
response = predictor.predict(test_data_csv)
print(response)  # The response will be in JSON format, containing the predicted label




In [None]:
# Example input data (in the same format as your training data)
test_data = test_df[feature_columns].head(1)  # Select the first row of test data for prediction
test_data_csv = test_data.to_csv(index=False, header=False).strip()  # Convert to CSV format

# Invoke the endpoint
response = predictor.predict(test_data_csv)
print(response)  # The response will be in JSON format, containing the predicted label

In [None]:
# Delete the endpoint when no longer needed
predictor.delete_endpoint()

## Deploying Endpoint using Model Artifacts

In [None]:
from sagemaker.model import Model

# Specify the S3 path to the pre-trained model artifact
model_artifact = "s3://<your-path>/output/model.tar.gz"

# Retrieve the container image for the framework (e.g., XGBoost)
container = sagemaker.image_uris.retrieve(framework="xgboost", region=boto3.Session().region_name, version="1.3-1")

# Create the model object using the S3 path
model = Model(
    image_uri=container,
    model_data=model_artifact,
    role=sagemaker.get_execution_role()
)

# Deploy the model as a real-time endpoint
predictor = model.deploy(
    initial_instance_count=1,  # Number of instances
    instance_type='ml.m5.xlarge',  # Instance type
    endpoint_name='employee-attrition-predictor'  # Name of the endpoint
)

# Optionally, configure the predictor for the specific input and output formats
predictor.serializer = sagemaker.serializers.CSVSerializer()
predictor.deserializer = sagemaker.deserializers.JSONDeserializer()

# Invoke the endpoint for predictions
response = predictor.predict(test_data_csv)
print(response)



In [None]:
import boto3

# Initialize the SageMaker runtime client
runtime_client = boto3.client('sagemaker-runtime')

# Specify the endpoint name
endpoint_name = 'employee-attrition-predictor'

# Prepare your input data (same format as before)
test_data = test_df[feature_columns].head(25)  # Select the first row of test data for prediction
test_data_csv = test_data.to_csv(index=False, header=False).strip()  # Convert to CSV format

# Invoke the endpoint directly using the runtime client
response = runtime_client.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType="text/csv",  # Specify the content type
    Body=test_data_csv  # The input data as a CSV string
)

# Parse the response
result = response['Body'].read().decode('utf-8')
print(result)


In [None]:
# Delete the endpoint when no longer needed
predictor.delete_endpoint()


## Deploy as Serverless Inference Endpoint

In [None]:
import boto3
from sagemaker.model import Model
from sagemaker.serverless import ServerlessInferenceConfig
from sagemaker import get_execution_role


# Create a Model object using the trained estimator
model = Model(
    image_uri=xgboost_container,
    model_data=estimator.model_data,
    role=sagemaker.get_execution_role()
)

# Alternative
# Create the model object using the S3 path
# model = Model(
#     image_uri=container,
#     model_data=model_artifact,
#     role=get_execution_role()
# )

# Define the Serverless Inference configuration
serverless_inference_config = ServerlessInferenceConfig(
    memory_size_in_mb=2048,  # Allocate memory
    max_concurrency=5  # Max concurrent invocations
)

# Deploy the model as a serverless endpoint
serverless_predictor = model.deploy(
    serverless_inference_config=serverless_inference_config,
    endpoint_name='employee-attrition-serverless-1'
)





In [None]:
import boto3

# Initialize the SageMaker runtime client
runtime = boto3.client("sagemaker-runtime")

# Specify your serverless endpoint name
endpoint_name = "employee-attrition-serverless-1"  # Replace with your actual endpoint name

# Specify the content type
content_type = "text/csv"

# Use the CSV string from test_df as the payload
payload = test_data_csv

# Invoke the endpoint
response = runtime.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType=content_type,
    Body=payload
)

# Read and decode the response
result = response['Body'].read().decode('utf-8').splitlines()

# Print the predictions
print(result)


In [None]:
print(len(result))  # Should match len(test_df)

print(test_data_csv)

In [None]:
# Example input data (in the same format as your training data)
test_data = test_df[feature_columns].head(23)  # Select the first row of test data for prediction
test_data_csv = test_data.to_csv(index=False, header=False).strip()  # Convert to CSV format

## Deploy using Batch Transform

In [None]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.transformer import Transformer  # Import the Transformer class

# S3 bucket for storing data
# bucket = 'sagemaker-ml-28573'
# prefix = 'deployment'

output_data = f's3://{bucket}/{prefix}/output/'


# Specify the S3 path to the pre-trained model artifact
# model_artifact = "s3://sagemaker-ml-28573/demo-built-in-algorithm/output/sagemaker-xgboost-2024-08-29-13-30-18-037/output/model.tar.gz"


# Retrieve the container image for the framework (e.g., XGBoost)
container = sagemaker.image_uris.retrieve(framework="xgboost", region=boto3.Session().region_name, version="1.3-1")


# Create the model object using the S3 path
model = Model(
    image_uri=container,
    model_data=model_artifact,
    role=sagemaker.get_execution_role()
)


# Create a transformer object
transformer = model.transformer(
    instance_count=1,
    instance_type='ml.m5.large',  # Choose an instance type
    output_path=output_data,
    strategy='MultiRecord',  # Strategy for processing records (SingleRecord or MultiRecord)
    assemble_with='Line',  # How to join results, e.g., 'Line' to join with newlines
    accept='text/csv'  # Output format
)



# Start the transform job
transformer.transform(
    data=input_data,  # Input data in S3
    content_type='text/csv',  # Input format
    split_type='Line'  # How the input data is split (e.g., by line)
)

# Wait for the job to finish
transformer.wait()



# The results will be available in the S3 output path specified


## Deploy as Asynchronous Inference Endpoint

In [None]:

from sagemaker.async_inference import AsyncInferenceConfig

# Deploy the model as an asynchronous endpoint
async_predictor = estimator.deploy(
    initial_instance_count=1,  # Number of instances
    instance_type='ml.m5.xlarge',  # Instance type
    async_inference_config=AsyncInferenceConfig(
        output_path=f's3://{bucket}/{prefix}/async-output',  # S3 path to store output
        max_concurrent_invocations_per_instance=2
    ),
    endpoint_name='employee-attrition-async-1'
)


In [None]:
# Remove the label column and the header
features_only = test_df.iloc[:, 1:]  # Exclude the first column (label)
test_data_csv = features_only.to_csv(index=False, header=False).strip()

# Save to CSV without header
csv_file_path = 'validation_no_label.csv'
features_only.to_csv(csv_file_path, index=False, header=False)

# Upload the prepared CSV to S3
s3 = boto3.client('s3')
s3.upload_file(csv_file_path, bucket, f'{prefix}/validation/validation_no_label.csv')

# Update the input location for the asynchronous request
input_location = f's3://{bucket}/{prefix}/validation/validation_no_label.csv'

In [None]:
import boto3

# Create a low-level client representing Amazon SageMaker Runtime
sagemaker_runtime = boto3.client("sagemaker-runtime", region_name=region)



# The name of the asynchronous endpoint that you have deployed

endpoint_name = 'employee-attrition-async-1'

# Invoke the asynchronous endpoint with your input data location
response = sagemaker_runtime.invoke_endpoint_async(
    EndpointName=endpoint_name, 
    InputLocation=input_location,
    InvocationTimeoutSeconds=1600,  # Set timeout to allow sufficient time for processing
    ContentType='text/csv'
)

# Response contains metadata about the request, not the prediction itself.
# The actual predictions will be saved to the specified S3 output path.
print("Asynchronous inference request sent. Check S3 for results.")


In [None]:
print(f's3://{bucket}/{prefix}/validation/validation_no_label.csv')

In [None]:
print(input_location)

In [None]:
# Clean up the asynchronous endpoint
async_predictor.delete_endpoint()

## Deploy as Multi-Model Endpoint

In [None]:
from sagemaker import Model
from sagemaker.multidatamodel import MultiDataModel

# S3 path to your model artifacts
model_artifact_path = 's3://sagemaker-ml-28573/deployment/output/sagemaker-xgboost-2024-09-03-12-15-08-548/output/'

# Define the container image (for example, XGBoost)
container_image = sagemaker.image_uris.retrieve(framework="xgboost", region=boto3.Session().region_name, version="1.3-1")

# Create a Model object (this includes the image and role information)
model = Model(
    image_uri=container_image,
    role=sagemaker.get_execution_role()
)

# Create the MultiDataModel
mme = MultiDataModel(
    name="multi-model-endpoint",
    model_data_prefix=model_artifact_path,  # S3 path where models are stored
    model=model,  # Pass the Model object that defines the container image
    sagemaker_session=sagemaker.Session()
)

# Deploy the Multi-Model Endpoint
predictor = mme.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.xlarge",
    endpoint_name="multi-model-endpoint"
)


In [None]:
import boto3
from botocore.config import Config

# Initialize the SageMaker Runtime client with a retry strategy
config = Config(
    read_timeout=70,
    retries={
        'max_attempts': 2  # Adjust this value as needed (up to 5 for a max timeout of 360s)
    }
)
runtime_sagemaker_client = boto3.client('sagemaker-runtime', config=config)

# Define the endpoint name and the specific model to target
endpoint_name = "multi-model-endpoint"
target_model = "model.tar.gz"

# Example CSV input data

# Invoke the endpoint
response = runtime_sagemaker_client.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType="text/csv",
    TargetModel=target_model,
    Body=test_data_csv
)

# Print the response from the model
print(response['Body'].read().decode('utf-8'))


In [None]:
print(test_data_csv)

In [None]:
# Clean up the multi-model endpoint
multi_model_predictor.delete_endpoint()