In [None]:
import os
import boto3
import sagemaker

sagemaker_session = sagemaker.session.Session()

role = sagemaker.get_execution_role()

region = boto3.Session().region_name  # must be in the same region as the S3 data!
bucket = "sagemaker-bitty-magazines"  # pipeline steps use S3 storage extensively
model_package_group_name = f"BittiModelPackageGroupName"  # enables model versioning
prefix = 'sagemaker_pipelines_bitti'
turicreate_logs_path = "s3://{}/{}/logs".format(bucket, prefix)

print('Bucket: {}'.format(bucket))
print('Execution role: {}'.format(role))
print('SageMaker ver: ' + sagemaker.__version__)

### Preprocessing step

Download the dataset, convert it into Turi Create's `SFrame` object, and save the output on S3 in a train/test split.

In [4]:
! mkdir -p bitti_source

In [5]:
%%writefile bitti_source/preprocessing.py
"""Processing step for Turi Create object detection model

Adapted from the official Turi Create object detection walkthrough.
"""
import csv
import logging
import argparse
from pathlib import Path
import pip

# TODO: add EXIF image rotation in the mix - otherwise other people will
#       stub their toes eventually. You don't wanna burn 20 eur training
#       the model where the labels don't actually align.

# TODO: this, hands-down, wins the hackiness competition - refactor ASAP!
try:
    import turicreate as tc
except ImportError:
    pip.main(["install", "turicreate"])
    import turicreate as tc
try:
    import numpy as np
except ImportError:
    pip.main(["install", "numpy"])
    import numpy as np

logging.getLogger().setLevel(logging.INFO)


def extract_sframes(train_split_fraction, size_x, size_y):
    """
    Takes in images and YOLO-formatted annotations, converts
    them into Turi Create's SFrame format, and, finally,
    splits and saves training / testing frames as output.

    Note that the input/output folders are controlled by the
    pipeline definition - they are automatically synced with
    their respective S3 destinations.
    """
    base_dir = Path("/opt/ml/processing").resolve()
    input_dir = base_dir/"input"
    output_train_dir = base_dir/"output_train"
    output_test_dir = base_dir/"output_test"

    unique_img_names = [f.name.replace(".jpeg", "")
                        for f in input_dir.glob("*.jpeg")]

    bbox_data = []
    for name in unique_img_names:
        image_path = input_dir/f"{name}.jpeg"
        yolo_annotation_path = input_dir/f"{name}.txt"
        with open(yolo_annotation_path, newline='\n') as csvfile:
            yoloreader = csv.reader(csvfile, delimiter=',')
            for row in yoloreader:
                x_cen, y_cen, width, height = [np.float32(v) for v in
                                               row[0].split(" ")[1:]]
                bbox_data.append([
                    image_path.name,
                    "bitti",
                    int(round(x_cen*size_x)),
                    int(round(y_cen*size_y)),
                    int(round(width*size_x)),
                    int(round(height*size_y))])

    # yes, we don't really need an extra csv file here, but the tutorial
    # was based around it so I based it around having it... ideally, it
    # should be refactored out of the script
    tmp_csv_fname = "/tmp/sframe.csv"
    with open(tmp_csv_fname, "w+") as csvfile:
        sframe_writer = csv.writer(csvfile, delimiter=',')
        sframe_writer.writerow(["name", "label", "x", "y", "width", "height"])
        sframe_writer.writerows(bbox_data)

    csv_sf = tc.SFrame(tmp_csv_fname)

    def row_to_bbox_coordinates(row):  # tutorial artifact
        return {'x': row['x'], 'width': row['width'],
                'y': row['y'], 'height': row['height']}

    csv_sf['coordinates'] = csv_sf.apply(row_to_bbox_coordinates)
    # delete no longer needed columns
    del csv_sf['x'], csv_sf['y'], csv_sf['width'], csv_sf['height']

    sf_images = tc.image_analysis.load_images(str(input_dir),
                                              recursive=False,
                                              random_order=True)

    # Split path to get filename
    info = sf_images['path'].apply(lambda path: [Path(path).name])

    # Rename columns to 'name'
    info = info.unpack().rename({'X.0': 'name'})

    # Add to our main SFrame
    sf_images = sf_images.add_columns(info)

    # Original path no longer needed
    del sf_images['path']

    # Combine label and coordinates into a bounding box dictionary
    csv_sf = csv_sf.pack_columns(['label', 'coordinates'],
                                 new_column_name='bbox', dtype=dict)

    # Combine bounding boxes of the same 'name' into lists
    sf_annotations = csv_sf.groupby('name', {
            'annotations': tc.aggregate.CONCAT('bbox')})

    sf_all = sf_images.join(sf_annotations, on='name', how='left')
    sf_all['annotations'] = sf_all['annotations'].fillna([])

    # Make a train-test split
    sf_train, sf_test = sf_all.random_split(train_split_fraction)

    sf_train.save(str(output_train_dir/'bitti_train.sframe'))
    sf_test.save(str(output_test_dir/'bitti_test.sframe'))


