In [3]:
# ! unzip sagemaker-flower-pipeline.zip

Make sure to use `sagemaker==2.93.0`

In [3]:
! pip install --quiet sagemaker==2.93.0
# ! pip install sagemaker==2.120.0

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
awscli 1.27.20 requires botocore==1.29.20, but you have botocore 1.29.25 which is incompatible.
aiobotocore 2.0.1 requires botocore<1.22.9,>=1.22.8, but you have botocore 1.29.25 which is incompatible.[0m[31m
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/pytorch_p38/bin/python -m pip install --upgrade pip' command.[0m[33m
[0m

In [1]:
import boto3
import sagemaker
import sagemaker.session
from sagemaker.workflow.pipeline_context import PipelineSession

In [2]:
sagemaker.__version__

'2.93.0'

In [3]:
region = boto3.Session().region_name
role = sagemaker.get_execution_role()
boto_session = boto3.Session()
sagemaker_session = sagemaker.Session(boto_session=boto_session)
default_bucket = sagemaker_session.default_bucket()

In [4]:
def get_sagemaker_client(region):
    """Gets the sagemaker client.

    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        `sagemaker.session.Session instance
    """
    boto_session = boto3.Session(region_name=region)
    sagemaker_client = boto_session.client("sagemaker")
    return sagemaker_client


def get_session(region, default_bucket):
    """Gets the sagemaker session based on the region.

    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        `sagemaker.session.Session instance
    """

    boto_session = boto3.Session(region_name=region)

    sagemaker_client = boto_session.client("sagemaker")
    runtime_client = boto_session.client("sagemaker-runtime")
    return sagemaker.session.Session(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        sagemaker_runtime_client=runtime_client,
        default_bucket=default_bucket,
    )


def get_pipeline_session(region, default_bucket):
    """Gets the pipeline session based on the region.

    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        PipelineSession instance
    """

    boto_session = boto3.Session(region_name=region)
    sagemaker_client = boto_session.client("sagemaker")

    return PipelineSession(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        default_bucket=default_bucket,
    )

In [5]:
# from sagemaker.workflow.pipeline_context import LocalPipelineSession

# local_pipeline_session = LocalPipelineSession()

In [6]:
sagemaker_session = get_session(region, default_bucket)
role = sagemaker.session.get_execution_role(sagemaker_session)
pipeline_session = get_pipeline_session(region, default_bucket)

In [7]:
base_job_name = "pipeline-flower"

In [8]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

## Processing Step

In [9]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.sklearn import SKLearn, SKLearnProcessor
from sagemaker.processing import FrameworkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

In [10]:
dvc_repo_url = ParameterString(
    name="DVCRepoURL", default_value="codecommit::us-east-1://sagemaker-intel-classification"
)
dvc_branch = ParameterString(
    name="DVCBranch", default_value="pipeline-processed-dataset"
)

In [11]:
sklearn_processor = FrameworkProcessor(
    estimator_cls=SKLearn,
    framework_version="0.23-1",
    # instance_type="ml.t3.medium",
    instance_type="ml.m5.xlarge",
    # instance_type='local',
    instance_count=1,
    base_job_name=f"{base_job_name}/preprocess-flower-dataset",
    sagemaker_session=pipeline_session,
    # sagemaker_session=local_pipeline_session,
    role=role,
    env={
        "DVC_REPO_URL": dvc_repo_url,
        "DVC_BRANCH": dvc_branch,
        # "DVC_REPO_URL": "codecommit::ap-south-1://sagemaker-flower-emlo",
        # "DVC_BRANCH": "processed-dataset-pipeline",
        "GIT_USER": "Satyajit Ghana",
        "GIT_EMAIL": "satyajitghana7@gmail.com",
    },
)

In [12]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

In [13]:
input_dataset = ParameterString(
    name="InputDatasetZip",
    default_value="s3://sagemaker-ap-south-1-006547668672/flowers.zip",
)

In [14]:
processing_step_args = sklearn_processor.run(
    code="preprocess.py",
    source_dir="sagemaker-flower-pipeline",
    # dependencies="sagemaker-flower-pipeline/requirements.txt",
    inputs=[
        ProcessingInput(
            input_name="data",
            source=input_dataset,
            # source="s3://sagemaker-ap-south-1-006547668672/flowers.zip",
            destination="/opt/ml/processing/input",
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="train", source="/opt/ml/processing/dataset/train"
        ),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/dataset/test"),
    ],
)


Job Name:  pipeline-flower/preprocess-flower-datas-2022-12-07-22-25-41-877
Inputs:  [{'InputName': 'data', 'AppManaged': False, 'S3Input': {'S3Uri': ParameterString(name='InputDatasetZip', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://sagemaker-ap-south-1-006547668672/flowers.zip'), 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-ap-south-1-006547668672/pipeline-flower/preprocess-flower-datas-2022-12-07-22-25-41-877/source/sourcedir.tar.gz', 'LocalPath': '/opt/ml/processing/input/code/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'entrypoint', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-ap-south-1-006547668672/pipeline-flower/preprocess-flower-datas-2022-1



In [15]:
step_process = ProcessingStep(
    name="PreprocessFlowerClassifierDataset",
    step_args=processing_step_args,
)

In [16]:
step_process

ProcessingStep(name='PreprocessFlowerClassifierDataset', display_name=None, description=None, step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)

## Train Step

In [17]:
from sagemaker.pytorch import PyTorch
from sagemaker.debugger import TensorBoardOutputConfig
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
)

In [18]:
tensorboard_output_config = TensorBoardOutputConfig(
    s3_output_path=f"s3://{default_bucket}/sagemaker-flower-logs-pipeline",
    container_local_output_path="/opt/ml/output/tensorboard",
)

In [19]:
# ! aws s3 cp --recursive /root/flower-project/flowers s3://sagemaker-ap-south-1-006547668672/testing/training

In [20]:
# s3://sagemaker-ap-south-1-006547668672/testing/training/

In [21]:
pt_estimator = PyTorch(
    base_job_name=f"{base_job_name}/training-flower-pipeline",
    source_dir="sagemaker-flower-pipeline",
    entry_point="train.py",
    sagemaker_session=pipeline_session,
    role=role,
    py_version="py38",
    framework_version="1.11.0",
    instance_count=1,
    instance_type="ml.m5.4xlarge",
    tensorboard_output_config=tensorboard_output_config,
    use_spot_instances=True,
    max_wait=1800,
    max_run=1500,
    environment={"GIT_USER": "Satyajit Ghana", "GIT_EMAIL": "satyajitghana7@gmail.com"},
)

In [22]:
from sagemaker.inputs import TrainingInput

In [23]:
estimator_step_args = pt_estimator.fit(
    {
        # 'train': 's3://sagemaker-ap-south-1-006547668672/testing/training',
        # 'train': step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
        # # 'test': 's3://sagemaker-ap-south-1-006547668672/testing/training'
        # 'test': step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
        ),
    }
)

In [24]:
step_train = TrainingStep(
    name="TrainFlowerClassifier",
    step_args=estimator_step_args,
)

In [25]:
step_train

TrainingStep(name='TrainFlowerClassifier', display_name=None, description=None, step_type=<StepTypeEnum.TRAINING: 'Training'>, depends_on=None)

## Eval Step

In [26]:
from sagemaker.pytorch.processing import PyTorchProcessor

In [27]:
pytorch_processor = PyTorchProcessor(
    framework_version="1.11.0",
    py_version="py38",
    role=role,
    sagemaker_session=pipeline_session,
    # instance_type='ml.t3.medium',
    # instance_type="ml.c5.xlarge",
    instance_type="ml.m5.4xlarge",
    # instance_type='local',
    instance_count=1,
    base_job_name=f"{base_job_name}/eval-flower-classifier-model",
)

In [28]:
eval_step_args = pytorch_processor.run(
    code="evaluate.py",
    source_dir="sagemaker-flower-pipeline",
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            # source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            # source="s3://sagemaker-ap-south-1-006547668672/training-flower-pipeline-2022-12-07-03-20-21-157/output/model.tar.gz",
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            # source=step_process.properties.ProcessingOutputConfig.Outputs[
            #     "test"
            # ].S3Output.S3Uri,
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            # source="s3://sagemaker-ap-south-1-006547668672/testing/training",
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation", source="/opt/ml/processing/evaluation"
        ),
    ],
)


Job Name:  pipeline-flower/eval-flower-classifier--2022-12-07-22-25-49-501
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7f5601a44f10>, 'LocalPath': '/opt/ml/processing/model', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'input-2', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7f5603420d90>, 'LocalPath': '/opt/ml/processing/test', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-ap-south-1-006547668672/pipeline-flower/eval-flower-classifier--2022-12-07-22-25-49-501/source/sourcedir.tar.gz', 'LocalPath': '/opt/ml/processing/input/code/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionTy

In [29]:
from sagemaker.workflow.properties import PropertyFile

In [30]:
evaluation_report = PropertyFile(
    name="FlowerClassifierEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)
step_eval = ProcessingStep(
    name="EvaluateFlowerClassifierModel",
    step_args=eval_step_args,
    property_files=[evaluation_report],
)

In [31]:
step_eval

ProcessingStep(name='EvaluateFlowerClassifierModel', display_name=None, description=None, step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)

## Model Metrics

In [32]:
from sagemaker.workflow.properties import PropertyFile
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)

In [33]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
                "S3Uri"
            ]
        ),
        # s3_uri="s3://sagemaker-ap-south-1-006547668672/eval-flower-classifier-model-2022-12-07-19-40-04-608/output/evaluation/evaluation.json",
        content_type="application/json",
    )
)

