In [None]:
import time
from typing import NamedTuple, Optional

import sagemaker
from sagemaker import get_execution_role
from sagemaker.processing import FrameworkProcessor, ProcessingInput
from sagemaker.pytorch import PyTorchModel, PyTorch

In [None]:
import torch
import torchvision
from torchvision.models import resnet50, ResNet50_Weights

In [None]:
torch.__version__, torchvision.__version__

# Upload pre-trained model to S3 (Need to run just once)

In [None]:
bucket_name = <your-bucket-name>

In [None]:
# Load pretrained ResNet model and save locally
model = resnet50(weights=ResNet50_Weights.DEFAULT)
local_path = "./model.ckpt"
torch.save(model.state_dict(), local_path)

In [None]:
# Tar the model checkpoint and upload to S3 bucket

!tar -czvf model.tar.gz ./model.ckpt
# !aws s3 mb s3://$bucket_name
!aws s3 cp ./model.tar.gz s3://$bucket_name/model/
!aws s3 ls s3://$bucket_name

# Copy datasets to a personal S3 bucket

In [None]:
%%time

# all image data

!aws s3 cp \
    s3://air-example-data-2/10G-image-data-synthetic-raw/ \
    s3://$bucket_name/data/10G-image-data-synthetic-raw/ \
    --recursive \
    --quiet

In [None]:
%%time

# 120 parts parquet data

!aws s3 cp \
    s3://air-example-data-2/10G-image-data-synthetic-raw-parquet-120-partition/ \
    s3://$bucket_name/data/10G-image-data-synthetic-raw-parquet-120-partition/ \
    --recursive \
    --quiet

In [None]:
%%time

# default parquet data

!aws s3 cp \
    s3://air-example-data-2/10G-image-data-synthetic-raw-parquet/ \
    s3://$bucket_name/data/10G-image-data-synthetic-raw-parquet/ \
    --recursive \
    --quiet

# Batch Inference:

In [None]:
# Must be a tar.gz file
model_artifact_s3_location = f"s3://{bucket_name}/model/model.tar.gz"

In [None]:
jobs_location = f"s3://{bucket_name}/jobs"

In [None]:
images_location = f"s3://{bucket_name}/data/10G-image-data-synthetic-raw/"
parquet_location = f"s3://{bucket_name}/data/10G-image-data-synthetic-raw-parquet-120-partition/"
parquet_default_location = f"s3://{bucket_name}/data/10G-image-data-synthetic-raw-parquet/"

In [None]:
dataset_size = 16_232  # number of images in 10GB dataset

In [None]:
ml_g4dn_xlarge_instance_price = 0.736
ml_g4dn_12xlarge_instance_price = 4.89

In [None]:
sagemaker_session = sagemaker.Session(default_bucket=bucket_name)
role = get_execution_role()

In [None]:
def get_transform_time(job_name: str) -> float:
    response = sagemaker_session.sagemaker_client.describe_transform_job(TransformJobName=job_name)
    start_time = response["TransformStartTime"]
    end_time = response["TransformEndTime"]
    return (end_time - start_time).total_seconds()

In [None]:
def get_processing_job_time(job_name: str) -> float:
    response = sagemaker_session.sagemaker_client.describe_processing_job(ProcessingJobName=job_name)
    start_time = response["ProcessingStartTime"]
    end_time = response["ProcessingEndTime"]
    return (end_time - start_time).total_seconds()

In [None]:
class ExperimentElapsedTime(NamedTuple):
    total_time: float
    transform_time: float

In [None]:
def run_sm_bt_experiment(
    job_name: str,
    model_kwargs: Optional[dict] = None,
    transformer_kwargs: Optional[dict] = None,
    transform_run_kwargs: Optional[dict] = None,
) -> ExperimentElapsedTime:
    global model_artifact_s3_location, jobs_location, sagemaker_session
    
    default_model_kwargs = dict(
        model_data=model_artifact_s3_location,
        role=role,
        framework_version="2.0",
        py_version="py310",
        source_dir="./sagemaker/code",
        entry_point="predict.py",
        code_location=jobs_location,
        sagemaker_session=sagemaker_session,
        env={
            "TS_MAX_REQUEST_SIZE": "100000000",
            "MMS_MAX_REQUEST_SIZE": "100000000",
        },
    )
    if model_kwargs is not None:
        default_model_kwargs.update(model_kwargs)
    
    default_transformer_kwargs = dict(
        instance_count=1,
        instance_type="ml.g4dn.xlarge",
        output_path=jobs_location,
        max_payload=100,  # MaxConcurrentTransforms * MaxPayloadInMB <= 100 MB
        accept="application/json",
    )
    if transformer_kwargs is not None:
        default_transformer_kwargs.update(transformer_kwargs)
    
    default_transform_run_kwargs = dict(
        data_type="S3Prefix",
        job_name=job_name,
        wait=True,
        logs=False,
    )
    if transform_run_kwargs is not None:
        default_transform_run_kwargs.update(transform_run_kwargs)
    
    model = PyTorchModel(**default_model_kwargs)
    transformer = model.transformer(**default_transformer_kwargs)
    
    start_time = time.time()
    transformer.transform(**default_transform_run_kwargs)
    end_time = time.time()
    
    total_time = end_time - start_time
    transform_time = get_transform_time(job_name)
    
    return ExperimentElapsedTime(total_time, transform_time)

