In [None]:
%reload_ext autoreload
%autoreload 2

import json
import os
import tarfile
from datetime import datetime
from uuid import uuid4

import boto3
import joblib
import polars as pl
import s3fs
import sagemaker
from numpy.random import RandomState
from sagemaker import get_execution_role
from sagemaker.model import Model
from sagemaker.s3 import S3Uploader
from sagemaker.transformer import Transformer
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures

## Terraform State Files

Download the terraform state files to access the outputs:

In [5]:
terraform_state_key = os.getenv("TERRAFORM_STATE_S3_KEY")

!aws s3 cp $terraform_state_key /tmp/ --recursive --include "*.tfstate"

We will read in the dev terraform state file:

In [15]:
sagemaker_outputs = {
    key: value["value"]
    for key, value in json.load(open("/tmp/sagemaker/terraform.tfstate"))[
        "outputs"
    ].items()
}
batch_transform_outputs = {
    key: value["value"]
    for key, value in json.load(open("/tmp/dev/terraform.tfstate"))["outputs"].items()
}

sagemaker_outputs, batch_transform_outputs

({'ecr_repository': 'batch-transform-demo',
  's3_bucket': 'hpa-batch-transform-demo',
  'sagemaker_execution_role_arn': 'arn:aws:iam::722696965592:role/batch_transform_demo_sagemaker_execution_role'},
 {'s3_bucket': 'batch-transform-demo-dev',
  's3_key_input': 'batch-transform-input/',
  's3_key_output': 'batch-transform-output/'})

## Global

In [43]:
random_state = RandomState(1227)
fs = s3fs.S3FileSystem()

s3_bucket_model_artifacts = sagemaker_outputs["s3_bucket"]
s3_bucket_batch_transform = batch_transform_outputs["s3_bucket"]
s3_bucket_batch_transform_input = batch_transform_outputs["s3_key_input"]
s3_bucket_batch_transform_output = os.path.join(
    batch_transform_outputs["s3_key_output"], datetime.now().strftime("%Y-%m-%d")
)

role = get_execution_role()
profile = os.getenv("AWS_PROFILE", "default")

boto_session = boto3.session.Session(profile_name=profile)
sm_session = sagemaker.session.Session(
    boto_session=boto_session, default_bucket=s3_bucket_model_artifacts
)

ecr_repository = sagemaker_outputs["ecr_repository"]

## Random Data

In [22]:
X, y = make_classification(
    n_samples=15_000,
    n_features=5,
    n_classes=2,
    random_state=random_state,
)

X = pl.DataFrame(X)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=random_state, stratify=y
)

X_train.shape, y_train.shape, X_test.shape, y_test.shape

((12000, 5), (12000,), (3000, 5), (3000,))

## Model for Demo

In [23]:
model_pipeline = Pipeline(
    [
        ("poly", PolynomialFeatures(degree=2)),
        ("logistic", LogisticRegression(random_state=random_state)),
    ]
)

model_pipeline.fit(X_train, y_train)

In [24]:
model_path = os.path.join(os.getcwd(), "model.joblib")
joblib.dump(model_pipeline, model_path)

['/home/ec2-user/SageMaker/batch-transform-demo/notebooks/model.joblib']

In [26]:
tar_path = "model.tar.gz"
with tarfile.open(tar_path, "w:gz") as tar:
    tar.add(model_path, arcname=os.path.basename(model_path))

model_artifact_s3_uri = S3Uploader.upload(
    local_path="model.tar.gz",
    desired_s3_uri=f"s3://{s3_bucket_model_artifacts}/model",
    sagemaker_session=sm_session,
)

os.remove(model_path)
os.remove(tar_path)

## Docker Image for Serving

In [None]:
!cd /home/ec2-user/SageMaker/batch-transform-demo/docker && bash ./build_and_push.sh serve-latest serve $ecr_repository

In [28]:
images = !aws ecr list-images --repository-name $ecr_repository
images_str = "\n".join(images)
images_str = json.loads(images_str)

for ecr_meta_data_dict in images_str["imageIds"]:
    if "imageTag" in ecr_meta_data_dict and ecr_meta_data_dict["imageTag"].startswith(
        "serve"
    ):
        image_tag = ecr_meta_data_dict["imageTag"]

serve_image_uri = f"{sm_session.account_id()}.dkr.ecr.{sm_session.boto_region_name}.amazonaws.com/{ecr_repository}:{image_tag}"

## Model Entity

In [29]:
model = Model(
    image_uri=serve_image_uri,
    model_data=model_artifact_s3_uri,
    role=role,
    name="demo" + f"-v{uuid4().hex[:6]}",
    sagemaker_session=sm_session,
)

model.create(
    instance_type=None, tags=[{"Key": "project", "Value": "batch-transform-demo"}]
)

## Batch Transform

In [31]:
config = {
    "instance_count": 1,
    "instance_type": "ml.c5.xlarge",  # 4 vCPU, 8 GiB RAM
    "strategy": "MultiRecord",
    "output_path": f"s3://{s3_bucket_model_artifacts}/output",
    "max_concurrent_transforms": 2,
    "max_payload": 25,  # Max concurrent x max payload cannot exceed 100 mb
    "base_transform_job_name": "batch-transform-demo",
}

In [32]:
transformer = Transformer(
    model_name=model.name,
    instance_count=config["instance_count"],
    instance_type=config["instance_type"],
    strategy=config["strategy"],
    output_path=config["output_path"],
    accept="text/csv",
    max_concurrent_transforms=config["max_concurrent_transforms"],
    max_payload=config["max_payload"],
    base_transform_job_name=config["base_transform_job_name"],
    sagemaker_session=sm_session,
)

In [34]:
test_s3_uri = f"s3://{s3_bucket_model_artifacts}/data/test.parquet"
with fs.open(test_s3_uri, "wb") as f:
    X_test.write_parquet(f)

In [None]:
transformer.transform(
    data=test_s3_uri,
    data_type="S3Prefix",
    content_type="application/x-parquet",
    model_client_config={"InvocationsTimeoutInSeconds": 60, "InvocationsMaxRetries": 2},
    wait=True,
)

In [36]:
predictions = pl.scan_csv(config["output_path"] + "/*.out").collect()

predictions

predicted_class,predicted_probability
i64,f64
0,1.0
1,0.962178
0,1.0
1,0.933482
0,1.0
…,…
1,0.973601
0,1.0
0,0.924858
0,0.644252


## Batch Transform Lambda

In [37]:
input_base_path = f"s3://{s3_bucket_batch_transform}/{s3_bucket_batch_transform_input}/"
unique_id = uuid4().hex
file_path = f"{input_base_path}test_{unique_id}.parquet"
with fs.open(file_path, "wb") as f:
    X_test.write_parquet(f)

In [44]:
output_base_path = (
    f"s3://{s3_bucket_batch_transform}/{s3_bucket_batch_transform_output}/"
)

predictions = pl.scan_csv(output_base_path + "*.out").collect()

predictions

predicted_class,predicted_probability
i64,f64
1,0.991911
0,0.989748
1,0.929175
0,0.996197
1,0.962088
…,…
1,0.996596
0,0.858636
1,0.99013
1,0.953402


## Clean Up

In [45]:
model.delete_model()