# Concurrency & Parallel Programming

## Concurrency (đồng thời)
**Concurrency**: Đồng thời là khi hai hoặc nhiều nhiệm vụ chạy có thể bắt đầu , chạy và hoàn thành trong khoảng thời gian chồng chéo. Hiểu đơn giản thì concurrency = Two queues and one coffee machine.(đa nhiệm trên bộ vi xử lý đơn lõi). 1 CPU thì chỉ có thể chạy một thread, tuy vậy chúng ta có thể xử lý đa luồng dựa vào cơ chế time-slicing để chuyển ngữ cảnh (rất nhanh, nên ta cảm giác nhiều thread đang chạy tại một thời điểm).

### Tại sao app cần xử lý đồng thời ?
- Để giữ cho UI luôn trong trạng thái được đáp ứng.
- Tăng tốc độ xử lý. Tận dụng tối đa vi xử lý có trên máy tính.

![](sync.png)
![](async.png)

### Race condition 
Race condition là một tình huống xảy ra khi nhiều threads cùng truy cập và cùng lúc muốn thay đổi dữ liệu (có thể là 1 biến, 1 vùng memory,...) Vì thuật toán chuyển đổi việc thực thi giữa các threas có thể xảy ra bất cứ lúc nào nên không thể biết được thứ tự của các threas tuy cập và thay đổi dữ liệu đó sẽ dẫn đến giá trị của data sẽ không như mong muốn. 

## Parallelism (song song)
**Parallelism**: Song song là khi các nhiệm vụ chạy cùng một lúc.

Parallel = Two queues and two coffee machines.(đa nhiệm trên bộ vi xử lý đa lõi)

> "Concurrency is not Parallelism"

## Khi nào thì Concurrency có ích ?
Concurrency giải quyết hai vấn đề:
- CPU-bound: chậm do CPU phải tính toán nhiều 
![CPU Bound](cpu-bound.webp)
-> giải quyết bằng cách tìm cách để tính toán được nhiều hơn trong cùng một khoảng thời gian.
![Multiprocessing CPU Bound](multiprocessing-cpu-bound.webp)


- I/O-bound: khiến program chậm do phải đợi i/o từ nguồn ngoài (slower than CPU). Ex: làm việc với file system và network connections. 
![IO Bound](io-bound.webp)
-> giải quyết bằng cách chèn vào thời gian chờ đợi.
![threading](threading.webp)


Thêm concurrency vào program có thể khiến nó trở nên phức tạp, thế nên cân nhắc nếu nó thực sự cần thiết.

## Concurrency Programming with Python 
Có vài cách để chạy "đồng thời" ở trong Python.
- Với `multiprocessing`, Python tạo ra processes mới. Mỗi process được hiểu như một chương trình khác nhau. -> Có thể chạy được đồng thời trên đa nhân. 
- `threading` và `asyncio` đều chạy trên một processor, chỉ là có cách để đẩy nhanh quá trình hơn, dù vậy vẫn được gọi là concurrency. Cả hai đều chạy trên 1 nhân. Với `threading`, sử dụng *pre-emptive multitasking* (phủ đầu) -> OS biết từng thread một và có thể **dừng chúng bất cứ lúc nào** để chạy thread khác. Với `asyncio`, sử dụng *cooperative multitasking*. Các task phải hợp tác với nhau bằng cách thông báo sẵn sàng để switch. 

