# Многопоточность в Python

1. [Поток и процесс. Создание нитей и процессов](#process)
2. [Передача данных между потоками при помощи pipe и общей памяти](#gen)
3. [GIL](#GIL)
5. [Асинхронное выполнение потоков](#asynchronous)
6. [Библиотеки threading, multiprocessing и asyncio](#threading)

## 1. Поток и процесс. Создание нитей и процессов <a name="process"></a>

In [1]:
import _thread
import time

def print_time(thread_name, delay):
   count = 0
   while count < 4:
      time.sleep(delay)
      count += 1
      print("%s: %s" % (thread_name, time.ctime(time.time())))

_thread.start_new_thread(print_time, ('Thread #1', 1))
_thread.start_new_thread(print_time, ('Thread #2', 2))

139835142567680

Thread #1: Wed Mar 24 16:04:48 2021
Thread #2: Wed Mar 24 16:04:49 2021Thread #1: Wed Mar 24 16:04:49 2021

Thread #1: Wed Mar 24 16:04:50 2021
Thread #2: Wed Mar 24 16:04:51 2021
Thread #1: Wed Mar 24 16:04:51 2021
Thread #2: Wed Mar 24 16:04:53 2021
Thread #2: Wed Mar 24 16:04:55 2021


### Запуск потока с применением threading

In [2]:
import threading
import time

import sympy

class MyThread(threading.Thread):
    def __init__(self, x):
        threading.Thread.__init__(self)
        self.x = x

    def run(self):
        time.sleep(1)
        print('Is {} prime: {}'.format(self.x, sympy.isprime(self.x)))

In [3]:
my_input = [2, 193, 323, 1327, 433785907]

threads = []

for x in my_input:
    temp_thread = MyThread(x)
    temp_thread.start()
    threads.append(temp_thread)

for thread in threads:
    thread.join()  # wait until the thread terminates

print('done')

Is 2 prime: TrueIs 193 prime: True
Is 1327 prime: True

Is 323 prime: False
Is 433785907 prime: True
done


## 2. Передача данных между потоками при помощи pipe и общей памяти <a name="gen"></a>

### Queue

Queue предоставляет нам механизм взаимодействия потоков между процессами FIFO (первым пришел — первым обслужен).

In [4]:
from multiprocessing import Process, Queue
import random

def func(queue):
    queue.put([42, None, 'hello'])

queue = Queue()
p = Process(target=func, args=(queue,))
p.start()
print(queue.get())

[42, None, 'hello']


### Pipe

Это структура данных, которая используется для связи между процессами в многопроцессорных программах. Функция Pipe() возвращает пару объектов соединения, соединенных каналом, который по умолчанию является дуплексным (двухсторонним). Работа Pipe происходит следующим образом. Pipe возвращает пару объектов соединения, которые представляют два конца канала. У каждого объекта есть два метода — send() и recv() для взаимодействия между процессами.

In [5]:
from multiprocessing import Process, Pipe
import time

def process2(conn):
    conn.send([42, None, 'hello'])
    time.sleep(1)
    print('process2 done')
    conn.close()

parent_conn, child_conn = Pipe()
p = Process(target=process2, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()  # blocks until the process2 terminates

print('main process done')

[42, None, 'hello']
process2 done
main process done


### Менеджер
**Менеджер** — это класс многопроцессорных модулей, который обеспечивает способ координации общей информации между всеми его пользователями. Управляющий объект управляет процессом сервера, который управляет общими объектами и позволяет другим процессам манипулировать ими. Другими словами, менеджеры предоставляют способ создавать данные, которые могут быть разделены между различными процессами.  
Основным свойством менеджера является управление серверным процессом, который управляет общими объектами.  
Другим важным свойством является обновление всех общих объектов, когда какой-либо процесс изменяет их.

In [6]:
import multiprocessing

def insert_record(record, records):
    records.append(record)
    print('{} record added'.format(record))
    
def print_records(records):
    for record in records:
        print("Name: {0}\nScore: {1}".format(record[0], record[1]))

with multiprocessing.Manager() as manager:
    records = manager.list([('Computers', 1), ('Histoty', 5), ('Hindi', 9)])
    new_record = ('English', 3)
    p1 = multiprocessing.Process(target=insert_record, args=(new_record, records))
    p2 = multiprocessing.Process(target=print_records, args=(records,))
    p1.start()
    p1.join()
    p2.start()
    p2.join()

('English', 3) record added
Name: Computers
Score: 1
Name: Histoty
Score: 5
Name: Hindi
Score: 9
Name: English
Score: 3


Использование пространства имен для обмена данными между основным процессом и дочерним процессом.

In [7]:
import multiprocessing

def func(using_ns):
    
    using_ns.x += 5
    using_ns.y *= 10

manager = multiprocessing.Manager()
using_ns = manager.Namespace()
using_ns.x = 1
using_ns.y = 1
    
print ('Before:', using_ns)
p = multiprocessing.Process(target=func, args=(using_ns,))
p.start()
p.join()
print ('After:', using_ns)

Before: Namespace(x=1, y=1)
After: Namespace(x=6, y=10)


### Ctypes-Array и Value
Многопроцессорный модуль предоставляет объекты Array и Value для хранения данных в карте общей памяти. Массив — это массив ctypes, выделенный из общей памяти, а Value — объект ctypes, выделенный из общей памяти.

In [8]:
import multiprocessing

def func(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

num = multiprocessing.Value('d', 0.0)
arr = multiprocessing.Array('i', range(10))
    
p = multiprocessing.Process(target=func, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]


### Библиотека PyCSP

In [None]:
!pip3 install --user --quiet pycsp

Пример запуска двух процессов параллельно друг другу.

In [9]:
import time

from pycsp.parallel import *

@process
def P1():
   time.sleep(1)
   print('P1 exiting')
    
@process
def P2():
   time.sleep(1)
   print('P2 exiting')
    
Parallel(P1(), P2())
print('done')

P1 exitingP2 exiting

done


## 3. GIL <a name="GIL"></a>

Python **Global Interpreter Lock (GIL)** — это своеобразная блокировка, позволяющая только одному потоку управлять интерпретатором Python. 

Пример кода, демонстрирующий работу переменных подсчёта ссылок:

In [10]:
import os

os.cpu_count()

40

In [11]:
import sys

sys.getswitchinterval()

0.005

In [12]:
import sys

a = []
print(sys.getrefcount(a))

b = a
print(sys.getrefcount(a))

2
3


Несложная CPU-bound программа, которая ведёт обратный отсчёт:

In [13]:
import time
from threading import Thread

n = 50000000

def countdown(n):
    while n > 0:
        n -= 1

start = time.time()
countdown(n)
end = time.time()
delta = end - start
print('Затраченное время:', delta)

Затраченное время: 2.2816755771636963


Реализация обратного отсчёта ведётся в 4 параллельных потоках:

In [14]:
t1 = Thread(target=countdown, args=(n//4,))
t2 = Thread(target=countdown, args=(n//4,))
t3 = Thread(target=countdown, args=(n//4,))
t4 = Thread(target=countdown, args=(n//4,))

start = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
end = time.time()
delta = end - start
print('Затраченное время:', delta)

Затраченное время: 1.1415107250213623


## 4. Асинхронное выполнение потоков <a name="asynchronous"></a>

Асинхронная программа может одновременно обрабатывать задачи, но ее контекст переключается внутри, а не системным планировщиком.

## 5. Библиотеки threading, multiprocessing и asyncio <a name="threading"></a>

In [15]:
from multiprocessing import Process

def print_func(continent='Asia'):
    print('The name of continent is ', continent)

names = ['America', 'Europe', 'Africa']

procs = []
proc = Process(target=print_func)  # instantiating w/o any argument
procs.append(proc)
proc.start()

# Instantiating process with arguments
for name in names:
    proc = Process(target=print_func, args=(name,))
    procs.append(proc)
    proc.start()

# Complete the processes
for proc in procs:
    proc.join()

The name of continent is  America
The name of continent is  Europe
The name of continent is  Asia
The name of continent is  Africa


In [16]:
def print_name(prefix):
    print('Searching {} prefix...'.format(prefix))
    try: 
        while True:
            # yeild used to create coroutine
            name = (yield)
            if prefix in name:
                print(name)
    except GeneratorExit:
        print('Closing coroutine!')
            
corou = print_name('Dear')
corou.__next__()
corou.send('James')  # nothing
corou.send('Dear James')  # print 'Dear James'
corou.close()

Searching Dear prefix...
Dear James
Closing coroutine!


Пример в котором запускаем 3 асинхронных таска, которые по-отдельности делают запросы к Reddit, извлекают и выводят содержимое JSON. Библиотекф aiohttp гарантирует, что HTTP-запрос будет выполнен асинхронно.

In [None]:
!pip3 install --user --quiet aiohttp

In [17]:
import asyncio  
import json
import signal

import aiohttp 

loop = asyncio.get_event_loop()  
client = aiohttp.ClientSession(loop=loop)

async def get_json(client, url):  
    async with client.get(url) as response:
        assert response.status == 200
        return await response.read()

async def get_reddit_top(subreddit, client):  
    data1 = await get_json(client, 'https://www.reddit.com/r/' + subreddit + '/top.json?sort=top&t=day&limit=3')

    j = json.loads(data1.decode('utf-8'))
    for i in j['data']['children']:
        score = i['data']['score']
        title = i['data']['title']
        link = i['data']['url']
        print(str(score) + ': ' + title + ' (' + link + ')')

    print(f'finished {subreddit} subreddit\n')

def signal_handler(signal, frame):  
    loop.stop()
    client.close()

signal.signal(signal.SIGINT, signal_handler)

asyncio.ensure_future(get_reddit_top('python', client))  
asyncio.ensure_future(get_reddit_top('programming', client))  
asyncio.ensure_future(get_reddit_top('compsci', client))  

<Task pending coro=<get_reddit_top() running at <ipython-input-17-f5d558b0ed6a>:15>>

1452: Linus Torvalds on where Rust will fit into Linux (https://www.zdnet.com/article/linus-torvalds-on-where-rust-will-fit-into-linux/)
998: Free software advocates seek removal of Richard Stallman and entire FSF board (https://arstechnica.com/tech-policy/2021/03/free-software-advocates-seek-removal-of-richard-stallman-and-entire-fsf-board/)
310: TLS 1.0, 1.1 officially deprecated (https://datatracker.ietf.org/doc/rfc8996/)
finished programming subreddit

160: What exactly does a Computer Scientist (with a PhD or Masters) work on, compared to a regular software engineer? (https://www.reddit.com/r/compsci/comments/mbyctv/what_exactly_does_a_computer_scientist_with_a_phd/)
2: [N] China's GPT-3? BAAI Introduces Superscale Intelligence Model 'Wu Dao 1.0' (https://www.reddit.com/r/compsci/comments/mbjgvy/n_chinas_gpt3_baai_introduces_superscale/)
1: Those of you working as researchers in industry, how often are you allowed to publish your work? (https://www.reddit.com/r/compsci/comments/mc

### Библиотека multiprocessing

In [18]:
from multiprocessing import Process
import os
 
def func(number):
    result = number * 2
    proc = os.getpid()
    print('{} doubled to {} by process id: {}'.format(
        number, result, proc))

numbers = [5, 10, 15]
procs = []
    
for index, number in enumerate(numbers):
    proc = Process(target=func, args=(number,))
    procs.append(proc)
    proc.start()
    
for proc in procs:
    proc.join()

5 doubled to 10 by process id: 244888
10 doubled to 20 by process id: 244890
15 doubled to 30 by process id: 244894


In [19]:
from multiprocessing import Process, current_process
import os


def func(number):
    result = number * 2
    proc_name = current_process().name
    print('{} doubled to {} by: {}'.format(number, result, proc_name))

numbers = [5, 10, 15]
procs = []

for index, number in enumerate(numbers):
    proc = Process(target=func, args=(number,))
    procs.append(proc)
    proc.start()
    
proc = Process(target=func, name='example name', args=(2,))
proc.start()
procs.append(proc)
    
for proc in procs:
    proc.join()

5 doubled to 10 by: Process-16
10 doubled to 20 by: Process-17
15 doubled to 30 by: Process-18
2 doubled to 4 by: example name


### Замки (Locks)
Модуль `multiprocessing` поддерживает замки так же, как и модуль `threading`. Все что вам нужно, это импортировать `Lock`, повесить его, сделать что-нибудь и снять его.

In [20]:
from multiprocessing import Lock, Process, Value

def func(store, lock):
    lock.acquire()
    store.value += 1  # safe mode
    lock.release()

store = Value('i', 1)
lock = Lock()

print(store.value)
p = Process(target=func, args=(store, lock))
p.start()
p.join()
print(store.value)

1
2


### Логирование (Logging)

In [21]:
import logging
import multiprocessing

def func(store, lock):
    lock.acquire()
    store.value += 1  # safe mode
    lock.release()

store = multiprocessing.Value('i', 1)
lock = multiprocessing.Lock()

logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.INFO)
    
p = multiprocessing.Process(target=func, args=(store, lock))
p.start()
p.join()

logger.setLevel(logging.CRITICAL)

[INFO/Process-21] incref failed: [Errno 2] No such file or directory
[INFO/Process-21] child process calling self.run()
[INFO/Process-21] process shutting down
[INFO/Process-21] process exiting with exitcode 0


## Класс Pool
Класс Pool используется для показа пула рабочих процессов. Он включает в себя методы, которые позволяют вам разгружать задачи к рабочим процессам.

In [22]:
from multiprocessing import Pool

def func(number):
    return number * 2

numbers = [5, 10, 20]
pool = Pool(processes=3)
print(pool.map(func, numbers))

[10, 20, 40]


### Связь между процессами

In [23]:
from multiprocessing import Process, Queue

end = -1
 
def func(q, end):
    while True:
        value = q.get()
        processed = value * 2
        print(processed)
        if value == end:
            break

q = Queue()
data = [5, 10, 13, -1]
for item in data:
    q.put(item)
    
p = Process(target=func, args=(q, end))
p.start()
    
q.close()
q.join_thread()

p.join()

10
20
26
-2


### Lambda-функции
**Lambda-функции** — это  анонимные функции, подчиняющиеся более строгому, но более лаконичному синтаксису, чем обычные функции Python.

Функция тождества (identity function), функция, которая возвращает свой аргумент, выражается стандартным определением функции Python с использованием ключевого слова def следующим образом:

In [24]:
lambda x: x

<function __main__.<lambda>(x)>

In [25]:
def identity(x):
    return x

Функция, которая добавляет 1 к аргументу:

In [26]:
lambda x: x + 1

<function __main__.<lambda>(x)>

In [27]:
func = lambda x: x + 1
func(2)

3

In [28]:
func = lambda first, last: f'Full name: {first.title()} {last.title()}'
func('guido', 'van rossum')

'Full name: Guido Van Rossum'

Выражение немедленного вызова функции (IIFE -- Immediately Invoked Function Expression, произносится "iffy").

In [29]:
(lambda x, y: x + y)(2, 3)

5

Лямбда-функция может быть функцией более высокого порядка, принимая функцию (нормальную или лямбда-функцию) в качестве аргумента.

In [30]:
high_ord_func = lambda x, func: x + func(x)
high_ord_func(2, lambda x: x * x)

6