<h1 style="color:green"> Distributed Training with Tensorflow (Keras) </h1>

### Course : DA212 -  MLOps 
Author/TA            : [Thivin Anandh]( https://www.linkedin.com/in/thivinanandh )<br />
Course Instructor : [Prof. Sashikumaar Ganesan](http://cds.iisc.ac.in/faculty/sashi/) 


<h2 style="color:green"> Lecture  Contents </h2>

---
- Need for Distributed Training
- Data vs Model Parallelism
- Distribution Stratergies in TensorFlow
    - Synchronous vs Asynchronous Algorithm
    - Allreduce Implementation 
- Mirrored Stratergy
- sample Codes for Distributed training with tensorflow
- Other popular distributed training frameworks for ML
    - Apache MXNet
    - horovod 


<h2 style="color:green"> Need for Distributed Training </h2>

---
- Large models with billions of parameters and billions of data points <br />
    Eg: AI Language Model PaLM - Google 540 Billion Parameter
- High computing power commonly occurs in a distributed enviroinment <br />
    Eg : Frontier - 8 Million Cores
- It can Significantly reduce training time of your model
- As model scale increases, we will start getting better performance 
<br />
<img src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgLXCWMlipdu0gFF6hsiJHbxg1zSaEkdDWfl-8RakQuW__8RPvlOS9KGIScNCytxT4jz9isnx0GLMwbS1G0Q4WdXzT42GszgfwIIAVX1H3J-43lVWWqcb--q9cPsxCsJFFz2dRfpKgEmLe-xfIyBqQuPq1BPYcK9CtAK1_xnhgvgAAx0GeZmODJxGNMYQ/s1600/image8.gif" width="800px" height="800px"></img>

[Image Credits](https://ai.googleblog.com/2022/04/pathways-language-model-palm-scaling-to.html)

<h2 style="color:green"> Types of Parallelism </h2>

---

### 1. Data Parallelism
- The input Data is aplit across multiple avaialble devices
- So all the devices will perform same operation on different set of data
<img src="/content/parallel_1.png"></img>



<h2 style="color:green"> Types of Parallelism </h2>

---
### 2. Model Parallelism
- The training model is split across multiple devices.
- It will be used when the entire model cannot fit into a single device
<img src="parallel_2.png"></img>

<h2 style="color:green"> Data Parallelism</h2>

---

- Used when the training data in huge compared to the complexity of the model
- Data is split across devices ( in Batches ) 
- The Overhead in this kind of parallelism will be due to the 
    - Communication overhead for keeping devices in sync
    - Transfering the updated gradient information to all the devices
- It has two main types of updates
    - Synchronous
    - Asynchronous
    

<h2 style="color:green">Doesn't Tensorflow perform Data Parallelisation Automatically ?</h2>

---


<h3><center>No..!</center></h3>

- Tensor flow only uses one GPU at any given time automatically
- we need to provide distribution stratergies to make use of all GPU's in our system

Lets take a look at what happens if we run the code in a system with two GPU's with no distribution strategy 

<h2 style="color:green">MNIST Images Prediction ( Without Distribution )</h2>

---

#### Import tensorflow and check for GPU support

In [None]:
import warnings
warnings.filterwarnings("ignore")
import tensorflow as tf
import time
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))
print("TensorFlow version:", tf.__version__)

Num GPUs Available:  2
TensorFlow version: 2.8.0


#### Load dataset from MNIST and Train the model

In [None]:
mnist = tf.keras.datasets.mnist
BATCH_SIZE = 32
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(input_shape=(28, 28)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(10)
])
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(optimizer='adam',
              loss=loss_fn,
              metrics=['accuracy'])
print(f"Steps per Epoch: {int(x_train.shape[0]/BATCH_SIZE)}")

Steps per Epoch: 1875


#### Train the model

In [None]:
start = time.process_time()
model.fit(x_train,y_train, epochs=5,batch_size=BATCH_SIZE)
model.evaluate(x_test,y_test, verbose=2)
print(f"TimeTaken : {time.process_time() - start} seconds")

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
313/313 - 0s - loss: 0.0746 - accuracy: 0.9778 - 288ms/epoch - 921us/step
TimeTaken : 15.414834740999993 seconds


<center><h4> Checking the GPU usage of the system with NVIDIA-smi</center></h4>
<center><img src="parallel_3_gpu.png"></img></center>

