In [46]:
# Подключаем нужные библиотеки
import multiprocessing 

import time
import datetime
import re
import requests
from bs4 import BeautifulSoup

import pymorphy2

import queue
import random


Для начала просто попробуем как работает multithreading.queue .
Данный класс позволяет обмениваться информацией между потоками. При этом если поток-поставщик данных работает быстрее, то данные будут "складироваться" в очереди, пока потребитель их не заберет.
Размер очереди может быть ограничен. В этом случае операция "положить в очередь" заблокирует поток, если очередь заполнена.
Если очередь пуста, поток, запросивший данные также будет заблокирован при запросе данных. 
Для избежания блокировок можно использовать timeout (через какое время поток будет разблокирован и фцнкция сообщит о неуспехе).


In [47]:
# создадим очередь
cntr=0
exch=multiprocessing.Queue(5)
random.seed()


In [48]:
# У нас будет два потока. 
# Writer будет писать даные в очередь через случайные промежутки времени. 
def writer():
  global cntr
  while cntr<10: #пишем 10 чисел
    exch.put(cntr) # собственно пишем в очередь
    print('wrote '+str(cntr)) # отчитываемся о записи.
    cntr+=1
    time.sleep(random.random()) # засыпаем на случайный промежуток времени.
  exch.put(11) # конец данных.
  
# Reader в параллельном потоке читает из очереди через случайные промежутки времени.
def reader():
  i=-1
  while i<10: # пока прочитанное число меньше 10...
    i=exch.get() # Собственно читаем из очереди.
    print('read '+str(i)) # Отчитываемся.
    time.sleep(random.random()) # спмм
  
# создаем, а потом запускаем потоки.
pr1=multiprocessing.Process(target=writer)
pr2=multiprocessing.Process(target=reader)
pr1.start()
pr2.start()

# Видно, как потоки работают параллельно, правда?

wrote 0
read 0
wrote 1
read 1
wrote 2
read 2
wrote 3
read 3
wrote 4
wrote 5
read 4
wrote 6
read 5
wrote 7
read 6
wrote 8
read 7
read 8
wrote 9
read 9
read 11


Теперь попробуем решить следующую задачу.
Необходимо получать новости из нескольких источников. Далее каждая новость токенизируется в отдельном потоке, лемматизируется в еще одном потоке. В конце идет поток, который обрабатывает полученные данные.
Новости также получаются в отдельных потоках.

Для передачи данных используем очередь. 
Для сигнализации завершения работы потоков используем события и семафоры.
Событие может быть установлено или сброшено. Если поток пытается установить установленное или сбросить сброшенное событие, он блокируется (но может быть разблокирован по тайм-ауту).
Семафор фактически хранит информацию о нескольких событиях, то есть у семафора есть некоторый объем. Процесс блокируется при запросе ресурса из семафрра, только если тот уже был запрошен n раз и ресурс исчерпан.

Использование очереди, событий и семафоров позволяет синхронизировать операции в потоках. Токенизатор начнет разбор только после того, как очередь получит данные от одного из загрузчиков новостей. Результат будет помещен в другую очередь. Токенизатор "уснет" до тех пор, пока не придет еще одна новость, зато "проснется" лемматизатор.

In [41]:
# !!!===--- ВНИМАНИЕ!!! ---===!!!
# Код в данной ячейке работает некорректно, так как завершение процессов вызывает взаимную  блокировку!
# Корректный вариант синхронизации показан ниже.


# Функция для загрузки одной новости из Ленты.ру
def getLentaArticle(url):
 """ getLentaArticle gets the body of the article from Lenta.ru"""
 r = requests.get(url)
 body=re.findall('<div class="b-text clearfix js-topic__text" itemprop="articleBody">(.+?)</p></div></div><section class="b-topic__socials js-topic__socials">', r.text)
 if len(body)>0:
  return BeautifulSoup(body[0], "lxml").get_text()
 else:
    return "  "

