Ноутбук в процессе разработки

**Embarassingly Parallel Problem** - задача,  которую можно легко разбить на составные части (её вычисление не требует комбинирования промежуточных результатов).

Примеры легко параллелизуемых задач:
- вычисление базовых статистик
- построение ансамблей классификаторов (random forest)
- расчет статистик путем многократных симуляций (например, monte-carlo simulation, кросс-валидация или grid search)

Gradient Boosting ансамблирование не является embarassingly parallel задачей, так как каждая итерация зависит от результата предыдущей.

Уровни параллельного выполнения:
0. Sequential
1. Multiple threads (within single process)
2. Multiple processes (on a single machine)
3. Multiple machines (a cluster)

**Поток vs Процесс**

Поток - часть процесса. Он работает в едином адресном пространтсве. Все потоки в рамках одного процесса имеют доступ к ранее созданным переменным.

## Параллельное выполнение в Python (Threading)

Цель потоков - иметь возможность параллельно запустить (синхронно или асинхронно) несколько подзадач. Использование тредов не дает выигрыша по времени выполнения, важна именно параллельность.

Запускается так:

    t = Thread(target = func, args = ())
    t.start()

Второй способ создать поток:

    class myThread(Thread):
        def run()

Можно запускать много потоков:

    ThreadPoolExecutor(max_workers=100)
    pool.map(func(), range(100))

    По умолчанию, все переменные потока создаются в общей памяти процесса. Чтобы 

Можно работать с локальными данными:

    locals = threading.local()
    locals.x = 1

**Barrier** - останавливает выполнение, пока все потоки не дойдут до него

**Event** - потоки подписываются на событие методом wait() и стартуют только когда поток устанавливает его set()

**Timer** - поток, который запускается через n секунд

**Lock** - позволяет выполнять только один поток в один момент времени

    lock = Lock(blocking = True, timeout = -1)
    
    lock.acquire()
    __do_some_work__()
    lock.release()

Если занято, то поток может блокироваться (blocking = True) или просто возвращать False (blocking = False). Также можно настроить, что поток будет стучаться ещё в течение timeout.

**RLock** - Lock, который может много раз блокировать вход в рамках потока. Соотвественно столько же раз он должен сделать release().

**Semaphore** - не позволяет выполнять более n параллельных потоков. Если параметр value = 1, то логика работы аналогична классу Lock.

## Параллельное выполнение в Python (Multiprocessing)
   
    import multiprocessing

- Объект Process() создает новый процесс, в котором выполнеятся функция f():

        p = Process(target=f, args=())
        p.start()
        p.join()
    
    Метод join() говорит, что теперь выполнение синхронное.


- Если предполагается использовать много процессов, удобнее работать сразу с пулом процессов:

        # Создать пул из 25 процессов
        pool = Pool(25)
        
        # в каждом процессе запустить функцию f
        pool.map(f, args)
        
        # запустить f
        pool.async_apply_async()

Для межпроцессного взаимодействия используются три типа объектов:
- **Queue**
    - Создается очередь q = Queue() и передается ссылкой в новый процесс
    - Из основного процесса можно читать то, что процесс туда пишет q.get()
    
    
- **Pipe**
    - при создании пайпа Pipe() создаются две ссылки - одна передается параметром в новый процесс
    - из основного процесса можно читать по второй ссылке


- **Lock**
    - создается Lock()
    - передается параметром в дочерние процессы
    - внутри каждый процесс его запирает, выполняет нужные действия и открывает

Также есть классы для Shared Memory
- **Value** - скаляр
- **Array** - массив значений

Создаются разделяемые переменные
- передаются параметрами в дочерние процессы
- оттуда могут по ссылке изменяться

Класс **Server Manager** позволяет создавать shared объекты произвольного типа.
Их так же можно передавать параметрами в дочерние процессы.

##  Встроенный шедулер в Python

**Sched** - встроенный в питон простенький шедулер

    import sched

При создании шедулера задаётся функция времени (по умолчанию time.time) и функция ожидания (по умолчанию time.sleep)

    Scheduler(time_func=, delay_func=)

