High Performance Computing & AI - 2024.10.05

파이선 MPI 병렬프로그래밍: MPI 기초 및 예제 실습
===================================================


### 한국과학기술정보연구원 강지훈

***


## MPI Parallelization

### 데이터 분할 (영역 분할)

* **각각의 코어 (또는 프로세스)는 데이터를 분할하여 소유** 
  
  - 각 계산 코어는 동기/비동기 통신을 이용하여 필요한 데이터를 주고 받음.
  - 분할된 데이터에서 계산을 수행하므로 계산은 자동적으로 병렬화됨.

    <img src = "figs/fig_1.jpg" height="200">


* **MPI 병렬화에 적합**
* **프로그래머가 모든 계산과 통신을 신경써야 함: 복잡하지만 유연** 
  - 데이터 분할, 작업 할당, 데이터 통신, 동기화 등
* **클러스터 시스템에 적합한 기본적인 병렬화 방식**

### MPI (Message Passing Interface)

<img src = "figs/fig_2.jpg" height="250">


- **프로세스(Process)와 프로세서(Processor)** 
  
  - MPI는 프로세스 기준으로 작업할당 
  - 프로세서 대 프로세스 = 일대일(1:1) 또는 일대다(1:n)
  

- **메시지 (= 데이터 + 봉투(Envelope))**

  - 어떤 프로세스가 보내는가 
  - 어디에 있는 데이터를 보내는가 
  - 어떤 데이터를 보내는가 얼마나 보내는가 
  - 어떤 프로세스가 받는가 
  - 어디에 저장할 것인가 
  - 얼마나 받을 준비를 해야 하는가

- **꼬리표(tag)** 
  
  - 메시지 매칭과 구분에 이용 
  - 순서대로 메시지 도착을 처리할 수 있음 


- **커뮤니케이터(Communicator)** 

  - 서로간에 통신이 허용되는 프로세스들의 집합 
  - 기본적으로 만들어지는 커뮤니케이터: MPI.COMM_WORLD


- **랭크(Rank)** 

  - 동일한 커뮤니케이터 내의 프로세스들을 식별하기 위한 식별자
  - **0부터 시작**


- **점대점 통신(Point to Point Communication)** 
  
  - 두 개 프로세스 사이의 통신 
  - 하나의 송신 프로세스에 하나의 수신 프로세스가 대응


- **집합통신(Collective Communication)** 

  - 동시에 여러 개의 프로세스가 통신에 참여 
  - 일대다, 다대일, 다대다 대응 가능 
  - 여러 번의 점대점 통신 사용을 하나의 집합통신으로 대체 
    - 오류의 가능성이 적다. 
    - 최적화 되어 일반적으로 빠르다.

### MPI4py - introduction

- "MPI for Python (mpi4py)" 는 MPI 표준 라이브러리에 대한 파이선 바인딩 인터페이스를 제공
  
- 파이선 프로그램들이 워크스테이션, 클러스터, 슈퍼컴퓨터 등 대규모 시스템에서 병렬 프로세서를 이용할 수 있도록 지원

- MPI 표준을 따라 구현되며 MPI-2 C++ 바인딩과 유사한 객체지향 인터페이스를 제공

- 파이선 피클 객채를 이용한 손쉬운 통신 가능 

  - 점대점 통신 (send & receive)
  - 집합 통신 (broadcast, scatter & gather, reductions)

- 파이선 버퍼 인터페이스를 갖는 객체 (Numpy 처럼 bytes, string, array 객체)를 이용한 빠른 통신 가능

  - 점대점 통신 (blocking/nonbloking/persistent send & receive)
  - 집합 통신 (broadcast, block/vector scatter & gather, reductions)

- 프로세스 그룹과 커뮤니케이터 지원

  - Intra/inter 커뮤니케이터의 생성
  - 카테시안 & 그래프 토폴로지

- 병렬 파일 IO

  - 읽기 & 쓰기
  - blocking/non-blocking & collective/noncollective
  - individual/shared file pointers & explicit offset


- 동적 프로세스 관리

  - spawn & spawn multiple
  - accept/connect
  - name publishing & lookup


- 일방향 통신 (One-sided operations)

  - remote memory access (put, get, accumulate)
  - passive target syncronization (start/complete & post/wait)
  - active target syncronization (lock & unlock)

### MPI4py - Installation

- **필수요소**
  - Python 3.6 or above
  - MPI 라이브러리 (MPICH or Open MPI built with shared/dynamic libraries)

