# Distributed Data Parallel (DDP)

### 1. How DDP works

For DDP, the steps are:

  1. create several pipeline for training with their model and data
  2. do forward pass for each pipeline
  3. compute loss for each pipeline
  4. communicate between each pipeline the gradients
  5. update each model


### 2. Some Notions

There are some notions to understand DDP:

  * group: all pieplies in all gpus for a task
  * world_size: parallel size, normally, the number of the gpus
  * node: a machine / node, can have several gpus
  * rank(global rank): training pipeline number
  * local_rank: the training advancement within a node

eg.

    2 machines with 2 gpus each
    node = 2
    wold_size = 4
    each process use 2 gpus

                    Node 0                                  Node 1
     ++++++++++++++++++++++++++++++++++++++  +++++++++++++++++++++++++++++++++++++
     +   Train.py      +   Train.py      +   +   Train.py      +  Train.py       +
     + Global Rank 0   + Gloabal Rank 1  +   + Global Rank 2   + Global Rank 3   +
     + Local Rank 0    + Local Rank 1    +   + Local Rank 0    + Local Rank 1    +
     + ------  ------  + ------- ------- +   + ------- ------- + ------- ------- +
     + |     | |     | + |     | |     | +   + |     | |     | + |     | |     | +
     + | GPU | | GPU | + | GPU | | GPU | +   + | GPU | | GPU | + | GPU | | GPU | +
     + |  0  | |  1  | + |  2  | |  3  | +   + |  0  | |  1  | + |  2  | |  3  | +
     + |     | |     | + |     | |     | +   + |     | |     | + |     | |     | +
     + ------- ------- + ------- ------- +   + ------- ------- + ------- ------- +
     +++++++++++++++++++++++++++++++++++++   +++++++++++++++++++++++++++++++++++++


### 3. communication

The communication between each node and gpus allows the exchange of information and coordinate the training. There are 2 majors types of communication:

  * point to point: from one pipeline to another

  * broadcast: see the schema for understanding.

    - Scater: from one pipeline to all other pipeline in the group
    
                                    [t0, t1, t2, t3]                                
                                      ++++++++++                                        
                                      | Rank 0 |                                    
                                      ++++++++++                                    
                                          |                                      
                   --------------  ------   ------- ----------------                 
                  |                |               |                |              
             ++++++++++       ++++++++++       ++++++++++      ++++++++++  
             | Rank 0 |       | Rank 1 |       | Rank 2 |      | Rank 3 |
             ++++++++++       ++++++++++       ++++++++++      ++++++++++
              [t0,]             [t1,]             [t2,]           [t3,]                   


    - Gather: it is the reverse operation of scatter
    
              [t0,]             [t1,]             [t2,]           [t3,]   
             ++++++++++       ++++++++++       ++++++++++      ++++++++++  
             | Rank 0 |       | Rank 1 |       | Rank 2 |      | Rank 3 |
             ++++++++++       ++++++++++       ++++++++++      ++++++++++ 
                  |                |                |                |
                  ----------------  ------   ------- -----------------
                                           |
                                       ++++++++++
                                       | Rank 0 |
                                       ++++++++++
                                     [t0, t1, t2, t3]

    - Reduce: different from Gather, he collected data were merged into one data through an operation

              [t0,]             [t1,]             [t2,]           [t3,]   
             ++++++++++       ++++++++++       ++++++++++      ++++++++++  
             | Rank 0 |       | Rank 1 |       | Rank 2 |      | Rank 3 |
             ++++++++++       ++++++++++       ++++++++++      ++++++++++ 
                  |                |                |                |
                  ----------------  ------   ------- -----------------
                                           |
                                       ++++++++++
                                       | Rank 0 |
                                       ++++++++++
                                     [t0 + t1 + t2 + t3]

    - All Reduce: the reduced data were sent to all pipelines
  
                [t0,]             [t1,]             [t2,]           [t3,]   
             ++++++++++       ++++++++++       ++++++++++      ++++++++++  
             | Rank 0 |       | Rank 1 |       | Rank 2 |      | Rank 3 |
             ++++++++++       ++++++++++       ++++++++++      ++++++++++ 
                  |                |                |                |
                  ----------------  ---------------- -----------------
                  |                |                |                |
             ++++++++++       ++++++++++       ++++++++++      ++++++++++  
             | Rank 0 |       | Rank 1 |       | Rank 2 |      | Rank 3 |
             ++++++++++       ++++++++++       ++++++++++      ++++++++++ 
            [t0+t1+t2+t3]    [t0+t1+t2+t3]    [t0+t1+t2+t3]   [t0+t1+t2+t3]

    - Broadcast: send data from one pipeline to all other in the group

                                        [t0, ]                                
                                      ++++++++++                                        
                                      | Rank 0 |                                    
                                      ++++++++++                                    
                                          |                                      
                   --------------  ------   ------- ----------------                 
                  |                |               |                |              
             ++++++++++       ++++++++++       ++++++++++      ++++++++++  
             | Rank 0 |       | Rank 1 |       | Rank 2 |      | Rank 3 |
             ++++++++++       ++++++++++       ++++++++++      ++++++++++
              [t0,]             [t0,]             [t0,]           [t0,]   

    - All Gather:

               [t0,]             [t1,]            [t2,]           [t3,]   
             ++++++++++       ++++++++++       ++++++++++       ++++++++++  
             | Rank 0 |       | Rank 1 |       | Rank 2 |       | Rank 3 |
             ++++++++++       ++++++++++       ++++++++++       ++++++++++ 
                  |                |                |                |
                  ----------------  ---------------- -----------------
                  |                |                |                |
             ++++++++++       ++++++++++       ++++++++++       ++++++++++  
             | Rank 0 |       | Rank 1 |       | Rank 2 |       | Rank 3 |
             ++++++++++       ++++++++++       ++++++++++       ++++++++++ 
            [t0,t1,t2,t3]    [t0,t1,t2,t3]    [t0,t1,t2,t3]   [t0,t1,t2,t3]