In [None]:
def run_ray_inference_experiment(
    job_name: str,
    processor_kwargs: Optional[dict] = None,
    run_kwargs: Optional[dict] = None,
) -> ExperimentElapsedTime:
    global jobs_location, sagemaker_session
    
    default_processor_kwargs = dict(
        estimator_cls=PyTorch,
        framework_version="2.0",
        py_version="py310",
        role=role,
        instance_count=1,
        instance_type="ml.g4dn.12xlarge",
        sagemaker_session=sagemaker_session,
        code_location=jobs_location,
        env={
            "SAGEMAKER_CONTAINER_LOG_LEVEL": "DEBUG",
        },
    )
    if processor_kwargs is not None:
        default_processor_kwargs.update(processor_kwargs)
        
    default_run_kwargs = dict(
        job_name=job_name,
        code="inference.py",
        source_dir="./ray/code",
        wait=True,
        logs=False,
    )
    if run_kwargs is not None:
        default_run_kwargs.update(run_kwargs)
    
    processor = FrameworkProcessor(**default_processor_kwargs)
    start_time = time.time()
    processor.run(**default_run_kwargs)
    end_time = time.time()
    
    total_time = end_time - start_time
    processing_time = get_processing_job_time(job_name)
    
    return ExperimentElapsedTime(total_time, processing_time)

In [None]:
def estimate_job_pricing(job_time: int, price_per_instance: float, number_of_instances: int) -> float:
    return (job_time / 3600) * price_per_instance * number_of_instances

---

# Sagemaker Batch Transform experiments:

## a. Default run:

In [None]:
%%time

experiment_times = run_sm_bt_experiment(
    job_name="sm-bt-images-a",
    transformer_kwargs={
        "instance_count": 4,
    },
    transform_run_kwargs={
        "data": images_location,
        "content_type": "application/x-image",
    },
)

In [None]:
experiment_times

In [None]:
dataset_size / experiment_times.total_time, dataset_size / experiment_times.transform_time

In [None]:
estimate_job_pricing(experiment_times.transform_time, ml_g4dn_xlarge_instance_price, 4)

## b.

In [None]:
experiment_times = run_sm_bt_experiment(
    job_name="sm-bt-images-b",
    transformer_kwargs={
        "instance_count": 4,
        "max_concurrent_transforms": 2,
        "max_payload": 50,
    },
    transform_run_kwargs={
        "data": images_location,
        "content_type": "application/x-image",
    },
)

In [None]:
experiment_times

In [None]:
dataset_size / experiment_times.total_time, dataset_size / experiment_times.transform_time

In [None]:
estimate_job_pricing(experiment_times.transform_time, ml_g4dn_xlarge_instance_price, 4)

## c.

In [None]:
experiment_times = run_sm_bt_experiment(
    job_name="sm-bt-images-c",
    transformer_kwargs={
        "instance_count": 4,
        "max_concurrent_transforms": 8,
        "max_payload": 10,
    },
    transform_run_kwargs={
        "data": images_location,
        "content_type": "application/x-image",
    },
)

In [None]:
experiment_times

In [None]:
dataset_size / experiment_times.total_time, dataset_size / experiment_times.transform_time

In [None]:
estimate_job_pricing(experiment_times.transform_time, ml_g4dn_xlarge_instance_price, 4)

## d.

In [None]:
experiment_times = run_sm_bt_experiment(
    job_name="sm-bt-images-d",
    transformer_kwargs={
        "instance_count": 4,
    },
    transform_run_kwargs={
        "data": parquet_location,
        "content_type": "application/x-parquet",
    },
)

In [None]:
experiment_times

In [None]:
dataset_size / experiment_times.total_time, dataset_size / experiment_times.transform_time

In [None]:
estimate_job_pricing(experiment_times.transform_time, ml_g4dn_xlarge_instance_price, 4)

## e.

