# <center> An introduction to parallel programming and parallel training<center>

Our computation resources have increased with multiple machines where each machine have multiple GPUs. Training a neural network on only 1 GPU does not make full use of our hardwares. In this note we will understand the mechanism of training on multiple gpus. The information across the internet is not complete, instead just copy paste the code to make things work, it is more important to understand what's happening behind the scene. In this way we can also get in touch with multiprocessing, which may also help to speedup scientific research.

We will begin with basic concepts of parallel programming, then talk about different set up to training neural networks on multiple gpus. The main topics are listed as follows:
- Threads and Processes.
- Problems might happened in parallel programming.
    - Race condition
    - Random number generators
    - overheads caused by number of workers
- Some examples of parallel programming.
- Parallel training
    - Data Parallel
    - Model Parallel

### 1. Threads and Processes

We will first get into parallel programming. What we want to achieve is to run multiple tasks at the same time, thus hopefully we can get a performance gain. This pictures shows the basic idea.

8 people buying ticket from one machine.
<p align="center">
<img src="resources/1/one_queue.png" alt="drawing" width="500" >
</p>

But if we have two machines, we can have four people queueing each machine.

<p align="center">
<img src="resources/1/two_queue.png" alt="drawing" width="500" >
</p>

Modern computers generally have multiple cores, each can run tasks individually. The basic idea of parallel programming is to distribute tasks to the cores, so that we can make full use of the computation power. This is actually an extremely complicated topic and many weird things might happen. The goal of this note is to provide basic concepts and terminologies, for a deeper understanding you may just search online or take a look at the book __**Modern Operating Systems**__.

You may have already noticed the two words **thread** and **process** somewhere, they are managed using  `threading` and `multiprocessing` module in Python. You may find many abstract definitions of them, but I prefer to explain them using a figure.

<p align="center">
<img src="resources/1/processes_thread.png" alt="drawing" width="1000" >
</p>

- Each block represents a process, each process have a unique PID (process id) assigned by the operating system. 
- Several processes can have a hierarchical structure. 
- Each process have exactly one parent process, may have zero or several child processes. 
- Several processes can be combined together to form a process group.
- Each process contains its own resources for example variables, fils, a program within a process can access another process's resource through Inter Process Communication (IPC).
- Each process contains one or more threads. You may just consider threads as some codes running or going to run.
- Threads in a process can access the resources within that process directly.

Next let's examine some of the above points through some demonstrations.

Let's run this [file](./demos/pid_gid_ppid.py), and then take a look at the system monitor. The program launches a Python process and have one thread inside it, can also check the corresponding ids.

This [file](demos/create_process.py) shows how we can start a new process programmatically. We have the main process which is the entry point of the program, inside the main process we start a new process p1 which runs function f.

Run this [file](demos/mp_resources.py) to see how resources are not shared between processes. After we start a new process from main process, all the variables are copied. Thus, all the changes made in the sub processes will not affect the original process.
<p align="center">
<img src="
resources/1/sub_process_resources.png" alt="drawing" width="200" >
</p>

This [file](demos/create_threads.py) shows how we can start a new thread. In the system monitor we can confirm that we have created only one process which have two threads. [This](demos/thread_resources.py) shows threads in a process can modify the resource directly.


Next let's talk about the difference between multi-process and multi-thread. First we need to know the fact that creating threads are much faster than creating processes. [This folder](demos/speed_test_mp_thread) contains two test file to create 5000 threads or processes. Create threads takes about 0.3s, but create processes takes about 13s, this is a huge difference. For the purpose of parallel programming we can always create processes instead threads since we know a process contains threads. However, considering the efficiency of creation and memory usage, we should know what exact is the difference between them and use the right one at the right time.

The short answer: For scientific computing, as far as I can think of, we should use multi-process.

Long answer: multi-thread are used for IO bound operation, multi-process are used for CPU bound operation. (At least in Python with GIL) What does this means?

IO bound operation: operations that the speed are limited by input output. For example waiting for user's input, waiting for http response, time.sleep function, waiting for the GPU to return results.

CPU bound operation: operations needs to be computed, math operations + - * / , matrix operations.

**multi-thread cannot speedup cpu bound operations.** This is important, which is the main difference between threads and processes.

Let's see the following example where we generate random numbers.

In [None]:
import random

def f(n):
    for _ in range(n):
        random.random()

NUM = 100_000_000