# Функция загрузки одной новости из N+1.
def getArticleTextNPlus1(adr):
 r = requests.get(adr)
 n_text=re.split("</div>", re.split("</figure>", re.split('</article>',re.split('<article', r.text)[1])[0])[1])[1]    
 return BeautifulSoup(n_text, "lxml").get_text()

# Загрузка новостей из Ленты.ру за некоторый период.
def getLenta(qu, sem):
 curdate=datetime.date(2017, 1, 16)
 findate=datetime.date(2017, 1, 16)
 res=""

# Загружаем новости до конечной даты.
 while curdate<=findate:
  print('lenta '+curdate.strftime('%Y/%m/%d'))
  day = requests.get('https://lenta.ru/news/'+curdate.strftime('%Y/%m/%d'))
  body=re.findall('<h3>(.+?)</h3>', day.text)
  links=['https://lenta.ru'+re.findall('"(.+?)"', x)[0] for x in body]
  for l in links: # идем по всем ссылкам на новости за день.
    qu.put(getLentaArticle(l)) # Полученную новость кладем в очередь.
    time.sleep(0.2)
  curdate+=datetime.timedelta(days=1)
 sem.acquire() # Блокируем семафор один раз, по
 print("lenta finished")

# Получаем новости с NPlus1 за заданный промежуток времени, кладем тексты новостей в очередь qu.
# По завершении взводим семафор sem.
def getNplus1(qu, sem):
 curdate=datetime.date(2015, 12, 15)
 findate=datetime.date(2015, 12, 19)

 while curdate<=findate: # Перебираем все дни.
  r = requests.get('https://nplus1.ru/news/'+curdate.strftime('%Y/%m/%d'))
  print('nplus '+curdate.strftime('%Y/%m/%d'))
  # Берем заголовки и ссылки на новости за этот день.
  refs=[re.split('"', t)[6] for t in re.split('<article class="item item-news item-news', r.text)[1:]]
  for t in refs:
   qu.put(getArticleTextNPlus1("https://nplus1.ru"+t)) # Получаем текст статьи и отправляем в очередь.
   time.sleep(0.2) # Мы этичные хакеры и не стремимся к DDoS.
  curdate+=datetime.timedelta(days=1)
 sem.acquire() # Взводим семафор.
 print("nplus1 finished")
    
# Функция токенизирует вход из очереди qu1 и кладет результаты токенизации в очередь qu2.
# Токенизация ведется до тех пор, пока семафор semw не будет взведен максимальное количество раз.
# По завершении токенизации устанавливаем событие evs.
def tokenize(qu1, qu2, semw, evs):  
    c=0
    # Пытаемся взвести семафор. Если не взведется, значит все потоки завершились (взведением семафора).
    while semw.acquire(False): 
        txt=qu1.get() # Получаем данные из очереди.
        semw.release() # Отпускаем семафор (мы же взвели его в while).
        qu2.put(re.findall("[А-Яа-я]+(-[А-Яа-я]+)?", txt)) # ОЧень простая токенизация кладет результаты в очередь.
        c+=1
        print('token '+str(c))
    print("tokenization finished")
    evs.set() # Сообщаем о завершении работы установкой события.
    print("tokenization finished")
""" Вот здесь-то и происходит блокировка.
    while semw.acquire(False): блокирует семафор для потоков, поставляющих новости. В самом конце последний 
    поток с новостями заканчивает свою работу и пытается завершиться. Но семафор уже заполнен, и поток переводится
    в состояние ожидания. При этом поток токенизации ждет получения данных txt=qu1.get(), но так как данные не 
    больше поступают, то он тоже блокируется. 
    Один поток ждет данных, чтобы продолжить работу и разблокировать семафор, другой заблокирован семафором и 
    больше не хочет отправлять данные. Клинч! Хорошо хоть, что в конце работы, но такое могло произойти и в середине.
    
    Ниже приведен код, который исправляет эту ситуацию.
"""

