## Imports

In [None]:
!pip install -U sagemaker

In [None]:
import os
import sagemaker
from sagemaker.local import LocalSession
from sagemaker.pytorch import PyTorch
from utils_cifar import get_train_data_loader, get_test_data_loader, classes

## Downloading data
We retrieve the CIFAR10 dataset

In [None]:
if os.path.isfile('./data/cifar-10-batches-py/batches.meta') and \
        os.path.isfile('./data/cifar-10-python.tar.gz') :
    print('Training and evaluation datasets exist')
    test_loader = get_test_data_loader(False)
else:
    print('Downloading training and evaluation dataset')
    test_loader = get_test_data_loader(True)

## Local development

You can use the SageMaker SDK without actually running remote jobs, using our local compute where this notebook runs. [Local mode](https://sagemaker.readthedocs.io/en/stable/overview.html#local-mode) improves the development experience by enabling you to test everything runs correctly with the selected pre-built or custom container, and that the SageMaker interfaces are being used properly, ensuring parity between local experimentation and full on remote jobs. You can also use it to train on local data and output the resulting model to a local directory, and debug the .fit method, stepping through your training code.

In [None]:
sagemaker_session_local = LocalSession()
sagemaker_session_local.config = {'local': {'local_code': True}}

# dummy role
dummy_role = 'arn:aws:iam::111111111111:role/service-role/AmazonSageMaker-ExecutionRole-20200101T000001'

cifar10_estimator = PyTorch(entry_point='cifar10_pytorch.py',
                            source_dir='./code',
                            role=dummy_role,
                            framework_version='1.8',
                            py_version='py3',
                            # image_uri='custom-container',
                            instance_count=1,
                            instance_type='local_gpu',
                            output_path='file://model/',
                            hyperparameters={
                                'epochs': 1,
                            })

In [None]:
cifar10_estimator.fit(inputs={'training':'file://./data/'})

## Remote training job

In [None]:
sm_sess = sagemaker.Session()
sm_role = sagemaker.get_execution_role()
bucket = sm_sess.default_bucket()
print('Default role:', sm_role)
print('Default bucket:', bucket)

In [None]:
prefix = 'sm-core-testing'
inputs_s3 = sm_sess.upload_data(path="data", bucket=bucket, key_prefix=prefix+'/data')
print(inputs_s3)

In [None]:
# # If you just want to upload the model you trained locally
# model_s3_uri = sm_sess.upload_data(path="model/model.tar.gz", bucket=bucket, key_prefix=prefix+'/model')

In [None]:
cifar10_estimator_remote = PyTorch( entry_point='cifar10_pytorch.py',
                                    source_dir='./code',
                                    role=sm_role,
                                    framework_version='1.8',
                                    py_version='py3',
                                    # image_uri='046234989437.dkr.ecr.us-east-1.amazonaws.com/test-az-sm-core-custom:20220628203731',
                                    instance_count=1,
                                    instance_type='ml.g5.xlarge',
                                    hyperparameters={
                                        'epochs': 1,
                                    },
                                    output_path=f's3://{bucket}/{prefix}/train_output/')

In [None]:
cifar10_estimator_remote.fit(inputs={'training':inputs_s3},
                             job_name=f'test-sm-core-{sagemaker.utils.sagemaker_short_timestamp()}'
                             )

In [None]:
cifar10_estimator_remote.model_data

## Use a custom container

In [None]:
image_name = 'test-sm-core-custom'
!sh ./docker_custom/build_and_push.sh $image_name

In [None]:
!docker image ls

## Retrieve pre-built Deep Learning Container images

In [None]:
import sagemaker 
sagemaker.image_uris.retrieve(region='us-east-1',framework='pytorch',version='1.8',py_version='py3',instance_type='ml.p3.2xlarge',image_scope='training')

## Deploy model to real-time endpoint

In [None]:
# We could go straight from training to deployment, but we use PyTorchModel here to showcase just deploying a model artifact
# cifar10_estimator_remote.deploy(initial_instance_count=1,instance_type='ml.c5.xlarge')

In [None]:
from sagemaker.pytorch import PyTorchModel

env_vars = {
    "SAGEMAKER_TS_BATCH_SIZE": "4",
    "SAGEMAKER_TS_MAX_BATCH_DELAY": "100000"
}

pt_model = PyTorchModel(model_data=cifar10_estimator_remote.model_data,
                        # model_data=model_s3_uri,
                        role=sm_role,
                        entry_point='cifar10_pytorch.py',
                        source_dir='code',
                        framework_version='1.8',
                        py_version='py3')
                        

In [None]:
predictor = pt_model.deploy(initial_instance_count=1,instance_type='ml.g4dn.xlarge')

In [None]:
# # If you want to load a predictor again after you've lost the variables
# from sagemaker.predictor import Predictor
# from sagemaker.serializers import NumpySerializer
# from sagemaker.deserializers import NumpyDeserializer

# predictor = Predictor(endpoint_name='pytorch-inference-2022-07-05-16-18-01-354',serializer=NumpySerializer(),deserializer=NumpyDeserializer())

## Deploy a serverless endpoint

In [None]:
from sagemaker.serverless.serverless_inference_config import ServerlessInferenceConfig

serverless_config = ServerlessInferenceConfig(
    memory_size_in_mb=4096, max_concurrency=10,
)

In [None]:
predictor_serverless = pt_model.deploy(serverless_inference_config=serverless_config,wait=False)

## Deploy an asynchronous endpoint

In [None]:
from sagemaker.async_inference.async_inference_config import AsyncInferenceConfig

async_config = AsyncInferenceConfig(output_path=f"s3://{bucket}/{prefix}/async_results/",
                                    max_concurrent_invocations_per_instance=10,
                                    notification_config=None)

In [None]:
predictor_async = pt_model.deploy(async_inference_config=async_config,
                                  instance_type='ml.g4dn.xlarge',
                                  initial_instance_count=1,
                                  wait=False)

In [None]:
from sagemaker.predictor_async import AsyncPredictor

predictor_async_wrapper = AsyncPredictor(predictor=predictor_async)

## Do inference on the hosted/serverless endpoint

In [None]:
import torch 
import numpy as np
import torchvision as tv
import matplotlib.pyplot as plt

def imshow(img):
  img = img / 2 + 0.5   # unnormalize
  npimg = img.numpy()   # convert from tensor
  plt.imshow(np.transpose(npimg, (1, 2, 0))) 
  plt.show()


def do_inference(predictor, testloader):
    print('Sending requests to SM Endpoint')
    dataiter = iter(testloader)
    images, labels = dataiter.next()
    print(len(images))

    outputs = predictor.predict(images.numpy())

    _, predicted = torch.max(torch.from_numpy(np.array(outputs)), 1)
    
    print('Predicted: ', ' '.join('%4s' % classes[predicted[j]]
                                  for j in range(4)))

    for i in range(4):  # show just the frogs  # 6 = frog
        imshow(tv.utils.make_grid(images[i]))

In [None]:
do_inference(predictor,test_loader)

## Do inference on the async endpoint

In [None]:
import torch 
import numpy as np
import torchvision as tv
import matplotlib.pyplot as plt

def imshow(img):
  img = img / 2 + 0.5   # unnormalize
  npimg = img.numpy()   # convert from tensor
  plt.imshow(np.transpose(npimg, (1, 2, 0))) 
  plt.show()


def do_inference_async(predictor, testloader):
  print('Sending requests to SM Endpoint')
  dataiter = iter(testloader)
  images, labels = dataiter.next()
  print(len(images))


  async_output = predictor.predict_async(data=images.numpy())

  return async_output

def process_async_response(response):
  _, predicted = torch.max(torch.from_numpy(response), 1)
  print('Predicted: ', ' '.join('%4s' % classes[predicted[j]]
                                  for j in range(4)))


In [None]:
# response = predictor_async_wrapper.predict_async(data=images.numpy())
response = do_inference_async(predictor_async_wrapper,test_loader)

In [None]:
response_array = response.get_result()

## Clean up 

In [None]:
predictor.delete_model()
predictor.delete_endpoint()