def main(args):
    """String-pulling function

    Basically, channels the args in the right places and ensures
    that the required command-line arguments were passed to it
    """

    # a hacky way to make sure env. variables don't come in empty
    if not all([args.image_size_x, args.image_size_y,
                args.train_split_fraction]):
        raise RuntimeError("the following arguments are required: "
                           "--image-size-x, --image-size-y,"
                           " --train-split-fraction")

    extract_sframes(train_split_fraction=args.train_split_fraction,
                    size_x=args.image_size_x,
                    size_y=args.image_size_y)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    parser.add_argument('--image-size-x', type=int,
                        required=False, default=4032,
                        help='Image size along x-axis (in pixels).')
    parser.add_argument('--image-size-y', type=int,
                        required=False, default=3024,
                        help='Image size along x-axis (in pixels).')
    parser.add_argument('--train-split-fraction', type=float,
                        required=False, default=0.9,
                        help='Training data fraction.')

    args = parser.parse_args()
    main(args)

Overwriting bitti_source/preprocessing.py


In [6]:
# defining some variables

s3_input = f"s3://{bucket}/bitti-data-yolo-format/"

In [27]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="30d")

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1)

processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.t3.large")  # ml.t3.medium runs out of RAM on eval stage

training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.c5.9xlarge")  # TODO: get the GPU tested and running

training_batch_size = ParameterInteger(
    name="TrainingBatchSize",
    default_value=32)

training_max_iterations = ParameterInteger(
    name="MaxIterations",
    default_value=300)

training_n_cores = ParameterInteger(
    name="NumPyLambdaWorkers",
    default_value=36)

# this will also be used for preprocessing - TC runs on TF now
training_instance_tf_version = ParameterString(
    name="TrainingInstanceTFVersion",
    default_value="2.3")

model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    #default_value="PendingManualApproval")
    default_value="Approved")

model_approval_map_threshold = ParameterFloat(
    name="ModelApprovalmAPThreshold",
    default_value=0.7)

input_data = ParameterString(
    name="InputData",
    default_value=s3_input)

In [28]:
from sagemaker.processing import ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


preprocessing_image_uri = sagemaker.image_uris.retrieve(
    framework="tensorflow",
    image_scope="inference",
    region=region,
    version=str(training_instance_tf_version),
    py_version="py37",
    instance_type=processing_instance_type)

sframes_preproessor = ScriptProcessor(
    image_uri=preprocessing_image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={"IMAGE_SIZE_X": "4032",  # TODO: make it a pipeline param
         "IMAGE_SIZE_y": "3024",  # TODO: make it a pipeline param
         "TrainSplitFraction": "0.9",  # TODO: make it a pipeline param
         },
    base_job_name="script-sframe-conversion",
    role=role)

step_sframe_process = ProcessingStep(
    name="BittiDataProcessing",
    processor=sframes_preproessor,
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/output_train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/output_test")
    ],
    code="bitti_source/preprocessing.py",
    cache_config=cache_config
)

### Now for the main bit - the training step

In [29]:
%%writefile bitti_source/training.py
"""Training step for Turi Create object detection model

Adapted from the official Turi Create object detection walkthrough.
"""
import os
import logging
import argparse
from pathlib import Path
import pip

try:
    import turicreate as tc
except ImportError:
    pip.main(["install", "turicreate"])
    import turicreate as tc
try:
    import numpy as np
except ImportError:
    pip.main(["install", "numpy"])
    import numpy as np

logging.getLogger().setLevel(logging.INFO)