# Поток лемматизации текстов. Токены берутся из очереди qu1, результаты лемматизации кладутся в qu2.
# Так как новости приходят по одной в токенизацию, скорее всего токены будут идти подряд. Но если вдруг у нас появится
# еще один поток для токенизации, они начнут помещаться в очередь вперемешку. Так что такая технология подходит
# только если нас не волнуется порядок слов.
# Данный заканчиваются, когда будет установлено событие evw, сообщаем о своем завершении при помощи события evs.
def lemmatize(qu1, qu2, evw, evs):
    morpho=pymorphy2.MorphAnalyzer() # Создаем морфоанализатор.
    l=[]
    c=0
    while not evw.is_set(): # Если событие установлено - пора завершать работу.
        txt=qu1.get()
        s=[]
        for w in txt:
            s+=morpho.parse(w)[0]
        qu2.put(s) # Анализиируем, кладем результат в очередь.
        c+=1
        print('lemma '+str(c))
    evs.set() # ЗАвершаем работу.
    print("lemmatization finished")

# Функция имитирует, что она обрабатывает данные из очереди qu. Заершает работу по событию ev.    
def utilize(qu, ev):
    c=0
    while not ev.is_set(): # Если событие установлено - пора завершать работу.
        t=qu.get()
        #processing
        c+=1
        print('process '+str(c))
    print('processing finished')

# не надо тормозить при получении данных из очереди,
# надо поставить ожидание если очередь пуста.
# то, что есть - хороший пример на сложности синхронизации. еще очередь поменьше сделай.


In [44]:
# Два потока новостей.
newswirenumber=2

# Создаем очереди, семафор и события для синхронизации.
textsq=multiprocessing.Queue(100)
tokenq=multiprocessing.Queue(100)
lemmaq=multiprocessing.Queue(100)
newss=multiprocessing.Semaphore(newswirenumber)
tokene=multiprocessing.Event()
lemmae=multiprocessing.Event()

# Запускаем процессы.
lentap=multiprocessing.Process(target=getLenta, args=(textsq, newss,))
nplusp=multiprocessing.Process(target=getNplus1, args=(textsq, newss,))  
tokenp=multiprocessing.Process(target=tokenize, args=(textsq, tokenq, newss, tokene, ))   
lemmap=multiprocessing.Process(target=lemmatize, args=(tokenq, lemmaq, tokene, lemmae, ))   
processp=multiprocessing.Process(target=utilize, args=(lemmaq, lemmae, ))    
    
# Стартуем процессы    
lentap.start()
nplusp.start()
tokenp.start()
lemmap.start()
processp.start()

# Если надо - ждем пока процессы не завершатся.
#lentap.join()
#nplusp.join()
#tokenp.join()
#lemmap.join()
#processp.join()
#print("Everything is allright")                               
                               

lenta 2017/01/16
nplus 2015/12/15
token 1
lemma 1
process 1
token 2
lemma 2
process 2
token 3
lemma 3
process 3
token 4
lemma 4
process 4
token 5
lemma 5
process 5
token 6
lemma 6
process 6
token 7
lemma 7
process 7
token 8
lemma 8
process 8
token 9
lemma 9
process 9
token 10
lemma 10
process 10
token 11
lemma 11
process 11
token 12
lemma 12
process 12
token 13
lemma 13
process 13
token 14
lemma 14
process 14
token 15
lemma 15
process 15
token 16
lemma 16
process 16
token 17
lemma 17
process 17
token 18
lemma 18
process 18
token 19
lemma 19
process 19
token 20
lemma 20
process 20
token 21
lemma 21
process 21
token 22
lemma 22
process 22
token 23
lemma 23
process 23
token 24
lemma 24
process 24
token 25
lemma 25
process 25
token 26
lemma 26
process 26
token 27
lemma 27
process 27
lemma 28
process 28
token 28
token 29
lemma 29
process 29
token 30
lemma 30
process 30
token 31
lemma 31
process 31
token 32
lemma 32
process 32
token 33
lemma 33
process 33
token 34
lemma 34
process 34
token 3

In [43]:
# Это код работает более корректно, чем тот, что приведен выше.
# Больша часть комментариев опущена, оставлены только те, что помогают понять изменения в логике.

