# Параллельное программирование

Алексей Умнов https://www.youtube.com/watch?v=xYV_C4pg1LU  
Слайды доступны по адресу: http://parallels.nsu.ru/~fat/Python/

## Последовательное выполнение

In [1]:
print('Hello, World!')
x = 1
y = 2
print(x + y)

Hello, World!
3


Последовательное выполнение это - Пока одна строка не выполнится, другая не начнется

### Производительность
* Производительность ~ частота процессора
* Ограниение роста частоты
* Есть возможность делать много процессоров

### Блокирующие задачи

#### Ожидание действий
* Исполнение приостанавливается
* Можно было бы что-нибудь делать

#### Примеры
* Пользовательские интерфейсы (ожидание ввода)
* Работа с сетью

### Параллельное выполнение

#### Преимущества:
* Ускорение ~ число процессоров (или ядер процессора)

#### Недостатки:
* Нет "порядка" исполнения:

````python
text = 'Hello, World!'

Поток № 1:                   Поток № 2:
for s in text:               for s in text:
    print(s)                     print(s)

Результат:
HHeelllol,o w,or ld!World!
````

## Многопоточные программы

Несколько потоков.  
Потоки характерны тем, что находятся внутри одного процесса и работают с одной и той же памятью, но при этом они могут исполнятся на разных процессорах (или ядрах процессора):

````
------------------------------------------------------------
| Process                                                  |
|  ------------------------------------------------------  |
|  | Memory                                             |  |
|  |                                                    |  |
|  ------------------------------------------------------  |
|           ^                ^                ^            |
|           |                |                |            |
|     -------------    -------------    -------------      |
|     | Thread1   |    | Thread2   |    | Thread3   |      |
|     |           |    |           |    |           |      |
|     -------------    -------------    -------------      |
|                                                          |
------------------------------------------------------------
````

Подготовка:

In [6]:
def test(part_index, inputs, outputs):
    print('Запущен поток № {}\n'.format(part_index), end='')
    output = inputs[part_index] ** 2 # Эмуляция некоторой задачи
    outputs[part_index] = output     # Заносим результат вычисления

inputs = range(5)
outputs = [ None for i in inputs ]

Как сделать так, чтобы каждый поток запустил функцию `test`? Для этого нужно подключить модуль `threading`:

In [7]:
from threading import Thread

treads = [] # Храним ссылки на потоки

# Создаем потоки и кладем их в список treads
for i in inputs:
    t = Thread(target=test, args=(i, inputs, outputs))
    treads.append(t)

# Запускаем потоки
for i in treads:
    i.start() # Запускается параллельный поток. Цикл for остался выполнятся в главном (main) потоке

# Чтобы главный поток (main) не завершился быстрее параллельно запущенных потоков,
# выполняем ожидание параллельного потока главным потоком:
for i in treads:
    i.join()

print('Результат:', outputs)

Запущен поток № 0
Запущен поток № 1
Запущен поток № 2
Запущен поток № 3
Запущен поток № 4
Результат: [0, 1, 4, 9, 16]


## Наследование от Thread

In [9]:
import time
import random

class TCalc(Thread):

    def __init__(self):
        super().__init__()
        self.sleeptime = random.randint(1,3)

    def run(self):
        self.status = 'started'
        time.sleep(self.sleeptime)
        self.status = 'done'

    def __repr__(self):
        if hasattr(self, 'status'):
            return '{}-{}'.format(self.name, self.status)
        return self.name + ' (id=' + str(id(self)) + ')'



t = TCalc()
t.start()  # Запускаем поток на исполнение

# Теоретически может возникнуть так, что поток может писать что-то в переменную status,
# а в это время главный поток с инструкцией print будет пытаться считать значение этой переменной,
# Эту ситуацию Python успешно разрешает, если вы не пишите одновременно с разных мест в эту переменную.
print('status = {}, is_alive = {}'.format(t.status, t.is_alive()))

print('Параллельный поток уже выполняется, присоединяемся и дожидаемся его завершения')
t.join() # Ожидание выполнения потока. В этом месте главный поток приостанавливается

