Make sure to use `sagemaker==2.93.0`

In [3]:
! pip install --quiet sagemaker==2.93.0
# ! pip install sagemaker==2.120.0

In [1]:
import boto3
import sagemaker
import sagemaker.session
from sagemaker.workflow.pipeline_context import PipelineSession

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

In [2]:
sagemaker.__version__

'2.93.0'

In [3]:
region = boto3.Session().region_name
role = sagemaker.get_execution_role()
boto_session = boto3.Session()
sagemaker_session = sagemaker.Session(boto_session=boto_session)
default_bucket = sagemaker_session.default_bucket()

In [4]:
def get_sagemaker_client(region):
     """Gets the sagemaker client.

        Args:
            region: the aws region to start the session
            default_bucket: the bucket to use for storing the artifacts

        Returns:
            `sagemaker.session.Session instance
        """
     boto_session = boto3.Session(region_name=region)
     sagemaker_client = boto_session.client("sagemaker")
     return sagemaker_client


def get_session(region, default_bucket):
    """Gets the sagemaker session based on the region.

    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        `sagemaker.session.Session instance
    """

    boto_session = boto3.Session(region_name=region)

    sagemaker_client = boto_session.client("sagemaker")
    runtime_client = boto_session.client("sagemaker-runtime")
    return sagemaker.session.Session(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        sagemaker_runtime_client=runtime_client,
        default_bucket=default_bucket,
    )

def get_pipeline_session(region, default_bucket):
    """Gets the pipeline session based on the region.

    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        PipelineSession instance
    """

    boto_session = boto3.Session(region_name=region)
    sagemaker_client = boto_session.client("sagemaker")

    return PipelineSession(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        default_bucket=default_bucket,
    )


In [5]:
# from sagemaker.workflow.pipeline_context import LocalPipelineSession

# local_pipeline_session = LocalPipelineSession()

In [6]:
sagemaker_session = get_session(region, default_bucket)
role = sagemaker.session.get_execution_role(sagemaker_session)
pipeline_session = get_pipeline_session(region, default_bucket)

In [7]:
base_job_name = "pipeline-intel-casptone"

In [8]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
    ParameterBoolean
)

## Processing Step

In [9]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.sklearn import SKLearn, SKLearnProcessor
from sagemaker.processing import FrameworkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

In [11]:
"""
The parameters needed in the model

Model
Batch Size
Optimizer
Learning Rate
Train Dataset Location in S3
Test Dataset Location in S3
Augmentations to use for Training

"""



dvc_repo_url = ParameterString(
    name="DVCRepoURL",
    default_value="codecommit::ap-south-1://intl-emlo-s12"
)
dvc_branch = ParameterString(
    name="DVCBranch",
    default_value="pipeline-processed-dataset"
)

input_dataset = ParameterString(
    name="InputDatasetZip",
    default_value="s3://tmp-datasets/intel_s12.zip"
)

model = ParameterString(
    name="Model",
    default_value="resnet18"
)

batch_size = ParameterInteger(
    name="BatchSize",
    default_value=32
)

optimizer = ParameterString(
    name="Optimizer",
    default_value="adam"
)

learning_rate = ParameterFloat(
    name="LearningRate",
    default_value=0.001
)

epochs = ParameterInteger(
    name="Epochs",
    default_value=10
)

augmentations = ParameterString(
    name="Augmentations",
    default_value="['RandomCrop', 'HorizontalFlip']"
)



In [12]:
sklearn_processor = FrameworkProcessor(
    estimator_cls=SKLearn,
    framework_version="0.23-1",
    # instance_type="ml.g4dn.xlarge",
    instance_type="ml.m5.xlarge",
    # instance_type='local',
    instance_count=1,
    base_job_name=f"{base_job_name}-preprocess-intel-dataset",
    # sagemaker_session=pipeline_session,
    # sagemaker_session=local_pipeline_session,
    role=role,
    env={
        # "DVC_REPO_URL": dvc_repo_url,
        # "DVC_BRANCH": dvc_branch,
        "DVC_REPO_URL": "codecommit::ap-south-1://intl-emlo-s12",
        "DVC_BRANCH": "processed-dataset-pipeline",
        "GIT_USER": "Shivam Prasad",
        "GIT_EMAIL": "shivam.prasad2015@vitalum.ac.in"
    }
)   

