# Задание по много поточности:

Вам необходимо проанализировать википедию на предмет того, какие слова в каждой из частей речи встречаются чаще. Вы хотите реализовать это в несколько потоков.

Запросы к википедии можно осуществлять с помощью библиотеки wikipedia. Для морфологического анализа используйте библиотеку pymorphy2. Чтобы разбить текст на слова можете воспользоваться функцией word_tokenize из библиотеки nltk.

Класс должен иметь функции, приведенные ниже (но может иметь и другие на ваше усмотрение).

### Часть 1

<b>Многопоточной реализация</b>

Задачи делятся на три типа:
<ul>
<li><i>Получение данных</i>:
<ol>
<li>Получение заголовков для страниц википедии - запускает по max_threads функций, которые асинхронно получают заголовки страниц.</li>
<li>Получение конкретных страниц - ждем, пока не появятся новые заголовки, которые не обработаны.
Когда появились - начинаем запрашивать в max_threads функциях конкретные страницы по заголовкам.</li>
</ol>
</li>
<li><i>Обработка данных</i>:
<ol>
<li>Ждем, пока не появятся новые необработанные страницы. Когда появляются, запускаем по max_threads функций для морфологического анализа слов.</li>
<li>Ждем пока не появились обработанные слова. Как только появляется новое слово, сразу же обновляем _stats.</li>
</ol>
</li>
<li><i>Сохранение данных</i>:
<ol>
        Раз в store_every обработанных слов вызывается асинхронно функция dump, которая сохраняет _stats.
</ol>
</li>
</ul>
<b>P. S.</b>

Комментарии специально запутанные, чтобы вы сами придумали архитектуру вызова потоков. Не бойтесь использовать Queue и daemon=True. Запрещается использовать threading.Lock / threading.RLock или другие блокировки.

In [1]:
import wikipedia as wiki
import pymorphy2
from nltk.tokenize import word_tokenize
from collections import Counter
from queue import LifoQueue
from time import time

import threading as td