Затем добавляются задачи:

    event_id1 = s.enter(delay=10, priority=1, action=func1())
    event_id2 = s.enter(delay=15, priority=2, action=func2())
    event_id3 = s.enter(delay=20, priority=1, action=func3())

Можно убрать какую-то задачу:

    s.cancel(event_id3)

И затем все это запускается на выполнение:

    s.run()

Посмотреть актуальную очередь можно так:

    s.queue()

## Очереди (Queue)

Бесконечная очередь,  в которую можно из параллельных потоков добавлять и забирать элементы. При подключении обеспечитвается блокировка.

Помимо FIFO варианта (по умолчанию), есть ещё LIFO (стек) и PriorityQueue.

    q = Queue()

Аналог collections.deque, но там без блокировки.

Хорошо иллюстрируется на примере реализации кординатора задач и рабочих потоков.

Опишем поток, который 
1. подключается к очереди и ждёт задачу:
        q.get() 
2. выполняет задачу
        do_work()
3. и оповещает очередь, что сделал задачу:
        q.task_done()

In [None]:
def worker():
        while True:
            item = q.get()
            if item is None:
                break
            do_work(item)
            q.task_done()

Запустим 20 рабочих потоков, каждый пытается подключиться к очереди

In [None]:
q = queue.Queue()
    threads = []
    for i in range(num_worker_threads):
        t = threading.Thread(target=worker)
        t.start()
        threads.append(t)

В основном потоке запуливаем данные в очередь, чтобы потоки начали работать

In [None]:
for item in source():
    q.put(item)

Ждем, когда все потоки свои задачи выполнят и шлем им сигнал остановки

In [None]:
q.join()

for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()

## Запуск внешних процессов (Subprocess)

Первый способ запуска шелл-команд в питоне

    import os
    os.system("ls")

Не выводит результат, нет контроля над выводом. Лучше использовать subprocess.

    import subprocess
    subprocess.run("ls")

А ещё лучше Popen - конструктор, конструктор который позволяет 
- перенаправлеть ввод/вывод
- передавать длинные строки параметров
- делать интерактивные сессии

## Тип Futures

    import concurrent.futures

Здесь вводятся два важных класса:
- ThreadPoolExecutor
- ProcessPoolExeutor

Оба асинхронно запускают произвольный набор потоков или процессов. 

Есть два метода запуска:

- submit(func, args) - запускает один поток/процесс
- map(func, args) - запускает несколько потоков/процессов для списка аргументов

Кроме того, вводится **Future** - специальный тип, ссылка на асинхронно запущенный поток или процесса.

submit() возвращает Future, а map() итератор над обхектами типа Future

Атрибуты запущенного процесса:
- result() - пытаемся прочитать результат выполнения. Ждем timeout секунд, если результата пока нету, выводим Exception.
- cancel() - отменить процесс/поток
- running() - проверяем, что выполняется
- done() - проверяем, что завершен

wait(fs) - выводим, кто завершен, кто выполняется
as_completed(fs) - итератор над будущими значениями

-----

# Библиотека Joblib



Joblib is a library for simple parallel programming __primarily developed and used by the Scikit Learn community__. As of version 0.10.0 it contains a plugin mechanism to allow Joblib code to use other parallel frameworks to execute computations

**Цель библиотеки:** конвертация питоновского кода в пайплайны - наборы преобразований, которые вычисляются как можно реже.

### Основные функции

##### 1) Параллелизация

Шаблон использования Joblib:

    parallel = Parallel(n_jobs=-1, backend='loky')
    input_generator = delayed(func)(partition) for partition in inputs
    parallel(input_generator)

Вторая строчка - это <a href=https://www.python.org/dev/peps/pep-0289/>generator expression</a>. Аналог list compregension, но без скобок - считается во время выполнения.

delayed() - обертка для функции
    
Основные API функции в Joblib - это фронт, есть ешё бэкенд - конкретный способ параллелизации.

- Loky
- Multithreading
- Multiprcoessing

Плюс можно регистрировать любые свои бэкенды.

