In [None]:
OBJECTIVE:
    Implement and Test different parameter options for DATA and MODEL PARALLEL DISTRIBUTED training on SageMaker

BASED ON:
    Introduction to DISRIBUTED TRAINING at:
    https://towardsdatascience.com/smart-distributed-training-on-amazon-sagemaker-with-smd-part-1-cd296f87a0ee

STEPS & FINDINGS:
    1. SetUp
    2. Prepare CIFAR-10 dataset
    3. Distributed Data Parallel (DDP)
        - Implemented with TF
        - Not heavily tested as more tests of DDP are in different repository
        - There needs to be at least as many input files as instances
            - NEXT: Explore the consequences of that fact...
        - HOROVOD/MPI requires EFA and specific instance type not availabel in my private account
        - Also tested on MINIST but run into issues of internal structure of the data (mixing two projets); tbc...
    4. Distirbuted Model Parallel (DMP):
        - Main focus to test implmentation of Sharded Data Parallel Distriution (SDPD)
            if SDPD == instance_count * "processes_per_host" then it is ZeRO
            if SDPD < instance_count * "processes_per_host" than it is MiCS 
            MiCS =  ZeRO implementaiton by AWS (incomplete but tailored to Cloud Infrastreucture)

        - @SM no mixing of SDPD and Pipline Parallelism
        - @SM no mixing of SDPD and Tensor Parallelism
        - The lower SDPD the faster (but minimally with the given data)
        - The lower Pipeline Parallelism the faster -- possibly due to more efficient comms and parallel data distirbution
        - SMD:MP:"sdpd": 1, SMD:MP:"partitions": 1 might equal to simple DDP
        - Other Hyperparameters (Tested Individually):
            "horovod": DEFAULT: False; TEST True  - 'ddp' and 'horovod' cannot be simultaneously enabled. - Slightly Slower
            "ddp": DEFAULT: False; TEST True  - 'ddp' and 'horovod' cannot be simultaneously enabled.
            "tensor_parallel_degree": DEFAULT: 1; TEST 2, 4, 8 - Slight improvment over DDP but increasing level of parallelism does not have impact
            "fp16": DEFAULT: False; TEST True - 
            "fp16_params": False, # DEFAULT: False; TEST True --- REMOVED, USE "fp16"
            "bf16": False, # DEFAULT: False; TEST True
            "offload_activations": DEFAULT: False; TEST True - No Impact, but might be required for MEMORY MANAGEMENT
            "shard_optimizer_state": DEFAULT: False; TEST True - No SDPD allowed, but slight improvment over DDP
            "activation_loading_horizon": DEFAULT: 4; TEST 1 - Slight Improvement
            "microbaches"and "batch_size": 
                - setting different values for those parameters had a big impact on the training speed
                - results dependent on the level of model parallelism:
                    - simetimes running into MEMORY issues!
                    - needs to be tested and specified individualy per training job and take into account the max batch size available to the model

In [None]:
# SET UP
! pip install tensorflow

from sagemaker.tensorflow import TensorFlow
from sagemaker.session import TrainingInput

from sagemaker import get_execution_role

role = get_execution_role()
print(role)

In [None]:
# DATA GENERATED WITH SYNTAX AT THE BOTTOM!!!

In [None]:
# SET FOR THE TRAINING JOB NAME
run = 35

In [None]:
#####################################################################
################ RUN Distributed DATA Parallel (DDP) ################
#####################################################################

s3_input_location_str = 's3://dis-tr-workshop/cifar10-dataset/' # USING ALREADY EXISTING BUILD - might not work or have an optimal performance!!!!
#s3_input_location_str = 's3://mnist-tdrecords/train/120/' # USING ALREADY EXISTING BUILD - 120 SHARDS!

s3_input = TrainingInput(s3_input_location_str,
                         input_mode='FastFile') #'FastFile

# Training using SMDataParallel Distributed Training Framework
distribution = {'smdistributed':{'dataparallel':{'enabled': True,
                                                'processes_per_host': 8 # this does not work -- tested with 4 and still 8 processes run
                                                }}}

