### Running on thousands of AWS GPUs with Amazon SageMaker Model Training

#### Overview: SageMaker Distributed Training Libraries:

1. SageMaker Data Parallel (SDP): Data Parallel = massive data. Here, each of the model replica is in each of the GPUs, when the data comes in, it is distributed into multiple GPUs. 

2. SageMaker Model Parallel (SMP): Model Parallel = massive model. These are about the models that have instances that are too big for the GPU instances. Here, there are some processes to follow:

    1. Automate model partitioning
    
    2. Interleaved pipelined training
    
    3. Managed SageMaker training
    
    4. Clean framework integration


#### How to use SMP to automate model partitioning?

1. Analyze the model: Here, when we start with MP, it will first analyze the model graph structure, the parameters, weighs.

2. Run graph partitioning algorithms: After the model is analyzed, the partitioning algorithm runs that divides the model.

3. Place Partition on devices: Places in a pipelined manner.


#### Interleave pipeline execution to stabilize GPU utilization:

1. What is pipelining: The model is placed and divided into different GPUs. When the data comes in, it is divided into micro batches (subset of the data batch), and it processes the micro batch sequentially, does the forward pass. The activations are calculated in the forward pass, and in the backward pass, the gradient is calculated. 

It figures out how these micro batches are figured out:

1. Simple: It calculates the forward pass, and once all of these are done, it starts with the backward pass. Might not be efficient - have to store all the calculations in a sequential manner.

2. Interleave: Prioritizes backward pass. This increases the GPU utilization.


We will focus on model parallel training on a large model: GPT-2/any other similar competent model.

## Training a FLAN-T5 with near-linear scaling

The FLAN-T5 model is a non-distributed transformer model, and this notebook and the accompanied scripts show how to set up FlashAttention.

These two features are also compatible with Tensor Parallelism.

This notebook is accompanied by the following files:

train.py: The entry point script that'll be passed to the SageMaker PyTorch estimator later in this notebook when launching the training job. This script is prepared to run an end-to-end training of the FLAN-T5 model with SMP, settings for sharded data parallelism applied, and implemented with code lines to save, load, and fine-tune the model. You can follow the comments throughout the script to learn where the SMP APIs and code modifications are implemented.


data_pipeline.py: This has data pipeline functions to prepare the training dataset.


learining_rate.py: This has functions for learning rate schedule.


requirements.txt: This installs the dependencies, including huggingface transformers.


memory_tracker.py: This has functions to track memory usage.


model_config.py: This has functions to get model configuration information.


sdp_utils.py: This has util functions for sharded data parallelism.


t5_flash_attn.py: This has util functions for implementation of FlashAttention.

#### Step 1: Initialize SageMaker Initialization

In [2]:
## Represents installing the sagemaker and examples libraries

%pip install --upgrade sagemaker
%pip install sagemaker-experiments