## Register Model Step (Conditional)

In [34]:
from sagemaker.pytorch import PyTorchModel
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer
from sagemaker.workflow.model_step import ModelStep

In [35]:
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

In [36]:
model_package_group_name = "FlowerClassifierModelGroup"

In [37]:
model = PyTorchModel(
    entry_point="infer.py",
    source_dir="sagemaker-flower-pipeline",
    sagemaker_session=pipeline_session,
    role=role,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    # model_data="s3://sagemaker-ap-south-1-006547668672/training-flower-pipeline-2022-12-07-03-20-21-157/output/model.tar.gz",
    framework_version="1.11.0",
    py_version="py38",
)

In [38]:
model_step_args = model.register(
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.t2.medium"],
    transform_instances=["ml.m4.xlarge"],
    model_package_group_name=model_package_group_name,
    # approval_status=model_approval_status,
    approval_status="PendingManualApproval",
    model_metrics=model_metrics,
)

In [39]:
step_register = ModelStep(
    name="RegisterFlowerClassifierModel",
    step_args=model_step_args,
)

In [40]:
step_register

ModelStep(name='RegisterFlowerClassifierModel', steps=[_RepackModelStep(name='RegisterFlowerClassifierModel-RepackModel-0', display_name=None, description='Used to repack a model with customer scripts for a register/create model step', step_type=<StepTypeEnum.TRAINING: 'Training'>, depends_on=None), _RegisterModelStep(name='RegisterFlowerClassifierModel-RegisterModel', display_name=None, description=None, step_type=<StepTypeEnum.REGISTER_MODEL: 'RegisterModel'>, depends_on=None)])

