# Создание многопоточных приложений

В данной лекции мы посмотрим как работать с многопоточностью.

In [1]:
# Подключаем нужные библиотеки
import multiprocessing 
import threading

import time
import datetime
import re
import requests
from bs4 import BeautifulSoup

import pymorphy2

import queue
import random


Давайте просто попробуем запустить два потока, которые будут выводить каждый свое значение.

Для создания потока используется `threading.Thread`, в параметр `target` передается функция, которая будет выполняться в этом потоке. Для того, чтобы поток начал выполняться, необходимо вызвать функцию `start`. Если необходимо дождаться окончания выполнения потока, вызывается функция `join`. 

In [2]:
def func1():
    var1 = 0
    for i in range(1000):
        var1 += 1
        if (i % 100) == 0:
            print("Func 1 says ", var1)
        time.sleep(random.random()/100) # засыпаем на случайный промежуток времени.
            
def func2():
    var2 = 0
    for i in range(1000):
        var2 += 2
        if (i % 100) == 0:
            print("Func 2 says ", var2)
        time.sleep(random.random()/100+0.003) # засыпаем на случайный промежуток времени.
            
t1 = time.time()

# создаем, а потом запускаем потоки.
pr1=threading.Thread(target=func1)
pr2=threading.Thread(target=func2)
pr1.start()
pr2.start()
print("I can fly!")

pr1.join()
print("Process 1 is finished")
pr2.join()
print("The work is done")

t2 = time.time()
print("Time estimated ", t2-t1)

Func 1 says  1
Func 2 says  2
I can fly!
Func 1 says  101
Func 2 says  202
Func 1 says  201
Func 1 says  301
Func 2 says  402
Func 1 says  401
Func 2 says  602
Func 1 says  501
Func 1 says  601
Func 2 says  802
Func 1 says  701
Func 1 says  801
Func 2 says  1002
Func 1 says  901
Func 2 says  1202
Process 1 is finished
Func 2 says  1402
Func 2 says  1602
Func 2 says  1802
The work is done
Time estimated  8.269513845443726


Для создания процесса используется `multiprocessing.Process`, принимающий в параметр `target` функцию, которая будет выполняться в этом потоке. Для того, чтобы процесс начал выполняться, необходимо вызвать функцию `start`. Если необходимо дождаться окончания выполнения потока, вызывается функция `join`. Если процесс не будет переходить в режим ожидания, он выполнится от начала и до конца при вызове функции `start`.

In [3]:
%%time
t1 = time.time()

# создаем, а потом запускаем потоки.
pr1=multiprocessing.Process(target=func1)
pr2=multiprocessing.Process(target=func2)
pr1.start()
pr2.start()
print("I can fly!")

pr1.join()
print("Process 1 is finished")
pr2.join()
print("The work is done")

t2 = time.time()
print("Time estimated ", t2-t1)

Func 1 says  1
Func 2 says  2
I can fly!
Func 1 says  101
Func 2 says  202
Func 1 says  201
Func 1 says  301
Func 2 says  402
Func 1 says  401
Func 1 says  501
Func 2 says  602
Func 1 says  601
Func 2 says  802
Func 1 says  701
Func 1 says  801
Func 2 says  1002
Func 1 says  901
Func 2 says  1202
Process 1 is finished
Func 2 says  1402
Func 2 says  1602
Func 2 says  1802
The work is done
Time estimated  8.348276853561401
CPU times: user 28.1 ms, sys: 23.8 ms, total: 51.9 ms
Wall time: 8.35 s


Теперь попробуем сделать тоже самое, но для одной и той же переменной - две функции будут увеличивать значение одной переменной по очереди.

In [5]:
t1 = time.time()
Loki = 0

def func1():
    global Loki
    for i in range(1000):
        Loki += 1
        if (i % 100) == 0:
            print("Func 1 says ", Loki)
        time.sleep(random.random()/100) # засыпаем на случайный промежуток времени.
            
def func2():
    global Loki
    for i in range(1000):
        Loki += 2
        if (i % 100) == 0:
            print("Func 2 says ", Loki)
        time.sleep(random.random()/100) # засыпаем на случайный промежуток времени.
            
# создаем, а потом запускаем потоки.
pr1=multiprocessing.Process(target=func1)
pr2=multiprocessing.Process(target=func2)
pr1.start()
pr2.start()

