In [3]:
# ! unzip sagemaker-flower-pipeline.zip

Make sure to use `sagemaker==2.93.0`

In [1]:
! pip install --quiet sagemaker==2.93.0
# ! pip install sagemaker==2.120.0

In [1]:
# !pip install --quiet -r sagemaker-intel-pipeline/requirements.txt

In [1]:
import boto3
import sagemaker
import sagemaker.session
from sagemaker.workflow.pipeline_context import PipelineSession

In [2]:
sagemaker.__version__

'2.93.0'

In [3]:
import boto3
import sagemaker
import time
from time import strftime

boto_session = boto3.Session()
sagemaker_session = sagemaker.Session(boto_session=boto_session)
sm_client = boto3.client("sagemaker")
region = boto_session.region_name
bucket = sagemaker_session.default_bucket()
role = sagemaker.get_execution_role()
account = sagemaker_session.boto_session.client("sts").get_caller_identity()["Account"]


print(f"account: {account}")
print(f"bucket: {bucket}")
print(f"region: {region}")
print(f"role: {role}")
default_bucket = bucket

account: 629171115321
bucket: sagemaker-us-east-1-629171115321
region: us-east-1
role: arn:aws:iam::629171115321:role/my_sagemaker_execution_role


In [4]:
def get_sagemaker_client(region):
    """Gets the sagemaker client.

    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        `sagemaker.session.Session instance
    """
    boto_session = boto3.Session(region_name=region)
    sagemaker_client = boto_session.client("sagemaker")
    return sagemaker_client


def get_session(region, default_bucket):
    """Gets the sagemaker session based on the region.

    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        `sagemaker.session.Session instance
    """

    boto_session = boto3.Session(region_name=region)

    sagemaker_client = boto_session.client("sagemaker")
    runtime_client = boto_session.client("sagemaker-runtime")
    return sagemaker.session.Session(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        sagemaker_runtime_client=runtime_client,
        default_bucket=default_bucket,
    )


def get_pipeline_session(region, default_bucket):
    """Gets the pipeline session based on the region.

    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        PipelineSession instance
    """

    boto_session = boto3.Session(region_name=region)
    sagemaker_client = boto_session.client("sagemaker")

    return PipelineSession(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        default_bucket=default_bucket,
    )

In [5]:
# from sagemaker.workflow.pipeline_context import LocalPipelineSession

# local_pipeline_session = LocalPipelineSession()

In [6]:
sagemaker_session = get_session(region, default_bucket)
role = sagemaker.session.get_execution_role(sagemaker_session)
pipeline_session = get_pipeline_session(region, default_bucket)

In [7]:
base_job_name = "pipeline-project"

In [8]:
hparams = {
    "MODEL_NAME":"resnet18",
    "OPTIM":"RMS",
    "LR_RATE":str(3.5999257898500047e-05),
    "BATCH_SIZE":256
}

In [9]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat
)

## Processing Step

In [10]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.sklearn import SKLearn, SKLearnProcessor
from sagemaker.processing import FrameworkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

In [11]:
dvc_repo_url = ParameterString(
    name="DVCRepoURL", default_value="codecommit::us-east-1://sagemaker-intel-classification"
)
dvc_branch = ParameterString(
    name="DVCBranch", default_value="pipeline-processed-dataset"
)

In [12]:
# docker register repo
repo_name = "sagemaker-custom-image"
image_tag = "pytorch-cpu" # use gpu image for training; tag: pytorch-gpu

In [13]:
output_path = 's3://{}/{}/output'.format(bucket, repo_name)
image_name  = '{}.dkr.ecr.{}.amazonaws.com/{}:{}'.format(account, region, repo_name, image_tag)
image_name

'629171115321.dkr.ecr.us-east-1.amazonaws.com/sagemaker-custom-image:pytorch-cpu'

In [14]:
!zip -r myfiles.zip .