def train(train_dir, test_dir, output_dir, batch_size, max_iterations, number_pylambda_workers):
    train_dir = Path(train_dir).resolve()
    test_dir = Path(test_dir).resolve()
    output_dir = Path(output_dir).resolve()

    logging.info(f"train_dir is \"{train_dir}\";test_dir is \"{test_dir}\"\n")
    logging.info(f"train_dir contents are {list(train_dir.glob('*'))}")
    logging.info(f"test_dir contents are {list(test_dir.glob('*'))}")

    tc.config.set_runtime_config('TURI_DEFAULT_NUM_PYLAMBDA_WORKERS',
                                 number_pylambda_workers)

    # Load the data
    train_data = tc.SFrame(str(train_dir/'bitti_train.sframe'))
    test_data = tc.SFrame(str(test_dir/'bitti_test.sframe'))

    # Create a model
    model = tc.object_detector.create(train_data,
                                      max_iterations=max_iterations,
                                      batch_size=batch_size)

    # Evaluate the model and save the results into a dictionary
    metrics = model.evaluate(test_data)
    logging.info(metrics)

    # Save the model for later use in Turi Create
    model.save(str(output_dir/'bitti.model'))

    # Export for use in Core ML
    model.export_coreml(str(output_dir/'bitti.mlmodel'))


def main(args):
    """String-pulling function

    Basically, channels the args in the right places and ensures
    that the required command-line arguments were passed to it
    """

    logging.info("Recieved the following arguments:")
    logging.info(args)

    # a hacky way to make sure env. variables don't come in empty
    if not all([args.train, args.test, args.batch_size, args.max_iterations,
                args.number_pylambda_workers]):
        raise RuntimeError("the following arguments are required: "
                           "--train, --test, --batch-size, --max-iterations,"
                           " --number-pylabmda-workers")
    train(train_dir=args.train, test_dir=args.test, output_dir=args.model_output,
          batch_size=args.batch_size, max_iterations=args.max_iterations,
          number_pylambda_workers=args.number_pylambda_workers)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    parser.add_argument('--train', type=str, required=False, default=os.environ.get('SM_CHANNEL_TRAIN'),
                        help='The directory where the training data is stored.')
    parser.add_argument('--test', type=str, required=False, default=os.environ.get('SM_CHANNEL_TEST'),
                        help='The directory where the test input data is stored.')
    parser.add_argument('--model-output', type=str, default=os.environ.get('SM_MODEL_DIR'),
                        help='The directory where the trained model will be stored.')
    parser.add_argument('--batch-size', type=int,
                        required=False, default=32)
    parser.add_argument('--max-iterations', type=int,
                        required=False, default=300)
    parser.add_argument('--number-pylambda-workers', type=int,
                        required=False, default=36)  # TODO: read this from SM_NUM_CPUS!
    parser.add_argument('--model_dir', type=str,
                        help="This is the S3 URI for model's file storage and so on."
                             " It appears it always gets passed via SageMaker TrainingStep.")

    args = parser.parse_args()
    main(args)

Overwriting bitti_source/training.py


In [30]:
from sagemaker.tensorflow import TensorFlow


model_path = f"s3://{bucket}/output_model"
image_uri = sagemaker.image_uris.retrieve(
    framework="tensorflow",
    image_scope="training",
    region=region,
    version=str(training_instance_tf_version),
    py_version="py37",
    instance_type=training_instance_type)

# Regular expressions are a pain, use the playground here: https://regex101.com/r/kopij0/1
# TODO: works for reporting, but how to publish it via CloudWatch metrics as well?
turicreate_metrics = [{'Name': 'train:loss', 'Regex': r"(?:\| [0-9]+ \| )([0-9]+[.][0-9]+)"}]

tf_train = TensorFlow(base_job_name='bitti-turicreate-pipelines',
                      entry_point='training.py',
                      source_dir='bitti_source',
                      output_path=model_path,  # don't use model_dir hyperparam!
                      role=role,
                      image_uri=image_uri,
                      hyperparameters={'max-iterations': int(training_max_iterations),
                                       'batch-size': int(training_batch_size),
                                       'number-pylambda-workers': int(training_n_cores)
                                      },
                      instance_count=1,
                      instance_type=str(training_instance_type),
                      metric_definitions=turicreate_metrics,
                      input_mode='File')

In [31]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

# TODO: change into Pipe - but that would need additional read f-ions in training.py

step_train = TrainingStep(
    name="ModelTraining",
    estimator=tf_train,
    inputs={
        "train": TrainingInput(step_sframe_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
                               #content_type="application/octet-stream",  # Tested, not needed for File mode
                               input_mode="File"),
        "test": TrainingInput(step_sframe_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
                              #content_type="application/octet-stream",
                              input_mode="File")},
    cache_config=cache_config)

