# 03 - Train and Deploy

In [None]:
import boto3
import sagemaker
import time
from time import strftime

boto_session = boto3.Session()
sagemaker_session = sagemaker.Session(boto_session=boto_session)
sm_client = boto3.client("sagemaker")
region = boto_session.region_name
bucket = sagemaker_session.default_bucket()
role = sagemaker.get_execution_role()
account = sagemaker_session.boto_session.client("sts").get_caller_identity()["Account"]

prefix = 'sagemaker-intel-dvc'

print(f"account: {account}")
print(f"bucket: {bucket}")
print(f"region: {region}")
print(f"role: {role}")

In [None]:
from sagemaker.pytorch import PyTorch
from sagemaker import get_execution_role
from sagemaker.debugger import TensorBoardOutputConfig

In [None]:
dvc_repo_url = "codecommit::us-west-2://sagemaker-intel"
dvc_branch = "processed-dataset"

In [None]:
tensorboard_output_config = TensorBoardOutputConfig(
    s3_output_path=f's3://{bucket}/sagemaker-intel-logs',
    container_local_output_path='/opt/ml/output/tensorboard'
)

In [None]:
pt_estimator = PyTorch(
    base_job_name="training-intel-dataset",
    role=get_execution_role(),
    image_uri='public.ecr.aws/f2t6q8t2/emlo:train',
    instance_count=1,
    instance_type="ml.g4dn.2xlarge",
    tensorboard_output_config=tensorboard_output_config,
    use_spot_instances=True,
    max_wait=1800,
    max_run=1500,
    environment={
        "DVC_REPO_URL": dvc_repo_url,
        "DVC_BRANCH": dvc_branch,
        "GIT_USER": "m",
        "GIT_EMAIL": "m@emlo.com"
    }
)

In [None]:
pt_estimator.fit()

## Deploy to Endpoint

In [None]:
from sagemaker.pytorch import PyTorchModel
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer

Get the `model_data` from the models uploaded to S3 of the aboved training

Once your debugging is done, it's better to simply call `pt_estimator.deploy`

In [None]:
model = PyTorchModel(
    entry_point="infer.py",
    source_dir="scripts",
    role=get_execution_role(),
    model_data="s3://sagemaker-ap-south-1-006547668672/training-flower-dataset-2022-11-29-11-36-31-730/output/model.tar.gz",
    framework_version="1.12.0",
    py_version="py38",
)

In [None]:
predictor = model.deploy(
    initial_instance_count=1,
    instance_type="ml.t2.medium",
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer(),
)

## Predictor

In [None]:
from sagemaker.pytorch import PyTorchPredictor

Predictor is also not needed to be created again, but today we want to be verbose

In [None]:
predictor_new = PyTorchPredictor(
    endpoint_name="pytorch-inference-2022-11-29-11-45-30-851",
    sagemaker_session=sagemaker_session,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer(),
)

In [None]:
import numpy as np

In [None]:
dummy_data = {"inputs": np.random.randn(1, 3, 224, 224)}

In [None]:
out = predictor_new.predict(dummy_data)

In [None]:
out = np.array(out)

In [None]:
out.shape

In [None]:
classnames = ['buildings', 'forest', 'glacier', 'mountain', 'sea', 'street']

In [None]:
import torch

import torchvision.transforms as T
import torch.nn.functional as F

from PIL import Image

In [None]:
inp_img = Image.open("forest.jpg")

We could have done all this in infer.py as well, but today we are being verbose :p

In [None]:
inp_img.resize((224, 224))

