# Fine-tune LLMs using multi-host SageMaker distributed Training with Trainium and HF Optimum Neuron

**SageMaker Studio Kernel**: Python 3 (PyTorch 1.13 Python 3.9 CPU Optimized)  
**Instance**: ml.t3.medium

In this sample we'll use HF Optimum Neuron to fine-tune LLama2 or Mistral using Trainium/SageMaker Distributed training. For that we need to make sure we correctly configure the code, hyperparameters, envvars and other things.

**MAKE SURE**
 1) To split your dataset correctly and use at minimum **FastFile mode** to minimize the IO/Memory Overhead;
 2) To use a memory efficient mechanism, like Pyarrow. Load chunks of the dataset into memory to avoid OOM issues;
 3) To correctly set the envvar **MALLOC_ARENA_MAX** to avoid OOM issues;
 6) **NEVER** use **ShardedByS3Key** to distribute the samples of your dataset.

### Other Tips

**1) If you see the following WARN: 2024-Feb-28 11:37:42.0289 452:647 [0] include/socket.h:541 CCOM WARN Timeout waiting for RX (waited 120 sec) - retrying**, check: https://awsdocs-neuron.readthedocs-hosted.com/en/latest/frameworks/torch/torch-neuronx/training-troubleshooting.html#nccl-warning-nccl-warn-timeout-waiting-for-rx-waited-120-sec-retrying

**2) "file not found" issue when compilig cache files**, check https://awsdocs-neuron.readthedocs-hosted.com/en/latest/frameworks/torch/torch-neuronx/training-troubleshooting.html?highlight=CACHE_DIR#compilation-errors-when-placing-neuroncache-home-directory-on-nfs-efs-fsx-mounted-drive

**3)  ERROR  TDRV:v2_cc_execute                           [nec_dev 22, gid 22] MPMD detected but reload is not supported yet:** if each node receives different datasets

## 01) Install requirements

In [None]:
%pip install pyarrow datasets

## 02) Download, split and upload the dataset to S3

In [None]:
import sagemaker

sess = sagemaker.Session()
bucket = sess.default_bucket()

In [None]:
import os
import io
import pyarrow as pa
from datasets import load_dataset

batch_size = 32
max_rows_per_file = batch_size * 32
dataset = load_dataset("wikitext", "wikitext-2-raw-v1")
schema = pa.schema([('text', pa.string())])

for name in ['train', 'test']:
    os.makedirs(f"dataset/{name}", exist_ok=True)
    num_rows = len(dataset.data[name])
    print(f"Num unfiltered rows for {name}: {num_rows}")

    rows = []
    sink = None
    file_id = 0
    writer = None
    num_good_rows_per_file = 0

    for row_id, row in enumerate(dataset[name]):
        text = row['text'].strip()
        eof = row_id == num_rows-1
        
        if writer is None:
            # start a new writer + file if required
            sink = io.BytesIO()
            writer = pa.ipc.new_file(sink, schema)
            num_good_rows_per_file = 0
        
        if len(text) > 0:
            # ignore empty rows, but continue the flow anyway
            rows.append(text)
            num_good_rows_per_file += 1
        
        if eof or len(rows) == batch_size:
            # Ok. We have enough rows for the batch or we reached the end of rows
            batch = pa.record_batch([rows], schema)
            writer.write(batch)
            rows = []
        
        if eof or num_good_rows_per_file == max_rows_per_file:
            # write a new file with the batches we collected so far
            filename = f"{file_id:010d}.arrow"
            writer.close()

            print(f"Trying to write file {filename}, rows: {num_good_rows_per_file}")
            sink.seek(0)
            s3_uri = sess.upload_string_as_file_body(sink.read(), bucket=bucket, key=f'datasets/wikitest/{name}/{filename}')
            print(s3_uri)

            sink.close()

            writer = None
            sink = None
            file_id += 1

## 02) Training script + requirements.txt

In [None]:
import os
os.makedirs("src", exist_ok=True)

In [None]:
%%writefile src/train.py
import os
import glob
import socket
import argparse
import transformers
import pyarrow as pa

from huggingface_hub import login
from torch.utils.data import Dataset, DataLoader
from optimum.neuron import NeuronTrainer as Trainer
from transformers import AutoModelForCausalLM, AutoTokenizer
from optimum.neuron.distributed import lazy_load_for_parallelism
from optimum.neuron import NeuronTrainingArguments as TrainingArguments

