In [1]:
import sys
import importlib
import os
import boto3
import json
from sagemaker import get_execution_role
from sagemaker.tensorflow import TensorFlow
from datetime import datetime
import os
import pprint
import subprocess

In [2]:
# Login to ECR and select training image
region = "us-west-2"
set_region = f"aws configure set region {region} --profile default"
_ = subprocess.check_output(set_region, shell=True)

In [3]:
role = get_execution_role()

In [4]:
instance_count = 4
instance_type = "ml.p4d.24xlarge"
if instance_type=='local_gpu':
    local_gpu_count = int(subprocess.check_output("nvidia-smi --query-gpu=name --format=csv,noheader | wc -l", shell=True).decode().strip())
processes_per_host = 8 if instance_type in ["ml.p3dn.24xlarge", "ml.p4d.24xlarge", "ml.p3.16xlarge"] \
                     else 4 if instance_type in ["ml.p3.8xlarge", "ml.g4dn.12xlarge"] \
                     else local_gpu_count if instance_type=='local_gpu' else 1

In [5]:
user_id = "jbsnyder"
date_str = datetime.now().strftime("%d-%m-%Y")
time_str = datetime.now().strftime("%d-%m-%Y-%H-%M-%S")
job_name = '{}-{}'.format(user_id, time_str)

In [6]:
s3_bucket = "jbsnyder-sagemaker-pdx" if region=="us-west-2" else "jbsnyder-sagemaker-iad"
s3_path = os.path.join('s3://{}/'.format(s3_bucket))

s3_data_dir = "s3://{}/data/".format(s3_bucket)
imagenet_tfrecord = "imagenet/tfrecord"

channels = {
    'train': os.path.join(s3_data_dir, imagenet_tfrecord, 'train'),
    'validation': os.path.join(s3_data_dir, imagenet_tfrecord, 'validation')
}

output_path = os.path.join(s3_path, "sagemaker-output", date_str, job_name)

code_location = os.path.join(s3_path, "sagemaker-code", date_str, job_name)

In [7]:
pipe_mode = False

batch_size = instance_count * 1024

lr = batch_size / 256 * 0.01

hyperparameters = {"train_data_dir": channels['train'], # "/opt/ml/input/data/train",
                   "validation_data_dir": channels['validation'], # "/opt/ml/input/data/validation",
                   "batch_size": batch_size,
                   "num_epochs": 120,
                   "model_dir": "/opt/ml/checkpoints",
                   "learning_rate": lr,
                   "momentum": 0.9,
                   "label_smoothing": 0.1,
                   "mixup_alpha": 0.2,
                   "l2_weight_decay": 2.5e-5,
                   "fp16": "True",
                   "xla": "True",
                   "tf32": "True",
                   "model": "resnet152v1_d",
                   "pipe_mode": str(pipe_mode)}

In [8]:
source_dir = "."
entry_point = "train.py"

In [9]:
distribution_type='smd' if instance_count>1 else 'hvd'

# distribution_type = 'hvd'

if distribution_type=="smd":
    distribution = { "smdistributed": { "dataparallel": { "enabled": True } } } 
elif distribution_type=="hvd":
    custom_mpi_options = mpi_options = [
         '-x TF_CUDNN_USE_AUTOTUNE=0',
         '-x FI_EFA_USE_DEVICE_RDMA=1',
    ]
    distribution = {
    "mpi": {
        "enabled": True,
        "processes_per_host": processes_per_host,
        "custom_mpi_options": " ".join(custom_mpi_options)
        }
    }

In [10]:
estimator = TensorFlow(
                entry_point=entry_point, 
                source_dir=source_dir, 
                image_uri='427566855058.dkr.ecr.us-west-2.amazonaws.com/jbsnyder:tf-2.5-cudnn-8.2',
                role=role,
                instance_count=instance_count,
                instance_type=instance_type,
                distribution=distribution,
                output_path=None if "local" in instance_type else output_path,
                checkpoint_s3_uri=None if "local" in instance_type else output_path,
                model_dir=None if "local" in instance_type else output_path,
                hyperparameters=hyperparameters,
                volume_size=500,
                input_mode="Pipe" if pipe_mode else "File",
                disable_profiler=True,
                debugger_hook_config=False,
                code_location=None if "local" in instance_type else code_location 
)

In [None]:
# estimator.fit(channels, wait=True, job_name=job_name)
estimator.fit(wait=True, job_name=job_name)

2021-07-13 08:37:42 Starting - Starting the training job...
2021-07-13 08:37:45 Starting - Launching requested ML instances......
2021-07-13 08:39:11 Starting - Preparing the instances for training.................................
2021-07-13 08:44:38 Downloading - Downloading input data
2021-07-13 08:44:38 Training - Downloading the training image.....................
2021-07-13 08:48:08 Training - Training image download completed. Training in progress.[36m2021-07-13 08:48:03.727697: W tensorflow/core/profiler/internal/smprofiler_timeline.cc:460] Initializing the SageMaker Profiler.[0m
[36m2021-07-13 08:48:03.734251: W tensorflow/core/profiler/internal/smprofiler_timeline.cc:105] SageMaker Profiler is not enabled. The timeline writer thread will not be started, future recorded events will be dropped.[0m
[36m2021-07-13 08:48:03.915887: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0[0m
[36m2021-07-13 08:48:04.