In [2]:
class WikiReader(object):
    """
    Класс для работы с википедией.
    Собирает статисткику по словам каждой части речи в статьях википедии.

    Parameters
    ----------
    morphs: list
        Части речи, которые хотим исследовать. Слова другой части речи не включаются в статистику.
    
    page_per_req: int
        Количество случайных названий страниц, запрашиваемых за один раз у википедии.
    
    max_threads_per_job: int
        Количество потоков, запускаемых другим потоком демоном (можно не использовать, если получится).
    
    max_words: int
        Количество слов для обработки.
    
    store_every: int
        Как часто сохранять данные на диск. Каждые store_every слов вызывается функция dump.
    
    store_path: str
        Куда сохранять данные.
    
    Attributes
    ----------
    _stats: <your code here>
        Структура данных (возможно встроенная), позволяющая хранить для каждой части речи список слов с их количеством.
        Необходимо, чтобы получение (изменение) статистики (текущего количества) для каждой пары
        <часть речи, слово> происходило за O(1).
            
    """
    def __init__(self,
                 morphs=['NOUN', 'ADJF', 'VERB'],
                 page_per_req=4,
                 max_threads_per_job=2,
                 max_words=10000,
                 store_every=1000,
                 store_path="stats.txt"):
        self._stats = Counter()
        
        self._titles = LifoQueue()
        self._pages = LifoQueue()
        self._words = LifoQueue()
        
        self._morph = pymorphy2.MorphAnalyzer()
        self._morphs = morphs
        
        self._page_per_req = page_per_req
        self._max_threads_per_job = max_threads_per_job
        self._max_words = max_words
        
        self._store_every = store_every
        self._store_path = store_path
        
        wiki.set_lang('ru')
    
    def run(self):        
        self._words_count = 0
        self._stop = False
        
        title_gens = [td.Thread(target=self._gen_titles, daemon=True) for _ in range(self._max_threads_per_job)]
        page_gens = [td.Thread(target=self._gen_pages, daemon=True) for _ in range(self._max_threads_per_job)]
        word_gens = [td.Thread(target=self._gen_words, daemon=True) for _ in range(self._max_threads_per_job)]
        word_procs = [td.Thread(target=self._process_words, daemon=True) for _ in range(self._max_threads_per_job)]
        
        print('starting', self._max_threads_per_job * 4, 'threads')
        
        start = time()
        
        for thread in title_gens:
            thread.start()
            
        for thread in page_gens:
            thread.start()
            
        for thread in word_gens:
            thread.start()
            
        for thread in word_procs:
            thread.start()
                
        print('waiting for threads:\ntitle jobs: ', end='')
                
        for thread in title_gens:
            thread.join()
            print('.', end='')
        print(' terminated\npage jobs: ', end='')
            
        for thread in page_gens:
            thread.join()
            print('.', end='')
        print(' terminated\nword jobs: ', end='')
            
        for thread in word_gens:
            thread.join()
            print('.', end='')
        print(' terminated\nprocessing jobs: ', end='')
        
        for thread in word_procs:
            thread.join()
            print('.', end='')
        print(' terminated')
        
        start = time() - start
        print('wall time: {}s'.format(start))
        
    def dump(self, path=None):
        if path is None:
            fout = open(self._store_path, 'w')
        else:
            fout = open(path, 'w')
        
        for i in self._stats:
            print('{}: {}'.format(i, self._stats[i]), file=fout)
        
        fout.close()
    
    def load(self, path=None):
        if path is None:
            fin = open(self._store_path)
        else:
            fin = open(path)
            
        cnt = {}
        for i in fin.readlines():
            k, v = i.split()
            cnt[k] = v
            
        fin.close()
        
        return cnt
    
    def _get_word_grammeme(self, word):
        return self._morph.parse(word)[0].tag.POS
    
    def _gen_titles(self):
        while not self._stop:
            titles = wiki.random(pages=self._page_per_req)
            for title in titles:
                self._titles.put(title)
        
    def _gen_pages(self):
        while not self._stop:
            title = self._titles.get()
            self._titles.task_done()
            
            try:
                page = wiki.page(title=title)
            except:
                continue
            
            self._pages.put(page)
            
    def _gen_words(self):
        while not self._stop:
            page = self._pages.get()
            self._pages.task_done()
            
            for word in word_tokenize(page.content):
                self._words.put(word)
                
    def _process_words(self):
        global start
        while not self._stop:
            word = self._words.get()
            if word is not None:
                self._words_count += 1
                if self._words_count % self._store_every == 0:
                    self.dump()
                    
                if self._words_count == self._max_words:
                    self._stop = True
                    
                gram = self._get_word_grammeme(word)
                if gram not in self._morphs:
                    self._stats['REST'] += 1
                else:
                    self._stats[gram] += 1
                    
            self._words.task_done()

In [3]:
reader = WikiReader(max_threads_per_job=1)
reader.run()

starting 8 threads
waiting for threads:
title jobs: 



 BeautifulSoup(YOUR_MARKUP})

to this:

 BeautifulSoup(YOUR_MARKUP, "lxml")

  markup_type=markup_type))


.. terminated
page jobs: .. terminated
word jobs: .. terminated
processing jobs: .. terminated
wall time: 17.270789861679077s


In [4]:
cnt = reader.load()
for i in cnt.items():
    print(*i)

REST: 5245
NOUN: 3162
ADJF: 1058
VERB: 506


In [5]:
reader = WikiReader(max_threads_per_job=2)
reader.run()

starting 4 threads
waiting for threads:
title jobs: 



 BeautifulSoup(YOUR_MARKUP})

to this:

 BeautifulSoup(YOUR_MARKUP, "lxml")

  markup_type=markup_type))


. terminated
page jobs: . terminated
word jobs: . terminated
processing jobs: . terminated
wall time: 34.072606563568115s


In [6]:
cnt = reader.load()
for i in cnt.items():
    print(*i)

REST: 5028
NOUN: 3330
ADJF: 1064
VERB: 577
