# Multiprocessing sử dụng concurrent.futures

### Lê Ngọc Khả Nhi

Trong bài thực hành ngắn hôm nay, Nhi sẽ hướng dẫn các bạn sử dụng module trong Python để thực hiện tính toán song song trên nhiều CPU (Multiprocessing) nhằm tăng tốc độ của các quy trình phân tích dữ liệu.

In [1]:
import time
import multiprocessing as mp
import numpy as np
import inspect

Giả sử ta có một hàm large_array_mean, với tính năng như sau: nhận vào 2 arguments là giá trị mu, sigma, hàm sẽ tạo một 2D array với kích thước 10000 x 10000 chứa tất cả giá trị ngẫu nhiên từ phân phối Gaussian(mu, sigma), sau đó tính trung bình của mỗi hàng trong array.

Như vậy, đây là một quy trình tính toán khá nặng, hiệu năng về thời gian của nó phụ thuộc vào CPU.

In [2]:
def large_array_mean(mu : float, sigma: float):
    array = np.random.normal(mu, sigma,(10000,10000))
    return array.mean(axis = 1)

Mỗi lượt thi hành của quy trình mất khoảng 2 giây:

In [3]:
start = time.perf_counter()

res = large_array_mean(5., 2.)
print(res)

end = time.perf_counter()

print(f'Thi hành mất {round(end - start, 2)} giây')

[5.02750666 4.98018274 5.0229275  ... 5.01515793 4.98121049 5.00093053]
Thi hành mất 2.53 giây


Giả sử bạn cần thi hành 50 lượt quy trình này, từ 2 list chứa 50 giá trị mu và sigma khác nhau, cách làm thông thường là sử dụng vòng lặp for hoặc hàm map. Tuy nhiên cả 2 cách này đều rất chậm, vì 50 tác vụ được thi hành tuần tự, phải chờ tác vụ trước kết thúc thì tác vụ sau mới được khởi động. Do đó, tổng thời gian thi hành sẽ lên đến 126 giây.

In [4]:
mu_list = list(np.random.uniform(5,100,50))

sigma_list = list(np.random.uniform(2,50,50))

## Sử dụng vòng lặp for

In [5]:
start = time.perf_counter()

results = []
for i,j in zip(mu_list, sigma_list):
    res = large_array_mean(i, j)
    results.append(res)
    
end = time.perf_counter()

print(f'Thi hành mất {round(end - start, 2)} giây')

Thi hành mất 126.52 giây


## Sử dụng hàm map

In [6]:
start = time.perf_counter()

results = [res for res in map(large_array_mean, mu_list, sigma_list)]

end = time.perf_counter()

print(f'Thi hành mất {round(end - start, 2)} giây')

Thi hành mất 111.72 giây


# Multiprocessing 

Ta có thể thực hiện tính toán song song 50 tác vụ trên nhiều CPU một cách rất đơn giản, nhờ vào method ProcessPoolExecutor của module concurrent.futures.

Lưu ý: Không thể dùng concurrent.futures trực tiếp trên jupyter notebook, do đó các bạn cần viết 1 module rời, thí dụ tên là parallel, đặt trong cùng thư mục hiện hành. Nội dung code của module này như sau:

In [39]:
import concurrent.futures as cf
import numpy as np

def large_array_mean(mu : float, sigma: float):
    array = np.random.normal(mu, sigma,(10000,10000))
    return array.mean(axis = 1)

def parallel_func(*args):
    with cf.ProcessPoolExecutor() as pool:
        results = pool.map(large_array_mean, *args)

    return results

Như vậy, ta dùng context manager để tạo 1 pool từ class ProcessPoolExecutor(), sau đó dùng method map của class này theo cùng nguyên tắc như hàm map, với target function là large_array_mean, theo sau là danh sách các arguments.

Ta import module vừa tạo ra vào notebook:

In [7]:
import parallel as para

Lúc này, ta có thể dùng hàm parallel_func trong module vừa tạo ra, cho 2 list mu và sigma, và đóng gói kết quả vào list results bằng list comprehension:

In [8]:
start = time.perf_counter()

results = [r for r in para.parallel_func(mu_list,sigma_list)]

end = time.perf_counter()

print(f'Thi hành mất {round(end - start, 2)} giây')

Thi hành mất 14.58 giây


Như ta thấy, khi áp dụng Multiprocessing, 50 tác vụ được chia ra thi hành song song trên nhiều CPU, do đó rút ngắn đáng kể thời gian xuống chỉ còn 14.5 giây, như vậy nhanh hơn gấp 8-9 lần so với cách làm thông thường.

Bài thực hành kết thúc ở đây, chúc các bạn thí nghiệm vui.