In [None]:
transforms = T.Compose([
    T.ToTensor(),
    T.Resize((224, 224)),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
img_t = transforms(inp_img)

In [None]:
input_tensor = {"inputs": img_t[None, ...].numpy().tolist()}

In [None]:
out = predictor_new.predict(input_tensor)

In [None]:
out_t = torch.tensor(out)

In [None]:
out_t.shape

In [None]:
F.softmax(out_t, dim=-1)

In [None]:
f"Prediction: {classnames[torch.argmax(out_t, dim=-1)[0]]}"

### Testing

In [None]:
! pip install --quiet timm pytorch-lightning

In [None]:
from typing import Any, Dict, Optional, Tuple

import os
import subprocess
import torch
import timm

import pytorch_lightning as pl
import torchvision.transforms as T
import torch.nn.functional as F

from pathlib import Path
from torchvision.datasets import ImageFolder
from pytorch_lightning.plugins.environments import LightningEnvironment
from torch.utils.data import DataLoader, Dataset
from torchmetrics.functional import accuracy

In [None]:
class LitResnet(pl.LightningModule):
    def __init__(self, num_classes=10, lr=0.05):
        super().__init__()

        self.save_hyperparameters()
        self.model = timm.create_model('resnet18', pretrained=True)

    def forward(self, x):
        out = self.model(x)
        return F.log_softmax(out, dim=1)

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)
        self.log("train_loss", loss)
        return loss

    def evaluate(self, batch, stage=None):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = accuracy(preds, y)

        if stage:
            self.log(f"{stage}/loss", loss, prog_bar=True)
            self.log(f"{stage}/acc", acc, prog_bar=True)

    def validation_step(self, batch, batch_idx):
        self.evaluate(batch, "val")

    def test_step(self, batch, batch_idx):
        self.evaluate(batch, "test")

    def configure_optimizers(self):
        optimizer = torch.optim.SGD(
            self.parameters(),
            lr=self.hparams.lr,
            momentum=0.9,
            weight_decay=5e-4,
        )
        return {"optimizer": optimizer}


In [None]:
class FlowerDataModule(pl.LightningDataModule):
    def __init__(
        self,
        data_dir: str = "data/",
        batch_size: int = 64,
        num_workers: int = 0,
        pin_memory: bool = False,
    ):
        super().__init__()

        # this line allows to access init params with 'self.hparams' attribute
        # also ensures init params will be stored in ckpt
        self.save_hyperparameters(logger=False)
        
        self.data_dir = Path(data_dir)

        # data transformations
        self.transforms = T.Compose([
            T.ToTensor(),
            T.Resize((224, 224)),
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

        self.data_train: Optional[Dataset] = None
        self.data_test: Optional[Dataset] = None

    @property
    def num_classes(self):
        return len(self.data_train.classes)
    
    @property
    def classes(self):
        return self.data_train.classes

    def prepare_data(self):
        """Download data if needed.
        Do not use it to assign state (self.x = y).
        """
        pass

    def setup(self, stage: Optional[str] = None):
        """Load data. Set variables: `self.data_train`, `self.data_val`, `self.data_test`.
        This method is called by lightning with both `trainer.fit()` and `trainer.test()`, so be
        careful not to execute things like random split twice!
        """
        # load and split datasets only if not loaded already
        if not self.data_train and not self.data_test:
            trainset = ImageFolder(self.data_dir / "train", transform=self.transforms)
            testset = ImageFolder(self.data_dir / "test", transform=self.transforms)
            
            self.data_train, self.data_test = trainset, testset

    def train_dataloader(self):
        return DataLoader(
            dataset=self.data_train,
            batch_size=self.hparams.batch_size,
            num_workers=self.hparams.num_workers,
            pin_memory=self.hparams.pin_memory,
            shuffle=True,
        )

    def val_dataloader(self):
        return DataLoader(
            dataset=self.data_train,
            batch_size=self.hparams.batch_size,
            num_workers=self.hparams.num_workers,
            pin_memory=self.hparams.pin_memory,
            shuffle=False,
        )

    def test_dataloader(self):
        return DataLoader(
            dataset=self.data_test,
            batch_size=self.hparams.batch_size,
            num_workers=self.hparams.num_workers,
            pin_memory=self.hparams.pin_memory,
            shuffle=False,
        )

    def teardown(self, stage: Optional[str] = None):
        """Clean up after fit or test."""
        pass

    def state_dict(self):
        """Extra things to save to checkpoint."""
        return {}

    def load_state_dict(self, state_dict: Dict[str, Any]):
        """Things to do when loading checkpoint."""
        pass



In [None]:
datamodule = FlowerDataModule(data_dir=(Path("/root/flower-project/example-git/") / "dataset").absolute())
datamodule.setup()

In [None]:
datamodule.num_classes

In [None]:
model = LitResnet(num_classes=datamodule.num_classes)

In [None]:
trainer = pl.Trainer(
    max_epochs=2,
    accelerator="auto",
)

In [None]:
trainer.fit(model, datamodule)