In [None]:
experiment_times = run_sm_bt_experiment(
    job_name="sm-bt-images-e",
    transformer_kwargs={
        "instance_count": 4,
        "max_concurrent_transforms": 2,
        "max_payload": 50,
    },
    transform_run_kwargs={
        "data": parquet_location,
        "content_type": "application/x-parquet",
    },
)

In [None]:
experiment_times

In [None]:
dataset_size / experiment_times.total_time, dataset_size / experiment_times.transform_time

In [None]:
estimate_job_pricing(experiment_times.transform_time, ml_g4dn_xlarge_instance_price, 4)

---

# Ray Inference experiments:

## a. Default run

In [None]:
%%time

experiment_times = run_ray_inference_experiment(
    job_name="ray-inference-run-a",
    processor_kwargs={
        "env": {"SAGEMAKER_CONTAINER_LOG_LEVEL": "INFO"},
    },
    run_kwargs={
        "arguments": [
            "run_inference_on_parquets",
            "--data_path", parquet_default_location,
            "--data_read_parallelism", "-1",
            "--preprocessing_batch_size", "4096",
            "--inference_batch_size", "1000",
            "--inference_concurrency", "2",
        ],
    }
)

In [None]:
experiment_times

In [None]:
dataset_size / experiment_times.total_time, dataset_size / experiment_times.transform_time

In [None]:
total_script_time = 160.53647327423096
total_script_no_metadata_time = 145.280113697052

dataset_size / total_script_time, dataset_size / total_script_no_metadata_time

In [None]:
estimate_job_pricing(experiment_times.transform_time, ml_g4dn_12xlarge_instance_price, 1)

## b.

In [None]:
%%time

experiment_times = run_ray_inference_experiment(
    job_name="ray-inference-run-b",
    processor_kwargs={
        "env": {"SAGEMAKER_CONTAINER_LOG_LEVEL": "INFO"},
    },
    run_kwargs={
        "arguments": [
            "run_inference_on_parquets",
            "--data_path", parquet_location,
            "--data_read_parallelism", "-1",
            "--preprocessing_batch_size", "4096",
            "--inference_batch_size", "1000",
            "--inference_concurrency", "2",
        ],
    }
)

In [None]:
experiment_times

In [None]:
dataset_size / experiment_times.total_time, dataset_size / experiment_times.transform_time

In [None]:
total_script_time = 170.1976945400238
total_script_no_metadata_time = 151.15066051483154

dataset_size / total_script_time, dataset_size / total_script_no_metadata_time

In [None]:
estimate_job_pricing(experiment_times.transform_time, ml_g4dn_12xlarge_instance_price, 1)

## c.

In [None]:
%%time

experiment_times = run_ray_inference_experiment(
    job_name="ray-inference-run-c-01",
    processor_kwargs={
        "env": {"SAGEMAKER_CONTAINER_LOG_LEVEL": "INFO"},
    },
    run_kwargs={
        "arguments": [
            "run_inference_on_parquets",
            "--data_path", parquet_default_location,
            "--data_read_parallelism", "-1",
            "--preprocessing_batch_size", "135",
            "--inference_batch_size", "135",
            "--inference_concurrency", "2",
        ],
    }
)

In [None]:
experiment_times

In [None]:
dataset_size / experiment_times.total_time, dataset_size / experiment_times.transform_time

In [None]:
total_script_time = 184.49217987060547
total_script_no_metadata_time = 168.5918595790863

dataset_size / total_script_time, dataset_size / total_script_no_metadata_time

In [None]:
estimate_job_pricing(experiment_times.transform_time, ml_g4dn_12xlarge_instance_price, 1)

## d.

In [None]:
%%time

experiment_times = run_ray_inference_experiment(
    job_name="ray-inference-run-d-04",
    processor_kwargs={
        "env": {"SAGEMAKER_CONTAINER_LOG_LEVEL": "INFO"},
    },
    run_kwargs={
        "arguments": [
            "run_inference_on_images",
            "--data_path", images_location,
            "--data_read_parallelism", "8",
            "--preprocessing_batch_size", "1",
            "--inference_batch_size", "1",
            "--inference_concurrency", "2",
        ],
    }
)

In [None]:
experiment_times

In [None]:
dataset_size / experiment_times.total_time, dataset_size / experiment_times.transform_time

In [None]:
total_script_time = 956.8057413101196
total_script_no_metadata_time = 954.6421239376068

dataset_size / total_script_time, dataset_size / total_script_no_metadata_time

In [None]:
estimate_job_pricing(experiment_times.transform_time, ml_g4dn_12xlarge_instance_price, 1)