f(NUM)


Let's try create two threads and each doing half the job.

In [None]:
import random
import threading

def f(n):
    for _ in range(n):
        random.random()

NUM = 100_000_000


t1 = threading.Thread(target=f, args=(NUM //2, ))
t2 = threading.Thread(target=f, args=(NUM //2, ))

t1.start()
t2.start()

t1.join()
t2.join()

The time is same. Let's next try using multi-process.

In [None]:
import multiprocessing
import random
try:
    multiprocessing.set_start_method('fork')
except Exception:
    pass


def f(n):
    for _ in range(n):
        random.random()

NUM = 100_000_000

p1 = multiprocessing.Process(target=f, args=(NUM //2, ))
p2 = multiprocessing.Process(target=f, args=(NUM //2, ))

p1.start()
p2.start()

p1.join()
p2.join()

We see that in this case we use only half the time. So why would this happens? This is because the "powerful" feature of Python which is called Global Interpreter Lock (GIL). It only allows only one thread is running at any time. 

<p align="center">
<img src="
resources/1/mp_vs_thread.png" alt="drawing" width="700" >
</p>

As the above figure shows, at any time only one thread can run. Thus, using thread will **NOT** make computation faster. 

You may wondering if only one can run, how come it can help to speedup a program? The main reason is comes from the difference between IO operation and CPU operation, if the program goes into an IO operation, which the CPU will be idle the operating system just leave the thread there and begin to process other thread, and wait for the IO operation to finish to continue that thread. In other word, the IO operation are still going on even though the CPU is no working on that thread.

If we create 2 threads and each one sleep 1 second then the total running time is still 1 second.

In [None]:
import time
import threading

def f():
    time.sleep(1)

t1 = threading.Thread(target=f)
t2 = threading.Thread(target=f)

t1.start()
t2.start()

t1.join()
t2.join()

This figure shows how the above routine actually runs. When the thread begin to sleep the cpu is idle, where the thread is not actually "running". Thus, the sleep in two threads actually happens at the same time.
<p align="center">
<img src="
resources/1/thread_sleep.png" alt="drawing" width="700" >
</p>


### 2. Problems might happened in parallel programming.

#### 2.1 Race condition

This is a classic topic in parallel programming, which says that error may happen if a resource are accessed by more than one thread at the same time. Let's take a look at the following code where the final result of x should be 0, however, a random number is produced.
 

In [None]:
import threading
import time

x = 0
def increment():
    global x
    for _ in range(1000000):
        x= x + 1
 
def decrement():
    global x
    for _ in range(1000000):
        x= x - 1

t1 = threading.Thread(target=increment, args=())
t2 = threading.Thread(target=decrement, args=())

t1.start()
t2.start()
 
t1.join()
t2.join()
 
print(f'The final value of x is {x}')

The following machine code shows why this would happen. The operation x = x + 1 actually have several steps. 

In [None]:
import dis

def f(x):
    x=x+5

dis.dis(f)

1. store x into **0**
2. store 5 into **1**
3. do the addition of **0** and **1**
4. store results back into x

Thus while one thread have not finish these steps, another thread comes in, error will occurs. To avoid this from happening, we need locks, which can ensure operations can finish properly before another thread starts.... (A complicated topic, luckily for scientific computing we usually do not use this.)

#### 2.2 Random number generators

Let's first start two processes to generate 5 random numbers.

In [None]:
import multiprocessing
import time
import os
import numpy as np
try:
    multiprocessing.set_start_method('fork')
except Exception:
    pass


def f():
    print(np.random.random(5))
    print()

p1 = multiprocessing.Process(target=f)
p2 = multiprocessing.Process(target=f)

p1.start()
p2.start()

p1.join()
p2.join()

Each subprocess have same seed and the generation process is independent, thus, the results are same. This is a good feature, we will see later this feature ensures the correctness of neural network training.

We only need do be remember to set a seed when we actually want sub processes generating different random numbers, for example when we generate data using multiple processes. 

#### 2.3 overheads caused by number of workers

Overhead means the extra time caused by creating processes, joining processes, combining results from different processes. For example if we just want to generate two random numbers, we create two processes each generate 1. The total time will definitely longer then just use one process to generate 2. The extra time here are called overheads.

In [None]:
import random

def f(n):
    for _ in range(n):
        random.random()

NUM = 2

f(NUM)

In [None]:
import multiprocessing
import random
try:
    multiprocessing.set_start_method('fork')
except Exception:
    pass

def f(n):
    for _ in range(n):
        random.random()

NUM = 2

p1 = multiprocessing.Process(target=f, args=(NUM //2, ))
p2 = multiprocessing.Process(target=f, args=(NUM //2, ))

p1.start()
p2.start()

p1.join()
p2.join()

Overheads always exists in parallel programming, we can only benefit from parallel programming when the overhead is rather small compared to actual running time of the program. For example when we want to generate 100,000,000 random numbers, divide into several process will help to speedup.

Next question is how many processes should we divide?

Basically each CPU core can run one process, and we do **NOT** need to worry about distributing the processes to the cores, the scheduler of the operating system will handle this. Suppose we have 8 cores, then if we start eight process they will be distributed to different cores and begin to run, ideally we can get x8 speedup. However, we know in reality the real performance will be slower. See the figure below.

<p align="center">
<img src="
resources/2/num_of_cpu.png" alt="drawing" width="900" >
</p>

The yellow line is the $\displaystyle \frac 1  x$ line where x-axis is the number of processes divided. Before 8 process, the running time decreases, but still larger then the ideal time. However, dividing more than 9 processes results in a longer run time. This is because my computer running this task have only 8 cores, if divide into more than 8 process, there must be some process waiting for others complete, which results in longer running time. Let's have a check of the cores of this computer.

In [None]:
import psutil
print(f'We have {psutil.cpu_count(logical=True)} of logical cores')

print(f'We have {psutil.cpu_count(logical=False)} of physical cores')


What does it mean by physical cores and logical cores? The logical cores is related with some technical terms called hyper-threading, it allows each core can run two thread at same time (somehow, with a bunch of limit and drawbacks). Thus, we can say that for multiprocessing on scientific computing, we should consider the physical cores instead of logical cores (most of the case, hardware and os dependent).

### 3 Some examples of parallel programming.

Here let's take a look at some examples where parallel programming can be used

#### 1 matrix multiplication

Suppose we want to multiplicate two large matrix $A$ and $B$. We can divide it into subprocesses with smaller blocks. 
$$
\begin{align}
AB & = \begin{pmatrix}
A_{11} & A_{12} \\
A_{21} & A_{22}
\end{pmatrix}\begin{pmatrix}
B_{11} & B_{12} \\
B_{21} & B_{22}
\end{pmatrix}\\
& = \begin{pmatrix}
A_{11}B_{11}+A_{12}B_{21} & A_{11}B_{12}+A_{12}B_{22} \\
A_{21}B_{11}+A_{22}B_{21} & A_{21}B_{12}+A_{22}B_{22}
\end{pmatrix}
\end{align}
$$
In this way we we can distribute the computation to subprocess, and combine the results when all parts finish. 

(This is just an example, do not do this in real application, matrix multiplication are optimized to run fast.)

#### 2 generating experiment data

When we need to generate machine learning data by ourself, it usually will be a CPU bound task, and may benefit from making it parallel. For example the following exampel where we want to solve a bunch of odes. 

In [1]:
from scipy import integrate
import numpy as np

def f(t,x):
    return -x*t - x**2 + np.sin(x)

def run(n):
    res = []
    for _ in range(n):
        solution = integrate.RK45(f, t0=0, y0=np.random.random(10) , t_bound=10)
        t_values = []
        y_values = []
        while solution.status != 'finished':
            solution.step()
            t_values.append(solution.t)
            y_values.append(solution.y)

        data = tuple(zip(t_values, y_values))
        res.append(data)
    return res

Run 10k times on a single core takes around 32s.

In [2]:
run(10_000);

In [None]:
import multiprocessing
from multiprocessing import Manager
import random
try:
    multiprocessing.set_start_method('fork')
except Exception:
    pass

n = 10_000

cpus = 8

with multiprocessing.Pool(cpus) as pool:
    res = pool.map(run,[n//cpus]*cpus, chunksize=1)

res = sum(res, start=[]) # Combine the results.

Using multiprocessing pool to distribute to 8 cores use about 6s. Note that the results from the subprocesses are put in a list, so we need to write a reduce function to combine the results together and it is problem dependent. What we need here is just concatenate all the results together.

Wether multiprocessing can speedup the data generating process is problem and hardware dependent. It is hard to decide unless we run some test examples. If you do not see any improvement using multiprocessing, here are a few possible reasons. May need a small number of workers. The data generating process is not CPU bound, instead the time may be spend on writing data to the RAM. If this is the case, make the programme run in multiprocessing may even make the code slower.

### 4. Parallel training
#### 4.1 Data Parallel

Next let's consider how parallel programming can be applied to neural network training. We know that neural network training is very computation extensive so we want to fully utilize all the computation resources to speedup the training. Usually we can easily use one GPU to training a neural network, however using multiple gpus to train a neural network is not that straight forward. We need to first divide the task into subprocesses, and we need to combine the results when they are finished. Let's first consider these two high level concept which is the central part of data parallel.

Let's first investigate how we can divide a training process into several subprocesses. Generally we want to minimize the empirical loss which can be written as

$$
\begin{align}
\Phi_i(\theta) = L(f_\theta(x_i), y_i) , \qquad \Phi(\theta) = \frac 1 B \sum_{i=1}^B\Phi_i(\theta),
\end{align}
$$
where $L$ is the loss function, and $B$ is the batch size. We want to  minimize $\Phi(\theta)$ on bathes of data $X_B$ (data with size $B$). A general procedure would be first do a forward pass $f_\theta(x_i)$ on $X_B$ and we compute the loss function $\Phi_i(\theta)$ and then do a backward pass to compute the gradient $\displaystyle\nabla \Phi(\theta)= \frac 1 B \sum_{i=1}^B\nabla\Phi_i(\theta)$ of the loss. We then pass the computed gradient to the optimizer to update the weights.


<p align="center">
<img src="
resources/4/train.png" alt="drawing" width="200" >
</p>



The most simple way to divide the process would we just divide the data $X_B$ into smaller portions. Let's for example divide into 2 processes.

<p align="center">
<img src="
resources/4/train_parallel.png" alt="drawing" width="600" >
</p>

We divide the data $X_B$ into two equally sized subset $X_{B_1}$ and $X_{B_2}$, each one do there forward and backward pass on separate GPUs.
We need to wait both of them finish the backward pass, where the gradients are ready. Now they have different gradient, before we update the parameters we combine the gradients from the two processes, which is equal to the gradient on the whole batch $\nabla L(f_\theta(X_B))$. They now have the same gradients. The process have same seed, thus we update the parameters on the two GPUs separately and the updated $\theta^*$ should be same on the two gpus, and we do not need to sync the weights across all the GPUs which can be time-consuming.

This is the high level procedure of data parallel, there are many details when comes into real implementation so it is recommended to use packages which have done all the hard works for us.

You may have encountered with data parallel (DP) and distributed data parallel (DDP). The difference is that DP use multi thread to initiate the process on GPUs, DDP uses multi process to initiate process on GPUs.  DP is easy to implement, but can only use on one machine. DDP can combine multiple machines together, but the implementation is difficult.

#### 4.2 Model Parallel
What if we have a very large model that cannot fit into one GPU? Model parallel is a technique can help deal with this situation. For a sequential model we can put it onto separate devices.

<p align="center">
<img src="
resources/4/model_parallel.png" alt="drawing" width="400" >
</p>

This is rather easy compared to data parallel, we just need to move the corresponding layers and intermediate results to the target device. In this way we can fit a large model into several gpus and do the training, let's take a look at the following diagram. 
<p align="center">
<img src="
resources/4/model_parallel1.png" alt="drawing" width="700" >
</p>

The x-axis is the time, y-axis is device. Each coloured line represent the device is computing, the device is idle over empty spaces. Since the model is sequential we need to let GPU 1 compute first and then pass the result to GPU 2, while GPU 2 is computing, GPU 1 becomes idle, until the results are ready for it to do the backward pass. After GPU 1 finishes its backward pass, all the gradients are available we now do the update of parameters. The drawback of this method is that GPU 1 is idle while waiting for results from other devices, we can actually improve this process by using pipeline parallel.

<p align="center">
<img src="
resources/4/pipline_parallel.png" alt="drawing" width="700" >
</p>

The above two figure are in scale and we can see the following figure reduces the run time. The idea is simple, we again divide the data into small portions, in the figure we divide it into two portions. When $F_0$ finishes on GPU 1, GPU 2 can begin to do the forward pass on it while at the same time GPU 1 do forward pass for $F_1$. In this way we can reduce the idle time for the GPUs. And make training faster.