pr1.join()
pr2.join()
t2 = time.time()
print("Time estimated", t2-t1)
print(Loki)

Func 1 says  1
Func 2 says  2
Func 1 says  101
Func 2 says  202
Func 1 says  201
Func 2 says  402
Func 1 says  301
Func 2 says  602
Func 2 says  802
Func 1 says  401
Func 1 says  501
Func 2 says  1002
Func 2 says  1202
Func 1 says  601
Func 2 says  1402
Func 1 says  701
Func 2 says  1602
Func 1 says  801
Func 2 says  1802
Func 1 says  901
Time estimated 5.308660984039307
0


Выглядит так, как будто каждый процесс работает со своей собственной переменной. Да, это так, это ведь независимые процессы, они выполняются каждый на своей копии GIL и комплекте переменных.

Теперь попытаемся сделать всё тоже самое, но на потоках, выполняющихся в рамках одного процесса, но в разных потоках.

In [6]:
t1 = time.time()
Loki = 0

def func1():
    global Loki
    for i in range(1000):
        Loki += 1
        if (i % 100) == 0:
            print("Func 1 says ", Loki)
        time.sleep(random.random()/100) # засыпаем на случайный промежуток времени.
            
def func2():
    global Loki
    for i in range(1000):
        Loki += 2
        if (i % 100) == 0:
            print("Func 2 says ", Loki)
        time.sleep(random.random()/100) # засыпаем на случайный промежуток времени.
            
# создаем, а потом запускаем потоки.
pr1=threading.Thread(target=func1)
pr2=threading.Thread(target=func2)
pr3=threading.Thread(target=func2)
pr1.start()
pr2.start()
pr3.start()

pr1.join()
pr2.join()
pr3.join()

t2 = time.time()
print("Time estimated ", t2-t1)
print(Loki)

Func 1 says  1
Func 2 says  3
Func 2 says  5
Func 2 says  492
Func 1 says  493
Func 2 says  514
Func 1 says  963
Func 2 says  1010
Func 2 says  1017
Func 1 says  1449
Func 2 says  1487
Func 2 says  1554
Func 1 says  1929
Func 2 says  1969
Func 2 says  2083
Func 1 says  2439
Func 2 says  2454
Func 2 says  2592
Func 1 says  2935
Func 2 says  2941
Func 2 says  3111
Func 1 says  3419
Func 2 says  3435
Func 2 says  3618
Func 1 says  3917
Func 2 says  3927
Func 2 says  4132
Func 2 says  4406
Func 1 says  4407
Func 2 says  4676
Time estimated  5.2778520584106445
5000


Когда-то давно, такой код обязательно вызвал бы исключительную ситуацию. Но сейчас GIL ам блокирует нужные переменные, не позволяя разным процессам обращаться к ним одновременно. В итоге всё работает именно так, как хотелось бы.

Помимо этого, обратите внимание, что время выполнения в обоих случаях примерно одинаково. Еще в Python 3.6 потоки работали не параллельно, а последовательно внутри одного и того же потока, а переключение осуществлялось не средствами ОС, а самого GIL. Как следствие, применение таких потоков не ускоряло работу программы. При этом процессы работали действительно параллельно. Сейчас потоки в самом деле выполняются параллельно, да и запускаются гораздо быстрее.

При помощи объекта типа `Lock` можно блокировать выполнение критических фрагментов кода. Например, если мы пишем в файл в нескольких потоках, мы должны гарантировать, что мы запишем свой фрагмент данных от начала и до конца. При этом другой поток не прервет наш вывод и не выедет свою информацию.

In [7]:
lk = threading.Lock()

Loki = 0

def func1():
    global Loki
    for i in range(1000):
        lk.acquire()
        Loki += 1
        lk.release()
        if (i % 100) == 0:
            print("Func 1 says ", Loki)
        time.sleep(random.random()/100) # засыпаем на случайный промежуток времени.
            
def func2():
    global Loki
    for i in range(1000):
        lk.acquire()
        Loki += 2
        lk.release()
        if (i % 100) == 0:
            print("Func 2 says ", Loki)
        time.sleep(random.random()/100) # засыпаем на случайный промежуток времени.
            