def getLentaArticle(url):
 """ getLentaArticle gets the body of the article from Lenta.ru"""
 r = requests.get(url)
 body=re.findall('<div class="b-text clearfix js-topic__text" itemprop="articleBody">(.+?)</p></div></div><section class="b-topic__socials js-topic__socials">', r.text)
 if len(body)>0:
  return BeautifulSoup(body[0], "lxml").get_text()
 else:
    return "  "

def getArticleTextNPlus1(adr):
 r = requests.get(adr)
 n_text=re.split("</div>", re.split("</figure>", re.split('</article>',re.split('<article', r.text)[1])[0])[1])[1]    
 return BeautifulSoup(n_text, "lxml").get_text()

def getLenta(qu, sem):
 curdate=datetime.date(2017, 1, 16)
 findate=datetime.date(2017, 1, 16)
 res=""

 while curdate<=findate:
  print('lenta '+curdate.strftime('%Y/%m/%d'))
  day = requests.get('https://lenta.ru/news/'+curdate.strftime('%Y/%m/%d'))
  body=re.findall('<h3>(.+?)</h3>', day.text)
  links=['https://lenta.ru'+re.findall('"(.+?)"', x)[0] for x in body]
  for l in links[:50]:
    qu.put(getLentaArticle(l))
    time.sleep(0.2)
  curdate+=datetime.timedelta(days=1)
 sem.acquire()
 print("lenta finished")

def getNplus1(qu, sem):
 curdate=datetime.date(2015, 12, 15)
 findate=datetime.date(2015, 12, 18)

 while curdate<=findate:
  r = requests.get('https://nplus1.ru/news/'+curdate.strftime('%Y/%m/%d'))
  print('nplus '+curdate.strftime('%Y/%m/%d'))
  refs=[re.split('"', t)[6] for t in re.split('<article class="item item-news item-news', r.text)[1:]]
  for t in refs:
   qu.put(getArticleTextNPlus1("https://nplus1.ru"+t))
   time.sleep(0.2)
  curdate+=datetime.timedelta(days=1)
 sem.acquire()
 print("nplus1 finished")
    
# На входе - те же самые очереди, семафор и событие, но используем мы их по-другому.
def tokenize(qu1, qu2, semw, evs):  
    c=0
    while semw.acquire(False): # Так же пытаемся получить семафор.
        try:
            semw.release() # ! Первым делом отпускаем его, чтобы никого не держать больше.
            txt=qu1.get(timeout=0.1) # Если что-то пойдет не так, нас отпустят через 0,1 секунды.
        except queue.Empty: # Если произршел выход по таймауту, генерируется исключение.
            #print('tokenization waits')
            pass # Данных нет, пойдем посмотрим, может вообще завершаться пора?
        else:
            qu2.put(re.findall("[А-Яа-я]+(-[А-Яа-я]+)?", txt)) # Данные есть - токенизируем и кладем в очередь.
            c+=1
            print('token '+str(c))

    print("tokenization ")
    evs.set()
    print("finished")

def lemmatize(qu1, qu2, evw, evs):
    morpho=pymorphy2.MorphAnalyzer()
    l=[]
    c=0
    while not evw.is_set():
        try:
            txt=qu1.get(timeout=0.1) # Поддерживаем логику таймаута везде на всякий случай.
        except queue.Empty:
            pass
        else:
            s=[]
            for w in s:
                s+=morpho.parse(w)[0]
            qu2.put(s)
            c+=1
            print('lemma '+str(c))
    evs.set()
    print("lemmatization finished")

def utilize(qu, ev):
    c=0
    while not ev.is_set(): # Поддерживаем логику таймаута везде на всякий случай.
        try:
            t=qu.get(timeout=0.1)
        except queue.Empty:
            pass
        else:
        #processing
            c+=1
            print('process '+str(c))
    print('processing finished')

# не надо тормозить при получении данных из очереди,
# надо поставить ожидание если очередь пуста.
# то, что есть - хороший пример на сложности синхронизации. еще очередь поменьше сделай.

