In [None]:
import boto3
import os
import sagemaker
import tensorflow as tf

sess = sagemaker.session.Session()
bucket = sess.default_bucket() 
region = boto3.Session().region_name

data_dir = os.path.join(os.getcwd(), 'data')
os.makedirs(data_dir, exist_ok=True)

train_dir = os.path.join(os.getcwd(), 'data/train')
os.makedirs(train_dir, exist_ok=True)

test_dir = os.path.join(os.getcwd(), 'data/test')
os.makedirs(test_dir, exist_ok=True)

raw_dir = os.path.join(os.getcwd(), 'data/raw')
os.makedirs(raw_dir, exist_ok=True)

batch_dir = os.path.join(os.getcwd(), 'data/batch')
os.makedirs(batch_dir, exist_ok=True)

In [2]:
# SageMaker Processing for dataset transformation
# Saving raw data to S3

import numpy as np
from tensorflow.python.keras.datasets import boston_housing
from sklearn.preprocessing import StandardScaler

(x_train, y_train), (x_test, y_test) = boston_housing.load_data()

np.save(os.path.join(raw_dir, 'x_train.npy'), x_train)
np.save(os.path.join(raw_dir, 'x_test.npy'), x_test)
np.save(os.path.join(raw_dir, 'y_train.npy'), y_train)
np.save(os.path.join(raw_dir, 'y_test.npy'), y_test)
s3_prefix = 'tf-2-workflow'
rawdata_s3_prefix = '{}/data/raw'.format(s3_prefix)
raw_s3 = sess.upload_data(path='./data/raw/', key_prefix=rawdata_s3_prefix)
print(raw_s3)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/boston_housing.npz
s3://sagemaker-us-west-2-867719205611/tf-2-workflow/data/raw


In [3]:
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor1 = SKLearnProcessor(framework_version='0.23-1',
                                     role=get_execution_role(),
                                     instance_type='ml.m5.xlarge',
                                     instance_count=2)

In [4]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from time import gmtime, strftime 

processing_job_name = "tf-2-workflow-{}".format(strftime("%d-%H-%M-%S", gmtime()))
output_destination = 's3://{}/{}/data'.format(bucket, s3_prefix)

sklearn_processor1.run(code='preprocessing.py',
                      job_name=processing_job_name,
                      inputs=[ProcessingInput(
                        source=raw_s3,
                        destination='/opt/ml/processing/input',
                        s3_data_distribution_type='ShardedByS3Key')],
                      outputs=[ProcessingOutput(output_name='train',
                                                destination='{}/train'.format(output_destination),
                                                source='/opt/ml/processing/train'),
                               ProcessingOutput(output_name='test',
                                                destination='{}/test'.format(output_destination),
                                                source='/opt/ml/processing/test')])

preprocessing_job_description = sklearn_processor1.jobs[-1].describe()