### 4. Run

We can't run multi-gpu of pytorch in jupyter, so we should use a script and do :
```
torchrun --multi-gpu=2 ddp_train.py

## I. Using torch

We take the same bert classification model as example. 
We should do some modifications to use DDP. For this, see the modification section and follow the steps.

### code

In [2]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, DataCollatorWithPadding
from datasets import load_dataset
import evaluate

2024-06-17 09:51:57.476265: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-06-17 09:51:57.476319: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-06-17 09:51:57.479244: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-06-17 09:51:57.493016: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


### 1. Load dataset

In [3]:
ckp = "google-bert/bert-base-uncased"

In [4]:
# load data
data = load_dataset("davidberg/sentiment-reviews")
data

DatasetDict({
    train: Dataset({
        features: ['Unnamed: 0', 'review', 'polarity', 'division'],
        num_rows: 4084
    })
})

In [5]:
# dataset

from torch.utils.data import Dataset

class dataset(Dataset):

    label2id = {"positive": 0, "negative": 1}

    def __init__(self, _data):

        super().__init__()

        self.data = {"review":[], "division":[]}
        for i in range(len(_data["train"]["review"])):
            
            if _data["train"][i]["division"] in dataset.label2id.keys():
                self.data["review"].append(_data["train"][i]["review"])
                self.data["division"].append(_data["train"][i]["division"])
            # else:
            #     print(_data["train"][i]["review"], _data["train"][i]["division"])

    
    def __getitem__(self, index):

        return self.data["review"][index], dataset.label2id.get(self.data["division"][index])
    
    def __len__(self):

        return len(self.data["review"])

In [6]:
# construct dataset

ds = dataset(data)
print(len(ds))

3548


In [7]:
# split dataset

from torch.utils.data import random_split

trainset, validset = random_split(ds, lengths=[0.9, 0.1])

### 2. dataloader

In [8]:
# load tokenizer

tokenizer = AutoTokenizer.from_pretrained(ckp)
tokenizer

BertTokenizerFast(name_or_path='google-bert/bert-base-uncased', vocab_size=30522, model_max_length=512, is_fast=True, padding_side='right', truncation_side='right', special_tokens={'unk_token': '[UNK]', 'sep_token': '[SEP]', 'pad_token': '[PAD]', 'cls_token': '[CLS]', 'mask_token': '[MASK]'}, clean_up_tokenization_spaces=True),  added_tokens_decoder={
	0: AddedToken("[PAD]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	100: AddedToken("[UNK]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	101: AddedToken("[CLS]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	102: AddedToken("[SEP]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	103: AddedToken("[MASK]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
}

In [9]:
# function collate

import torch

def collate_fct(batch):

    texts, labels = [], []

    for item in batch:

        texts.append(item[0])
        labels.append(item[1])

    toks = tokenizer(texts, max_length=512, truncation=True, padding="max_length", return_tensors="pt", add_special_tokens=True)

    toks["labels"] = torch.tensor(labels)

    return toks

In [10]:
# dataloader

from torch.utils.data import DataLoader

trainloader = DataLoader(trainset, batch_size=2, shuffle=True, collate_fn=collate_fct)
validloader = DataLoader(validset, batch_size=32, shuffle=False, collate_fn=collate_fct)

### 3. Load model

In [11]:
# load model
import torch

model = AutoModelForSequenceClassification.from_pretrained(ckp)

if torch.cuda.is_available():
    model = model.cuda()

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at google-bert/bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


### 4. define optimizer

In [12]:
# define optimizer

from torch.optim import Adam

optimizer = Adam(model.parameters(), lr=2e-5)

### 5. define eval function

In [13]:
def evaluate():

    model.eval()

    acc_num = 0
    count = 0

    with torch.inference_mode():

        for batch in validloader:

            batch = {k:v.cuda() for k, v in batch.items()}

            output = model(**batch)

            pred = torch.argmax(output.logits, dim=-1)

            count += len(batch["labels"])

            acc_num += (batch["labels"].int() == pred.int()).sum()

    return acc_num /count

### 6. training loop

In [14]:
# train

def train(epoch=3, log_step=100):

    gStep = 0

    for e in range(epoch):

        model.train()

        for batch in trainloader:
            
            if torch.cuda.is_available():
                batch = {k:v.cuda() for k, v in batch.items()}

            optimizer.zero_grad()

            output = model(**batch)

            output.loss.backward()

            optimizer.step()

            if gStep % log_step  == 0:

                print(f"epoch {e+1} / {epoch}: global step: {gStep}, loss: {output.loss.mean().item()}")

            gStep += 1

        acc = evaluate()

        print(f"epoch: {e} : acc: {acc}")


### 7. train

In [None]:
train()

### Modifications

The step to do distributed training are:

 step1. export the above code (without distributed training) to a .py file. To do so, use the "Export" functionality of VS Code to export a python script format.

 step2. remove texts and codes not related to the context. Mostly the titles, the dafault prints.

 step3. following below to modify the code to adapt it to distributed training.

    A. before anything, import the distributed module and initialize the backend
    B. for the loaders, remove shuffle and add distributed sampler.
    C. send model to the main GPU (rank 0). This should be done for all data including model and batch data in the eval function and the training loop.

    Until here, main steps using DDP are done and trhe training can be run without errors. However, the reporting is still messing (eg. loss shown several times, accuracy not correct.)
    So some extra steps are needed to coordinate the training progress among the gpus.

    D. reduce the loss from all GPUs. By doing this, if we run the training, we can see that the loss from all GPUs report the same value.

    E. report once for all GPUs. Since all loss are the same now, we need only to report the value once. To do so, we report only for the rank 0.

    F. compute final accuracy. Different from loss which can be averaged, accuracy sould be recomputed using the sum of correct predictions over the total number. 

    G. Fix data leak. So far, when we train, we will find that the accuracy seems to overfit. This is due to the data leak when the dataset was randomly split while training. To fix this, we fix the split using a random seed.

    H. Do shuffle. In B when we add the distributed sample, we disabled the shuffle, this can also cause overfit. So, it is better to reactivate the shuffle.

 Step4. Launch the script using : 
 ```bash
 $ torchrun --nproc_per_node=2 ddp_train_torch.py
 ```


Notes: There are some formattings are done to render the script more lisible and the default prints in the notebook were removed in the script.

Remarks: There still is a problem about the data scatter when using distributed training. This is due to the fact that DDP will automatically pad data to balance the data on each GPU (eg. there are total of 49 data, DDP will pad the data so that each process will have 25 data). This can cause problem when computing accuracy in the eval function. However, this is not tacled here.

## II. Using Transformers

We don't have to do any modification to the code to use DPP. Follow the steps to run it.

### code

In [None]:
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from datasets import load_dataset
import evaluate

ckp = "google-bert/bert-base-uncased"

# load data
data = load_dataset("davidberg/sentiment-reviews")

split_data = data["train"].train_test_split(test_size=0.2)

# load tokenizer
tokenizer = AutoTokenizer.from_pretrained(ckp)

label2id = {"positive": 0, "negative": 1}

# process data

def process(samples):

    _data = {"review":[], "division":[]}
    for i in range(len(samples["review"])):
        if samples["division"][i] in label2id.keys():
            _data["review"].append(samples["review"][i])
            _data["division"].append(samples["division"][i])

    toks = tokenizer(_data["review"], max_length=128, truncation=True, padding="max_length", return_tensors="pt")

    toks["labels"] = [label2id.get(d) for d in _data["division"]]

    return toks

tokenized_data = split_data.map(process, batched=True, remove_columns=split_data["train"].column_names)

# load model

model = AutoModelForSequenceClassification.from_pretrained(ckp)

# metric
acc_fct = evaluate.load("accuracy")
f1_fct = evaluate.load("f1")

def metric(pred):

    preds, refs = pred

    preds = preds.argmax(axis=-1)

    acc = acc_fct(predictions=preds, references=refs)
    f1 = f1_fct(predictions=preds, references=refs)
    acc.update(f1)

    return acc


In [None]:
# training
from transformers import DataCollatorWithPadding, TrainingArguments, Trainer

args = TrainingArguments(
    output_dir="./checkpoints",
    per_device_train_batch_size=32,
    gradient_accumulation_steps=32,
    logging_steps=10,
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    weight_decay=0.01,
    metric_for_best_model="f1",
    load_best_model_at_end=True
)

trainer = Trainer(
    model=model, 
    args=args,
    train_dataset=tokenized_data["train"],
    eval_dataset=tokenized_data["test"],
    data_collator=DataCollatorWithPadding(tokenizer=tokenizer, padding=True),
    compute_metrics=metric
)


In [None]:
trainer.train()

### Modifications

We just follow the steps 1&2 for the torch case.

For step3, we don't need to modify anything for the training. However, in this case, we may see overfit due to data split which causes data leak.

So we can do:

    A. fix split. We split data using a random seed.

And finally to run, we do: 

```bash
$ torchrun --nproc_per_node=2 ddp_train_transformers.py
```