if __name__=='__main__':
    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script.
    parser.add_argument("--epochs", type=int, default=1)
    parser.add_argument("--max_sen_len", type=int, default=512)
    parser.add_argument("--train_batch_size", type=int, default=4)    
    parser.add_argument("--eval_batch_size", type=int, default=4)
    parser.add_argument("--tp_size", type=int, default=8)
    parser.add_argument("--pp_size", type=int, default=1)
        
    parser.add_argument("--model_id", type=str, required=True)
    parser.add_argument("--zero_1", type=bool, default=True)
    parser.add_argument("--task", type=str, default="")
    parser.add_argument("--collator_class", type=str, default="DefaultDataCollator")
    parser.add_argument("--learning_rate", type=float, default=5e-5)
    parser.add_argument("--weight_decay", type=float, default=0.01)
    parser.add_argument("--bf16", type=bool, default=True)

    # Data, model, and output directories
    parser.add_argument("--output_data_dir", type=str, default=os.environ.get("SM_OUTPUT_DATA_DIR", "output"))
    parser.add_argument("--model_dir", type=str, default=os.environ.get("SM_MODEL_DIR", "model"))
    
    parser.add_argument("--training_dir", type=str, default=os.environ.get("SM_CHANNEL_TRAIN", None))
    parser.add_argument("--eval_dir", type=str, default=os.environ.get("SM_CHANNEL_EVAL", None))
    
    parser.add_argument("--hf_token", type=str, default=None)
    
    args, _ = parser.parse_known_args()    

    if not args.hf_token is None and len(args.hf_token) > 0:
        print("HF token defined. Logging in...")
        login(token=args.hf_token)

    # Custom Data Loader that minimizes Memory Utilization while
    # stream multiple big files from S3. It expects the files to be 
    # prepared the way you see in the initial sections of the original Notebook.
    class ArrowStreamDataset(Dataset):
        '''Dataset that streams batches instead of loading the whole file into memory'''
        def __init__(self, file_dir, batch_size=32, max_rows_per_file=32 * 32):
            self.batch_size = batch_size
            self.max_rows_per_file = max_rows_per_file

            data_files = sorted(glob.glob(os.path.join(file_dir, "*.arrow")))
            source = [pa.memory_map(f, 'rb') for f in data_files]
            self.data = [pa.ipc.open_file(s) for s in source]
            self.num_batches = [d.num_record_batches for d in self.data]

            self.rows_last_batch = [d.get_batch(self.num_batches[i]-1).num_rows for i,d in enumerate(self.data)]
            self.num_rows = 0
            for b,r in zip(self.num_batches, self.rows_last_batch):
                self.num_rows += (b*batch_size) - (batch_size-r)

        def __len__(self):
            return self.num_rows

        def __getitem__(self, idx):        
            file_id = idx // self.max_rows_per_file
            i_id = idx % self.max_rows_per_file

            batch_id = i_id // self.batch_size
            row_id = i_id % self.batch_size

            batch = self.data[file_id].get_batch(batch_id)
            item = batch.take([row_id]).to_pydict()['text'][0]
            return item

    # load the tokenizer
    tokenizer = AutoTokenizer.from_pretrained(args.model_id, token=args.hf_token)
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.model_max_length = args.max_sen_len

    # Custom collator to add labels in the input sample
    def data_collator(examples):
        global tokenizer
        inputs = tokenizer(examples, truncation=True, padding='max_length', return_tensors='pt')
        inputs['labels'] = inputs.input_ids
        return inputs

    # Instantiate the datasets
    train_dataset=ArrowStreamDataset(args.training_dir)
    eval_dataset=ArrowStreamDataset(args.eval_dir)
        
    # Specify the `tensor_parallel_size` in the training arguments.
    training_args = TrainingArguments(
        zero_1=args.zero_1,
        bf16=args.bf16,
        tensor_parallel_size=args.tp_size,        
        pipeline_parallel_size=args.pp_size,
    
        evaluation_strategy="epoch",
        learning_rate=args.learning_rate,
        weight_decay=args.weight_decay,
        
        num_train_epochs=args.epochs,
        output_dir=args.output_data_dir,
        overwrite_output_dir=True,        
    
        per_device_train_batch_size=args.train_batch_size,
        per_device_eval_batch_size=args.eval_batch_size,
        
        gradient_accumulation_steps=1,
        eval_accumulation_steps=1,
        
        logging_dir=f"{args.output_data_dir}/logs",
        logging_strategy="steps",
        logging_steps=500,
        save_steps=1000,
        save_strategy="steps",
        save_total_limit=1,
        hub_token=args.hf_token
    )
    # load the model using the lazy paralellizer
    with lazy_load_for_parallelism(tensor_parallel_size=args.tp_size, pipeline_parallel_size=args.pp_size):
        model = AutoModelForCausalLM.from_pretrained(args.model_id, token=args.hf_token, low_cpu_mem_usage=True)        
    
    trainer = Trainer(
        model,
        tokenizer=tokenizer,
        args=training_args,
        data_collator=data_collator,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
    )
    trainer.train()
    trainer.save_model(args.model_dir)
    tokenizer.save_pretrained(args.model_dir)