[33mDEPRECATION: pyodbc 4.0.0-unsupported has a non-standard version number. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of pyodbc or contact the author to suggest that they release a version with a conforming version number. Discussion can be found at https://github.com/pypa/pip/issues/12063[0m[33m
[0mNote: you may need to restart the kernel to use updated packages.
[33mDEPRECATION: pyodbc 4.0.0-unsupported has a non-standard version number. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of pyodbc or contact the author to suggest that they release a version with a conforming version number. Discussion can be found at https://github.com/pypa/pip/issues/12063[0m[33m
[0mNote: you may need to restart the kernel to use updated packages.


In [3]:
## Represents the number of seconds passed since epoch
import os

## Importing boto3 - provides a Python API for AWS infrastructure services
import boto3
import sagemaker

## Represents getting the execution role
from sagemaker import get_execution_role

## Represents importing PyTorch for training purposes
from sagemaker.pytorch import PyTorch

## Represents getting an execution role to the sagemaker client
role = (
    get_execution_role()
)

print(role, ': SageMaker role')

## Setting the client in boto3 environment
client = boto3.client("sts")
account = client.get_caller_identity()["Account"]

print(account, ': SageMaker account')

session = boto3.session.Session()


## Represents starting a session in boto3 to work with
sm_boto_client = boto3.client("sagemaker")
sagemaker_session = sagemaker.session.Session(boto_session = session)

## Represents getting a default s3 bucket to store information in and work with
default_bucket = sagemaker_session.default_bucket()
print()
print(default_bucket, ': sagemaker bucket for this session')

arn:aws:iam::988564344122:role/service-role/AmazonSageMaker-ExecutionRole-20230725T162819 : SageMaker role
988564344122 : SageMaker account

sagemaker-us-east-1-988564344122 : sagemaker bucket for this session


### Step 2: Downloading and Preparing the emotion data

##### Glue dataset: a collection of resources for training, evaluating, and analyzing natural language understanding systems

In [4]:
## Represents installing the Hugging Face Transformers and Datasets libraries
! pip install --upgrade pip
! pip install -q datasets transformers==4.21.0

[33mDEPRECATION: pyodbc 4.0.0-unsupported has a non-standard version number. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of pyodbc or contact the author to suggest that they release a version with a conforming version number. Discussion can be found at https://github.com/pypa/pip/issues/12063[0m[33m
[0m[33mDEPRECATION: pyodbc 4.0.0-unsupported has a non-standard version number. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of pyodbc or contact the author to suggest that they release a version with a conforming version number. Discussion can be found at https://github.com/pypa/pip/issues/12063[0m[33m
[0m

In [5]:
## Now that we have installed, we will download the datasets
import datasets
from datasets import load_dataset, load_from_disk, load_metric

In [6]:
## Represents the process of now, importing pytorch
from sagemaker.pytorch import PyTorch
import transformers
import logging

## NOW, WE WILL import the tranformers built in libraries:

from transformers import(
    AutoModelForCausalLM,
    AutoTokenizer,
)

from transformers.testing_utils import CaptureLogger

## Getting the logger
logger = logging.getLogger(__name__)

None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.


In [7]:
## Now we will load the dataset into the training, test and validation datasets

## Given below are the hyperparameters

hyperparameters = {
    "dataset_name": "emotion",
    "dataset_config_name": "split",
    "do_train": True, 
    "do_eval": True, 
    "cache_dir": "tmp",
}


## Now, we will load the data set into raw_formats
raw_generated_data = load_dataset(
    hyperparameters["dataset_name"],
    hyperparameters["dataset_config_name"],
)

Found cached dataset emotion (/root/.cache/huggingface/datasets/emotion/split/1.0.0/cca5efe2dfeb58c1d098e0f9eeb200e9927d889b5a03c67097275dfb5fe463bd)


  0%|          | 0/3 [00:00<?, ?it/s]

#### Once the raw data is generated, we will create the validation and training datasets to have fun training our model!

In [8]:
if "validation" not in raw_generated_data.keys():
    
    ## In this case, we will create our own validation dataset
    raw_generated_data["validation"] = load_dataset(
        ## Represents the dataset name
        hyperparameters["dataset_name"],
        hyperparameters["dataset_config_name"],
        ## Giving the rest of the 10 % of the data into training
        split = "train[:10%]",
        cache_dir = hyperparameters["cache_dir"],
    )
    
    ## Next step for us is to get the training data in!
        ## In this case, we will create our own validation dataset
    raw_generated_data["train"] = load_dataset(
        ## Represents the dataset name
        hyperparameters["dataset_name"],
        hyperparameters["dataset_config_name"],
        ## Giving the rest of the 10 % of the data into training
        split = "train[10%:]",
        cache_dir = hyperparameters["cache_dir"],
    )

### Tokenizers Loaded!

Now, we need to consider that in order for an NLP system to process informationm we need to have a tokenizer that converts the given data we have, into the format, or tokens, that can be processed by our Flan-T5 model using 'AutoTokenizer.from_pretrained().

In [9]:
tokenizer_kwargs = {
    "cache_dir": hyperparameters["cache_dir"],
}

## Now, we load and process this using

tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-xxl", **tokenizer_kwargs)

#### PRE PROCESSING OUR DATA

In [10]:
def tokenize_function(examples):
    tok_logger = transformers.utils.logging.get_logger("transformers.tokenization_utils_base")

    with CaptureLogger(tok_logger) as cl:
        output = tokenizer(examples[text_column_name])
        # clm input could be much much longer than block_size
        if "Token indices sequence length is longer than the" in cl.out:
            tok_logger.warning(
                "^^^^^^^^^^^^^^^^ Please ignore the warning above - this long input will be chunked into smaller bits before being passed to the model."
            )
    return output


# Main data processing function that will concatenate all texts from our dataset and generate chunks of block_size.
def group_texts(examples):
    # Concatenate all texts.
    concatenated_examples = {k: sum(examples[k], []) for k in examples.keys()}
    total_length = len(concatenated_examples[list(examples.keys())[0]])
    # We drop the small remainder, we could add padding if the model supported it instead of this drop, you can
    # customize this part to your needs.
    if total_length >= block_size:
        total_length = (total_length // block_size) * block_size
        # Split by chunks of max_len.
        result = {
            k: [t[i : i + block_size] for i in range(0, total_length, block_size)]
            for k, t in concatenated_examples.items()
        }
    result["labels"] = result["input_ids"].copy()
    return result

In [11]:
column_names = raw_generated_data["train"].column_names
text_column_name = "text" if "text" in column_names else column_names[0]

# since this will be pickled to avoid _LazyModule error in Hasher force logger loading before tokenize_function
tok_logger = transformers.utils.logging.get_logger("transformers.tokenization_utils_base")

tokenized_datasets = raw_generated_data.map(
    tokenize_function,
    batched=True,
    num_proc=1,
    remove_columns=column_names,
    desc="Running tokenizer on dataset",
)


block_size = tokenizer.model_max_length
if block_size > 1024:
    logger.warning(
        f"The tokenizer picked seems to have a very large `model_max_length` ({tokenizer.model_max_length}). "
        "Picking 1024 instead. You can change that default value by passing --block_size xxx."
    )
    block_size = 1024
else:
    if block_size > tokenizer.model_max_length:
        logger.warning(
            f"The block_size passed ({block_size}) is larger than the maximum length for the model"
            f"({tokenizer.model_max_length}). Using block_size={tokenizer.model_max_length}."
        )
    block_size = min(block_size, tokenizer.model_max_length)

lm_datasets = tokenized_datasets.map(
    group_texts,
    batched=True,
    #     num_proc=args.preprocessing_num_workers,
    desc=f"Grouping texts in chunks of {block_size}",
)

Loading cached processed dataset at /root/.cache/huggingface/datasets/emotion/split/1.0.0/cca5efe2dfeb58c1d098e0f9eeb200e9927d889b5a03c67097275dfb5fe463bd/cache-82a5d15fc6ed88c6.arrow
Loading cached processed dataset at /root/.cache/huggingface/datasets/emotion/split/1.0.0/cca5efe2dfeb58c1d098e0f9eeb200e9927d889b5a03c67097275dfb5fe463bd/cache-b22e9ce7e71ac947.arrow
Loading cached processed dataset at /root/.cache/huggingface/datasets/emotion/split/1.0.0/cca5efe2dfeb58c1d098e0f9eeb200e9927d889b5a03c67097275dfb5fe463bd/cache-8bfd270994d2500d.arrow
Loading cached processed dataset at /root/.cache/huggingface/datasets/emotion/split/1.0.0/cca5efe2dfeb58c1d098e0f9eeb200e9927d889b5a03c67097275dfb5fe463bd/cache-b91d6d5a2bbad197.arrow
Loading cached processed dataset at /root/.cache/huggingface/datasets/emotion/split/1.0.0/cca5efe2dfeb58c1d098e0f9eeb200e9927d889b5a03c67097275dfb5fe463bd/cache-90837055767b34f7.arrow
Loading cached processed dataset at /root/.cache/huggingface/datasets/emotion/sp

In [12]:
if hyperparameters["do_train"]:
    if "train" not in tokenized_datasets:
        raise ValueError("--do_train requires a train dataset")
    train_dataset = lm_datasets["train"]


if hyperparameters["do_eval"]:
    if "validation" not in tokenized_datasets:
        raise ValueError("--do_eval requires a validation dataset")
    eval_dataset = lm_datasets["validation"]
    
training_dataset_location = None
validation_dataset_location = None


if hyperparameters["do_train"]:
    train_dataset.to_json("./training.json")
    training_dataset_location = "s3://{}/dataset/train/".format(default_bucket)

if hyperparameters["do_eval"]:
    eval_dataset.to_json("./validation.json")
    validation_dataset_location = "s3://{}/dataset/validation/".format(default_bucket)

    
if training_dataset_location is not None:
    command = "aws s3 cp ./training.json {}".format(training_dataset_location)
    os.system(command)

if validation_dataset_location is not None:
    command = "aws s3 cp ./validation.json {}".format(validation_dataset_location)
    os.system(command)

if hyperparameters["do_train"]:
    command = "rm ./training.json"
    os.system(command)

if hyperparameters["do_eval"]:
    command = "rm ./validation.json"
    os.system(command)

    
%store training_dataset_location
%store validation_dataset_location

%store

Creating json from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Creating json from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Stored 'training_dataset_location' (str)
Stored 'validation_dataset_location' (str)
Stored variables and their in-db values:
training_dataset_location               -> 's3://sagemaker-us-east-1-988564344122/dataset/tra
validation_dataset_location             -> 's3://sagemaker-us-east-1-988564344122/dataset/val


## Now, get Amazon S3 in place

In [13]:
## Represents installing the locations of the training and validation datasets' locations
%store -r training_dataset_location
%store -r validation_dataset_location

In [14]:
## Represents creating two buckets:

## training bucket for s3
s3_train = training_dataset_location

## validation bucket for s3
s3_test = validation_dataset_location

## Represents the s3 bucket to store the output artifacts of the training job
s3_output_bucket = f"s3://sagemaker-{account}/smp-tensorparallel-outputdir/"