<h4><center>Only one GPU was used during the training process </center></h4>

<h2 style="color:green">Data Parallelisation with Tensorflow </h2>

---

1. we need to provide a stratergy for tensorflow to use multiple GPU's
    - In this example we will use the "Mirrored Stratergy"
2. Then we need to set the batch size accordingly for multiple GPU's 
3. Then tensorflow will automatically distribute the batches based on the distribution statergy


### Define a Distribution Strategy
we will use mirroredStrategy (synchronous distributed training on multiple GPUs on one server)

In [None]:
strategy = tf.distribute.MirroredStrategy()

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')


we can also include only certain GPU's to be used by passing in the device ID as parameters as shown below

In [None]:
strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')


### Define Batch Size
- The Batchsize that we give will be divided among all the GPU's
    - For Eg: If we give the batch size as 64 , then each GPU will get 32 images per step
    - Here 32 is defined as `per replica batch size`
- The replica is a copy of the model running on one slice of input data
- Since we have two GPU's we need to scale the batch size according to the number of GPUs we are going to use
- The num GPU's can be obtained by `strategy.num_replicas_in_sync`

In [None]:
GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync 

#### Compile the Model with Distribution Strategy

In [None]:
y_train_cat = tf.keras.utils.to_categorical(y_train,num_classes=10,dtype='float32')
y_test_cat = tf.keras.utils.to_categorical(y_test,num_classes=10,dtype='float32')

## Needed for the Distribution in Data Parallelism
train = tf.data.Dataset.from_tensor_slices((x_train,y_train_cat))
test = tf.data.Dataset.from_tensor_slices((x_test,y_test_cat))

trainData = train.batch(64).prefetch(tf.data.AUTOTUNE)
testData = test.batch(64)

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
trainData = trainData.with_options(options)
testData = testData.with_options(options)


with strategy.scope():
    model_st = tf.keras.models.Sequential([
              tf.keras.layers.Flatten(input_shape=(28, 28)),
              tf.keras.layers.Dense(128, activation='relu'),
              tf.keras.layers.Dropout(0.2),
              tf.keras.layers.Dense(10)
            ])
    model_st.compile(optimizer='adam',
                  loss="categorical_crossentropy",
                  metrics=['accuracy'])

#### Run the Model

In [None]:
start = time.process_time()
model_st.fit(trainData, epochs=5)
model_st.evaluate(testData, verbose=2)
print(f"TimeTaken : {time.process_time() - start} seconds")

Epoch 1/5
INFO:tensorflow:batch_all_reduce: 4 all-reduces with algorithm = nccl, num_packs = 1
INFO:tensorflow:batch_all_reduce: 4 all-reduces with algorithm = nccl, num_packs = 1
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
157/157 - 2s - loss: 8.8166 - accuracy: 0.2202 - 2s/epoch - 10ms/step
TimeTaken : 43.73143478200001 seconds


<center><h4> Checking the GPU usage of the system with NVIDIA-smi</center></h4>
<center><img src="parallel_4_gpu.png"></img></center>

<h4><center>Both the GPU's were occupied during the training process </center></h4>

### Save the model

In [None]:
tf.saved_model.save(model_st,"new")

INFO:tensorflow:Assets written to: new/assets


#### Load the model

In [None]:
with strategy.scope():
    loaded_model = tf.saved_model.load("new")

<h2 style="color:green">MultiWorkerMirroredStrategy</h2>

---

- is very similar to MirroredStrategy
- It implements synchronous distributed training across multiple workers, each with potentially multiple GPUs
- it creates copies of all variables in the model on each device across all workers.

<br />

`strategy = tf.distribute.MultiWorkerMirroredStrategy()`