- MPI 라이브러리
  - Linux
    - mpich (https://www.mpich.org/)
    - mvapich (https://mvapich.cse.ohio-state.edu/)
    - openmpi (https://www.open-mpi.org/)
  - Window 
    - msmpi (https://learn.microsoft.com/ko-kr/message-passing-interface/microsoft-mpi)



- **Miniconda 설치**
  - Miniconda 공식 홈페이지: [link](https://docs.anaconda.com/free/miniconda/) 
  
  
- **`mpi` 이름으로 `conda`를 이용해서 설치**
  ```shell
  conda create -n mpi python=3.11 numpy mpi4py matplotlib
  ```

- **`conda` 활성화**
  ```shell
  conda activate mpi 
  ```

- **필요한 라이브러리 설치**
  ```shell
  conda install numpy
  ```

- **작업 실행**
  ```shell
  mpiexec -np 3 python [FILE_NAME].py
  ```

- **`conda` 비활성화**
  ```shell
  conda deactivate
  ```

### MPI4py - 초기화 / 실행

- **라이브러리 import**

  - from mpi4py import MPI 

- **MPI 초기화와 마무리할 필요 없음**

  - Importing mpi4py already triggers MPI_INIT()
  - MPI_Finalize() is called when all python processes exit 

- **필요한 변수 초기화하기**

  - `comm = MPI.COMM_WORLD`
  - `myrank = comm.Get_rank()`
  - `nproc = comm.Get_size()`

- **병렬실행 (np: number of processes)**

  - `mpirun -np 2 python mycode.py`

### MPI4py - mpirun / mpiexec

- **MPI distributions normally come with an implementation-specific execution utility.** 

  - Executes program multiple times (SPMD parallel programming) 
  - Supports multiple nodes 
  - Integrates with batch queueing systems 
  - Some implementations use “mpiexec”
  

- **Examples**

  ```shell
    mpirun -n 4 python script.py # on a laptop 
    mpirun --host n01,n02,n03,n04 python script.py 
    mpirun --hostfile hosts.txt python script.py 
    mpirun python script.py # with batch queueing system
    mpiexec -n 4 python script.py # on a windows OS 

  ```




### MPI4py - Hello world

In [108]:
!mkdir examples

In [None]:
%%writefile ./examples/mpi1.py

from mpi4py import MPI

print("Hello World!")

In [None]:
!mpiexec -np 3 python ./examples/mpi1.py

### MPI4py - Using the Comm class to define communicator

- **size = MPI.COMM_WORLD.Get_size() : 프로세스는 몇 개인가?**
  - 커뮤니케이터의 크기 혹은 커뮤니케이터의 프로세스 수를 반환하는 함수 
    - MPI.COMM_WORLD.size로 직접 멤버변수에 접근 가능
    - COMM_WORLD : 기본 커뮤니케이터
  - `mpiexec –np n python MY_PROGRAM` 실행 
    - mpiexec은 MY_PROGRAM을 n개의 프로세스로 병렬 실행
    - 기본 커뮤니케이터인 MPI.COMM_WORLD를 생성
  - 프로세스 수를 argument로 받아서 MPI.COMM_WORLD 객체의 size란 변수 저장
- **rank = MPI.COMM_WORLD.Get_rank() : 나는 몇 번인가?**
  - 커뮤니케이터에서 할당된 각 프로세스의 랭크를 반환하는 함수
    - MPI.COMM_WORLD.rank로 직접 멤버 변수에 접근 가능
  - MPI 프로세스는 커뮤니케이터 생성시 (0 ~ n-1) 사이의 랭크를 부여 받음
  - 랭크 값을 이용하여 프로세스별 실행 작업을 달리할 수 있음

In [None]:
%%writefile ./examples/mpi2.py

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

print("Hello World! from process {0} of {1} \n".format(rank, size))

In [None]:
!mpiexec -np 3 python ./examples/mpi2.py

### MPI4py - 점대점통신 1 (Point to point communications)

- 가장 기본이 되는 통신 함수 (“send” and “recv”) $\leftarrow$ 소문자로 시작

  - `comm.send(obj, dest, tag=0)` 
  - `comm.recv(source=MPI.ANY SOURCE, tag=MPI.ANY TAG, status=None)` 
  
  - `“tag”` can be used as a filter 
  - `“dest”` must be a rank in communicator 
  - `“source”` can be a rank or MPI.ANY SOURCE (wild card) 
  - `“status”` used to retrieve information about recv’d message 
  - These are blocking operations
  - Basic Message Passing Process

  <img src = "figs/fig_3.jpg" height="250">

### MPI4py - MPI Data Types

- **C**
  
<img src = "figs/fig_4.jpg" height="300">

- **Fortran**
  
<img src = "figs/fig_5.jpg" height="200">


### MPI4py - 점대점통신 1 예제

In [None]:
%%writefile ./examples/p2p_1.py

from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
    msg = "Hello, world"
    comm.send(msg, dest=1)
elif rank == 1:
    s = comm.recv()
    print ("rank %d: %s" % (rank, s))

In [None]:
!mpiexec -np 2 python ./examples/p2p_1.py

### MPI4py - 점대점통신 2

- **데이터버퍼와 같은 객체를 이용한 통신 (“Send” and “Recv”) $\leftarrow$ 대문자로 시작**
  
  - **버퍼같은 객체 (Buffer-like object)s**
    
    - `Numpy arrays (class'numpy.ndarray’)`  
    - A list or tuple with 2 or 3 elements (or 4 elements for the vector variants)
      - 2 elements: [data, MPI.DOUBLE], data (a Numpy array)
      - 3 elements: [data, n, MPI.DOUBLE]: data, n (buffer of the first n elements.)
  
  - `comm.Send(buf, dest, tag=0)` 
  - `comm.Recv(buf, source=MPI.ANY SOURCE, tag=MPI.ANY TAG, status=None)` 
    
  - **장점**
    - 빠른 통신 (MPI C와 같은 수준의 통신 속도)
    
  - **단점**
    
    - 데이터형, 데이터주소, 데이터 범위 등 메모리 관련 파라미터들을 보다 명시적으로 표현해야 함
    - 수신측의 메모리가 미리 할당되어 있어야 하며, 송수신 데이터 버퍼의 크기에 오류가 없어야 함 (송신버퍼 < 수신버퍼)
  

In [None]:
%%writefile ./examples/p2p_2.py

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# master process
if rank == 0:
    data = np.arange(4.)
    # master process sends data to worker processes by
    # going through the ranks of all worker processes
    for i in range(1, size):
        comm.Send(data, dest=i, tag=i)
        print('Process {} sent data:'.format(rank), data)

# worker processes
else:
    # initialize the receiving buffer
    data = np.zeros(4)
    # receive data from master process
    comm.Recv(data, source=0, tag=rank)
    print('Process {} received data:'.format(rank), data)

In [None]:
!mpiexec -np 2 python3 ./examples/p2p_2.py

### MPI4py - 점대점통신 3

- **동기 통신 (Synchronous communication, Blocking comm.)**
  
  - 메시지 수신이 완료될 때까지 통신이 종료되지 않음
  - 송수신 종료를 상호 확인후 종료

- **비동기 통신 (Asynchronous communication, Non-blocking comm.)**

  - 메시지 송신 또는 수신 함수 호출과 동시에 리턴
  - 이메일과 같음
  

  <img src = "figs/fig_6.jpg" height="200">

- Communiation mode
  
  <img src = "figs/fig_7.jpg" height="300">

#### 교착(deadlock) 피하기 (or hung)

- **Send & Recv : Blocking communication**
  
  - **`comm.Send`**
    - 송신 프로세스가 데이터를 모두 전송했음을 확인한 후 리턴되어 호출을 종료
    - 데이터는 수신 프로세스의 버퍼나 메모리에 전송이 완료되어야 함
    - 송신이 끝날 때까지 데이터가 변하지 않음
  
  - **`comm.Recv`**
    - 수신 프로세스의 데이터 수신이 완료된 후 리턴되어 호출을 종료
    - 수신이 끝날 때까지 데이터가 변하지 않음

   <img src = "figs/fig_8.jpg" height="200">


#### 교착 없음

```python
#!--Exchange messages
if mpirank == 0 :
	comm.Send(a, 1, tag1)
	comm.Recv(b, 1, tag2)
elif mpirank == 1 : 
	comm.Recv(a, 0, tag1)
	comm.Send(b, 0, tag2)
```

<img src = "figs/fig_9.jpg" height="200">


#### 무조건 교착

```python
# !--Exchange messages
if mpirank == 0 :
	comm.Recv(b, 1, tag2)
	comm.Send(a, 1, tag1)

elif mpirank == 1 : 
	comm.Recv(a, 0, tag1)
	comm.Send(b, 0, tag2)

```

<img src = "figs/fig_10.jpg" height="200">

#### 조건부 교착

```python
# Exchange messages
if mpirank == 0 :
    comm.Send(a, 1, tag1)
    comm.Recv(b, 1, tag2)

elif mpirank == 1 : 
    comm.Send(b, 0, tag2)
    comm.Recv(a, 0, tag1)

```

<img src = "figs/fig_10_1.png" height="200">

In [None]:
%%writefile ./examples/deadlock.py 

from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
buf_size = 100

a = np.ones(buf_size, dtype = int)
b = np.empty(buf_size, dtype = int)
if rank == 0 :
    comm.Send(a, dest = 1, tag = 11)
    comm.Recv(b, source = 1, tag = 55)
elif rank ==1 :
    comm.Send(a, dest = 0, tag = 55)
    comm.Recv(b, source = 0, tag = 11)

print("Everything okay")

In [None]:
!mpiexec -np 2 python3 ./examples/deadlock.py

#### Ensuring a Program is Safe (No deadlock) 

- **`comm.Send` 와 `comm.Ssend`는 기본적으로 동일하다고 생각**
  
  - `comm.Ssend` 는 항상 동기 통신을 수행하고 `comm.Send`는 버퍼 크기에 따라 비동기 통신처럼 작동하지만 동기 통신을 수행하는 경우도 있음
  - `comm.Send` 함수 또한 동기 통신을 수행한다고 가정하고 코드를 작성

- **Deadlock 피하기**
  - send / receive에 참여하는 프로세스의 호출 순서를 주의깊게 처리
  - `comm.Send` 는 동기 통신으로 간주하고 버퍼 통신이 없을 것이라고 간주
  - 비통기 통신을 사용
  - `comm.Sendrecv`를 사용


#### `comm.Sendrecv`

- **`Sendrecv(sendbuf, dest, sendtag=0, recvbuf=None, source=ANY_SOURCE, recvtag=ANY_TAG, status=None)`**

  - sendbuf (BufSpec) –
  - dest (int) –
  - sendtag (int) –
  - recvbuf (BufSpec) –
  - source (int) –
  - recvtag (int) –
  - status (Optional[Status]) –

#### sendrecv.py: using `comm.Sendrecv`

In [None]:
%%writefile ./examples/sendrecv.py 

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

a = np.zeros(size, dtype = int)
b = np.zeros(size, dtype = int)
a[rank] = rank + 1
inext = rank + 1
iprev = rank - 1

if rank == 0 :
    iprev = size - 1
if rank == size - 1 :
    inext = 0

for i in range(size) :
    if rank == i :
        print('BEFORE : myrank={0}, A = {1}'.format(rank, a))

comm.Sendrecv(a, inext, 77, b, iprev, 77)

for i in range(size) :
    if rank == i :
        print('AFTER  : myrank={0}, B = {1}'.format(rank, b))

In [None]:
!mpiexec -np 2 python3 ./examples/sendrecv.py

### MPI4py - 점대점통신 4

#### Non-blocking communications

- **비동기 통신의 2단계**
  - 비동기 송수신 함수 호출 
  - `comm.Isend` 또는 `comm.Irecv`
  - 통신 완료를 확인하는 함수를 호출
  - `comm.Wait` 또는 `comm.Waitall`

- **특징**
  - 함수 호출 직후 리턴되어 Deadlock을 무조건적으로 피할 수 있음
  - 메시지 송수신의 안전성을 보장할 수 없음 (Send & forget)
  - 송수신 함수와 통신 완료 확인 함수 사이에 메시지와 관계 없는 작업을 추가할 수 있음

- **`request = comm.Isend(… )`**

  - request: request handle


- **`request = comm.Irecv(…)`**

  - request: request handle


- `request.Wait(status=None)`


- `MPI.Request.Waitall(requests, statuses=None)`



#### isendrecv.py: Example of Non-blocking communications

In [None]:
%%writefile ./examples/isendrecv.py

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank() # myrank = comm.rank

data  = np.zeros(100, dtype = float)
value = np.zeros(100, dtype = float)
req_list = []

if rank == 0 :
    for i in range(100) :
        data[i] = i * 100
        req_send = comm.Isend(data[i:i+1], dest = 1, tag = i)
        req_list.append(req_send)
elif rank == 1 :
    for i in range(100) :
        req_recv = comm.Irecv(value[i:i+1] , source = 0, tag = i)
        req_list.append(req_recv)

MPI.Request.Waitall(req_list)

if rank == 0 :
    print("data[99] = {0}\n".format(data[99]))

if rank == 1 :
    print("value[99] = {0}\n".format(value[99]))

In [None]:
!mpiexec -np 2 python ./examples/isendrecv.py

**동기 통신과 비동기 통신의 차이**
  - `comm.Send`
    - 통신이 종료될 때까지 함수 호출 상태로 대기 및 통신 종료 확인 후 진행되어 통신 데이터는 안전하게 전달됨.
    - 버그 등에 의해 통신이 완료되지 않을 경우 프로그램 실행이 멈춘 상태로 있음 (deadlock)
  - `comm.Isend`
    - 함수를 호출하자마자 리턴되어 진행되며, 통신은 background 작업으로 실행됨
    - 데드락이 발생하지 않지만 통신이 어느 시점에 완료되는지 알 수 없으며 데이터가 덮어써질 위험이 있음.
    - 사용자가 반드시 comm.Wait함수의 호출로 통신이 완료될 때까지 대기하도록 해야함
  - `comm.Wait`
    - Blocking 함수로 해당 request를 호출한 통신 함수가 완전히 통신을 종료할 때까지 대기
    - 통신 데이터는 Wait 함수 호출 이후에 안전하게 사용할 수 있음 

**프로그래머가 동기화 시점을 결정해야 함**
  - Send/Recv에 관계없이 통신 데이터가 필요한 곳 이전에 `comm.Wait`를 이용하여 직접 동기화
  - `comm.Wait` 는 블로킹 함수이므로 프로그램의 흐름을 제어할 수 있음

**비동기 통신의 유용성**
  - 비동기 통신 함수와 Wait함수 사이에 통신 데이터와 관계 없는 작업을 실행시킬 수 있음
  - 통신과 계산을 중첩하여 통신을 background작업으로 처리함으로써 통신 시간을 없앨 수 있음 (통신 은폐)

```python
if rank == 0 :
    for i in range(100) :
        data[i] = i * 100
        req_send = comm.Isend(data[i:i+1], dest = 1, tag = i)
        req_list.append(req_send)
elif rank == 1 :
    for i in range(100) :
        req_recv = comm.Irecv(value[i:i+1] , source = 0, tag = i)
        req_list.append(req_recv)

#  We can put calculation**

MPI.Request.Waitall(req_list)
```



### 파이썬 general object의 통신

- **함수 첫글자가 소문자인 통신함수의 경우 일반적인 파이썬 객체를 주고받을 수 있음**
  - Comm.send, Comm.recv, Comm.sendrecv 등

In [None]:
%%writefile ./examples/general1.py


from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
a = rank
b = 0

inext = rank + 1
iprev = rank - 1

if rank == 0 :
    iprev = size - 1
if rank == size - 1 :
    inext = 0
for i in range(size) :
    if rank == i :
        print('BEFORE : myrank={0}, A = {1}'.format(rank, a))
b = comm.sendrecv(a, inext, 77, None, iprev, 77)
for i in range(size) :
    if rank == i :
        print('AFTER : myrank={0}, B = {1}'.format(rank, b))


In [None]:
!mpiexec -np 2 python ./examples/general1.py

In [None]:
%%writefile ./examples/general2.py


from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank() 

data = []
value = []
req_list = []
if rank == 0 :
    for i in range(100) :
        data.append(i * 100)
        req_send = comm.isend(data[i], dest = 1, tag = i)
        req_list.append(req_send)
elif rank == 1 :
    for i in range(100) :
        req_recv = comm.irecv( source = 0, tag = i)
        req_list.append(req_recv)
value = MPI.Request.waitall(req_list)

if rank == 0 :
    print("data[99] = {0}\n".format(data[99]))
if rank == 1 :
    print("value[99] = {0}\n".format(value[99]))


In [None]:
!mpiexec -np 2 python ./examples/general2.py

### 예제 1 - 몬테카를로를 이용한 PI 계산

  <img src = "figs/fig_10_2.png" >


In [None]:
#순차코드
import numpy as np

SCOPE = 1000000
count = 0

for i in range(SCOPE) :
    x = np.random.rand()
    y = np.random.rand()
    z = (x*x + y*y)**(0.5)
    if z < 1 :
       count += 1

print("Count = %d, Pi = %f"%(count,count/SCOPE*4))

In [None]:
%%writefile ./examples/pi1.py
#병렬 코드

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
SCOPE = 1000000
np.random.seed(rank)

mycount = 0
for i in range(int(SCOPE/size)) :
    x = np.random.rand()
    y = np.random.rand()
    z = (x*x + y*y)**(0.5)
    if z < 1 :
        mycount += 1

if rank == 0 :
    for i in range (1, size) :
        other_count = comm.recv(source=i, tag=10)
        mycount = mycount + other_count
else :
    comm.send(mycount, dest=0, tag=10)
if rank == 0 :
    print('Rank : %d, Count = %d, Pi = %f'%(rank,mycount,mycount/SCOPE*4))


In [None]:
!mpiexec -np 4 python ./examples/pi1.py

### MPI4py - 집합통신 (Collective communicator)

- 같은 커뮤니케이터 안에 있는 프로세스들이 한 번에 참여하는 통신
- 점대점 통신에 기반하지만 효율적인 알고리즘으로 수행할 수 있도록 API를 제공
- 점대점 통신에 비해 사용이 편하고 빠른 성능을 냄

- **특징**

  - 커뮤니케이터에 있는 모든 프로세스들이 다같이 호출해야 함
  - 불필요한 Envelop 정보가 있음 : 메시지 tag이 없음. 통신 형태에 따라 송신/수신 프로세스가 없을 수 있음
    

  <img src = "figs/fig_11.jpg" height="500">


#### `comm.bcast(sendobj, root=0)`
  <img src = "figs/fig_12.jpg" height="200">


In [None]:
%%writefile ./examples/bcast.py

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.rank

if rank == 0:
    data = {'a': 1, 'b': 2, 'c': 3}
else:
    data = None 
    
data = comm.bcast(data, root=0)
print('rank', rank, data)



In [None]:
!mpiexec -np 4 python3 ./examples/bcast.py

#### `comm.Bcast(sendobj, root=0)`
- Numpy array 등 buffer interface

In [None]:
%%writefile ./examples/bcast2.py

from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
size = comm.Get_size()

rank = comm.Get_rank()
ROOT = 0
buf = np.zeros(4, dtype = int)
buf2 = np.zeros(4, dtype = int)
if rank == ROOT :
    buf = np.array([5, 6, 7, 8])
if rank == (size - 1) :
    buf2 = np.array([50, 60, 70, 80])
print('Before : rank = {0}, buf = {1}'.format(rank, buf))
comm.Bcast(buf, ROOT)
comm.Bcast(buf2, size-1)

print('After : rank = {0}, buf = {1}'.format(rank, buf))
print('After : rank = {0}, buf2 = {1}'.format(rank, buf2))


In [None]:
!mpiexec -np 4 python3 ./examples/bcast2.py

#### `recvobj = comm.gather(sendobj, root=0)`
  <img src = "figs/fig_15.jpg" height="200">


In [None]:
%%writefile ./examples/gather1.py

import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

recvbuf = comm.gather(rank)
if rank == 0:
    print('Gathered array: {}'.format(recvbuf))

In [None]:
!mpiexec -np 4 python ./examples/gather1.py

#### `comm.Gather(sendbuf, recvbuf, root=0)`
  <img src = "figs/fig_16.jpg" height="200">

In [None]:
%%writefile ./examples/gather2.py

import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size ()

local_array = [rank] * 4
print("rank: {}, local_array: {}".format(rank, local_array))
sendbuf = np.array(local_array)

recvbuf = None
if rank == 0:
    recvbuf = np.empty(size*4, dtype=int)

comm.Gather(sendbuf=sendbuf, recvbuf=recvbuf)
if rank == 0:
    print("Gathered array: {}".format(recvbuf))

In [None]:
!mpiexec -np 4 python ./examples/gather2.py

#### `recvobj = comm.scatter(sendobj, root=0)`
  <img src = "figs/fig_18.jpg" height="200">

In [None]:
%%writefile ./examples/scatter.py

from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
    data = [(i+1)**2 for i in range(size)]
else:
    data = None

data = comm.scatter(data, root=0)
assert data == (rank+1)**2
print("rank: {}, scattered: {}".format(rank, data))

In [None]:
!mpiexec -np 4 python ./examples/scatter.py

#### `comm.Scatter(sendbuf, recvbuf, root=0)`

In [None]:
%%writefile ./examples/scatter2.py

from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
isend = np.zeros(size, dtype = int)
irecv = np.empty(1, dtype = int)
if rank == 0 :
    isend = np.arange(0, size, dtype = int)
print('sbuf : rank = {0}, irecv = {1}'.format(rank, isend))
comm.Scatter(isend, irecv, 0)
print('rbuf : rank = {0}, irecv = {1}'.format(rank, irecv))


In [None]:
!mpiexec -np 4 python ./examples/scatter2.py

#### `recvobj = comm.allgather(sendobj)`
  <img src = "figs/fig_20.jpg" height="200">

In [None]:
%%writefile ./examples/allgather.py

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
data = rank**2
data = comm.allgather(data)

print("On rank", rank,"data =", data)

In [None]:
!mpiexec -np 4 python ./examples/allgather.py

#### `comm.Allgather(sendobj, recvobj)`


In [None]:
%%writefile ./examples/allgather2.py

from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
isend = np.array([rank + 1])
irecv = np.zeros(size, dtype = int)
print('rank = {0}, isend = {1}'.format(rank, isend))
comm.Allgather(isend, irecv)
print('rank = {0}, irecv = {1}'.format(rank, irecv))


In [None]:
!mpiexec -np 4 python ./examples/allgather2.py

#### `recvobj = comm.alltoall(sendobj)`
  <img src = "figs/fig_21.jpg" height="200">

In [None]:
%%writefile ./examples/alltoall.py

from mpi4py import MPI
comm = MPI.COMM_WORLD

rank = comm.Get_rank()
data = range(comm.Get_size())
print("On rank", rank,"original data =", list(data))
data = comm.alltoall(data)

print("On rank", rank,"transposed data =", data)

In [None]:
!mpiexec -np 4 python ./examples/alltoall.py

#### `comm.Alltoall(sendbuf, recvbuf)`

In [None]:
%%writefile ./examples/alltoall2.py

from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

isend = np.arange(1 + size * rank, 1 + size * rank + size, dtype = int)
irecv = np.zeros(size, dtype = int)
print('Rank({0}) : isend = {1}'.format(rank, isend))

comm.Alltoall(isend, irecv)
print('Rank({0}) : irecv = {1}'.format(rank, irecv))


In [None]:
!mpiexec -np 4 python ./examples/alltoall2.py

#### `comm.reduce(sendobj, op=MPI.SUM, root=0)`
  <img src = "figs/fig_13.jpg" height="200">


In [None]:
%%writefile ./examples/reduce.py

from mpi4py import MPI
import numpy as np
import random

comm = MPI.COMM_WORLD
rank = comm.Get_rank ()
size = comm.Get_size ()

local = random.randint(2, 5)
print("rank: {}, local: {}".format(rank, local))

sum = comm.reduce(local, MPI.SUM, root=0)
if (rank==0):
    print ("sum: ", sum)

In [None]:
!mpiexec -np 4 python ./examples/reduce.py

### `comm.Reduce(sendbuf, recvbuf, op, root = 0)`

In [None]:
%%writefile ./examples/reduce2.py

from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
a = np.zeros(3, dtype = int)
ista = rank * 3
iend = ista + 3
a= np.arange(ista + 1, iend + 1)
sum = a.sum()

print('Rank({0}) : local_sum = {1}'.format(rank, sum))
tsum = np.zeros_like(sum)
comm.Reduce(sum, tsum, MPI.SUM, 0)

if rank == 0 :
    print('Rank({0}) : sum = {1}'.format(rank, tsum))


In [None]:
!mpiexec -np 4 python ./examples/reduce2.py

#### `recvobj = comm.allreduce(sendobj)`
  <img src = "figs/fig_19.jpg" height="200">

In [None]:
%%writefile ./examples/allreduce.py

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
data = rank**2
data = comm.allreduce(data)

print("On rank", rank,"data =", data)

In [None]:
!mpiexec -np 3 python ./examples/allreduce.py

### `comm.Allreduce(sendbuf, recvbuf, op)`

In [None]:
%%writefile ./examples/allreduce2.py

from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
size = comm.Get_size()

rank = comm.Get_rank()
a = np.zeros(3, dtype = int)
ista = rank * 3
iend = ista + 3
a = np.arange(ista + 1, iend + 1)

sum = a.sum()
tsum = np.zeros_like(sum)

comm.Allreduce(sum, tsum, MPI.SUM)

if rank == 1 :
    print('Rank({0}) : sum = {1}'.format(rank, tsum))


In [None]:
!mpiexec -np 3 python ./examples/allreduce2.py

#### `comm.scan(sendobj, op=MPI.SUM, root=0)`
  <img src = "figs/fig_14.jpg" height="200">


In [None]:
%%writefile ./examples/scan.py

from mpi4py import MPI
import numpy as np
import random

comm = MPI.COMM_WORLD
rank = comm.Get_rank ()
size = comm.Get_size ()

local = random.randint(2, 5)
print("rank: {}, local: {}".format(rank, local))

scan = comm.scan(local, MPI.SUM)
print ("rank:", rank, "sum: ", scan)

In [None]:
!mpiexec -np 4 python ./examples/scan.py

### `comm.Scan(sendbuf, recvbuf, op)`

In [None]:
%%writefile ./examples/scan2.py

from mpi4py import MPI 
import numpy as np 
import random
comm = MPI.COMM_WORLD
 
rank = comm.Get_rank () 
size = comm.Get_size ()

local = random.randint(2, 5)

print("rank: {}, local: {}".format(rank, local))

scan = comm.scan(local, MPI.SUM) 

print ("rank:", rank, "sum: ", scan)


In [None]:
!mpiexec -np 4 python ./examples/scan2.py

#### `comm.Reduce_scatter(sendbuf, recvbuf, rcnts, op)`
  <img src = "figs/fig_16_2.png" height="200">

In [None]:
%%writefile ./examples/rdc_sct.py

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD

size = comm.Get_size()
rank = comm.Get_rank()

sendbuf = np.array([1, 2, 3], dtype = int)
recvbuf = np.zeros(1, dtype = int)
RECVBUF = sendbuf * 2

comm.Reduce_scatter(sendbuf, recvbuf, None, MPI.SUM)
print('Rank({0}) : recvbuf = {1}'.format(rank, recvbuf))



In [None]:
!mpiexec -np 3 python ./examples/rdc_sct.py

### 집합 통신의 확장
- Variant
  - 프로세스별로 송수신 크기를 다르게 할 수 있음
  - Gatherv, Scatterv, Allgatherv, Alltoallv

- 송신버퍼 대치
  - 수신버퍼 자리에 MPI.IN_PLACE를 사용하면 송신버퍼가 수신데이터로 대치됨

- Maxloc
  - Reduce중 maximum의 위치를 반환



#### `comm.Gatherv(sendbuf, recvbuf=(recvbuf, sendcounts), root=0)`
  <img src = "figs/fig_17.jpg" height="200">

In [None]:
%%writefile ./examples/gatherv.py

import numpy as np
from mpi4py import MPI
import random

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
local_array = [rank] * random.randint(2, 5)
print("rank: {}, local_array: {}".format(rank, local_array))
sendbuf = np.array(local_array)
sendcounts = np.array(comm.gather(len(sendbuf), 0))
recvbuf = None
if rank == 0:
    print("sendcounts: {}, total: {}".format(sendcounts, sum(sendcounts)))
    recvbuf = np.empty(sum(sendcounts), dtype=int)
comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendcounts), root=0)
if rank == 0:
    print("Gathered array: {}".format(recvbuf))

In [None]:
!mpiexec -np 4 python ./examples/gatherv.py

  <img src = "figs/fig_22.jpg" height="200">

### MPI4py - Loop parallelization

- **Loop is frequently found in computer algorithm.** 

  <img src = "figs/fig_23.jpg" height="80">




- **Loop parallelizatioin is essential for most computational problem.**

  <img src = "figs/fig_24.jpg" height="80">


### MPI4py - Block distribution - para_range



  <img src = "figs/fig_25.jpg" height="80">

- **Suppose when you divide `n` by `p`, the quotient is `q` and the remainder is `r`.**
 
  - $n = p\times q + r$ 

- **Proccess `0, ..., r-1` are assigned `q+1` iterations each. The other processes are assigned `q` iterations.**

  - $n = r(q+1) + (p-r)q$ 


In [169]:
## para_range functions 

def para_range(n1, n2, size, rank):
    iwork = divmod((n2 - n1 + 1), size)
    ista = rank * iwork[0] + n1 + min(rank, iwork[1])
    iend = ista + iwork[0] - 1
    if iwork[1] > rank :
        iend = iend + 1

    return ista, iend

def para_range2(n1, n2, size, rank):
  N=(n2-n1+1)//size+((n2-n1+1)%size > rank)
  end=comm.scan(N)
  start=end-N

  return n1+start-1, n1+end-1

### MPI4py - example1 (`map.py`)

In [None]:
%%writefile ./examples/map.py

from mpi4py import MPI
import numpy as np
import math

comm = MPI.COMM_WORLD
rank = comm.Get_rank ()
size = comm.Get_size ()

x = range(20)
m = int(math.ceil(float(len(x)) / size))
x_chunk = x[rank*m:(rank+1)*m]
r_chunk = map(math.sqrt, x_chunk)
r = comm.reduce(list(r_chunk))
if (rank==0):
    print (r)

#serial => print(list(map(math.sqrt, x)))

In [None]:
!mpiexec -np 4 python ./examples/map.py

### MPI4py - example2 (`pi.py`)

  <img src = "figs/fig_26.jpg" height="350">


In [None]:
%%writefile ./examples/pi_serial.py

import math
from time import perf_counter
#starting value of x
x=-1
dx=0.0000001
start_time = perf_counter()
iters=int((1-(-1))/dx)
#the sum of all the areas - start at 0
A=0.
for i in range(iters):
    A=A+math.sqrt(1-x**2)*dx
    x=x+dx
tpi=2*A
error = abs(tpi - math.pi)
end_time = perf_counter()
print ("pi is approximately %.16f, "
    "error is %.16f" % (tpi, error))
print('Elapsed wall clock time = %g seconds.' % (end_time-start_time) )
print(' ')

In [None]:
!python3 ./examples/pi_serial.py

In [None]:
%%writefile ./examples/pi_mpi.py

from mpi4py import MPI
from time import perf_counter
import math
comm=MPI.COMM_WORLD
size=comm.Get_size()
rank=comm.Get_rank()
x=-1
dx=0.0000001
start_time = perf_counter()
iters=int((1-(-1))/dx)
N=iters//size+(iters % size > rank)
start=comm.scan(N)-N
A=0.
x=x+dx*start
for i in range(N):
    A=A+math.sqrt(1-x**2)*dx
    x=x+dx
##
A = comm.reduce(A, op=MPI.SUM, root=0) 
##

end_time = perf_counter()
if rank==0:
    tpi=2*A
    error = abs(tpi - math.pi)
    print ("pi is approximately %.16f, "
        "error is %.16f" % (tpi, error))
    print('Elapsed wall clock time = %g seconds.' % (end_time-start_time) )
    print(' ')

In [None]:
!mpiexec -np 4 python3 ./examples/pi_mpi.py


### MPI4py - example3: vector + vector (`vecadd.py`: serial)

In [None]:
%%writefile ./examples/vecadd.py

from mpi4py import MPI
import numpy as np 

N = 10000000

# initialize a
start_time = MPI.Wtime()
a = np.ones( N )
end_time = MPI.Wtime()
print("Initialize a time: " + str(end_time-start_time))

# initialize b
start_time = MPI.Wtime()
b = np.zeros( N )
for i in range( N ):
    b[i] = 1.0 + i
end_time = MPI.Wtime()
print("Initialize b time: " + str(end_time-start_time))

# add the two arrays
start_time = MPI.Wtime()
for i in range( N ):
    a[i] = a[i] + b[i]
end_time = MPI.Wtime()
print("Add arrays time: " + str(end_time-start_time))

# average the result
start_time = MPI.Wtime()
sum = 0.0

for i in range( N ):
    sum += a[i]
average = sum / N
end_time = MPI.Wtime()

print("Average result time: " + str(end_time-start_time))
print("Average: " + str(average))

In [None]:
!python3 ./examples/vecadd.py

### MPI4py - example3: (`vecadd.py`: 점대점통신 함수를 이용한 병렬화)

In [None]:
%%writefile ./examples/vecadd_mpi.py

from mpi4py import MPI
import numpy as np 

## fix me example3.1
## def para_range
def para_range(n1, n2, size, rank) :
    iwork = divmod((n2 - n1 + 1), size)
    ista = rank * iwork[0] + n1 + min(rank, iwork[1])
    iend = ista + iwork[0] - 1
    if iwork[1] > rank :
        iend = iend + 1
    return ista, iend

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
N = 10000000

## fix me example3.1
# mpi process(rank) 별로 workload 새로 구하기
(my_start, my_end) = para_range(0, N-1, size, rank)
workload = my_end - my_start + 1

# initialize a
start_time = MPI.Wtime()
a = np.ones(workload)
#FIX me example3.2 
end_time = MPI.Wtime()
if rank == 0:
    print("Initialize a time: " + str(end_time-start_time))

# initialize b
start_time = MPI.Wtime()
b = np.ones(workload) #FIX me example3.2 
for i in range(workload): #FIX me example3.3
    b[i] = 1.0 + i + my_start
end_time = MPI.Wtime()
if rank == 0:
    print("Initialize b time: " + str(end_time-start_time))

# add the two arrays
start_time = MPI.Wtime()
for i in range(workload): #FIX me example3.3
    a[i] = a[i] + b[i]
end_time = MPI.Wtime()
if rank == 0:
    print("Add arrays time: " + str(end_time-start_time))

# average the result
start_time = MPI.Wtime()
sum = 0.0

for i in range(workload): #FIX me example3.4
    sum += a[i]
#average = sum / N #FIX me example3.5
if rank == 0:
    world_sum = sum
    for i in range( 1, size ):
        sum_np = np.empty( 1 )
        comm.Recv( [sum_np, MPI.DOUBLE], source=i, tag=77 )
        world_sum += sum_np[0]
    average = world_sum / N
else:
    sum_np = np.array( [sum] )
    comm.Send( [sum_np, MPI.DOUBLE], dest=0, tag=77 )

end_time = MPI.Wtime()

if rank == 0:
    print("Average result time: " + str(end_time-start_time))
    print("Average: " + str(average))

- **example3.1: para_range를 이용해서 각 mpi process(rank) 별로 workload 새로 구하기** 

    ```python
        def para_range(n1, n2, size, rank) :
            iwork = divmod((n2 - n1 + 1), size)
            ista = rank * iwork[0] + n1 + min(rank, iwork[1])
            iend = ista + iwork[0] - 1
            if iwork[1] > rank :
                iend = iend + 1
            return ista, iend
    ```

    ```python
        # determine the workload of each rank
        (my_start, my_end) = para_range(0, N-1, world_size, my_rank)
        workload = my_end - my_start + 1
    ```

- **example3.2: a, b, vector 메모리 초기화 부분 수정**

    `np.ones(N)` $\rightarrow$ `np.one(workload)`

- **example3.3: b vector 초기화 / two vectors 더하기 계산 부분 각각 수정**

    `for i in range(N)` $\rightarrow$ `for i in range(workload)`

    `b[i] = 1.0 + i` $\rightarrow$ `b[i] = 1.0 + (i + my_start)`

- **example3.4: average the result 부분 수정**

    `for i in range(N)` $\rightarrow$ `for i in range(workload)`

- **example3.5: average 계산부분 수정**

    `average = sum / N` $\rightarrow$ 

    ```python
        if my_rank == 0:
            world_sum = sum
            for i in range( 1, world_size ):
                sum_np = np.empty( 1 )
                world_comm.Recv( [sum_np, MPI.DOUBLE], source=i, tag=77 )
                world_sum += sum_np[0]
            average = world_sum / N
        else:
            sum_np = np.array( [sum] )
            world_comm.Send( [sum_np, MPI.DOUBLE], dest=0, tag=77 )

    ```

In [None]:
!mpiexec -np 4 python3 ./examples/vecadd_mpi.py

### MPI4py - example3: (`vecadd.py`: 집합통신 함수를 이용한 병렬화)

In [None]:
%%writefile ./examples/vecadd_mpi2.py

from mpi4py import MPI
import numpy as np 

## fix me example3.1
## def para_range
def para_range(n1, n2, size, rank) :
    iwork = divmod((n2 - n1 + 1), size)
    ista = rank * iwork[0] + n1 + min(rank, iwork[1])
    iend = ista + iwork[0] - 1
    if iwork[1] > rank :
        iend = iend + 1
    return ista, iend

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
N = 10000000

## fix me example3.1
# mpi process(rank) 별로 workload 새로 구하기
(my_start, my_end) = para_range(0, N-1, size, rank)
workload = my_end - my_start + 1

# initialize a
start_time = MPI.Wtime()
a = np.ones(workload)
#FIX me example3.2 
end_time = MPI.Wtime()
if rank == 0:
    print("Initialize a time: " + str(end_time-start_time))

# initialize b
start_time = MPI.Wtime()
b = np.ones(workload) #FIX me example3.2 
for i in range(workload): #FIX me example3.3
    b[i] = 1.0 + i + my_start
end_time = MPI.Wtime()
if rank == 0:
    print("Initialize b time: " + str(end_time-start_time))

# add the two arrays
start_time = MPI.Wtime()
for i in range(workload): #FIX me example3.3
    a[i] = a[i] + b[i]
end_time = MPI.Wtime()
if rank == 0:
    print("Add arrays time: " + str(end_time-start_time))

# average the result
start_time = MPI.Wtime()
sum = 0.0

for i in range(workload): #FIX me example3.4
    sum += a[i]

world_sum = comm.reduce(sum, op = MPI.SUM, root = 0)

if rank == 0:
    average = world_sum / N

end_time = MPI.Wtime()

if rank == 0:
    print("Average result time: " + str(end_time-start_time))
    print("Average: " + str(average))


- **average 계산부분 수정**
     
    ```python
        if my_rank == 0:
            world_sum = sum
            for i in range( 1, world_size ):
                sum_np = np.empty( 1 )
                world_comm.Recv( [sum_np, MPI.DOUBLE], source=i, tag=77 )
                world_sum += sum_np[0]
            average = world_sum / N
        else:
            sum_np = np.array( [sum] )
            world_comm.Send( [sum_np, MPI.DOUBLE], dest=0, tag=77 )

    ```

    $\rightarrow$

    ```python
        world_sum = comm.reduce(sum, op = MPI.SUM, root = 0)
        if rank == 0:
            average = world_sum / N
    ```
    or

    ```python
        sum = np.array( [sum] )
        world_sum = np.zeros( 1 )
        world_comm.Reduce( [sum, MPI.DOUBLE], \
            [world_sum, MPI.DOUBLE], op = MPI.SUM, root = 0 )
        if rank == 0:
            average = world_sum / N
    
    ```    

In [None]:
!mpiexec -np 4 python3 ./examples/vecadd_mpi2.py