Job Name:  tf-2-workflow-16-04-59-22
Inputs:  [{'InputName': 'input-1', 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-867719205611/tf-2-workflow/data/raw', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'ShardedByS3Key', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-867719205611/tf-2-workflow-16-04-59-22/input/code/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train', 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-867719205611/tf-2-workflow/data/train', 'LocalPath': '/opt/ml/processing/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'test', 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-867719205611/tf-2-workflow/data/test', 'LocalPath': '/opt/ml/processing/test', 'S3UploadMode': 'EndOfJo

In [5]:
x_train_in_s3 = '{}/train/x_train.npy'.format(output_destination)
y_train_in_s3 = '{}/train/y_train.npy'.format(output_destination)
x_test_in_s3 = '{}/test/x_test.npy'.format(output_destination)
y_test_in_s3 = '{}/test/y_test.npy'.format(output_destination)

!aws s3 cp {x_train_in_s3} ./data/train/x_train.npy
!aws s3 cp {y_train_in_s3} ./data/train/y_train.npy
!aws s3 cp {x_test_in_s3} ./data/test/x_test.npy
!aws s3 cp {y_test_in_s3} ./data/test/y_test.npy

download: s3://sagemaker-us-west-2-867719205611/tf-2-workflow/data/train/x_train.npy to data/train/x_train.npy
download: s3://sagemaker-us-west-2-867719205611/tf-2-workflow/data/train/y_train.npy to data/train/y_train.npy
download: s3://sagemaker-us-west-2-867719205611/tf-2-workflow/data/test/x_test.npy to data/test/x_test.npy
download: s3://sagemaker-us-west-2-867719205611/tf-2-workflow/data/test/y_test.npy to data/test/y_test.npy


In [6]:
# Training!!!

s3_prefix = 'tf-2-workflow'

traindata_s3_prefix = '{}/data/train'.format(s3_prefix)
testdata_s3_prefix = '{}/data/test'.format(s3_prefix)

In [7]:
# upload train and validation dataset to s3

train_s3 = sess.upload_data(path='./data/train/', key_prefix=traindata_s3_prefix)
test_s3 = sess.upload_data(path='./data/test/', key_prefix=testdata_s3_prefix)

inputs = {'train':train_s3, 'test': test_s3}

print(inputs)

{'train': 's3://sagemaker-us-west-2-867719205611/tf-2-workflow/data/train', 'test': 's3://sagemaker-us-west-2-867719205611/tf-2-workflow/data/test'}


In [8]:
# Set up an tensorflow estimator object

from sagemaker.tensorflow import TensorFlow

train_instance_type = 'ml.c5.xlarge'
hyperparameters = {'epochs': 30, 'batch_size': 128, 'learning_rate': 0.01}

git_config = {'repo': 'https://github.com/aws-samples/amazon-sagemaker-script-mode', 
              'branch': 'master'}

hosted_estimator = TensorFlow(
                       git_config=git_config,
                       source_dir='tf-2-workflow-smpipelines/train_model',
                       entry_point='train.py',
                       instance_type=train_instance_type,
                       instance_count=1,
                       hyperparameters=hyperparameters,
                       role=sagemaker.get_execution_role(),
                       base_job_name='tf-2-workflow',
                       framework_version='2.3.1',
                       py_version='py37',
                       script_mode=True)

In [9]:
# Fitting the model

hosted_estimator.fit(inputs)

2021-01-16 05:09:52 Starting - Starting the training job...
2021-01-16 05:10:16 Starting - Launching requested ML instancesProfilerReport-1610773791: InProgress
......
2021-01-16 05:11:16 Starting - Preparing the instances for training...
2021-01-16 05:11:50 Downloading - Downloading input data...
2021-01-16 05:12:17 Training - Downloading the training image..[34m2021-01-16 05:12:31.082142: W tensorflow/core/profiler/internal/smprofiler_timeline.cc:460] Initializing the SageMaker Profiler.[0m
[34m2021-01-16 05:12:31.087389: W tensorflow/core/profiler/internal/smprofiler_timeline.cc:105] SageMaker Profiler is not enabled. The timeline writer thread will not be started, future recorded events will be dropped.[0m
[34m2021-01-16 05:12:31.273543: W tensorflow/core/profiler/internal/smprofiler_timeline.cc:460] Initializing the SageMaker Profiler.[0m
[34m2021-01-16 05:12:34,102 sagemaker-training-toolkit INFO     Imported framework sagemaker_tensorflow_container.training[0m
[34m2021-

In [10]:
!aws s3 cp {hosted_estimator.model_data} ./model/model.tar.gz

download: s3://sagemaker-us-west-2-867719205611/tf-2-workflow-2021-01-16-05-09-51-399/output/model.tar.gz to model/model.tar.gz


In [11]:
!tar -xvzf ./model/model.tar.gz -C ./model

1/
1/variables/
1/variables/variables.index
1/variables/variables.data-00000-of-00001
1/saved_model.pb
1/assets/


In [12]:
# Model tuning

from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

hyperparameter_ranges = {
  'learning_rate': ContinuousParameter(0.001, 0.2, scaling_type="Logarithmic"),
  'epochs': IntegerParameter(10, 50),
  'batch_size': IntegerParameter(64, 256),
}

metric_definitions = [{'Name': 'loss',
                       'Regex': ' loss: ([0-9\\.]+)'},
                     {'Name': 'val_loss',
                       'Regex': ' val_loss: ([0-9\\.]+)'}]

objective_metric_name = 'val_loss'
objective_type = 'Minimize'

In [13]:
tuner = HyperparameterTuner(hosted_estimator,
                            objective_metric_name,
                            hyperparameter_ranges,
                            metric_definitions,
                            max_jobs=15,
                            max_parallel_jobs=5,
                            objective_type=objective_type)

tuning_job_name = "tf-2-workflow-{}".format(strftime("%d-%H-%M-%S", gmtime()))
tuner.fit(inputs, job_name=tuning_job_name)
tuner.wait()

....................................................................................................................................!
!


In [14]:
tuner_metrics = sagemaker.HyperparameterTuningJobAnalytics(tuning_job_name)
tuner_metrics.dataframe().sort_values(['FinalObjectiveValue'], ascending=True).head(5)

Unnamed: 0,batch_size,epochs,learning_rate,TrainingJobName,TrainingJobStatus,FinalObjectiveValue,TrainingStartTime,TrainingEndTime,TrainingElapsedTimeSeconds
4,231.0,47.0,0.176477,tf-2-workflow-16-05-20-34-011-1f4a1365,Completed,17.0954,2021-01-16 05:28:44+00:00,2021-01-16 05:29:34+00:00,50.0
6,65.0,47.0,0.006957,tf-2-workflow-16-05-20-34-009-271ee3de,Completed,19.5874,2021-01-16 05:25:49+00:00,2021-01-16 05:26:57+00:00,68.0
10,220.0,46.0,0.087906,tf-2-workflow-16-05-20-34-005-1bfd22f2,Completed,20.577499,2021-01-16 05:23:00+00:00,2021-01-16 05:24:04+00:00,64.0
3,102.0,30.0,0.032157,tf-2-workflow-16-05-20-34-012-c83e1752,Completed,20.9004,2021-01-16 05:28:42+00:00,2021-01-16 05:29:36+00:00,54.0
1,102.0,48.0,0.006711,tf-2-workflow-16-05-20-34-014-2d841776,Completed,21.0501,2021-01-16 05:29:16+00:00,2021-01-16 05:30:09+00:00,53.0


In [15]:
total_time = tuner_metrics.dataframe()['TrainingElapsedTimeSeconds'].sum() / 3600
print("The total training time is {:.2f} hours".format(total_time))
tuner_metrics.dataframe()['TrainingJobStatus'].value_counts()

The total training time is 0.24 hours


Completed    15
Name: TrainingJobStatus, dtype: int64

In [16]:
# Deploy the model

tuning_predictor = tuner.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge')


2021-01-16 05:29:34 Starting - Preparing the instances for training
2021-01-16 05:29:34 Downloading - Downloading input data
2021-01-16 05:29:34 Training - Training image download completed. Training in progress.
2021-01-16 05:29:34 Uploading - Uploading generated training model
2021-01-16 05:29:34 Completed - Training job completed

update_endpoint is a no-op in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.



-------------!

In [17]:
results = tuning_predictor.predict(x_test[:10])['predictions'] 
flat_list = [float('%.1f'%(item)) for sublist in results for item in sublist]
print('predictions: \t{}'.format(np.array(flat_list)))
print('target values: \t{}'.format(y_test[:10].round(decimals=1)))

predictions: 	[18.9 42.9 43.3 42.9 42.9 42.9 31.3 31.3 31.1 42.9]
target values: 	[ 7.2 18.8 19.  27.  22.2 24.5 31.2 22.9 20.5 23.2]


In [18]:
sess.delete_endpoint(tuning_predictor.endpoint_name)

In [19]:
# Sagemaker pipeline setup for automating the workflow

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

# raw input data
input_data = ParameterString(name="InputData", default_value=raw_s3)

# processing step parameters
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge")
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=2)

# training step parameters
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.c5.2xlarge")
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)

# batch inference step parameters
batch_instance_type = ParameterString(name="BatchInstanceType", default_value="ml.c5.xlarge")
batch_instance_count = ParameterInteger(name="BatchInstanceCount", default_value=1)

In [21]:
from sagemaker.sklearn.processing import SKLearnProcessor

role = sagemaker.get_execution_role()
framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="tf-2-workflow-process",
    sagemaker_session=sess,
    role=role,
)

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


step_process = ProcessingStep(
    name="TF2Process",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input", s3_data_distribution_type='ShardedByS3Key'),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="./preprocessing.py",
)

In [22]:
from sagemaker.tensorflow import TensorFlow
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.step_collections import RegisterModel

tensorflow_version = '2.3.1'
python_version = 'py37'

image_uri_train = sagemaker.image_uris.retrieve(
                                        framework="tensorflow",
                                        region=region,
                                        version=tensorflow_version,
                                        py_version=python_version,
                                        instance_type=training_instance_type,
                                        image_scope="training"
                                       )

import time

model_path = f"s3://{bucket}/TF2WorkflowTrain"
training_parameters = {'epochs': 44, 'batch_size': 128, 'learning_rate': 0.0125, 'for_pipeline': 'true'}

estimator = TensorFlow(
    image_uri=image_uri_train,
    git_config=git_config,
    source_dir='tf-2-workflow-smpipelines/train_model',
    entry_point='train.py',
    instance_type=training_instance_type,
    instance_count=training_instance_count,
    role=role,
    base_job_name="tf-2-workflow-train",
    output_path=model_path,
    hyperparameters=training_parameters
)

In [23]:
step_train = TrainingStep(
    name="TF2WorkflowTrain",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri
        )
    },
)

In [24]:
from sagemaker.model import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep

image_uri_inference = sagemaker.image_uris.retrieve(
                                        framework="tensorflow",
                                        region=region,
                                        version=tensorflow_version,
                                        py_version=python_version,
                                        instance_type=batch_instance_type,
                                        image_scope="inference"
                                       )
model = Model(
    image_uri=image_uri_inference,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=role,
)

inputs_model = CreateModelInput(
    instance_type=batch_instance_type
)

step_create_model = CreateModelStep(
    name="TF2WorkflowCreateModel",
    model=model,
    inputs=inputs_model,
)

In [25]:
batch_scorer = SKLearnProcessor(
                    framework_version=framework_version,
                    instance_type=batch_instance_type,
                    instance_count=batch_instance_count,
                    base_job_name="tf-2-workflow-batch",
                    sagemaker_session=sess,
                    role=role )

step_batch = ProcessingStep(
                    name="TF2WorkflowBatchScoring",
                    processor=batch_scorer,
                    inputs=[
                        ProcessingInput(
                            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
                            destination="/opt/ml/processing/model"
                        ),
                        ProcessingInput(
                            source=step_process.properties.ProcessingOutputConfig.Outputs[
                                "test"
                            ].S3Output.S3Uri,
                            destination="/opt/ml/processing/test"
                        )
                    ],
                    outputs=[
                        ProcessingOutput(output_name="batch", source="/opt/ml/processing/batch"),
                    ],
                    code="./batch-score.py" )

In [26]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"TF2Workflow"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[input_data,
                processing_instance_type, 
                processing_instance_count, 
                training_instance_type, 
                training_instance_count,
                batch_instance_type,
                batch_instance_count],
    steps=[step_process, 
           step_train, 
           step_create_model,
           step_batch
          ],
    sagemaker_session=sess
)

In [27]:
import json

definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-west-2-867719205611/tf-2-workflow/data/raw'},
  {'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 2},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.c5.2xlarge'},
  {'Name': 'TrainingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'BatchInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.c5.xlarge'},
  {'Name': 'BatchInstanceCount', 'Type': 'Integer', 'DefaultValue': 1}],
 'Steps': [{'Name': 'TF2Process',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUr

In [28]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()

In [29]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-west-2:867719205611:pipeline/tf2workflow',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-west-2:867719205611:pipeline/tf2workflow/execution/y9651a2m5kif',
 'PipelineExecutionDisplayName': 'execution-1610776427863',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2021, 1, 16, 5, 53, 47, 715000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2021, 1, 16, 5, 53, 47, 715000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': 'c7c061b9-b3a4-4079-972f-da17921a57a4',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'c7c061b9-b3a4-4079-972f-da17921a57a4',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '387',
   'date': 'Sat, 16 Jan 2021 05:53:53 GMT'},
  'RetryAttempts': 0}}

In [30]:
execution.wait()
execution.list_steps()

[{'StepName': 'TF2WorkflowCreateModel',
  'StartTime': datetime.datetime(2021, 1, 16, 6, 0, 51, 127000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2021, 1, 16, 6, 0, 51, 951000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-west-2:867719205611:model/pipelines-y9651a2m5kif-tf2workflowcreatemod-rgdki0vd3b'}}},
 {'StepName': 'TF2WorkflowBatchScoring',
  'StartTime': datetime.datetime(2021, 1, 16, 6, 0, 51, 111000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2021, 1, 16, 6, 5, 30, 931000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:867719205611:processing-job/pipelines-y9651a2m5kif-tf2workflowbatchscor-86bae7jmtv'}}},
 {'StepName': 'TF2WorkflowTrain',
  'StartTime': datetime.datetime(2021, 1, 16, 5, 58, 0, 988000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2021, 1, 16, 6, 0, 50, 890000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metada

In [31]:
report_path = f"{step_batch.outputs[0].destination}/score-report.txt"
!aws s3 cp {report_path} ./score-report.txt && cat score-report.txt

download: s3://sagemaker-us-west-2-867719205611/tf-2-workflow-batch-2021-01-16-05-51-32-810/output/batch/score-report.txt to ./score-report.txt
Test MSE : 25.488441467285156