# Training using HOROVOD/MPI -- Possibly runs only with 'ml.p3dn.24xlarge' instances as it needs EFA
"""
distribution = {'mpi': {'enabled': True,
                        'processes_per_host': 8}}
"""
#The supported instance types:('ml.p3.16xlarge', 'ml.p3dn.24xlarge', 'ml.p4d.24xlarge', 'ml.p4de.24xlarge', 'local_gpu')

tf_estimator = TensorFlow(entry_point='train_tf_DPD.py',
                          role=role,
                          instance_type='ml.p3.16xlarge', # InExample: 'ml.p4d.24xlarge' = $32.7726/h
                                                          # 'ml.p3.16xlarge' = $24.48/h
                                                          # 'ml.p3dn.24xlarge' = NoPrice Provided/h
                                                          # 'ml.p4de.24xlarge' = NoPrice Provided/h
                          instance_count=1, # InExample: 4
                          framework_version='2.9', # InExample: '2.9.1'
                          py_version='py39',
                          distribution=distribution)

run += 1

tf_estimator.fit(s3_input,
                 job_name = "train-tf-DPD-" + str(run),)


In [None]:
#####################################################################
########################## SUMMARY OF DDP ###########################
#####################################################################

# CIFAR-10
'ml.p3.16xlarge' x1, smdistributed, no @tf.function
EXEC TIME: 149.16670870780945
EXEC TIME: 148.32054686546326

'ml.p3.16xlarge' x1, smdistributed, WITH @tf.function
EXEC TIME: 150.85319781303406

'ml.p3.16xlarge' x2, smdistributed, no @tf.function
requies > 2 files!!!!
EXEC TIME: 175.32054686546326

'ml.p3.16xlarge' x1, HOROVOD/MPI, no @tf.function
ENDS WITH ERROR: EFA device name not set

'ml.p4d.24xlarge' x1, smdistributed, no @tf.function
NO INSANCE QUOTA FOR THE INSTANCE TYPE

MINIST:
    
'ml.p3.16xlarge' x1, smdistributed, 120 SHARDS, no @tf.function
# POSSIBLY the TFRECORD internal structure does not meet the .py requirments

'ml.p3.16xlarge' x2, smdistributed, 120 SHARDS, no @tf.function
# it seems it started on both instances although input folder name error killed it.


In [None]:
#####################################################################
############### RUN Distributed MODEL Parallel (DMP) ################
#####################################################################

# NOTES:
# "processes_per_host" -- HOW many GPUs per HOST/NODE
# instance_count = 1, -- how many instances to provision
# "sharded_data_parallel_degree" -- level of ZeRO model parallelism; 
                                  # if it == instance_count * "processes_per_host" then it is ZeRO 
                                  # if it is less than it is MiCS -- ZeRO implementaiton by AWS (incomplete but tailored to Cloud Infrastreucture)

from sagemaker import get_execution_role
# from sagemaker.pytorch.training_compiler.config import TrainingCompilerConfig # ValueError: SageMaker distributed training configuration is currently not compatible with SageMaker Training Compiler.
role = get_execution_role()
print(role)

from sagemaker.pytorch import PyTorch

distribution = {"mpi": {"enabled": True,
                        "processes_per_host": 8, # InExample: 4
                        #"custom_mpi_options": "--mca btl_vader_single_copy_mechanism none" #Use this key to pass any custom MPI options you might need. To avoid Docker warnings from contaminating your training logs, we recommend the following flag. 
                       },
                #"pytorchxla": {"enabled": True},
                "smdistributed": {#"dataparallel": {"enabled": True}, #choose one of the following supported strategies:['dataparallel', 'modelparallel']
                                  "modelparallel": {"enabled": True,
                                                    "parameters": {"partitions": 1, # Got an Error '"partitions" is a required parameter'!!!; controls pipeline_parallel_degree
                                                                   "ddp": True, # PyTorch Specifc -- allows mix data+model parallel
                                                                   "sharded_data_parallel_degree": 2,  # InExample: 32
                                                                   "delayed_parameter_initialization": True,
                                                                   "microbatches": 4, # DEFAULT: 1; TEST 4
                                                                   "horovod": False, # DEFAULT: False; TEST True   ---- ValueError: 'ddp' and 'horovod' cannot be simultaneously enabled.
                                                                   "ddp": True, # DEFAULT: False; TEST True   ---- ValueError: 'ddp' and 'horovod' cannot be simultaneously enabled.
                                                                   "tensor_parallel_degree": 1, # DEFAULT: 1; TEST 2, ???
                                                                   "fp16": False, # DEFAULT: False; TEST True
                                                                   "fp16_params": False, # DEFAULT: False; TEST True --- DEPReciated
                                                                   "bf16": True, # DEFAULT: False; TEST True
                                                                   "offload_activations": True, # DEFAULT: False; TEST True
                                                                   "shard_optimizer_state": False, # DEFAULT: False; TEST True
                                                                   "activation_loading_horizon": 1, # DEFAULT: 4; TEST 1, ???  
                                                                   "placement_strategy": "cluster" # DEFAULT: "cluster" ("DPT"); TEST "spread" ("TPD"), permutations of "D,P,T"
                                                                  },}},}
                        