print('status = {}, is_alive = {}'.format(t.status, t.is_alive()))

status = started, is_alive = True
Параллельный поток уже выполняется, присоединяемся и дожидаемся его завершения
status = done, is_alive = False


## Еще пример потоков с этим классом...

In [15]:
treads = []

# Создаем (инициализируем) потоки
for i in range(10):
    treads.append(TCalc())

print('Потоки созданы:\n', treads, '\n')

# Запускаем потоки
for i in treads:
    i.start()

print('Главным потоком подождем немного...\n')
time.sleep(1.5) # Вводим в сон главный поток
print('Потоки выполняются:\n', treads, '\n') # Какие-то потоки уже выполнились (в статусе done)

# Дожидаемся выполнение потоков
for i in treads:
    i.join()

print('Потоки выполнены:\n', treads)

Потоки созданы:
 [Thread-71 (id=140356783319136), Thread-72 (id=140356783735920), Thread-73 (id=140356783736368), Thread-74 (id=140356783737320), Thread-75 (id=140356783737376), Thread-76 (id=140356783737600), Thread-77 (id=140356783737096), Thread-78 (id=140356783736088), Thread-79 (id=140356783736312), Thread-80 (id=140356783735752)] 

Главным потоком подождем немного...

Потоки выполняются:
 [Thread-71-started, Thread-72-done, Thread-73-started, Thread-74-started, Thread-75-started, Thread-76-done, Thread-77-started, Thread-78-done, Thread-79-done, Thread-80-started] 

Потоки выполнены:
 [Thread-71-done, Thread-72-done, Thread-73-done, Thread-74-done, Thread-75-done, Thread-76-done, Thread-77-done, Thread-78-done, Thread-79-done, Thread-80-done]


## Принудительная остановка потока

* Поток может содержать ресурсы, которые нужно освободить. Например, что-то долго происходит и нужно это прервать принудительно
* Нет специальной функции для остановки. Так как скорее всего после этого вызова программа перейдет в некорректное состояние
* Можно самостоятельно реализовать такую возможность

In [17]:
class KillableThread(Thread):

    def __init__(self):
        super().__init__()
        self.killing = False

    def run(self):
        self.status = 'started'
        for i in range(5,10): # Эмуляция некоторых действий потока, внутри которых условие остановки потока
            if self.killing:
                self.status = 'killed i=' + str(i)
                return # break
            time.sleep(0.2)
        self.status = 'done'

    def kill(self):
        self.killing = True

    def __repr__(self):
        if hasattr(self, 'status'):
            return '{}-{}'.format(self.name, self.status)
        return self.name


threads = []
for i in range(5):
    threads.append(KillableThread())

print('Потоки созданы:\n', threads, '\n')

for i in threads:
    i.start()

print('Потоки запущены:\n', threads, '\n')

kill_thread_id = random.randint(0, len(threads)-1) # Выбираем случайный поток для остановки
print('kill id =', kill_thread_id, '\n')
threads[kill_thread_id].kill()                     # Останавливаем выбранный поток

# Дожидаемся выполнение всех потоков:
for i in threads:
    i.join()

print('Потоки выполнены:\n', threads)

Потоки созданы:
 [Thread-86, Thread-87, Thread-88, Thread-89, Thread-90] 

Потоки запущены:
 [Thread-86-started, Thread-87-started, Thread-88-started, Thread-89-started, Thread-90-started] 

kill id = 3 

Потоки выполнены:
 [Thread-86-done, Thread-87-done, Thread-88-done, Thread-89-killed i=6, Thread-90-done]


## Эксперимент скорости потоков

* CPU-intense вычисления
* 4 процессора

````
| Эксперимент    | Время |
| Обычный запуск | 42с   |
| 2 потока       | 65с   |
| 4 потока       | 80с   |
````

Что происходит? Почему такое время исполнения?