# создаем, а потом запускаем потоки.
pr1=threading.Thread(target=func1)
pr2=threading.Thread(target=func2)
pr1.start()
pr2.start()

Func 1 says  1
Func 2 says  3
Func 1 says  287
Func 2 says  311
Func 1 says  587
Func 2 says  610
Func 1 says  877
Func 2 says  914
Func 1 says  1171
Func 2 says  1219
Func 1 says  1461
Func 2 says  1523
Func 1 says  1789
Func 2 says  1808
Func 1 says  2063
Func 2 says  2120
Func 1 says  2361
Func 2 says  2426
Func 1 says  2643
Func 2 says  2734


<b> Внимание!!!</b> Вызов двух блокировок подряд вызовет полную блокировку процесса, а также всех процессов, зависящих от данной блокировки.

Также для синхронизации работы потоков могут использоваться события и семафоры.

Событие может быть установлено или сброшено. Если поток пытается установить установленное или сбросить сброшенное событие, он блокируется (но может быть разблокирован по тайм-ауту).

Семафор фактически хранит информацию о нескольких событиях, то есть у семафора есть некоторый объем. Процесс блокируется при запросе ресурса из семафрра, только если тот уже был запрошен n раз и ресурс исчерпан.



In [9]:
# создадим очередь
cntr = 0
eve1 = threading.Event() # Разрешает читать читателю.
eve2 = threading.Event() # Читатель прочитал, писатель может писать.

# У нас будет два потока. 
# Writer будет писать даные в очередь через случайные промежутки времени. 
def writer():
    global cntr
    while cntr<10: #пишем 10 чисел
        res = eve2.wait() # Всё прочитано, можно начинать писать.
        eve2.clear() # Я понял, начинаю писать.
        # Делаем какие-то долгие операции по генерации данных.
        cntr+=1
        print('wrote '+str(cntr)) # отчитываемся о записи.
        eve1.set() # Данные готовы, можно читать.
        time.sleep(random.random()) # засыпаем на случайный промежуток времени.

# Reader в параллельном потоке читает из очереди через случайные промежутки времени.
def reader():
    while cntr<10: # пока прочитанное число меньше 10...
        res = eve1.wait() # А не готовы ли данные для чтения?
        if res:
            print('read '+str(cntr)) # Отчитываемся.
        else:
            print('read failed')
        eve1.clear() # Спасибо, я всё прочёл.
        eve2.set() # Можно начинать писать.

eve2.set() # Вначале писать может писать, а читателю нечего читать.
eve1.clear()
# создаем, а потом запускаем потоки.
pr1=threading.Thread(target=writer)
pr2=threading.Thread(target=reader)
pr1.start()
pr2.start()

# Видно, как потоки работают параллельно, правда?

wrote 1
read 1
wrote 2
read 2
wrote 3
read 3
wrote 4
read 4
wrote 5
read 5
wrote 6
read 6
wrote 7
read 7
wrote 8
read 8
wrote 9
read 9
wrote 10
read 10


Попробуем организовать обмен данными при помощи `multiprocessing.queue`.
Данный класс позволяет обмениваться информацией между потоками. При этом если поток-поставщик данных работает быстрее, то данные будут "складироваться" в очереди, пока потребитель их не заберет.
Размер очереди может быть ограничен. В этом случае операция "положить в очередь" заблокирует поток, если очередь заполнена.
Если очередь пуста, поток, запросивший данные также будет заблокирован при запросе данных. 
Для избежания блокировок можно использовать timeout (через какое время поток будет разблокирован и фцнкция сообщит о неуспехе).


In [13]:
# создадим очередь
cntr=0
exch=multiprocessing.Queue(5)
random.seed()

# У нас будет два потока. 
# Writer будет писать даные в очередь через случайные промежутки времени. 
def writer():
    global cntr
    while cntr<10: #пишем 10 чисел
        exch.put(cntr) # собственно пишем в очередь
        print('wrote '+str(cntr)) # отчитываемся о записи.
        cntr+=1
        time.sleep(random.random()) # засыпаем на случайный промежуток времени.
    exch.put(11) # конец данных.
  
# Reader в параллельном потоке читает из очереди через случайные промежутки времени.
def reader():
    i=-1
    while i<10: # пока прочитанное число меньше 10...
        i=exch.get() # Собственно читаем из очереди.
        print('read '+str(i)) # Отчитываемся.
        time.sleep(random.random()) # спим
  