pytorch = PyTorch(entry_point='train_PT_MP.py',
                  role=role,
                  instance_type='ml.p3.16xlarge', # InExample: 'ml.g4dn.12xlarge' 
                                                  # 'ml.p4d.24xlarge' = $32.7726/h
                                                  # 'ml.p3.16xlarge' = $24.48/h
                                                  # 'ml.p3dn.24xlarge' = NoPrice Provided/h
                                                  # 'ml.p4de.24xlarge' = NoPrice Provided/h
                  instance_count=1,  # InExample: 8
                  framework_version='1.12',
                  py_version='py38',
                  distribution=distribution)

                  #compiler_config = TrainingCompilerConfig()) # ValueError: SageMaker distributed training configuration is currently not compatible with SageMaker Training Compiler.

pytorch.fit()

In [None]:
#####################################################################
########################## SUMMARY OF DPD ###########################
#####################################################################

# CIFAR-10
'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 8, SMD:MP:"partitions": 1, FakeDataset_LEN: 100
average/sum step time: 68.43147036700003

'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 8, SMD:MP:"partitions": 2, FakeDataset_LEN: 100
ERROR: Setting config parameter sharded_data_parallel_degree to non-default value 8 requires pipeline_parallel_degree to be set to 1. Found: 2

'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 4, SMD:MP:"partitions": 2, FakeDataset_LEN: 100
ERROR: Setting config parameter sharded_data_parallel_degree to non-default value 4 requires pipeline_parallel_degree to be set to 1. Found: 2

'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 2, SMD:MP:"partitions": 4, FakeDataset_LEN: 100
ERROR: Setting config parameter sharded_data_parallel_degree to non-default value 4 requires pipeline_parallel_degree to be set to 1. Found: 2


### PIPELINE PARALLEL SEEMS NOT TO BE SUPPORTEDWITH SDPD!!!
'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 1, SMD:MP:"partitions": 2, FakeDataset_LEN: 100
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 1.4530066869583347, 34.87216048700003

'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 1, SMD:MP:"partitions": 4, FakeDataset_LEN: 100
1,mpirank:0,algo-1]<stdout>:average/sum step time: 1.5864331667500007, 38.074396002000015

'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 1, SMD:MP:"partitions": 8, FakeDataset_LEN: 100
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 2.142736179499998, 51.425668307999956


'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 4, SMD:MP:"partitions": 1, FakeDataset_LEN: 100
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 2.6638444754999995, 63.93226741199999

'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 2, SMD:MP:"partitions": 1, FakeDataset_LEN: 100 ######## BENCHMARK!!!
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 2.6146430564166656, 62.75143335399997


# PROBABLY JUST DATA PARALLEL
'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 1, SMD:MP:"partitions": 1, FakeDataset_LEN: 100
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 1.343366788208333, 32.240802916999996

# ADDITIONAL PARAMS TESTS
# DEFAULT VALUES
microbaches: 1
batch_size=8*8, # InExample: 4