updating: 04-pipeline.ipynb (deflated 80%)
updating: sagemaker-intel-pipeline/ (stored 0%)
updating: sagemaker-intel-pipeline/__pycache__/ (stored 0%)
updating: sagemaker-intel-pipeline/__pycache__/utils.cpython-37.pyc (deflated 50%)
updating: sagemaker-intel-pipeline/utils.py (deflated 71%)
updating: sagemaker-intel-pipeline/requirements.txt (deflated 19%)
updating: sagemaker-intel-pipeline/.ipynb_checkpoints/ (stored 0%)
updating: sagemaker-intel-pipeline/.ipynb_checkpoints/train-checkpoint.py (deflated 64%)
updating: sagemaker-intel-pipeline/.ipynb_checkpoints/model-checkpoint.py (deflated 70%)
updating: sagemaker-intel-pipeline/.ipynb_checkpoints/preprocess-checkpoint.py (deflated 67%)
updating: sagemaker-intel-pipeline/.ipynb_checkpoints/evaluate-checkpoint.py (deflated 61%)
updating: sagemaker-intel-pipeline/evaluate.py (deflated 61%)
updating: sagemaker-intel-pipeline/preprocess.py (deflated 67%)
updating: sagemaker-intel-pipeline/__init__.py (stored 0%)
updating: sagemaker-inte

In [15]:
sklearn_processor = FrameworkProcessor(
    estimator_cls=SKLearn,
    framework_version="0.23-1",
    # instance_type="ml.t3.medium",
    instance_type="ml.m5.xlarge",
    # instance_type='local',
    instance_count=1,
    base_job_name=f"{base_job_name}/preprocess-dataset",
    sagemaker_session=pipeline_session,
    # sagemaker_session=local_pipeline_session,
    role=role,
    env={
        "DVC_REPO_URL": dvc_repo_url,
        "DVC_BRANCH": dvc_branch,
        # "DVC_REPO_URL": "codecommit::us-east-1://sagemaker-intel-classification",
        # "DVC_BRANCH": "project-dataset",
        "GIT_USER": "sushant",
        "GIT_EMAIL": "sushantgautm@gmail.com",
    },
)

In [16]:
# !python sagemaker-intel-pipeline/preprocess.py

In [17]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

In [18]:
input_dataset = ParameterString(
    name="InputDatasetZip",
    default_value="s3://sagemaker-us-east-1-629171115321/intel.zip",
)

In [19]:
processing_step_args = sklearn_processor.run(
    code="preprocess.py",
    source_dir="sagemaker-intel-pipeline",
    # dependencies="sagemaker-flower-pipeline/requirements.txt",
    inputs=[
        ProcessingInput(
            input_name="data",
            source=input_dataset,
            # source="s3://sagemaker-us-east-1-629171115321/intel.zip",
            destination="/opt/ml/processing/input",
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="train", source="/opt/ml/processing/dataset/train"
        ),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/dataset/test"),
    ],
)


