In [2]:
!python3 -m pip install -U -q sagemaker

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
distributed 2022.7.0 requires tornado<6.2,>=6.0.3, but you have tornado 6.3.2 which is incompatible.[0m[31m
[0m

In [104]:
import boto3
import pprint
import sagemaker
from datetime import datetime
from sagemaker.workflow.pipeline_context import PipelineSession

In [136]:
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
sm_client = boto3.client("sagemaker")
s3 = boto3.resource("s3")

## Setup

In [137]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
    ParameterBoolean
)
from sagemaker.workflow.steps import CacheConfig

In [138]:
base_job_prefix = "llama27b"
pipeline_base = "Llama2"

step_cache_config = CacheConfig(
    enable_caching=True,
    expire_after="T12H",
)

# lets pick a light weight python container for pre and post processing
basepy3_image_uri = sagemaker.image_uris.retrieve(
    framework='sklearn', 
    region=region, 
    version='1.2-1'
)

# processing params
input_dataset_name = ParameterString(
    name="DatasetName",
    default_value="databricks/databricks-dolly-15k",
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", 
    default_value="ml.c5.xlarge"
)

processing_image_uri = ParameterString(
    name="ProcessingImageURI", 
    default_value=basepy3_image_uri
)

# training params
train_instance_count = ParameterInteger(
    name="TrainingInstanceCount", 
    default_value=1
)
train_instance_type = ParameterString(
    name="TrainingInstanceType", 
    default_value="ml.g5.12xlarge"
)
model_id = ParameterString(
    name="ModelId", 
    default_value="meta-textgeneration-llama-2-7b"
)
epochs = ParameterInteger(
    name="Epochs", 
    default_value=1
)
model_output_path = ParameterString(
    name="ModelOutputS3URI", 
    default_value=f"s3://{default_bucket}/{model_id.default_value}/models/"
)

training_monitor_metrics = ParameterString(
    name="TrainingMonitorMetrics", 
    default_value="huggingface-textgeneration:eval-loss,huggingface-textgeneration:eval-ppl,huggingface-textgeneration:train-loss"
)

pending_threshold = ParameterFloat(
    name="RegisterThreshold", default_value=2.0
)

approved_threshold = ParameterFloat(
    name="DeployThreshold", default_value=1.5
)

model_package_group_name = ParameterString(
    name="ModelPackageGroupName", 
    default_value="llama-2-7b-jumpstart"
)

# deploy params
inference_instance_count = ParameterInteger(
    name="InferenceInstanceCount", 
    default_value=1
)

inference_instance_type = ParameterString(
    name="InferenceInstanceType", 
    default_value="ml.g5.xlarge"
)

global_experiment_name = ParameterString(
    name="PipelineExperimentName", 
    default_value="llama2-7b-experiments"
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.


## Preprocessing

In [139]:
from sagemaker.sklearn.processing import ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

In [140]:
script_datapreproc = ScriptProcessor(
    image_uri=processing_image_uri.default_value,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name=f"{base_job_prefix}/script-data-preprocessing",
    sagemaker_session=pipeline_session,
    role=role
)

In [141]:
processor_args = script_datapreproc.run(
    outputs=[
        ProcessingOutput(
            output_name="train", 
            source="/opt/ml/processing/output/train"
        ),
        ProcessingOutput(
            output_name="validation", 
            source="/opt/ml/processing/output/validation"
        ),
        ProcessingOutput(
            output_name="test", 
            source="/opt/ml/processing/output/test"
        ),
    ],
    code="scripts/preprocess_dataset.py",
    arguments=[
        "--dataset_name",
        input_dataset_name,
        "--val_size",
        str(0.1),
        "--test_size",
        str(0.1),
        "--experiment_name",
        global_experiment_name
    ]
)

In [142]:
step_preprocess = ProcessingStep(
    name=f"{pipeline_base}-DataPrep", 
    step_args=processor_args,
    cache_config=step_cache_config
)

## Training Job

In [143]:
from sagemaker.inputs import TrainingInput
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.jumpstart.estimator import JumpStartEstimator
from sagemaker.workflow.execution_variables import ExecutionVariables

In [144]:
estimator = JumpStartEstimator(
    model_id=model_id.default_value, 
    base_job_name=f"{base_job_prefix}/training-job",
    instance_count=train_instance_count,
    instance_type=train_instance_type,
    output_path=model_output_path,
    environment={
        "accept_eula": "true"
    },
    hyperparameters={
        "instruction_tuned": "True", 
        "epoch": epochs.default_value
    },
    sagemaker_session=pipeline_session,
    role=role
)

INFO:sagemaker.jumpstart:Model 'meta-textgeneration-llama-2-7b' requires accepting end-user license agreement (EULA). See https://jumpstart-cache-prod-us-east-1.s3.us-east-1.amazonaws.com/fmhMetadata/eula/llamaEula.txt for terms of use.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


In [145]:
train_args = estimator.fit(
    inputs={
        "training": TrainingInput(
            s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri
        ),
        "validation": TrainingInput(
            s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri
        ),
        "test": TrainingInput(
            s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri
        ),
    },
    experiment_config={
        "ExperimentName": global_experiment_name,
        # "TrialName": ExecutionVariables.TRAINING_JOB_NAME,
        "RunName": ExecutionVariables.TRAINING_JOB_NAME,
    }
)

In [146]:
step_train = TrainingStep(
    name=f"{pipeline_base}-Train", 
    step_args=train_args,
    # cache_config=step_cache_config
)

## Evaluation Metric

In [147]:
from sagemaker.workflow.properties import PropertyFile

In [148]:
script_evaluation = ScriptProcessor(
    image_uri=processing_image_uri.default_value,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name=f"{base_job_prefix}/script-evaluation",
    sagemaker_session=pipeline_session,
    role=role
)

In [149]:
eval_args = script_evaluation.run(
    outputs=[
        ProcessingOutput(
            output_name="evaluation", 
            source="/opt/ml/processing/evaluation"
        )
    ],
    code="scripts/get_metrics.py",
    arguments=[
        "--training_job_name",
        step_train.properties.TrainingJobName,
        "--metric_names",
        training_monitor_metrics
    ]
)

evaluation_report = PropertyFile(
    name="EvaluationReport", 
    output_name="evaluation", 
    path="evaluation.json"
)

In [150]:
step_eval = ProcessingStep(
    name=f"{pipeline_base}-GetMetrics",
    step_args=eval_args,
    property_files=[evaluation_report],
    cache_config=step_cache_config
)

## Create a Model

In [151]:
from sagemaker.workflow.model_step import Model

In [152]:
model = Model(
    image_uri=estimator.image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

## Register Model

In [153]:
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.functions import Join
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.fail_step import FailStep

### Add Model Metrics to a Model

In [154]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)



### Register a Model as 'Approved' if the Model is below a Deploy Threshold

In [155]:
register_approved_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.g4dn.12xlarge"],
    transform_instances=["ml.g4dn.12xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status="Approved",
    model_metrics=model_metrics,
)

step_approved_register = ModelStep(
    name=f"{pipeline_base}-Approved", 
    step_args=register_approved_args
)

### Register a Model as 'Pending' if the Model is above an Acceptable Threshold but not below a Deploy Threshold

In [156]:
register_pending_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.g4dn.12xlarge"],
    transform_instances=["ml.g4dn.12xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status="PendingManualApproval",
    model_metrics=model_metrics,
)

step_pending_register = ModelStep(
    name=f"{pipeline_base}-Pending", 
    step_args=register_pending_args
)

### If the model is above an Acceptable Threshold, then lead to fail

In [157]:
step_fail = FailStep(
    name=f"{pipeline_base}-Fail", 
    error_message=Join(on=" ", values=["Execution failed because metric is below an acceptable threshold"]),
)

## Conditional Check

In [158]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

In [159]:
# acceptable threshold, below this threshold the model will enter into MR in Pending Status
cond_pending_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="metrics.train_loss.value",
    ),
    right=pending_threshold
)

# deploy threshold, this this threshold the model will enter into MR in an Approved Status, ready to deploy!
cond_approved_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="metrics.train_loss.value",
    ),
    right=approved_threshold
)