In [None]:
%%writefile src/requirements.txt
--extra-index-url https://pip.repos.neuron.amazonaws.com
optimum-neuron==0.0.21

In [None]:
import os

print(sagemaker.__version__)
if not sagemaker.__version__ >= "2.146.0": print("You need to upgrade or restart the kernel if you already upgraded")

role = sagemaker.get_execution_role()
bucket = sess.default_bucket()
region = sess.boto_region_name

## ATTENTION: Copy your HF Access token to the following variable
HF_TOKEN=""

if HF_TOKEN == "": print(" >>> Go to your HF account and get an access token. Set HF_TOKEN to your token if you want to define your own cache repo")
os.makedirs("src", exist_ok=True)

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {bucket}")
print(f"sagemaker session region: {region}")

In [None]:
import json
import logging
from sagemaker.pytorch import PyTorch

tp_degree=8
batch_size=4
max_seq_len=512

# Mistral consumes more host memory, that's why it is 70 vs 80 of llama2
## Mistral and Llama2 are gated models, that's why we're using a fine-tuned Mistral here
## To use gated models you need to provide a valid HF_TOKEN
ARENA_MAX,model_id=70,"yam-peleg/Experiment31-7B"

# ATTENTION: To use llama2 you need to pass HF_TOKEN of an account
# with permission to download Llama2 weights, otherwise the training will fail
#ARENA_MAX,model_id=128,"meta-llama/Llama-2-7b-chat-hf"

# the default cache repo points to a public / read-only cache
# You can point it to your own repo, but make sure you properly defined the HF token in the HF_TOKEN (above)
CUSTOM_CACHE_REPO="aws-neuron/optimum-neuron-cache"

instance_type='ml.trn1.32xlarge'

hyperparameters={
    "epochs": 1,
    "zero_1": True,
    "bf16": True,
    "max_seq_len": max_seq_len,
    "tp_size": tp_degree,
    "pp_size": 1,
    "eval_batch_size": batch_size,
    "train_batch_size": batch_size,
    "model_id": model_id
}
if not HF_TOKEN is None and len(HF_TOKEN) > 3:
    hyperparameters["hf_token"]= HF_TOKEN

print(f"Instance type: {instance_type}\nHyperparameters: {hyperparameters}")
estimator = PyTorch(
    entry_point="train.py", # Specify your train script
    source_dir="src",
    role=role,
    sagemaker_session=sess,    
    instance_count=2,
    instance_type=instance_type,
    output_path=f"s3://{bucket}/output",
    disable_profiler=True,
    input_mode='FastFile',
    disable_output_compression=True,
    
    image_uri=f"763104351884.dkr.ecr.{region}.amazonaws.com/pytorch-training-neuronx:1.13.1-neuronx-py310-sdk2.18.1-ubuntu20.04",
    
    volume_size = 512,
    distribution={
        "torch_distributed": {
            "enabled": True
        }
    },
    environment={
        # Uncomment the following line to precompile the cache files
        #"RUN_NEURON_PARALLEL_COMPILE": "1",
        "OMP_NUM_THREADS": "1",
        "FI_EFA_FORK_SAFE": "1",
        "FI_EFA_USE_DEVICE_RDMA": "1",
        "FI_PROVIDER": "efa",
        "XLA_DOWNCAST_BF16": "1",
        "NEURON_FUSE_SOFTMAX": "1",
        "NEURON_RT_ASYNC_EXEC_MAX_INFLIGHT_REQUESTS": "5",
        
        "NEURON_RT_STOCHASTIC_ROUNDING_EN": "1",
        "CUSTOM_CACHE_REPO": CUSTOM_CACHE_REPO,
        "MALLOC_ARENA_MAX": str(ARENA_MAX), # required to avoid OOM
        "NEURON_CC_FLAGS": "--model-type=transformer --distribution-strategy=llm-training --enable-saturate-infinity"
    },
    hyperparameters=hyperparameters
)
estimator.framework_version = '1.13.1' # workround when using image_uri

In [None]:
from sagemaker.inputs import TrainingInput

estimator.fit({
    'train': TrainingInput(
        f"s3://{bucket}/datasets/wikitest/train", distribution='FullyReplicated', compression='Gzip', input_mode='FastFile'
    ),
    'eval': TrainingInput(
        f"s3://{bucket}/datasets/wikitest/test", distribution='FullyReplicated', compression='Gzip', input_mode='FastFile'
    )    
})