## Ví dụ
Phần code mình đã lược bỏ để tập trung vào kết quả khi sử dụng các phương pháp xử lý đồng thời. Nếu quan tâm về phần code thì các bạn hãy đọc tại [Đây](https://realpython.com/python-concurrency/) hoặc trong file Jupyter Notebook của mình đính ở dưới bài.

### Ví dụ IO-Bound
Bài toán: dùng Python để tạo script download nhiều website về máy tính. Với bài toán này, vấn đề chính là IO-bound gây ra bởi quá trình máy tính kết nối với internet.

Kết quả:
- Synchronous Version: 64.94599461555481 seconds
- `threading` Version: 14.244598388671875 seconds
- `multiprocessing` Version: 16.825629949569702 seconds

Kết quả cho thấy rằng multiprocessing hoạt động không hiệu quả bằng threading.

### Ví dụ CPU-Bound

Bài toán: dùng Python để tạo script tính toán heavy task. Với bài toán này, vấn đề chính là CPU-bound, chúng ta cần tận dụng tối đa hệ thống đa nhân của CPU.

Kết quả:
- Synchronous Version: 9.767674684524536 seconds
- `threading` Versions: 10.26465916633606 seconds
- `multiprocessing` Version: 5.953038454055786 seconds

Kết quả cho thấy rằng multiprocessing hoạt động hiệu quả hơn threading. Phương pháp threading thậm chí còn tệ hơn synchronous do tốn thời gian switch giữa các thread.

## Kết luận
Cần xác định xem có nhất thiết phải dùng concurrency hay không. Nếu có thì xác định là CPU-bound hay I/O Bound.
- Với CPU-bound thì dùng các biện pháp đa nhân.
- Với I/O-bound thì dùng các biện pháp đa luồng.

## Tham khảo
- [Speed Up Your Python Program with Concurrency](https://realpython.com/python-concurrency/)
- https://realpython.com/learning-paths/python-concurrency-parallel-programming/
- https://text.relipasoft.com/2016/12/dong-thoi-khong-phai-la-song-song-concurrency-is-not-parallelism/
- https://viblo.asia/p/concurrency-programming-guide-63vKjpYdl2R
- https://engineering.grokking.org/race-condition-la-gi/

## How to speed up an I/O-Bound program
Ví dụ: download web pages

### Synchronous Version
- Ưu điểm: code dễ hiểu, dễ debug
- Nhược điểm: chậm, tuy nhiên nếu ko chậm quá thì không thành vấn đề.

In [4]:
import requests
import time


def download_site(url, session):
    with session.get(url) as response:
#         print(f"Read {len(response.content)} from {url}")
        return

def download_all_sites(sites):
    with requests.Session() as session:
        for url in sites:
            download_site(url, session)

sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")

Downloaded 160 in 64.94599461555481 seconds


### `threading` Version
- Ưu điểm: Nhanh
- Nhược điểm: khó fix bug, lỗi xuất hiện random (Tìm hiểu về Race Conditions)

In [6]:
import concurrent.futures
import requests
import threading
import time


thread_local = threading.local()


def get_session():
    if not getattr(thread_local, "session", None):
        thread_local.session = requests.Session()
    return thread_local.session


def download_site(url):
    session = get_session()
    with session.get(url) as response:
#         print(f"Read {len(response.content)} from {url}")
        return

def download_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_site, sites)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")

Downloaded 160 in 14.244598388671875 seconds


`ThreadPoolExecutor`: Một object để quản lý và thực thi các thread.

Với Python 2 hay các ngôn ngữ khác, thì thường sử dụng các technique khác như: `thread.start()`, `thread.join()` và `Queue`. Tuy nhiên đến Python 3.2 thì đã xuất hiện higher-level abstraction `Executors` để quản lý.



Optional: Để ý sẽ thấy mỗi thread tạo ra một `requests.Session()` riêng. Lý do là bởi `requests.Session()` không thread-safe. Có nhiều strategies để giải quyết vấn đề này. Ở đây sử dụng `threading.local()` để lưu session cho mỗi thread.

Việc chọn lựa số thread (worker) là phép thử sai.

![threading](threading.webp)


### `asyncio` Version
- The event loop kiểm soát how và when từng task chạy dựa trên trạng thái của stack. Mỗi task sẽ có 2 trạng thái: ready và waiting (trên tực tế thì có nhiều hơn). 

-> Task sẽ không bị mất kiểm soát, bị dừng đột ngột -> không cần phải lo việc thread-safe.

`async` và `await`:
- await: lệnh cho task trao quyền kiểm quyển soát lại cho event loop. 
- async: flag để python biết rằng function này sẽ sử dụng await.


In [12]:
# import asyncio
# import time
# import aiohttp


# async def download_site(session, url):
#     async with session.get(url) as response:
# #         print("Read {0} from {1}".format(response.content_length, url))
#         return

# async def download_all_sites(sites):
#     async with aiohttp.ClientSession() as session:
#         tasks = []
#         for url in sites:
#             task = asyncio.ensure_future(download_site(session, url))
#             tasks.append(task)
#         await asyncio.gather(*tasks, return_exceptions=True)

# sites = [
#     "https://www.jython.org",
#     "http://olympus.realpython.org/dice",
# ] * 80
# start_time = time.time()
# asyncio.get_event_loop().run_until_complete(download_all_sites(sites))
# duration = time.time() - start_time
# print(f"Downloaded {len(sites)} sites in {duration} seconds")

print("Downloaded 160 sites in 3.664998769760132 seconds")


Downloaded 160 sites in 3.664998769760132 seconds


### `multiprocessing` Version

`multiprocessing` tạo nhiều instance của Python interpreter để chạy trên từng CPU, chậm hơn nhiều so việc tạo thread. Tuy nhiên với program phù hợp thì nó sẽ tạo sự khác biệt lớn.


In [13]:
import requests
import multiprocessing
import time

session = None


def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site(url):
    with session.get(url) as response:
        name = multiprocessing.current_process().name
#         print(f"{name}:Read {len(response.content)} from {url}")
        return

def download_all_sites(sites):
    with multiprocessing.Pool(initializer=set_global_session) as pool:
        pool.map(download_site, sites)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")

Downloaded 160 in 16.825629949569702 seconds


## How to Speed up a CPU-Bound

### Synchronous Version

In [16]:
import time


def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    for number in numbers:
        cpu_bound(number)


if __name__ == "__main__":
    numbers = [5_000_000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

Duration 9.767674684524536 seconds


### `threading` and `asyncio` Versions
There is no waiting time -> It will show it down

In [15]:
import concurrent.futures
import time


def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(cpu_bound, numbers)


if __name__ == "__main__":
    numbers = [5_000_000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

Duration 10.26465916633606 seconds


### `multiprocessing` Version

In [17]:
import multiprocessing
import time


def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    with multiprocessing.Pool() as pool:
        pool.map(cpu_bound, numbers)


if __name__ == "__main__":
    numbers = [5_000_000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

Duration 5.953038454055786 seconds