Job Name:  pipeline-project/preprocess-dataset-2023-01-04-09-01-05-785
Inputs:  [{'InputName': 'data', 'AppManaged': False, 'S3Input': {'S3Uri': ParameterString(name='InputDatasetZip', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://sagemaker-us-east-1-629171115321/intel.zip'), 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-629171115321/pipeline-project/preprocess-dataset-2023-01-04-09-01-05-785/source/sourcedir.tar.gz', 'LocalPath': '/opt/ml/processing/input/code/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'entrypoint', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-629171115321/pipeline-project/preprocess-dataset-2023-01-04-09-01-05-785



In [20]:
step_process = ProcessingStep(
    name="PreprocessIntelClassifierDataset",
    step_args=processing_step_args,
)

In [21]:
step_process

ProcessingStep(name='PreprocessIntelClassifierDataset', display_name=None, description=None, step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)

## Train Step

In [22]:
train_s3_loc = ParameterString(
    name="TrainS3Location", default_value="s3://sagemaker-us-east-1-629171115321/pipeline-project-preprocess-dataset-2023-01-03-01-32-13-530/output/train/"
)
test_s3_loc = ParameterString(
    name="TestS3Location", default_value="s3://sagemaker-us-east-1-629171115321/pipeline-project-preprocess-dataset-2023-01-03-01-32-13-530/output/test/"
)
model_name = ParameterString(
    name="ModelName", default_value="resnet18"
)
optim_name = ParameterString(
    name="OptimName", default_value="RMS"
)
# Validation error: https://stackoverflow.com/questions/71221741/validationexception-in-sagemaker-pipeline-creation
learning_rate = ParameterString(name="Learning_rate", default_value="3.5999257898500047e-05")
batch_size = ParameterString(name="Batch_size", default_value="256")

In [23]:
from sagemaker.pytorch import PyTorch
from sagemaker.debugger import TensorBoardOutputConfig
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
)

In [24]:
tensorboard_output_config = TensorBoardOutputConfig(
    s3_output_path=f"s3://{default_bucket}/sagemaker-intel-logs-pipeline-project",
    container_local_output_path="/opt/ml/output/tensorboard",
)

In [25]:
# ! aws s3 cp --recursive /root/flower-project/flowers s3://sagemaker-ap-south-1-006547668672/testing/training

In [26]:
step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri.to_string()

Join(on='', values=[<sagemaker.workflow.properties.Properties object at 0x7f575c6b8310>])

In [None]:
pt_estimator = PyTorch(
    base_job_name=f"{base_job_name}/training-intel-pipeline",
    source_dir="sagemaker-intel-pipeline",
    entry_point="train.py",
    sagemaker_session=pipeline_session,
    role=role,
    py_version="py38",
    framework_version="1.11.0",
    instance_count=1,
    instance_type="ml.g4dn.xlarge",
    tensorboard_output_config=tensorboard_output_config,
    use_spot_instances=True,
    max_wait=2000,
    max_run=1800,
    environment={
        "ModelName":model_name,
        "OptimName":optim_name,
        "Learning_rate":learning_rate,
        "Batch_size":batch_size,
        # "ModelName":"resnet18",
        # "OptimName":"RMS",
        # "Learning_rate":"3.5999257898500047e-05",
        # "Batch_size":"256",
        "GIT_USER": "sushant",
        "GIT_EMAIL": "sushantgautm@gmail.com",
    },
)

In [None]:
from sagemaker.inputs import TrainingInput

In [40]:
estimator_step_args = pt_estimator.fit(
    {
        # 'train': 's3://sagemaker-us-east-1-629171115321/pipeline-project-preprocess-dataset-2023-01-03-01-32-13-530/output/train/',
        # 'test': 's3://sagemaker-us-east-1-629171115321/pipeline-project-preprocess-dataset-2023-01-03-01-32-13-530/output/test/'
        
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
        ),
    }
)

In [31]:
step_train = TrainingStep(
    name="TrainIntelClassifier",
    step_args=estimator_step_args,
)

In [32]:
step_train

TrainingStep(name='TrainIntelClassifier', display_name=None, description=None, step_type=<StepTypeEnum.TRAINING: 'Training'>, depends_on=None)

## Eval Step

In [33]:
from sagemaker.pytorch.processing import PyTorchProcessor

In [34]:
pytorch_processor = PyTorchProcessor(
    framework_version="1.11.0",
    py_version="py38",
    role=role,
    sagemaker_session=pipeline_session,
    # instance_type='ml.t3.medium',
    # instance_type="ml.c5.xlarge",
    instance_type="ml.m5.4xlarge",
    # instance_type='local',
    instance_count=1,
    base_job_name=f"{base_job_name}/eval-Intel-classifier-model",
)

In [35]:
eval_step_args = pytorch_processor.run(
    code="evaluate.py",
    source_dir="sagemaker-intel-pipeline",
    inputs=[
        ProcessingInput(
            # source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            # source="s3://sagemaker-us-east-1-629171115321/pipeline-project-training-intel-pipelin-2023-01-03-02-28-57-889/output/model.tar.gz",
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            # source="s3://sagemaker-us-east-1-629171115321/pipeline-project-preprocess-dataset-2023-01-03-01-32-13-530/output/test/",
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation", source="/opt/ml/processing/evaluation"
        ),
    ],
)


Job Name:  pipeline-project/eval-Intel-classifier--2023-01-04-02-50-13-151
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7ff6054942d0>, 'LocalPath': '/opt/ml/processing/model', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'input-2', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7ff605ec4e50>, 'LocalPath': '/opt/ml/processing/test', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-629171115321/pipeline-project/eval-Intel-classifier--2023-01-04-02-50-13-151/source/sourcedir.tar.gz', 'LocalPath': '/opt/ml/processing/input/code/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionTyp

In [36]:
from sagemaker.workflow.properties import PropertyFile

In [37]:
evaluation_report = PropertyFile(
    name="IntelClassifierEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)
step_eval = ProcessingStep(
    name="EvaluateIntelClassifierModel",
    step_args=eval_step_args,
    property_files=[evaluation_report],
)

In [38]:
step_eval

ProcessingStep(name='EvaluateIntelClassifierModel', display_name=None, description=None, step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)

## Model Metrics

In [39]:
from sagemaker.workflow.properties import PropertyFile
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)

In [40]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
                "S3Uri"
            ]
        ),
        # s3_uri="s3://sagemaker-us-east-1-629171115321/pipeline-project-eval-Intel-classifier--2023-01-03-02-39-40-663/output/evaluation/evaluation.json",
        content_type="application/json",
    )
)