# создаем, а потом запускаем потоки.
pr1=multiprocessing.Process(target=writer)
pr2=multiprocessing.Process(target=reader)
pr1.start()
pr2.start()

# Видно, как потоки работают параллельно, правда?

wrote 0
read 0
wrote 1
read 1
wrote 2
wrote 3
wrote 4
read 2
read 3
wrote 5
read 4
read 5
wrote 6
wrote 7
read 6
wrote 8
read 7
wrote 9
read 8
read 9
read 11


Теперь попробуем решить следующую задачу.

Необходимо получать новости из нескольких источников. Далее каждая новость токенизируется в отдельном потоке, лемматизируется в еще одном потоке. В конце идет поток, который обрабатывает полученные данные.
Новости также получаются в отдельных потоках.

Для передачи данных используем очередь. 

Для сигнализации завершения работы потоков используем события и семафоры.

Использование очереди, событий и семафоров позволяет синхронизировать операции в потоках. Токенизатор начнет разбор только после того, как очередь получит данные от одного из загрузчиков новостей. Результат будет помещен в другую очередь. Токенизатор "уснет" до тех пор, пока не придет еще одна новость, зато "проснется" лемматизатор.

In [11]:
# !!!===--- ВНИМАНИЕ!!! ---===!!!
# Код в данной ячейке работает некорректно, так как завершение процессов вызывает взаимную  блокировку!
# Корректный вариант синхронизации показан ниже.


# Функция для загрузки одной новости из Ленты.ру
def getLentaArticle(url):
    """ getLentaArticle gets the body of the article from Lenta.ru"""
    r = requests.get(url)
    body = re.findall('<div class="b-text clearfix js-topic__text" itemprop="articleBody"><p>(.+?)</p><div class="b-box"><i>', r.text)
    if len(body) > 0:
        return BeautifulSoup(body[0], "lxml").get_text()
    else:
        return "  "

# Функция загрузки одной новости из N+1.
def getArticleTextNPlus1(adr):
    r = requests.get(adr)
    n_text = re.split("</div>", re.split("</figure>", re.split('</article>',re.split('<article', r.text)[1])[0])[1])[1]    
    return BeautifulSoup(n_text, "lxml").get_text()

# Загрузка новостей из Ленты.ру за некоторый период.
def getLenta(qu, sem):
    curdate = datetime.date(2017, 1, 16)
    findate = datetime.date(2017, 1, 16)
    res = ""

# Загружаем новости до конечной даты.
    while curdate <= findate:
        print('lenta ' + curdate.strftime('%Y/%m/%d'))
        day = requests.get('https://lenta.ru/news/' + curdate.strftime('%Y/%m/%d'))
        body = re.findall('<h3>(.+?)</h3>', day.text)
        links = ['https://lenta.ru' + re.findall('"(.+?)"', x)[0] for x in body]
        for l in links: # идем по всем ссылкам на новости за день.
            qu.put(getLentaArticle(l)) # Полученную новость кладем в очередь.
            time.sleep(0.2)
        curdate += datetime.timedelta(days=1)
    sem.acquire() # Блокируем семафор один раз, по
    print("lenta finished")

# Получаем новости с NPlus1 за заданный промежуток времени, кладем тексты новостей в очередь qu.
# По завершении взводим семафор sem.
def getNplus1(qu, sem):
    curdate = datetime.date(2015, 12, 15)
    findate = datetime.date(2015, 12, 19)

    while curdate <= findate: # Перебираем все дни.
        r = requests.get('https://nplus1.ru/news/'+curdate.strftime('%Y/%m/%d'))
        print('nplus ' + curdate.strftime('%Y/%m/%d'))
        # Берем заголовки и ссылки на новости за этот день.
        refs = [re.split('"', t)[6] for t in re.split('<article class="item item-news item-news', r.text)[1:]]
        for t in refs:
            qu.put(getArticleTextNPlus1("https://nplus1.ru" + t)) # Получаем текст статьи и отправляем в очередь.
            time.sleep(0.2) # Мы этичные хакеры и не стремимся к DDoS.
        curdate += datetime.timedelta(days=1)
    sem.acquire() # Взводим семафор.
    print("nplus1 finished")
    