### Define the evaluation step

We have trained the model, but we need to validate it too. What happened the first time was, due to the EXIF tags half of images were not rotated properly relative to the labels. Needless to say, that led to the mAP score to be close to nill. That's a good example for why the validation step is needed - only models that *work* should be carried on with.

In [32]:
%%writefile bitti_source/evaluation.py
import json
import logging
from pathlib import Path
import tarfile
import pip
try:
    import turicreate as tc
except ImportError:
    pip.main(["install", "turicreate"])
    import turicreate as tc


if __name__ == "__main__":
    # define the folder structure we inherited from SageMaker pipelines
    model_dir = Path("/opt/ml/processing/model").resolve()
    test_dir = Path("/opt/ml/processing/test").resolve()
    output_dir = Path("/opt/ml/processing/evaluation").resolve()

    # load and untar the model file
    model_path = model_dir/"model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = tc.load_model("bitti.model")
    logging.info("Loaded the model from %s", str(model_path))

    # load and score the testing data
    test_path = test_dir/"bitti_test.sframe"
    test_data = tc.SFrame(str(test_path))  # don't pass raw Path instances
    metrics = model.evaluate(test_data)
    logging.info("Evaluating the model on test data: %s", metrics)
    mAP = metrics['mean_average_precision_50']

    report_dict = {"regression_metrics": {"mAP": {"value": mAP}}}

    Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = output_dir/"evaluation.json"

    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))
    if evaluation_path.exists():
        logging.info("Successfully dumped evaluation JSON to `%s`.",
                     str(evaluation_path))
    else:
        logging.error("Failed writing the evaluation file!")
    logging.info("Evaluation script finished. Storing mAP=%.2f into the evaluation report", mAP)

Overwriting bitti_source/evaluation.py


In [33]:
from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.properties import PropertyFile


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-bitti-eval",
    role=role)

evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json")

step_eval = ProcessingStep(
    name="ModelEvaluation",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"),
        ProcessingInput(
            source=step_sframe_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test")],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation")],
    code="bitti_source/evaluation.py",
    property_files=[evaluation_report])

In [34]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep, JsonGet


cond_map = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=step_eval,
        property_file=evaluation_report,
        json_path="regression_metrics.mAP.value"),
    right=model_approval_map_threshold)

In [35]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.workflow.step_collections import RegisterModel


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)
step_register = RegisterModel(
    name="BittiRegisterModel",
    estimator=tf_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/octet-stream"],
    response_types=["application/octet-stream"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics)

In [36]:
%%writefile bitti_source/publish_to_api.py
import time
import tarfile
import boto3


if __name__ == "__main__":
    # load the model
    model_path = "/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model_file = "bitti.mlmodel"

    mlfilename=f'bitti-{time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())}.mlmodel'

    bucket_name = "magazine-monitor"
    s3 = boto3.resource('s3')
    s3.Bucket(bucket_name).upload_file(model_file, f"Models/{mlfilename}")

Overwriting bitti_source/publish_to_api.py


In [37]:
script_publish = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-bitti-publish",
    role=role)

step_publish = ProcessingStep(
    name="PublishViaAPI",
    processor=script_publish,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model")],
    code="bitti_source/publish_to_api.py")

In [38]:
step_cond = ConditionStep(
    name="BittymAPcheck",
    conditions=[cond_map],
    if_steps=[step_register, step_publish],
    else_steps=[])

In [39]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = "BittiPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        training_batch_size,
        training_max_iterations,
        training_n_cores,
        training_instance_tf_version,
        model_approval_status,
        input_data,
        model_approval_map_threshold
    ],
    steps=[step_sframe_process, step_train, step_eval, step_cond],
)

In [None]:
pipeline.upsert(role_arn=role)

In [41]:
execution = pipeline.start()

In [23]:
# Test run of the pipeline - we probably don't want to burn resources
# just to see that there was a typo at the end of the training script

# FIXME: doesn't work!

#execution = pipeline.start(
#    parameters=dict(
#        TrainingInstanceType="ml.m5.large",
#        NumPyLambdaWorkers=1,
#        MaxIterations=2))

In [None]:
execution.describe()

In [None]:
execution.list_steps()

In [None]:
pipeline.describe()