"horovod": False, # DEFAULT: False; TEST True  ---- ValueError: 'ddp' and 'horovod' cannot be simultaneously enabled.
"ddp": True, # DEFAULT: False; TEST True  ---- ValueError: 'ddp' and 'horovod' cannot be simultaneously enabled.
"tensor_parallel_degree": 1, # DEFAULT: 1; TEST 2, ??? --- Setting config parameter sharded_data_parallel_degree to non-default value 2 requires tensor_parallel_degree to be set to 1. Found: 2
"fp16": DEFAULT: False; TEST True - no impact
"fp16_params": DEFAULT: False; TEST True - DEPRECIATED, USE "fp16"
"bf16": DEFAULT: False; TEST True - Slight improvement
"offload_activations": DEFAULT: False; TEST True
"shard_optimizer_state": False, # DEFAULT: False; TEST True --- Setting config parameter sharded_data_parallel_degree to non-default value 2 requires shard_optimizer_state to be set to False. Found: True
"activation_loading_horizon": 4 # DEFAULT: 4; TEST 1, ???   
                                    
'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 2, SMD:MP:"partitions": 1, FakeDataset_LEN: 100, microbaches: 8, batch_size=4*8, # InExample: 4
average/sum step time: 14.583918485999996, 43.75175545799999 --- # WARNINIG: 1 pytorch allocator cache flushes since last step. 
this happens when there is high memory pressure and is detrimental to performance. if this is happening frequently consider adjusting settings to reduce memory consumption.
If you are unable to make the cache flushes go away consider adding torch.cuda.empty_cache() calls in your training loop to ensure that all ranks flush their caches at the same time

'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 2, SMD:MP:"partitions": 1, FakeDataset_LEN: 100, "horovod": True, "ddp": False
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 2.7699321410416666, 66.478371385

# tensor_parallel_degree REQUIRES SMD:MP:"sdpd": 1
# Error: Setting config parameter sharded_data_parallel_degree to non-default value 2 requires tensor_parallel_degree to be set to 1. Found: 2
'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 1, SMD:MP:"partitions": 1, FakeDataset_LEN: 100, "tensor_parallel_degree": 2,
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 1.2903158405, 30.967580171999998

'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 1, SMD:MP:"partitions": 1, FakeDataset_LEN: 100, "tensor_parallel_degree": 4,
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 1.3078571323333321, 31.38857117599997

'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 1, SMD:MP:"partitions": 1, FakeDataset_LEN: 100, "tensor_parallel_degree": 8,
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 1.2856828337083333, 30.856388009


'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 2, SMD:MP:"partitions": 1, FakeDataset_LEN: 100, "fp16": True,
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 2.761540846416667, 66.27698031400001
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 1.123599046541668, 26.966377117000036 # SMD:MP:"sdpd": 1

'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 2, SMD:MP:"partitions": 1, FakeDataset_LEN: 100, "bf16": True,
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 2.5171378690416666, 60.411308856999995
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 1.2476976044583334, 29.944742507 # SMD:MP:"sdpd": 1

'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 2, SMD:MP:"partitions": 1, FakeDataset_LEN: 100, "offload_activations": True,
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 2.7027348693333337, 64.86563686400001
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 1.3232287461249992, 31.75748990699998 # SMD:MP:"sdpd": 1

#"shard_optimizer_state": True, REQUIRES SMD:MP:"sdpd": 1  
'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 1, SMD:MP:"partitions": 1, FakeDataset_LEN: 100, "shard_optimizer_state": True,
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 1.2148715330416664, 29.156916792999994

'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 2, SMD:MP:"partitions": 1, FakeDataset_LEN: 100, "activation_loading_horizon": 1
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 2.6403854495833343, 63.369250790000024


'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 2, SMD:MP:"partitions": 1, FakeDataset_LEN: 100, microbaches: 8, batch_size=4*8, "bf16": True, "offload_activations": True
"activation_loading_horizon": 1
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 24.500388405999995, 73.50116521799998

'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 2, SMD:MP:"partitions": 1, FakeDataset_LEN: 100, microbaches: 4, batch_size=4*8, "bf16": True, "offload_activations": True
"activation_loading_horizon": 1
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 13.70485049233334, 41.11455147700002

'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 2, SMD:MP:"partitions": 1, FakeDataset_LEN: 100, microbaches: 4, batch_size=4, "bf16": True, "offload_activations": True
"activation_loading_horizon": 1
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 9.119615185258063, 282.70807074299995 ?!?!?!?!?!?

'ml.p3.16xlarge' x1, MPI:"pph": 8, SMD:MP:"sdpd": 2, SMD:MP:"partitions": 1, FakeDataset_LEN: 100, microbaches: 4, batch_size=8*8, "bf16": True, "offload_activations": True
"activation_loading_horizon": 1
[1,mpirank:0,algo-1]<stdout>:average/sum step time: 26.91075615799997, 26.91075615799997