# Функция токенизирует вход из очереди qu1 и кладет результаты токенизации в очередь qu2.
# Токенизация ведется до тех пор, пока семафор semw не будет взведен максимальное количество раз.
# По завершении токенизации устанавливаем событие evs.
def tokenize(qu1, qu2, semw, evs):  
    c = 0
    # Пытаемся взвести семафор. Если не взведется, значит все потоки завершились (взведением семафора).
    while semw.acquire(False): 
        txt = qu1.get() # Получаем данные из очереди.
        semw.release() # Отпускаем семафор (мы же взвели его в while).
        qu2.put(re.findall("[А-Яа-я]+(-[А-Яа-я]+)?", txt)) # Очень простая токенизация кладет результаты в очередь.
        c += 1
        print('token '+str(c))
    print("tokenization finished")
    evs.set() # Сообщаем о завершении работы установкой события.
    print("tokenization finished")
""" Вот здесь-то и происходит блокировка.
    while semw.acquire(False): блокирует семафор для потоков, поставляющих новости. В самом конце последний 
    поток с новостями заканчивает свою работу и пытается завершиться. Но семафор уже заполнен, и поток переводится
    в состояние ожидания. При этом поток токенизации ждет получения данных txt=qu1.get(), но так как данные не 
    больше поступают, то он тоже блокируется. 
    Один поток ждет данных, чтобы продолжить работу и разблокировать семафор, другой заблокирован семафором и 
    больше не хочет отправлять данные. Клинч! Хорошо хоть, что в конце работы, но такое могло произойти и в середине.
    
    Через одну ячейку приведен код, который исправляет эту ситуацию.
"""

# Поток лемматизации текстов. Токены берутся из очереди qu1, результаты лемматизации кладутся в qu2.
# Так как новости приходят по одной в токенизацию, скорее всего токены будут идти подряд. Но если вдруг у нас появится
# еще один поток для токенизации, они начнут помещаться в очередь вперемешку. Так что такая технология подходит
# только если нас не волнуется порядок слов.
# Данный заканчиваются, когда будет установлено событие evw, сообщаем о своем завершении при помощи события evs.
def lemmatize(qu1, qu2, evw, evs):
    morpho = pymorphy2.MorphAnalyzer() # Создаем морфоанализатор.
    l = []
    c = 0
    while not evw.is_set(): # Если событие установлено - пора завершать работу.
        txt = qu1.get()
        s = []
        for w in txt:
            s += morpho.parse(w)[0]
        qu2.put(s) # Анализиируем, кладем результат в очередь.
        c += 1
        print('lemma ' + str(c))
    evs.set() # ЗАвершаем работу.
    print("lemmatization finished")

# Функция имитирует, что она обрабатывает данные из очереди qu. Заершает работу по событию ev.    
def utilize(qu, ev):
    c = 0
    while not ev.is_set(): # Если событие установлено - пора завершать работу.
        t = qu.get()
        #processing
        c += 1
        print('process ' + str(c))
    print('processing finished')

# не надо тормозить при получении данных из очереди,
# надо поставить ожидание если очередь пуста.
# то, что есть - хороший пример на сложности синхронизации. еще очередь поменьше сделай.


In [10]:
# Два потока новостей.
newswirenumber = 2

# Создаем очереди, семафор и события для синхронизации.
textsq = multiprocessing.Queue(100)
tokenq = multiprocessing.Queue(100)
lemmaq = multiprocessing.Queue(100)
newss = multiprocessing.Semaphore(newswirenumber)
tokene = multiprocessing.Event()
lemmae = multiprocessing.Event()

# Запускаем процессы.
lentap = multiprocessing.Process(target=getLenta, args=(textsq, newss,))
nplusp = multiprocessing.Process(target=getNplus1, args=(textsq, newss,))  
tokenp = multiprocessing.Process(target=tokenize, args=(textsq, tokenq, newss, tokene, ))   
lemmap = multiprocessing.Process(target=lemmatize, args=(tokenq, lemmaq, tokene, lemmae, ))   
processp = multiprocessing.Process(target=utilize, args=(lemmaq, lemmae, ))    
    
# Стартуем процессы    
lentap.start()
nplusp.start()
tokenp.start()
lemmap.start()
processp.start()

