# Создание многопоточных приложений

В данной лекции мы посмотрим как работать с многопоточностью.

Иллюстрации к теоретическому введению [здесь](https://drive.google.com/file/d/1TiN8I_UysXw3z9Ylja-e4DgFRtuicYU5/view?usp=sharing).

In [1]:
# Подключаем нужные библиотеки
import multiprocessing 
import threading

import time
import datetime
import re
import requests
from bs4 import BeautifulSoup

import pymorphy2

import queue
import random


Давайте просто попробуем запустить два потока, которые будут выводить каждый свое значение.

Для создания потока используется `threading.Thread`, в параметр `target` передается функция, которая будет выполняться в этом потоке. Для того, чтобы поток начал выполняться, необходимо вызвать функцию `start`. Если необходимо дождаться окончания выполнения потока, вызывается функция `join`. 

In [3]:
def func1():
    var1 = 0
    for i in range(1000):
        var1 += 1
        if (i % 100) == 0:
            print("Func 1 says ", var1)
        time.sleep(random.random()/100) # засыпаем на случайный промежуток времени.
            
def func2():
    var2 = 0
    for i in range(1000):
        var2 += 2
        if (i % 100) == 0:
            print("Func 2 says ", var2)
        time.sleep(random.random()/100+0.003) # засыпаем на случайный промежуток времени.
            
t1 = time.time()

# создаем, а потом запускаем потоки.
pr1=threading.Thread(target=func1)
pr2=threading.Thread(target=func2)
pr1.start()
pr2.start()
print("I can fly!")

pr1.join()
print("Process 1 is finished")
pr2.join()
print("The work is done")

t2 = time.time()
print("Time estimated ", t2-t1)

Func 1 says  1
Func 1 says  1
I can fly!
Func 1 says  101
Func 1 says  101
Func 1 says  201
Func 1 says  201
Func 1 says  301
Func 1 says  301
Func 1 says  401
Func 1 says  401
Func 1 says  501
Func 1 says  501
Func 1 says  601
Func 1 says  601
Func 1 says  701
Func 1 says  701
Func 1 says  801
Func 1 says  801
Func 1 says  901
Func 1 says  901
Process 1 is finished
The work is done
Time estimated  5.186389923095703


Для создания процесса используется `multiprocessing.Process`, принимающий в параметр `target` функцию, которая будет выполняться в этом потоке. Для того, чтобы процесс начал выполняться, необходимо вызвать функцию `start`. Если необходимо дождаться окончания выполнения потока, вызывается функция `join`. Если процесс не будет переходить в режим ожидания, он выполнится от начала и до конца при вызове функции `start`.

In [4]:
%%time
t1 = time.time()

# создаем, а потом запускаем потоки.
pr1=multiprocessing.Process(target=func1)
pr2=multiprocessing.Process(target=func2)
pr1.start()
pr2.start()
print("I can fly!")

pr1.join()
print("Process 1 is finished")
pr2.join()
print("The work is done")

t2 = time.time()
print("Time estimated ", t2-t1)

Func 2 says  2
Func 1 says  1
I can fly!
Func 1 says  101
Func 2 says  202
Func 1 says  201
Func 1 says  301
Func 2 says  402
Func 1 says  401
Func 2 says  602
Func 1 says  501
Func 1 says  601
Func 2 says  802
Func 1 says  701
Func 1 says  801
Func 2 says  1002
Func 1 says  901
Func 2 says  1202
Process 1 is finished
Func 2 says  1402
Func 2 says  1602
Func 2 says  1802
The work is done
Time estimated  8.116339206695557
CPU times: user 20.8 ms, sys: 43.5 ms, total: 64.3 ms
Wall time: 8.12 s


Теперь попробуем сделать тоже самое, но для одной и той же переменной - две функции будут увеличивать значение одной переменной по очереди.

In [5]:
t1 = time.time()
Loki = 0

def func1():
    global Loki
    for i in range(1000):
        Loki += 1
        if (i % 100) == 0:
            print("Func 1 says ", Loki)
        time.sleep(random.random()/100) # засыпаем на случайный промежуток времени.
            
def func2():
    global Loki
    for i in range(1000):
        Loki += 2
        if (i % 100) == 0:
            print("Func 2 says ", Loki)
        time.sleep(random.random()/100) # засыпаем на случайный промежуток времени.
            
# создаем, а потом запускаем потоки.
pr1=multiprocessing.Process(target=func1)
pr2=multiprocessing.Process(target=func2)
pr1.start()
pr2.start()

pr1.join()
pr2.join()
t2 = time.time()
print("Time estimated", t2-t1)
print(Loki)

Func 2 says  2
Func 1 says  1
Func 1 says  101
Func 2 says  202
Func 1 says  201
Func 2 says  402
Func 1 says  301
Func 2 says  602
Func 1 says  401
Func 2 says  802
Func 1 says  501
Func 2 says  1002
Func 1 says  601
Func 2 says  1202
Func 1 says  701
Func 2 says  1402
Func 1 says  801
Func 2 says  1602
Func 1 says  901
Func 2 says  1802
Time estimated 5.170403003692627
0


Выглядит так, как будто каждый процесс работает со своей собственной переменной. Да, это так, это ведь независимые процессы, они выполняются каждый на своей копии GIL и комплекте переменных.

Теперь попытаемся сделать всё тоже самое, но на потоках, выполняющихся в рамках одного процесса, но в разных потоках.

In [6]:
t1 = time.time()
Loki = 0

def func1():
    global Loki
    for i in range(1000):
        Loki += 1
        if (i % 100) == 0:
            print("Func 1 says ", Loki)
        time.sleep(random.random()/100) # засыпаем на случайный промежуток времени.
            
def func2():
    global Loki
    for i in range(1000):
        Loki += 2
        if (i % 100) == 0:
            print("Func 2 says ", Loki)
        time.sleep(random.random()/100) # засыпаем на случайный промежуток времени.
            
# создаем, а потом запускаем потоки.
pr1=threading.Thread(target=func1)
pr2=threading.Thread(target=func2)
pr3=threading.Thread(target=func2)
pr1.start()
pr2.start()
pr3.start()

pr1.join()
pr2.join()
pr3.join()

t2 = time.time()
print("Time estimated ", t2-t1)
print(Loki)

Func 1 says  1
Func 2 says  3
Func 2 says  5
Func 1 says  495
Func 2 says  497
Func 2 says  512
Func 2 says  996
Func 2 says  1000
Func 1 says  1017
Func 2 says  1479
Func 1 says  1507
Func 2 says  1527
Func 2 says  1980
Func 2 says  2008
Func 1 says  2019
Func 2 says  2449
Func 2 says  2516
Func 1 says  2575
Func 2 says  2929
Func 1 says  3029
Func 2 says  3049
Func 2 says  3456
Func 1 says  3507
Func 2 says  3556
Func 2 says  3931
Func 1 says  4039
Func 2 says  4053
Func 2 says  4461
Func 2 says  4514
Func 1 says  4553
Time estimated  5.13910174369812
5000


In [6]:
t1 = time.time()
Loki = 0

def func1():
    var1 = 0
    for i in range(1000):
        var1 += 1
        if (i % 100) == 0:
            print("Func 1 says ", var1)
        time.sleep(random.random()/100) # засыпаем на случайный промежуток времени.
            
def func2():
    var2 = 0
    for i in range(1000):
        var2 += 2
        if (i % 100) == 0:
            print("Func 2 says ", var2)
        time.sleep(random.random()/100) # засыпаем на случайный промежуток времени.
            
# создаем, а потом запускаем потоки.
pr1=threading.Thread(target=func1)
pr2=threading.Thread(target=func2)
pr3=threading.Thread(target=func2)
pr1.start()
pr2.start()
pr3.start()

pr1.join()
pr2.join()
pr3.join()

t2 = time.time()
print("Time estimated ", t2-t1)
print(Loki)

Func 1 says  1
Func 2 says  2
Func 2 says  2
Func 2 says  202
Func 2 says  202
Func 1 says  101
Func 2 says  402
Func 1 says  201
Func 2 says  402
Func 2 says  602
Func 2 says  602
Func 1 says  301
Func 2 says  802
Func 2 says  802
Func 1 says  401
Func 2 says  1002
Func 2 says  1002
Func 1 says  501
Func 2 says  1202
Func 2 says  1202
Func 1 says  601
Func 2 says  1402
Func 2 says  1402
Func 1 says  701
Func 2 says  1602
Func 1 says  801
Func 2 says  1602
Func 2 says  1802
Func 2 says  1802
Func 1 says  901
Time estimated  5.212342977523804
0


Когда-то давно, такой код обязательно вызвал бы исключительную ситуацию. Но сейчас GIL сам блокирует нужные переменные, не позволяя разным процессам обращаться к ним одновременно. В итоге всё работает именно так, как хотелось бы.

Помимо этого, обратите внимание, что время выполнения в обоих случаях примерно одинаково. Это связано с тем, что большую часть времени поток дремлет. Подобное поведение характерно для потоков, которые ожидают внешних результатов. Однако если создать высоконагруженный поток, станет очевидно, что потоки являются "легкими потоками", то есть не выполняются параллельно, а синхронизируют свою работу средствами Питона. При этом процессы выполняются в самом деле параллельно.

In [7]:
import numpy as np


In [8]:
%%time

m_size = 200

def mult_matrix():
    print("started")
    m1 = np.random.randn(m_size, m_size)
    m2 = np.random.randn(m_size, m_size)
    m3 = np.zeros((m_size, m_size))
    for i in range(m_size):
        for j in range(m_size):
            s = 0
            for k in range(m_size):
                s += m1[i, k] * m2[k, j]
            m3[i, j] = s
        
    print("finished")
    
mult_matrix()

started
finished
CPU times: user 1.77 s, sys: 0 ns, total: 1.77 s
Wall time: 1.77 s


In [9]:
t1 = time.time()
            
# создаем, а потом запускаем потоки.
pr1=threading.Thread(target=mult_matrix)
pr2=threading.Thread(target=mult_matrix)
pr3=threading.Thread(target=mult_matrix)
pr1.start()
pr2.start()
pr3.start()

pr1.join()
pr2.join()
pr3.join()

t2 = time.time()
print("Time estimated ", t2-t1)

started
started
started
finished
finished
finished
Time estimated  5.6652586460113525


In [10]:
t1 = time.time()

# создаем, а потом запускаем потоки.
pr1=multiprocessing.Process(target=mult_matrix)
pr2=multiprocessing.Process(target=mult_matrix)
pr3=multiprocessing.Process(target=mult_matrix)
pr1.start()
pr2.start()
pr3.start()

pr1.join()
pr2.join()
pr3.join()
t2 = time.time()
print("Time estimated", t2-t1)

started
started
started
finished
finished
finished
Time estimated 1.9312376976013184


При помощи объекта типа `Lock` можно блокировать выполнение критических фрагментов кода. Например, если мы пишем в файл в нескольких потоках, мы должны гарантировать, что мы запишем свой фрагмент данных от начала и до конца. При этом другой поток не прервет наш вывод и не выведет свою информацию.

In [11]:
lk = threading.Lock()

Loki = 0

def func1():
    global Loki
    for i in range(1000):
        lk.acquire()
        Loki += 1
        lk.release()
        if (i % 100) == 0:
            print("Func 1 says ", Loki)
        time.sleep(random.random()/100) # засыпаем на случайный промежуток времени.
            
def func2():
    global Loki
    for i in range(1000):
        lk.acquire()
        Loki += 2
        lk.release()
        if (i % 100) == 0:
            print("Func 2 says ", Loki)
        time.sleep(random.random()/100) # засыпаем на случайный промежуток времени.
            
# создаем, а потом запускаем потоки.
pr1=threading.Thread(target=func1)
pr2=threading.Thread(target=func2)
pr1.start()
pr2.start()

Func 1 says  1
Func 2 says  3
Func 2 says  298
Func 1 says  311
Func 2 says  589
Func 1 says  621
Func 2 says  886
Func 1 says  937
Func 2 says  1170
Func 1 says  1257
Func 2 says  1474
Func 1 says  1559
Func 2 says  1773
Func 1 says  1861
Func 2 says  2074
Func 1 says  2151
Func 2 says  2371
Func 1 says  2461
Func 2 says  2659
Func 1 says  2807


<b> Внимание!!!</b> Вызов двух блокировок подряд вызовет полную блокировку процесса, а также всех процессов, зависящих от данной блокировки.

Также для синхронизации работы потоков могут использоваться события и семафоры.

Событие может быть установлено или сброшено. Если поток пытается установить установленное или сбросить сброшенное событие, он блокируется (но может быть разблокирован по тайм-ауту).

Семафор фактически хранит информацию о нескольких событиях, то есть у семафора есть некоторый объем. Процесс блокируется при запросе ресурса из семафрра, только если тот уже был запрошен n раз и ресурс исчерпан.



In [12]:
# создадим очередь
cntr = 0
eve1 = threading.Event() # Разрешает читать читателю.
eve2 = threading.Event() # Читатель прочитал, писатель может писать.

# У нас будет два потока. 
# Writer будет писать даные в очередь через случайные промежутки времени. 
def writer():
    global cntr
    while cntr<10: #пишем 10 чисел
        res = eve2.wait() # Всё прочитано, можно начинать писать.
        eve2.clear() # Я понял, начинаю писать.
        # Делаем какие-то долгие операции по генерации данных.
        cntr+=1
        print('wrote '+str(cntr)) # отчитываемся о записи.
        eve1.set() # Данные готовы, можно читать.
        time.sleep(random.random()) # засыпаем на случайный промежуток времени.

# Reader в параллельном потоке читает из очереди через случайные промежутки времени.
def reader():
    while cntr<10: # пока прочитанное число меньше 10...
        res = eve1.wait() # А не готовы ли данные для чтения?
        if res:
            print('read '+str(cntr)) # Отчитываемся.
        else:
            print('read failed')
        eve1.clear() # Спасибо, я всё прочёл.
        eve2.set() # Можно начинать писать.

eve2.set() # Вначале писать может писать, а читателю нечего читать.
eve1.clear()
# создаем, а потом запускаем потоки.
pr1=threading.Thread(target=writer)
pr2=threading.Thread(target=reader)
pr1.start()
pr2.start()

# Видно, как потоки работают параллельно, правда?

wrote 1
read 1
wrote 2
read 2
wrote 3
read 3
wrote 4
read 4
wrote 5
read 5
wrote 6
read 6
wrote 7
read 7
wrote 8
read 8
wrote 9
read 9
wrote 10
read 10


Попробуем организовать обмен данными при помощи `multiprocessing.queue`.
Данный класс позволяет обмениваться информацией между потоками. При этом если поток-поставщик данных работает быстрее, то данные будут "складироваться" в очереди, пока потребитель их не заберет.
Размер очереди может быть ограничен. В этом случае операция "положить в очередь" заблокирует поток, если очередь заполнена.
Если очередь пуста, поток, запросивший данные также будет заблокирован при запросе данных. 
Для избежания блокировок можно использовать timeout (через какое время поток будет разблокирован и фцнкция сообщит о неуспехе).


In [13]:
# создадим очередь
cntr=0
exch=multiprocessing.Queue(5)
random.seed()

# У нас будет два потока. 
# Writer будет писать даные в очередь через случайные промежутки времени. 
def writer():
    global cntr
    while cntr<10: #пишем 10 чисел
        exch.put(cntr) # собственно пишем в очередь
        print('wrote '+str(cntr)) # отчитываемся о записи.
        cntr+=1
        time.sleep(random.random()) # засыпаем на случайный промежуток времени.
    exch.put(11) # конец данных.

# Reader в параллельном потоке читает из очереди через случайные промежутки времени.
def reader():
    i=-1
    while i<10: # пока прочитанное число меньше 10...
        i=exch.get() # Собственно читаем из очереди.
        print('read '+str(i)) # Отчитываемся.
        time.sleep(random.random()) # спим

# создаем, а потом запускаем потоки.
pr1=multiprocessing.Process(target=writer)
pr2=multiprocessing.Process(target=reader)
#pr1=threading.Thread(target=writer)
#pr2=threading.Thread(target=reader)
pr1.start()
pr2.start()

# Видно, как потоки работают параллельно, правда?

wrote 0
read 0
wrote 1
read 1
wrote 2
wrote 3
read 2
read 3
wrote 4
read 4
wrote 5
wrote 6
read 5
wrote 7
read 6
read 7
wrote 8read 8

wrote 9read 9

read 11


Теперь попробуем решить следующую задачу.

Необходимо получать новости из нескольких источников. Далее каждая новость токенизируется в отдельном потоке, лемматизируется в еще одном потоке. В конце идет поток, который обрабатывает полученные данные.
Новости также получаются в отдельных потоках.

Для передачи данных используем очередь. 

Для сигнализации завершения работы потоков используем события и семафоры.

Использование очереди, событий и семафоров позволяет синхронизировать операции в потоках. Токенизатор начнет разбор только после того, как очередь получит данные от одного из загрузчиков новостей. Результат будет помещен в другую очередь. Токенизатор "уснет" до тех пор, пока не придет еще одна новость, зато "проснется" лемматизатор.

In [None]:
from lxml import html

In [33]:
# !!!===--- ВНИМАНИЕ!!! ---===!!!
# Код в данной ячейке работает некорректно, так как завершение процессов вызывает взаимную  блокировку!
# Корректный вариант синхронизации показан ниже.


# Функция для загрузки одной новости из Ленты.ру
def getLentaArticle(url):
    """ getLentaArticle gets the body of the article from Lenta.ru"""
    page = requests.get(url)
    tree = html.fromstring(page.text)
    text = '\n'.join([p.text_content() for p in tree.xpath(".//p[contains(@class,'topic-body__content-text')]")])
    
    return text

# Функция загрузки одной новости из N+1.
def getArticleTextNPlus1(adr):
    r = requests.get(adr)
    n_text = re.split("</div>", re.split("</figure>", re.split('</article>',re.split('<article', r.text)[1])[0])[1])[1]    
    return BeautifulSoup(n_text, "lxml").get_text()

# Загрузка новостей из Ленты.ру за некоторый период.
def getLenta(qu, sem):
    curdate = datetime.date(2017, 1, 16)
    findate = datetime.date(2017, 1, 16)
    res = ""

# Загружаем новости до конечной даты.
    while curdate <= findate:
        print('lenta ' + curdate.strftime('%Y/%m/%d'))
        day = requests.get('https://lenta.ru/news/' + curdate.strftime('%Y/%m/%d'))
        body = re.findall('<h3>(.+?)</h3>', day.text)
        links = ['https://lenta.ru' + re.findall('"(.+?)"', x)[0] for x in body]
        for l in links: # идем по всем ссылкам на новости за день.
            qu.put(getLentaArticle(l)) # Полученную новость кладем в очередь.
            time.sleep(0.2)
        curdate += datetime.timedelta(days=1)
    sem.acquire() # Блокируем семафор один раз, по
    print("lenta finished")

# Получаем новости с NPlus1 за заданный промежуток времени, кладем тексты новостей в очередь qu.
# По завершении взводим семафор sem.
def getNplus1(qu, sem):
    curdate = datetime.date(2015, 12, 15)
    findate = datetime.date(2015, 12, 19)

    while curdate <= findate: # Перебираем все дни.
        r = requests.get('https://nplus1.ru/news/'+curdate.strftime('%Y/%m/%d'))
        print('nplus ' + curdate.strftime('%Y/%m/%d'))
        # Берем заголовки и ссылки на новости за этот день.
        refs = [re.split('"', t)[6] for t in re.split('<article class="item item-news item-news', r.text)[1:]]
        for t in refs:
            qu.put(getArticleTextNPlus1("https://nplus1.ru" + t)) # Получаем текст статьи и отправляем в очередь.
            time.sleep(0.2) # Мы этичные хакеры и не стремимся к DDoS.
        curdate += datetime.timedelta(days=1)
    sem.acquire() # Взводим семафор.
    print("nplus1 finished")
    
# Функция токенизирует вход из очереди qu1 и кладет результаты токенизации в очередь qu2.
# Токенизация ведется до тех пор, пока семафор semw не будет взведен максимальное количество раз.
# По завершении токенизации устанавливаем событие evs.
def tokenize(qu1, qu2, semw, evs):  
    c = 0
    # Пытаемся взвести семафор. Если не взведется, значит все потоки завершились (взведением семафора).
    while semw.acquire(False): 
        txt = qu1.get() # Получаем данные из очереди.
        semw.release() # Отпускаем семафор (мы же взвели его в while).
        qu2.put(re.findall("[А-Яа-я]+(-[А-Яа-я]+)?", txt)) # Очень простая токенизация кладет результаты в очередь.
        c += 1
        print('token '+str(c))
    print("tokenization finished")
    evs.set() # Сообщаем о завершении работы установкой события.
    print("tokenization finished")
""" Вот здесь-то и происходит блокировка.
    while semw.acquire(False): блокирует семафор для потоков, поставляющих новости. В самом конце последний 
    поток с новостями заканчивает свою работу и пытается завершиться. Но семафор уже заполнен, и поток переводится
    в состояние ожидания. При этом поток токенизации ждет получения данных txt=qu1.get(), но так как данные не 
    больше поступают, то он тоже блокируется. 
    Один поток ждет данных, чтобы продолжить работу и разблокировать семафор, другой заблокирован семафором и 
    больше не хочет отправлять данные. Клинч! Хорошо хоть, что в конце работы, но такое могло произойти и в середине.
    
    Через одну ячейку приведен код, который исправляет эту ситуацию.
"""

# Поток лемматизации текстов. Токены берутся из очереди qu1, результаты лемматизации кладутся в qu2.
# Так как новости приходят по одной в токенизацию, скорее всего токены будут идти подряд. Но если вдруг у нас появится
# еще один поток для токенизации, они начнут помещаться в очередь вперемешку. Так что такая технология подходит
# только если нас не волнуется порядок слов.
# Данный заканчиваются, когда будет установлено событие evw, сообщаем о своем завершении при помощи события evs.
def lemmatize(qu1, qu2, evw, evs):
    morpho = pymorphy2.MorphAnalyzer() # Создаем морфоанализатор.
    l = []
    c = 0
    while not evw.is_set(): # Если событие установлено - пора завершать работу.
        txt = qu1.get()
        s = []
        for w in txt:
            s += morpho.parse(w)[0]
        qu2.put(s) # Анализируем, кладем результат в очередь.
        c += 1
        print('lemma ' + str(c))
    evs.set() # ЗАвершаем работу.
    print("lemmatization finished")

# Функция имитирует, что она обрабатывает данные из очереди qu. Заершает работу по событию ev.    
def utilize(qu, ev):
    c = 0
    while not ev.is_set(): # Если событие установлено - пора завершать работу.
        t = qu.get()
        #processing
        c += 1
        print('process ' + str(c))
    print('processing finished')

# не надо тормозить при получении данных из очереди,
# надо поставить ожидание если очередь пуста.
# то, что есть - хороший пример на сложности синхронизации. еще очередь поменьше сделай.


In [34]:
# Два потока новостей.
newswirenumber = 2

# Создаем очереди, семафор и события для синхронизации.
textsq = multiprocessing.Queue(100)
tokenq = multiprocessing.Queue(100)
lemmaq = multiprocessing.Queue(100)
newss = multiprocessing.Semaphore(newswirenumber)
tokene = multiprocessing.Event()
lemmae = multiprocessing.Event()

# Запускаем процессы.
lentap = multiprocessing.Process(target=getLenta, args=(textsq, newss,))
nplusp = multiprocessing.Process(target=getNplus1, args=(textsq, newss,))  
tokenp = multiprocessing.Process(target=tokenize, args=(textsq, tokenq, newss, tokene, ))   
lemmap = multiprocessing.Process(target=lemmatize, args=(tokenq, lemmaq, tokene, lemmae, ))   
processp = multiprocessing.Process(target=utilize, args=(lemmaq, lemmae, ))    
    
# Стартуем процессы    
lentap.start()
nplusp.start()
tokenp.start()
lemmap.start()
processp.start()

# Если надо - ждем пока процессы не завершатся.
#lentap.join()
#nplusp.join()
#tokenp.join()
#lemmap.join()
#processp.join()
#print("Everything is allright")                               
                               

lenta 2017/01/16
nplus 2015/12/15
lenta finished
token 1
lemma 1


Process Process-26:
Traceback (most recent call last):
  File "/usr/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_493103/1823877807.py", line 108, in utilize
    t = qu.get()
  File "/usr/lib/python3.9/multiprocessing/queues.py", line 122, in get
    return _ForkingPickler.loads(res)
  File "/home/edward/.local/lib/python3.9/site-packages/pymorphy2/tagset.py", line 278, in __init__
    self._assert_grammemes_are_known(set(grammemes_tuple))
  File "/home/edward/.local/lib/python3.9/site-packages/pymorphy2/tagset.py", line 394, in _assert_grammemes_are_known
    cls._assert_grammemes_initialized()
  File "/home/edward/.local/lib/python3.9/site-packages/pymorphy2/tagset.py", line 403, in _assert_grammemes_initialized
    raise RuntimeError(msg)
RuntimeError: The class was not properly initialized.


token 2
lemma 2
token 3
lemma 3
token 4
lemma 4
token 5
lemma 5
token 6
lemma 6
token 7
lemma 7
token 8
lemma 8
token 9
lemma 9
token 10
lemma 10
token 11
lemma 11
token 12
lemma 12
token 13
lemma 13
token 14
lemma 14
token 15
lemma 15
token 16
lemma 16
token 17
lemma 17
token 18
lemma 18
token 19
lemma 19
token 20
lemma 20
token 21
lemma 21
token 22
lemma 22
token 23
lemma 23
nplus 2015/12/16
token 24
lemma 24
token 25
lemma 25
token 26
lemma 26
token 27
lemma 27
token 28
lemma 28
token 29
lemma 29
token 30
lemma 30
token 31
lemma 31
token 32
lemma 32
token 33
lemma 33
token 34
lemma 34
token 35
lemma 35
token 36
lemma 36
token 37
lemma 37
nplus 2015/12/17
token 38
lemma 38
token 39
lemma 39
token 40
lemma 40
token 41
lemma 41
token 42
lemma 42
token 43
lemma 43
token 44
lemma 44
token 45
lemma 45
token 46
lemma 46
token 47
lemma 47
token 48
lemma 48
token 49
lemma 49
token 50
lemma 50
token 51
lemma 51
token 52
lemma 52
token 53
lemma 53
token 54
lemma 54
token 55
lemma 55
token 56
l

In [35]:
# Это код работает более корректно, чем тот, что приведен выше.
# Большая часть комментариев опущена, оставлены только те, что помогают понять изменения в логике.

def getLentaArticle(url):
    # Вместо того, что вызывать Ктулху тем, что мы разбираем XML при помощи регулярок,
    # давайте будем использовать мощь BeautifulSoup.
    page = requests.get(url)
    tree = html.fromstring(page.text)
    text = '\n'.join([p.text_content() for p in tree.xpath(".//p[contains(@class,'topic-body__content-text')]")])
    
    return text

def getArticleTextNPlus1(adr):
    r = requests.get(adr)
    n_text = re.split("</div>", re.split("</figure>", re.split('</article>',re.split('<article', r.text)[1])[0])[1])[1]    
    return BeautifulSoup(n_text, "lxml").get_text()

def getLenta(qu, sem):
    curdate = datetime.date(2017, 1, 16)
    findate = datetime.date(2017, 1, 16)
    res=""

    while curdate <= findate:
        print('lenta ' + curdate.strftime('%Y/%m/%d'))
        day = requests.get('https://lenta.ru/news/'+curdate.strftime('%Y/%m/%d'))
        body = re.findall('<h3>(.+?)</h3>', day.text)
        links = ['https://lenta.ru'+re.findall('"(.+?)"', x)[0] for x in body]
        for l in links[:50]:
            qu.put(getLentaArticle(l))
            time.sleep(0.2)
        curdate += datetime.timedelta(days=1)
    sem.acquire()
    print("lenta finished")

def getNplus1(qu, sem):
    curdate = datetime.date(2015, 12, 15)
    findate = datetime.date(2015, 12, 18)

    while curdate <= findate:
        r = requests.get('https://nplus1.ru/news/'+curdate.strftime('%Y/%m/%d'))
        print('nplus ' + curdate.strftime('%Y/%m/%d'))
        refs=[re.split('"', t)[6] for t in re.split('<article class="item item-news item-news', r.text)[1:]]
        for t in refs:
            qu.put(getArticleTextNPlus1("https://nplus1.ru"+t))
            time.sleep(0.2)
        curdate += datetime.timedelta(days=1)
    sem.acquire()
    print("nplus1 finished")
    
# На входе - те же самые очереди, семафор и событие, но используем мы их по-другому.
def tokenize(qu1, qu2, semw, evs):  
    c = 0
    while semw.acquire(False): # Так же пытаемся получить семафор.
        try:
            semw.release() # ! Первым делом отпускаем его, чтобы никого не держать больше.
            txt = qu1.get(timeout=0.1) # Если что-то пойдет не так, нас отпустят через 0,1 секунды.
        except queue.Empty: # Если произошел выход по таймауту, генерируется исключение.
            #print('tokenization waits')
            pass # Данных нет, пойдем посмотрим, может вообще завершаться пора?
        else:
            qu2.put(re.findall("[А-Яа-я]+(-[А-Яа-я]+)?", txt)) # Данные есть - токенизируем и кладем в очередь.
            c += 1
            print('token '+str(c))

    print("tokenization ")
    evs.set()
    print("finished")

def lemmatize(qu1, qu2, evw, evs):
    morpho = pymorphy2.MorphAnalyzer()
    l = []
    c = 0
    while not evw.is_set():
        try:
            txt = qu1.get(timeout=0.1) # Поддерживаем логику таймаута везде на всякий случай.
        except queue.Empty:
            pass
        else:
            s = []
            for w in s:
                s += morpho.parse(w)[0]
            qu2.put(s)
            c += 1
            print('lemma '+str(c))
    evs.set()
    print("lemmatization finished")

def utilize(qu, ev):
    c = 0
    while not ev.is_set(): # Поддерживаем логику таймаута везде на всякий случай.
        try:
            t = qu.get(timeout=0.1)
        except queue.Empty:
            pass
        else:
        #processing
            c += 1
            print('process '+str(c))
    print('processing finished')



In [36]:
# Два потока новостей.
newswirenumber = 2

# Создаем очереди, семафор и события для синхронизации.
textsq = multiprocessing.Queue(100)
tokenq = multiprocessing.Queue(100)
lemmaq = multiprocessing.Queue(100)
newss = multiprocessing.Semaphore(newswirenumber)
tokene = multiprocessing.Event()
lemmae = multiprocessing.Event()

# Запускаем процессы.
lentap = multiprocessing.Process(target=getLenta, args=(textsq, newss,))
nplusp = multiprocessing.Process(target=getNplus1, args=(textsq, newss,))  
tokenp = multiprocessing.Process(target=tokenize, args=(textsq, tokenq, newss, tokene, ))   
lemmap = multiprocessing.Process(target=lemmatize, args=(tokenq, lemmaq, tokene, lemmae, ))   
processp = multiprocessing.Process(target=utilize, args=(lemmaq, lemmae, ))    
    
# Стартуем процессы    
lentap.start()
nplusp.start()
tokenp.start()
lemmap.start()
processp.start()

# Если надо - ждем пока процессы не завершатся.
#lentap.join()
#nplusp.join()
#tokenp.join()
#lemmap.join()
#processp.join()
#print("Everything is allright")                               
                               

lenta 2017/01/16
nplus 2015/12/15
lenta finished
token 1
lemma 1
process 1
token 2lemma 2
process 2

token 3lemma 3process 3


token 4lemma 4
process 4

token 5lemma 5
process 5

token 6lemma 6
process 6

token 7lemma 7
process 7

token 8lemma 8
process 8

token 9lemma 9
process 9

token 10lemma 10
process 10

token 11lemma 11
process 11

token 12lemma 12
process 12

token 13lemma 13
process 13

token 14lemma 14

process 14
token 15lemma 15
process 15

token 16lemma 16
process 16

token 17lemma 17
process 17

token 18lemma 18

process 18
token 19lemma 19

process 19
token 20lemma 20
process 20

token 21lemma 21
process 21

token 22lemma 22
process 22

token 23lemma 23
process 23

nplus 2015/12/16
token 24lemma 24
process 24

token 25lemma 25
process 25

token 26lemma 26
process 26

token 27lemma 27
process 27

token 28lemma 28
process 28

token 29lemma 29
process 29

token 30lemma 30
process 30

token 31lemma 31
process 31

token 32lemma 32
process 32

token 33lemma 33
process 33

toke

In [2]:
from pymorphy2 import MorphAnalyzer

In [4]:
%%time

morpho = MorphAnalyzer()
total_len = 0

with open("data/war_and_peace.txt") as infile:
    for line in infile:
        words = [w[0] for w in re.findall("([А-Яа-я]+(-[А-Яа-я]+)*)", line)]
        for word in words:
            res = morpho.parse(word)
            total_len += len(res)
print(total_len)

1920753
CPU times: user 5.51 s, sys: 17.6 ms, total: 5.53 s
Wall time: 5.53 s


In [4]:
%%time

morpho = MorphAnalyzer()

with open("data/war_and_peace.txt") as infile:
    for line in infile:
        pass

CPU times: user 62.4 ms, sys: 8.03 ms, total: 70.5 ms
Wall time: 69.3 ms


In [5]:
all_lines = []
with open("data/war_and_peace.txt") as infile:
    for line in infile:
        all_lines.append(line)

In [6]:
%%time

total_len = 0

for line in all_lines:
    words = [w[0] for w in re.findall("([А-Яа-я]+(-[А-Яа-я]+)*)", line)]
    for word in words:
        res = morpho.parse(word)
        total_len += len(res)
        
print(total_len)

1920753
CPU times: user 7.13 s, sys: 0 ns, total: 7.13 s
Wall time: 7.13 s


In [12]:
%%time

def read_lines(qu, event):
    cntr = 0
    buffer = []
    with open("data/war_and_peace.txt") as infile:
        for line in infile:
            buffer.append(line)
            if len(buffer) == 100:
                qu.put(buffer)
                buffer = []
    qu.put(buffer)
    event.set()
    
def tag_words(qu, event):
    total_len = 0
    morpho = MorphAnalyzer()

    while not event.is_set() or not qu.empty():
        try:
            lines = qu.get(timeout=0.1) # Поддерживаем логику таймаута везде на всякий случай.
        except queue.Empty:
            pass
        else:
            for line in lines:
                words = [w[0] for w in re.findall("([А-Яа-я]+(-[А-Яа-я]+)*)", line)]
                for word in words:
                    res = morpho.parse(word)        
                    total_len += len(res)
    print(total_len)

wordsq = multiprocessing.Queue(100)
wordse = multiprocessing.Event()
readp = multiprocessing.Process(target=read_lines, args=(wordsq, wordse,))
tagp = multiprocessing.Process(target=tag_words, args=(wordsq, wordse,))  

readp.start()
tagp.start()

readp.join()
tagp.join()


1920753
CPU times: user 0 ns, sys: 7.78 ms, total: 7.78 ms
Wall time: 5.66 s


Хорошее описание объектов Питона для параллельного программирования - [здесь](https://devpractice.ru/python-lesson-23-concurrency-part-2/).