[More Information](https://www.tensorflow.org/api_docs/python/tf/distribute/MultiWorkerMirroredStrategy)


<h2 style="color:green">Synchronous Gradient Update </h2>

---
<center><img src="parallel_5_sync.png"></img></center>

- All Devices will be synced before proceeding to the next step in the Training
- The Gradients from all the devices have to be updated to proceed to the next step <br />
[image Credits](https://www.youtube.com/watch?v=S1tN9a4Proc&t=597s)

<h2 style="color:green">Asynchronous Gradient Update </h2>

---
<center><img src="parallel_19.png"></img></center>

- The Update and the parameters are stored on a global server called **parameter server**
- Rather than every device updating the values to themselves, They all communicate their value to the global parameter server <br />
[image Credits](https://www.youtube.com/watch?v=S1tN9a4Proc&t=597s)

## Advantages
- No Sync overhead
- Faster than sync ( in terms of communication and waiting time )

## Disadvantages
- Some device may get an non updated gradient which may lead to increased iterations for better convergence

We need to deceide the tradeoff between these two methods based on the nature of our algorithm

<h2 style="color:green"> Gradient Reduction - Updating the Gradients </h2>

---
<img src="parallel_6_syncnew.png" width="700px"></img>
 

<center><h3>How are the gradients updated ? </h3></center>

<center><h3> All-Reduce </h3></center>

<center><img src="parallel_7_allred.png" width="700px"></img></center>

<h2 style="color:green"> A Brief History of Parallel Programming Paradigms </h2>

---


## Reduce
<center><img src="parallel_8_reduce.png" width="550px"></img></center>

## Reduce
<center><img src="parallel_9_red2.png" width="550px"></img></center>

## All Reduce
<center><img src="parallel_10_allred.png" width="550px"></img></center>

## Scatter 

<center><img src="parallel_11_scat.png" width="550px"></img></center>

## Gather
<center><img src="parallel_11_gath.png" width="550px"></img></center>

## All Gather 
<center><img src="parallel_12_allgather.png" width="550px"></img></center>

[image Credits MPI](https://mpitutorial.com/tutorials/)

<h2 style="color:green"> Ring All reduce </h2>

---

It has two Main steps 
- Reduce Scatter
- All gather

[image Credits](https://www.youtube.com/watch?v=S1tN9a4Proc&t=597s)

### Reduce Scatter
Each processor sends a value for partial reduction to the next processor
<center><img src="parallel_14_1.png" width="500px"></img></center>

Each processor then sends the partially reduced value to the next processor
<center><img src="parallel_15_2.png" width="500px"></img></center>

at end of reduce scatter, each processor will have one fully completed reduction
<center><img src="parallel_16_3.png" width="500px"></img></center>

### All Gather 
Then each processor will send their fully completed values to next processor in ring like fassion to get the final image
<center><img src="parallel_17_4.png" width="500px"></img></center>

After this, the gradients are averaged accordingly to compute the further epochs in each device

<h2 style="color:green"> Reduction Strategies in Tensorflow </h2>

---
- The reduction is performed via All-Reduce Algorithm
- There are various Implementations of the all reduce algorithms
- For Mirrored stratergy the default will be the NCCLAllReduce (NVIDIA Collective Communication Library (NCCL)) 
- The other available reduction methods are  ( parameter = `tf.distribute.CrossDeviceOps`
    - `tf.distribute.HierarchicalCopyAllReduce`
    - `tf.distribute.ReductionToOneDevice`
    

<h2 style="color:green"> Model Parallelism </h2>

---
<center><img src="parallel_17.png" width="700px"></img></center>

[image Credits](https://www.youtube.com/watch?v=cdNLtMszdLs&t=383s)

<h2 style="color:green"> Model Parallelism - Pipeline Parallel </h2>

---
<center><img src="parallel_18.png" width="700px"></img></center>

- The layers in model are distributed to various devices
- All the commuinication related to forwardpass and backpropogation will be automatically taken care by the TF library

[image Credits](https://www.youtube.com/watch?v=cdNLtMszdLs&t=383s)

<h2 style="color:green"> Model Parallelism - Tensor Parallel </h2>

---
<center><img src="parallel_19_tens.png" width="700px"></img></center>

- A fraction of a single data will be trained on each device
- For Eg : The operations on a very large image ( in 100's of MB where the High resolution is mandatory) such as convolution and other stuffs will be performed on a single device. 

[image Credits](https://www.youtube.com/watch?v=cdNLtMszdLs&t=383s)

- For Eg : The operations on a very large image ( in 100's of MB where the High resolution is mandatory) such as convolution and other stuffs will be performed on a single device.

<center><img src="parallel_20.png" width="700px"></img></center>


<h2 style="color:green"> Other Distributed ML Frameworks </h2>

---
- Apache MXNet
- Horovod



In [None]:
CPU -- 3 GHZ
-- 8 - 16   << 16 
GPU - 700 Mhz
-- 2000 -- 20


1000 

100 - Per batch 

NSteps per epoch = 10 
    - 1
    - 2
    
BackPropogation