# Если надо - ждем пока процессы не завершатся.
#lentap.join()
#nplusp.join()
#tokenp.join()
#lemmap.join()
#processp.join()
#print("Everything is allright")                               
                               

NameError: name 'getLenta' is not defined

In [43]:
# Это код работает более корректно, чем тот, что приведен выше.
# Большая часть комментариев опущена, оставлены только те, что помогают понять изменения в логике.

def getLentaArticle(url):
    # Вместо того, что вызывать Ктулху тем, что мы разбираем XML при помощи регулярок,
    # давайте будем использовать мощь BeautifulSoup.
    r = requests.get(url)
    
    beau = BeautifulSoup(r.text, "lxml")
    beau_text = beau("p") # Найди-ка мне все фрагменты, отмеченные как параграф.
        
    if len(beau_text) > 0:
        return "\n".join([b.get_text() for b in beau_text])
    else:
        return "  "

def getArticleTextNPlus1(adr):
    r = requests.get(adr)
    n_text = re.split("</div>", re.split("</figure>", re.split('</article>',re.split('<article', r.text)[1])[0])[1])[1]    
    return BeautifulSoup(n_text, "lxml").get_text()

def getLenta(qu, sem):
    curdate = datetime.date(2017, 1, 16)
    findate = datetime.date(2017, 1, 16)
    res=""

    while curdate <= findate:
        print('lenta ' + curdate.strftime('%Y/%m/%d'))
        day = requests.get('https://lenta.ru/news/'+curdate.strftime('%Y/%m/%d'))
        body = re.findall('<h3>(.+?)</h3>', day.text)
        links = ['https://lenta.ru'+re.findall('"(.+?)"', x)[0] for x in body]
        for l in links[:50]:
            qu.put(getLentaArticle(l))
            time.sleep(0.2)
        curdate += datetime.timedelta(days=1)
    sem.acquire()
    print("lenta finished")

def getNplus1(qu, sem):
    curdate = datetime.date(2015, 12, 15)
    findate = datetime.date(2015, 12, 18)

    while curdate <= findate:
        r = requests.get('https://nplus1.ru/news/'+curdate.strftime('%Y/%m/%d'))
        print('nplus ' + curdate.strftime('%Y/%m/%d'))
        refs=[re.split('"', t)[6] for t in re.split('<article class="item item-news item-news', r.text)[1:]]
        for t in refs:
            qu.put(getArticleTextNPlus1("https://nplus1.ru"+t))
            time.sleep(0.2)
        curdate += datetime.timedelta(days=1)
    sem.acquire()
    print("nplus1 finished")
    
# На входе - те же самые очереди, семафор и событие, но используем мы их по-другому.
def tokenize(qu1, qu2, semw, evs):  
    c = 0
    while semw.acquire(False): # Так же пытаемся получить семафор.
        try:
            semw.release() # ! Первым делом отпускаем его, чтобы никого не держать больше.
            txt = qu1.get(timeout=0.1) # Если что-то пойдет не так, нас отпустят через 0,1 секунды.
        except queue.Empty: # Если произршел выход по таймауту, генерируется исключение.
            #print('tokenization waits')
            pass # Данных нет, пойдем посмотрим, может вообще завершаться пора?
        else:
            qu2.put(re.findall("[А-Яа-я]+(-[А-Яа-я]+)?", txt)) # Данные есть - токенизируем и кладем в очередь.
            c += 1
            print('token '+str(c))

    print("tokenization ")
    evs.set()
    print("finished")

def lemmatize(qu1, qu2, evw, evs):
    morpho = pymorphy2.MorphAnalyzer()
    l = []
    c = 0
    while not evw.is_set():
        try:
            txt = qu1.get(timeout=0.1) # Поддерживаем логику таймаута везде на всякий случай.
        except queue.Empty:
            pass
        else:
            s = []
            for w in s:
                s += morpho.parse(w)[0]
            qu2.put(s)
            c += 1
            print('lemma '+str(c))
    evs.set()
    print("lemmatization finished")

def utilize(qu, ev):
    c = 0
    while not ev.is_set(): # Поддерживаем логику таймаута везде на всякий случай.
        try:
            t = qu.get(timeout=0.1)
        except queue.Empty:
            pass
        else:
        #processing
            c += 1
            print('process '+str(c))
    print('processing finished')