In [13]:


processing_step_args = sklearn_processor.run(
    code='preprocess.py',
    source_dir='scripts',
    dependencies=["requirements.txt"],
    inputs=[
        ProcessingInput(
            input_name='data',
            # source=input_dataset,
            source="s3://tmp-datasets/intel_s12.zip",
            destination='/opt/ml/processing/input'
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/dataset/train"
        ),
        ProcessingOutput(
            output_name="test",
            source="/opt/ml/processing/dataset/test"
        ),
    ],
)


Job Name:  pipeline-intel-casptone-preprocess-inte-2023-01-21-09-15-11-994
Inputs:  [{'InputName': 'data', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://tmp-datasets/intel_s12.zip', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-ap-south-1-546801796757/pipeline-intel-casptone-preprocess-inte-2023-01-21-09-15-11-994/source/sourcedir.tar.gz', 'LocalPath': '/opt/ml/processing/input/code/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'entrypoint', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-ap-south-1-546801796757/pipeline-intel-casptone-preprocess-inte-2023-01-21-09-15-11-994/source/runproc.sh', 'LocalPath': '/opt/ml/processing/input/entrypoint', 'S3DataType': 'S3Prefix', 'S3InputMod

KeyboardInterrupt: 

In [None]:
processing_step_args

In [None]:
step_process = ProcessingStep(
    name="PreprocessIntelImageClassifierDataset",
    step_args=processing_step_args,
)

In [15]:
step_process

ProcessingStep(name='PreprocessIntelImageClassifierDataset', display_name=None, description=None, step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)

## Train Step

In [10]:
from sagemaker.pytorch import PyTorch
from sagemaker.debugger import TensorBoardOutputConfig
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
)

In [11]:
tensorboard_output_config = TensorBoardOutputConfig(
    s3_output_path=f's3://{default_bucket}/sagemaker-intel-logs-pipeline-notebook',
    container_local_output_path='/opt/ml/output/tensorboard'
)

In [12]:
# ! aws s3 cp --recursive /root/flower-project/flowers s3://sagemaker-ap-south-1-006547668672/testing/training

In [13]:
# # s3://sagemaker-ap-south-1-006547668672/testing/training/

# from sagemaker.workflow.pipeline_context import LocalPipelineSession

# local_pipeline_session = LocalPipelineSession()

In [14]:
pt_estimator = PyTorch(
    # base_job_name=f"{base_job_name}/training-intel-pipeline",
    base_job_name=f"training-intel-pipeline",
    source_dir="scripts",
    entry_point="train.py",
    dependencies=['requirements.txt'],
    # sagemaker_session=pipeline_session,
    role=role,
    py_version="py38",
    framework_version="1.11.0",
    instance_count=1,
    # instance_type="ml.m5.4xlarge",
    instance_type="ml.g4dn.xlarge",
    tensorboard_output_config=tensorboard_output_config,
    use_spot_instances=False,
    # hyperparameters={
    #     "model": model,
    #     "batch-size": batch_size,
    #     "optimizer": optimizer,
    #     "learning-rate": learning_rate,
    #     "epochs": epochs,
    #     "augmentations": augmentations
    # },
    hyperparameters = {
        "model_name": 'resnet18',
        "batch_size": 32,
        "optimizer": 'adam',
        "learning_rate": 0.001,
        "epochs": 5,
        "augmentations": "[{'name':'RandomCrop', 'height': 32, 'width': 32}, 'HorizontalFlip']"
    },
    # max_wait=5000,
    max_run=4800,
    environment={
        "GIT_USER": "Shivam Prasad",
        "GIT_EMAIL": "shivam.prasad2015@vitalum.ac.in"
    }
)

In [15]:
from sagemaker.inputs import TrainingInput

In [16]:
estimator_step_args = pt_estimator.fit({
    'train': 's3://tmp-datasets/train',
    # 'train': step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
    'test': 's3://tmp-datasets/test'
    # 'test': step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
    # 'train': TrainingInput(
    #     s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
    #         "train"
    #     ].S3Output.S3Uri,
    # ),
    # 'test': TrainingInput(
    #     s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
    #         "test"
    #     ].S3Output.S3Uri,
    # )
})

2023-01-24 05:17:49 Starting - Starting the training job...
2023-01-24 05:18:04 Starting - Preparing the instances for trainingProfilerReport-1674537469: InProgress
......
2023-01-24 05:19:16 Downloading - Downloading input data.........
2023-01-24 05:20:49 Training - Downloading the training image...............
2023-01-24 05:23:20 Training - Training image download completed. Training in progress..[34mbash: cannot set terminal process group (-1): Inappropriate ioctl for device[0m
[34mbash: no job control in this shell[0m
[34m2023-01-24 00:23:21,377 sagemaker-training-toolkit INFO     Imported framework sagemaker_pytorch_container.training[0m
[34m2023-01-24 00:23:21,396 sagemaker-training-toolkit INFO     No Neurons detected (normal if no neurons installed)[0m
[34m2023-01-24 00:23:21,406 sagemaker_pytorch_container.training INFO     Block until all host DNS lookups succeed.[0m
[34m2023-01-24 00:23:21,409 sagemaker_pytorch_container.training INFO     Invoking user training s

In [25]:
step_train = TrainingStep(
    name="TrainIntelClassifier",
    step_args=estimator_step_args,
)

In [26]:
step_train

TrainingStep(name='TrainIntelClassifier', display_name=None, description=None, step_type=<StepTypeEnum.TRAINING: 'Training'>, depends_on=None)

## Eval Step

In [27]:
from sagemaker.pytorch.processing import PyTorchProcessor

In [28]:
pytorch_processor = PyTorchProcessor(
    framework_version='1.11.0',
    py_version="py38",
    role=role,
    sagemaker_session=pipeline_session,
    instance_type='ml.t3.medium',
    # instance_type="ml.c5.xlarge",
    # instance_type="ml.m5.xlarge",
    # instance_type='local',
    instance_count=1,
    base_job_name=f'{base_job_name}/eval-intel-classifier-model',
)

In [29]:
eval_step_args = pytorch_processor.run(
    code='evaluate.py',
    source_dir='scripts',
    dependencies=["requirements.txt"],
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            # source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            # source="s3://sagemaker-ap-south-1-006547668672/training-flower-pipeline-2022-12-07-03-20-21-157/output/model.tar.gz",
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            # source=step_process.properties.ProcessingOutputConfig.Outputs[
            #     "test"
            # ].S3Output.S3Uri,
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            # source="s3://sagemaker-ap-south-1-006547668672/testing/training",
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
)


Job Name:  pipeline-intel-emlo-s13/eval-intel-clas-2022-12-29-11-00-11-215
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7fc4f296ea30>, 'LocalPath': '/opt/ml/processing/model', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'input-2', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7fc4f2941c40>, 'LocalPath': '/opt/ml/processing/test', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-ap-south-1-546801796757/pipeline-intel-emlo-s13/eval-intel-clas-2022-12-29-11-00-11-215/source/sourcedir.tar.gz', 'LocalPath': '/opt/ml/processing/input/code/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionTy

In [30]:
from sagemaker.workflow.properties import PropertyFile

In [31]:
evaluation_report = PropertyFile(
    name="IntelImageClassifierEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)
step_eval = ProcessingStep(
    name="EvaluateIntelImageClassifierModel",
    step_args=eval_step_args,
    property_files=[evaluation_report],
)

In [32]:
step_eval

ProcessingStep(name='EvaluateIntelImageClassifierModel', display_name=None, description=None, step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)

## Model Metrics

In [33]:
from sagemaker.workflow.properties import PropertyFile
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)

In [34]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        # s3_uri="s3://sagemaker-ap-south-1-006547668672/eval-flower-classifier-model-2022-12-07-19-40-04-608/output/evaluation/evaluation.json",
        content_type="application/json"
    )
)

## Register Model Step (Conditional)

In [35]:
from sagemaker.pytorch import PyTorchModel
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer
from sagemaker.workflow.model_step import ModelStep

In [36]:
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

In [37]:
model_package_group_name = "IntelImageClassifierModelGroup"

In [38]:
model = PyTorchModel(
    entry_point="infer.py",
    source_dir="scripts",
    sagemaker_session=pipeline_session,
    role=role,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    # model_data="s3://sagemaker-ap-south-1-006547668672/training-flower-pipeline-2022-12-07-03-20-21-157/output/model.tar.gz",
    framework_version="1.11.0",
    py_version="py38",
)

In [39]:
model_step_args = model.register(
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.t2.medium"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    # approval_status="PendingManualApproval",
    model_metrics=model_metrics,
)

In [40]:
step_register = ModelStep(
    name="RegisterImageClassifierModel",
    step_args=model_step_args,
)

In [41]:
step_register

ModelStep(name='RegisterImageClassifierModel', steps=[_RepackModelStep(name='RegisterImageClassifierModel-RepackModel-0', display_name=None, description='Used to repack a model with customer scripts for a register/create model step', step_type=<StepTypeEnum.TRAINING: 'Training'>, depends_on=None), _RegisterModelStep(name='RegisterImageClassifierModel-RegisterModel', display_name=None, description=None, step_type=<StepTypeEnum.REGISTER_MODEL: 'RegisterModel'>, depends_on=None)])

In [42]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.functions import (
    JsonGet,
)
from sagemaker.workflow.condition_step import (
    ConditionStep,
)

In [43]:
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="multiclass_classification_metrics.accuracy.value"
    ),
    right=0.4,
)

step_cond = ConditionStep(
    name="CheckAccuracyIntelClassifierEvaluation",
    conditions=[cond_gte],
    if_steps=[step_register],
    else_steps=[],
)

## Pipeline

In [44]:
from sagemaker.workflow.pipeline import Pipeline

In [45]:
pipeline_name = "notebook-pytorch-pipeline-intel-vadi"

In [46]:
pipeline = Pipeline(
        name=pipeline_name,
        parameters=[
            dvc_repo_url,
            dvc_branch,
            input_dataset,
            model_approval_status,
        ],
        steps=[
            step_process,
            step_train,
            step_eval,
            step_cond
        ],
        sagemaker_session=pipeline_session,
        # sagemaker_session=local_pipeline_session,
    )

In [47]:
upsert_response = pipeline.upsert(
    role_arn=role, description="testing pytorch intel pipeline"
)

In [48]:
execution = pipeline.start()

In [49]:
execution.list_steps()

[]

In [50]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:ap-south-1:546801796757:pipeline/notebook-pytorch-pipeline-intel-vadi',
 'PipelineExecutionArn': 'arn:aws:sagemaker:ap-south-1:546801796757:pipeline/notebook-pytorch-pipeline-intel-vadi/execution/gigjj1b8q142',
 'PipelineExecutionDisplayName': 'execution-1672311612896',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2022, 12, 29, 11, 0, 12, 836000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2022, 12, 29, 11, 0, 12, 836000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': '472c7037-fd04-4893-9511-f195a8e07119',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '472c7037-fd04-4893-9511-f195a8e07119',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '439',
   'date': 'Thu, 29 Dec 2022 11:00:12 GMT'},
  'RetryAttempts': 0}}