Причина: **GIL** (**G**lobal **I**nterpreter **L**ock) для CPython
* Код ядра CPython некорректен для нескольких потоков (зато быстро работает для одного!)
* GIL не дает одновременно исполняться нескольким потокам ядра
* Исполняющийся в Python поток "захватывает" интерпретатор на короткое время
* После освобождения потока интерпретатором продолжить может другой поток

Зачем нужен GIL?
* Скорость однопоточных программ <--> GIL

Задачи, которые проводят мало времени "внутри ядра" Python:
* Интерфейсы (GUI)
* Работа с сетью
* Ввод/Вывод
* Внешние библиотеки. Например которые реализованы на других языках (NumPy)

## Параллельные процессы

* Для каждого процесса выделяется свой блок памяти
* GIL автоматически исчезает

In [18]:
from multiprocessing import Process

def f(name):
    print('Hello,', name)

p = Process(target=f, args=('World',))
p.start()
p.join()

Hello, World


Обмен данными между процессами возможен через:
* multiprocessing.Queue
* multiprocessing.Pipe

Но более простой способ выглядит использовать `multiprocessing.Pool`:

In [19]:
from multiprocessing import Pool

def calc(value):
    return value ** 2

p = Pool(processes=4)
results = p.map(calc, range(10)) # map - для каждого из значений в range выполнится функция calc

print(results)

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


Ограничения `multiprocessing` - Этот код не будет работать, упадет с ошибкой:

In [20]:
from multiprocessing import Pool

def calc_parallel(values):
    def calc(value):
        return value ** 2
    p = Pool(processes=2)
    return p.map(calc, values)

calc_parallel(range(10))

AttributeError: Can't pickle local object 'calc_parallel.<locals>.calc'

Почему? Как работает `multiprocessing`?  
Чтобы передать в другой процесс функцию которую нужно выполнить и значения
1. Запускается второй интерпретатор (процесс ОС)
2. В него импортируется оператором `import` функция `calc` из текущего модуля

Но проблема здесь в том, что функция `calc` находится внутри функции `calc_parallel` и она не доступна оператору `import`! Импортировать возможно только те функции, которые доступны в глобальной области видимости. Нельзя использовать:
* Локальные функции
* `lambda` функции
* Составные выражения и т.д.

Для данных, `multiprocessing` сериализирует объекты в файл, а затем их импортирует в другой процесс. Эти объекты должны быть "pickable".

## Внешние процессы

In [22]:
import os

# Выводит результат работы и возвращает код возврата
os.system('ls')

0

In [23]:
os.system('echo "Hello, World!"')

0

## Модуль subprocess

In [24]:
import subprocess

# Вместо os.system('echo "Hello, World!"'):
subprocess.check_output(['echo', '"Hello, World!"'])

b'"Hello, World!"\n'

Возможно возвращать `Exception`, если `shell` программа завершилась неудачно:

In [26]:
try:
    subprocess.check_output('exit 1', shell=True)
except subprocess.CalledProcessError as e:
    print('Произошло исключение:', e)

Произошло исключение: Command 'exit 1' returned non-zero exit status 1.


Более сложный способ работы с процессами, подпроцессами:

In [27]:
p = subprocess.Popen(['echo', 'hi'])
# Дожидается ответ от процесса и возвращает stdout который получила
print(p.communicate())

(None, None)


PIPE - означает приготовить `stdout` для другого процесса:

In [28]:
p = subprocess.Popen(['echo', 'hi'], stdout=subprocess.PIPE)
print(p.communicate())

(b'hi\n', None)


Передается на `stdin` подпроцесса:

In [32]:
p = subprocess.Popen(['cat'], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
print(p.communicate('sample text'.encode('utf8')))

(b'sample text', None)


Pipeline:

In [33]:
p1 = subprocess.Popen(['echo', 'qwerty'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['tr', 'q', 'z'], stdin=p1.stdout, stdout=subprocess.PIPE)

Чтобы это не падало в ошибку, перед тем как вызвать `communicate()` у второго процесса, нужно у первого процесса закрыть `stdout`:

In [34]:
p1.stdout.close()
print(p2.communicate())

(b'zwerty\n', None)
