In [23]:
from threading import Thread
from multiprocessing import Process, Queue, Value, Pipe
from time import sleep, perf_counter
from loguru import logger as log

Процессы и потоки имеют одинаковый интерфейс и могут создаваться либо из функции с помощью аргумента target: ```Process(target=f)```, либо вручную с помощью класса:

In [4]:
class BaseThread(Thread):

    def __init__(self, n: int, running: Value, data_que: Queue):
        super().__init__()
        self.n = n
        self.running = running
        self.que = data_que

    def run(self):
        m = 0
        while self.running.value:
            self.que.put(f'Test {self.n} {m}')
            m += 1
            sleep(1)
        log.success(f'The END {self.n}')

Запуск процесса и потока производится с помощью метода `.start()`

In [5]:
running = Value('B', True)
que = Queue(maxsize=10)
t0 = BaseThread(0, running, que)
t1 = BaseThread(1, running, que)
t0.start()
t1.start()

Оптимальным способом завершения работы процессов / потоков по необходимости является значение в общей памяти `Value`

In [6]:
running.value = False

Считывать данные из процессов и потоков можно с помощью очереди `multiprocessing.Queue`

In [None]:
while True:
    if not que.empty():
        print(que.get())

Если есть набор функций с примерно одинаковым временем выполнения, которые можно запустить параллельно, то удобно пользоваться библиотекой `joblib`, а именно, классом `joblib.Parallel`

In [8]:
def f(x, t=1):
    sleep(t)
    return x ** 2

In [9]:
%time [f(x) for x in range(10)]

Wall time: 10.1 s


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [12]:
from joblib import Parallel, delayed
%time Parallel(n_jobs=10, prefer='threads')(delayed(f)(x) for x in range(10))

Wall time: 1.01 s


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Если функции имет очень разное время выполнения, то удобнее воспользоваться асинхронным пулом `concurrent.futures.Executor`, либо `ProcessPoolExecutor`, либо `ThreadPoolExecutor`

In [27]:
%%time
from concurrent.futures import ThreadPoolExecutor

exec = ThreadPoolExecutor(max_workers=10)
futures = {}
for t in range(1, 11):
    futures[t] = exec.submit(f, t, t)

results = {}
time_start = perf_counter()
while len(results) < len(futures):
    for t in futures:
        fut = futures[t]
        if fut.done() and t not in results:
            results[t] = fut.result()
            log.info(f'Future {t} is done {perf_counter() - time_start:.1g}s')
results

2023-03-02 19:31:12.974 | INFO     | __main__:<module>:15 - Future 1 is done 1s
2023-03-02 19:31:13.976 | INFO     | __main__:<module>:15 - Future 2 is done 2s
2023-03-02 19:31:14.969 | INFO     | __main__:<module>:15 - Future 3 is done 3s
2023-03-02 19:31:15.969 | INFO     | __main__:<module>:15 - Future 4 is done 4s
2023-03-02 19:31:16.981 | INFO     | __main__:<module>:15 - Future 5 is done 5s
2023-03-02 19:31:17.979 | INFO     | __main__:<module>:15 - Future 6 is done 6s
2023-03-02 19:31:18.983 | INFO     | __main__:<module>:15 - Future 7 is done 7s
2023-03-02 19:31:19.984 | INFO     | __main__:<module>:15 - Future 8 is done 8s
2023-03-02 19:31:20.969 | INFO     | __main__:<module>:15 - Future 9 is done 9s
2023-03-02 19:31:21.976 | INFO     | __main__:<module>:15 - Future 10 is done 1e+01s


Wall time: 10 s


{1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81, 10: 100}