## Register Model Step (Conditional)

In [41]:
from sagemaker.pytorch import PyTorchModel
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer
from sagemaker.workflow.model_step import ModelStep

In [42]:
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

In [43]:
model_package_group_name = "ProjectIntelClassifierModelGroup"

In [44]:
model = PyTorchModel(
    entry_point="infer.py",
    source_dir="sagemaker-intel-pipeline",
    sagemaker_session=pipeline_session,
    role=role,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    # model_data="s3://sagemaker-us-east-1-629171115321/pipeline-project-training-intel-pipelin-2023-01-03-02-28-57-889/output/model.tar.gz",
    framework_version="1.11.0",
    py_version="py38",
)

In [45]:
model_step_args = model.register(
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.t2.medium"],
    transform_instances=["ml.m4.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    # approval_status="PendingManualApproval",
    model_metrics=model_metrics,
)

In [46]:
step_register = ModelStep(
    name="RegisterIntelClassifierModel",
    step_args=model_step_args,
)

In [47]:
step_register

ModelStep(name='RegisterIntelClassifierModel', steps=[_RepackModelStep(name='RegisterIntelClassifierModel-RepackModel-0', display_name=None, description='Used to repack a model with customer scripts for a register/create model step', step_type=<StepTypeEnum.TRAINING: 'Training'>, depends_on=None), _RegisterModelStep(name='RegisterIntelClassifierModel-RegisterModel', display_name=None, description=None, step_type=<StepTypeEnum.REGISTER_MODEL: 'RegisterModel'>, depends_on=None)])

In [48]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.functions import (
    JsonGet,
)
from sagemaker.workflow.condition_step import (
    ConditionStep,
)

In [49]:
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="multiclass_classification_metrics.accuracy.value",
    ),
    right=0.6,
)

step_cond = ConditionStep(
    name="CheckAccuracyIntelClassifierEvaluation",
    conditions=[cond_gte],
    if_steps=[step_register],
    else_steps=[],
)

## Pipeline

In [50]:
from sagemaker.workflow.pipeline import Pipeline

In [51]:
pipeline_name = "ProjectIntelClassifier1"

In [52]:
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[dvc_repo_url, dvc_branch, input_dataset, model_approval_status, train_s3_loc, test_s3_loc,
                model_name, 
                optim_name,
                batch_size,
                learning_rate],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=pipeline_session,
    # sagemaker_session=local_pipeline_session,
)

In [53]:
upsert_response = pipeline.upsert(
    role_arn=role, description="testing Project Intel pipeline v1"
)

In [54]:
execution = pipeline.start()

In [55]:
execution.list_steps()

[{'StepName': 'PreprocessIntelClassifierDataset',
  'StartTime': datetime.datetime(2023, 1, 4, 2, 50, 56, 34000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:629171115321:processing-job/pipelines-79pqywiherly-preprocessintelclass-kaw4bhpeqg'}}}]

In [56]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:629171115321:pipeline/projectintelclassifier1',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:629171115321:pipeline/projectintelclassifier1/execution/79pqywiherly',
 'PipelineExecutionDisplayName': 'execution-1672800655211',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'projectintelclassifier1',
  'TrialName': '79pqywiherly'},
 'CreationTime': datetime.datetime(2023, 1, 4, 2, 50, 55, 102000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 1, 4, 2, 50, 55, 102000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': '92123045-9d9a-4a57-9a76-d57d96fbedff',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '92123045-9d9a-4a57-9a76-d57d96fbedff',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '510',
   'date': 'Wed, 04 Jan 2023 02:50:58 GMT'},
  'RetryAttempts': 0}}