In [None]:
######################################################
########## DATA GENERATOR FOR DATA PARALLEL ##########
######################################################

# Loads Data from the web and saves in the "data-cifar10" folder

%tb
#     Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
#     Licensed under the Apache License, Version 2.0 (the "License").
#     You may not use this file except in compliance with the License.
#     A copy of the License is located at
#
#         https://aws.amazon.com/apache-2-0/
#
#     or in the "license" file accompanying this file. This file is distributed
#     on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
#     express or implied. See the License for the specific language governing
#     permissions and limitations under the License.

import argparse
import os
import sys
import tarfile

import tensorflow as tf
from six.moves import cPickle as pickle
from six.moves import xrange  # pylint: disable=redefined-builtin

# import tensorflow_datasets as tfds

CIFAR_FILENAME = "cifar-10-python.tar.gz"
CIFAR_DOWNLOAD_URL = "https://www.cs.toronto.edu/~kriz/" + CIFAR_FILENAME
CIFAR_LOCAL_FOLDER = "cifar-10-batches-py"


def download_and_extract(data_dir):
    import tensorflow_datasets as tfds

    dm = tfds.download.DownloadManager(download_dir=data_dir + "/tmp")
    extract_dir = dm.download_and_extract(CIFAR_DOWNLOAD_URL)

    return extract_dir


def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))


def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


def _get_file_names():
    """Returns the file names expected to exist in the input_dir."""
    file_names = {}
    file_names["train"] = ["data_batch_%d" % i for i in xrange(1, 5)]
    file_names["validation"] = ["data_batch_5"]
    file_names["eval"] = ["test_batch"]
    return file_names


def read_pickle_from_file(filename):
    # with open(filename, 'rb') as f:
    with tf.io.gfile.GFile(filename, "rb") as f:
        if sys.version_info >= (3, 0):
            data_dict = pickle.load(f, encoding="bytes")
        else:
            data_dict = pickle.load(f)
    return data_dict


def convert_to_tfrecord(input_files, output_file):
    """Converts a file to TFRecords."""
    print("Generating %s" % output_file)
    with tf.io.TFRecordWriter(output_file) as record_writer:
        for input_file in input_files:
            data_dict = read_pickle_from_file(input_file)
            data = data_dict[b"data"]
            labels = data_dict[b"labels"]

            num_entries_in_batch = len(labels)
            for i in range(num_entries_in_batch):
                example = tf.train.Example(
                    features=tf.train.Features(
                        feature={
                            "image": _bytes_feature(data[i].tobytes()),
                            "label": _int64_feature(labels[i]),
                        }
                    )
                )
                record_writer.write(example.SerializeToString())


def install_dependencies():
    from subprocess import call

    call(["pip", "install", "--upgrade", "pip"])
    call(["pip", "install", "tensorflow_datasets==4.1.0"])


def main(data_dir):
    print("Download from {} and extract.".format(CIFAR_DOWNLOAD_URL))

    extract_dir = download_and_extract(data_dir)
    file_names = _get_file_names()
    input_dir = os.path.join(extract_dir, CIFAR_LOCAL_FOLDER)

    for mode, files in file_names.items():
        input_files = [os.path.join(input_dir, f) for f in files]
        output_file = os.path.join(data_dir + "/" + mode, mode + ".tfrecords")
        if not os.path.exists(data_dir + "/" + mode):
            os.makedirs(data_dir + "/" + mode)
        try:
            os.remove(output_file)
        except OSError:
            pass
        # Convert to tf.train.Example and write the to TFRecords.
        convert_to_tfrecord(input_files, output_file)
    print("Done!")
    import shutil

    shutil.rmtree(data_dir + "/tmp")


if __name__ == "__main__":
    """

    parser = argparse.ArgumentParser()

    parser.add_argument("--data-dir",
                        type=str,
                        default="./data",
                        help="Directory to download and extract CIFAR-10 to.",)

    args = parser.parse_args()
    """
    data_dir = "./data-cifar-10"

    install_dependencies()
    #"""
    main(data_dir)
    
    #"""    