### Branch 2, Approved Branch defined first

In [160]:
step_approved_cond = ConditionStep(
    name=f"{pipeline_base}-Check-to-Deploy",
    conditions=[cond_approved_lte],
    if_steps=[step_approved_register],
    else_steps=[step_pending_register],
)

### Branch 1, Acceptable Branch next

In [161]:
step_basic_cond = ConditionStep(
    name=f"{pipeline_base}-Check-to-Register",
    conditions=[cond_pending_lte],
    if_steps=[step_approved_cond],
    else_steps=[step_fail],
)

## Execute Pipeline

In [162]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.pipeline import PipelineExperimentConfig

In [163]:
pipeline_name = "Llama2-7b-Finetune-Pipeline"

pipeline = Pipeline(
    name=pipeline_name,
    pipeline_experiment_config=None, # disable default config
    parameters=[
        input_dataset_name,
        processing_instance_type,
        processing_image_uri,
        train_instance_count,
        train_instance_type,
        model_id,
        model_package_group_name,
        model_output_path,
        pending_threshold,
        approved_threshold,
        training_monitor_metrics,
        global_experiment_name
    ],
    steps=[step_preprocess, step_train, step_eval, step_basic_cond],
    sagemaker_session=pipeline_session
)

In [164]:
pipeline.upsert(
    role_arn=role, 
    description="Fine tune a Jumpstart model with custom dataset using SageMaker Pipelines"
)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:811828458885:pipeline/Llama2-7b-Finetune-Pipeline',
 'ResponseMetadata': {'RequestId': 'ac88eb21-7d72-4c1c-80f1-d77782bff495',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'ac88eb21-7d72-4c1c-80f1-d77782bff495',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '95',
   'date': 'Wed, 30 Aug 2023 05:24:33 GMT'},
  'RetryAttempts': 0}}

In [165]:
# import json
# import pprint

# definition = json.loads(pipeline.definition())
# pprint.pprint(definition)

In [166]:
execution = pipeline.start(
    execution_display_name=f"finetune-{model_id.default_value}-{datetime.now().strftime('%y%m%d%H%M')}",
    execution_description=f"Finetune a jumpstart model: {model_id.default_value} with dataset: {input_dataset_name.default_value}"
)