Например, можно поставить Dask backend, тогда появляется возможность запускать параллельные sklearn модели на кластере.

##### 2) Кэширование результатов
    
    func = Memory.cache(func)
    func(1) # здесь кэшируется
    func(1) # здесь берется закэшированный результат

##### 3) Сериалзиация

По аналогии с pickle есть методы 

    dump()
    load()
    
но работает эффективнее.

##### Интеграция в Scikit-Learn

Joblib включен в Scikit-Learn и испрользуется в реализации некоторых методов. Например, можно посмотреть его использование в методе fit() алгоритма model_selection.GridSeachCV.

-----

# Библиотека DASK

Более глобальная библиотека. Цель та же.

The single machine scheduler is more useful to individuals (more people have personal laptops than have access to clusters) and probably accounts for 80+% of the use of Dask today. On the other hand, the distributed machine scheduler is more useful to larger organizations like universities, research labs, or private companies.

Dask DataFrame - распределенная версия Pandas датафрейма. Частично поддерживает API.

Dask относится к Python примерно так же как PySpark к Python.

Dask.Delayed - обертка для функции, которая указывает, что функция должна выполняться в lazy-режиме. На вход подается либо константа, либо другая delayed-функция.

Futures - результаты работы стратовавших background процессов, значения которых известны только после выполнения.

In [28]:
from dask import delayed, compute
from time import sleep
import random

In [32]:
# задаем функцию, которую нужно применить
def process(x):
    y = 0
    for x in range(1000000):
        y = y - x*x + random.randint(0,10000) * random.randint(0,10000)
    return y+1

# задаем входы
inputs = range(10)

# формирваем список функций-оберток
values = [delayed(process)(x) for x in inputs]

Для начала попробуем выполнить последовательно

In [33]:
%time results = [process(x) for x in inputs]

CPU times: user 59.8 s, sys: 356 ms, total: 1min
Wall time: 1min 1s


Запускаем выпонение в несколько потоков

In [34]:
%time results = compute(*values, scheduler='processes')

CPU times: user 69.9 ms, sys: 39.2 ms, total: 109 ms
Wall time: 36.1 s


In [35]:
%time results = compute(*values, scheduler='threads')

CPU times: user 58.9 s, sys: 735 ms, total: 59.7 s
Wall time: 1min


У меня на ноутбуке (4 ядра) получились такие резульаты:

##### Sequential
Wall time: 1min 1s
##### Multithread
Wall time: 1min
##### Multiprocess
Wall time: 36.1 s

## Кейсы применения

### 1.1 Параллельный apply в Pandas

In [13]:
from multiprocessing import Pool
import pandas
import numpy

# Загрузим какой-нибудь датасет
from sklearn.datasets import load_iris
iris = load_iris()
iris = pandas.DataFrame(iris['data'], columns=iris['feature_names'])

# Опишем фукнкцию для обработки данных, которую нужно применить
def func(df_partition):
    df_partition['new_column1'] = 1
    df_partition['new_column2'] = 2
    df_partition['new_column3'] = 3
    return df_partition

# Напишем функцию-обертку, которая внутри будет разбивать на составные части и выполнять параллельно
def parallel_apply(df, func, n_partitions=4):
    
    # Делим датасет на n равных частей
    df_split = numpy.array_split(df, n_partitions)
    
    # Создаем пул процессов
    pool = Pool(n_partitions)
    
    # Запускаем процесс для каждой части с помощью map()
    df = pandas.concat(pool.map(func, df_split))
    
    pool.close()
    
    pool.join()
    
    return df

# Можем применять для любых датасетов
new_df = parallel_apply(iris, func)

Проверяем

In [20]:
print("old dataset shape = {}, new dataset shape{}".format(iris.shape, new_df.shape))

old dataset shape = (150, 4), new dataset shape(150, 5)


### 1.2 Pandarallel

Есть доморощенная бибилотека, в которой реализован этот подход. Только тут он используется не как функция, а как переопределнный метод класса DataFrame - удобно!

Установка:

    pip install pandarallel

In [None]:
from pandarallel import pandarallel
pandarallel.initialize()

def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)