In [41]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.functions import (
    JsonGet,
)
from sagemaker.workflow.condition_step import (
    ConditionStep,
)

In [42]:
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="multiclass_classification_metrics.accuracy.value",
    ),
    right=0.6,
)

step_cond = ConditionStep(
    name="CheckAccuracyFlowerClassifierEvaluation",
    conditions=[cond_gte],
    if_steps=[step_register],
    else_steps=[],
)

## Pipeline

In [43]:
from sagemaker.workflow.pipeline import Pipeline

In [44]:
pipeline_name = "PyTorchFlowerClassifier7-old-fix-2"

In [45]:
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[dvc_repo_url, dvc_branch, input_dataset, model_approval_status],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=pipeline_session,
    # sagemaker_session=local_pipeline_session,
)

In [46]:
upsert_response = pipeline.upsert(
    role_arn=role, description="testing pytorch flower pipeline"
)

In [47]:
execution = pipeline.start()

In [48]:
execution.list_steps()

[]

In [49]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:ap-south-1:006547668672:pipeline/pytorchflowerclassifier7-old-fix-2',
 'PipelineExecutionArn': 'arn:aws:sagemaker:ap-south-1:006547668672:pipeline/pytorchflowerclassifier7-old-fix-2/execution/9euywnb8t8xo',
 'PipelineExecutionDisplayName': 'execution-1670451966208',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'pytorchflowerclassifier7-old-fix-2',
  'TrialName': '9euywnb8t8xo'},
 'CreationTime': datetime.datetime(2022, 12, 7, 22, 26, 6, 146000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2022, 12, 7, 22, 26, 6, 146000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': 'e32b2920-ffbb-4c22-81db-cd3a780e60f2',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'e32b2920-ffbb-4c22-81db-cd3a780e60f2',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '545',
   'date': 'Wed, 07 Dec 2022 22:26:07 GMT'},
  'RetryAttempts': 0}}