<center><H1>Параллелизация вычислений в python</H1></center>

<H4>Обозначения</H4>

⭐ - Простое задание  
🌟 - Сложное задание  
⭐⭐ - Задание на 2 балла  
💫 - Конец задания

<H2>1. Операции Map и Reduce</H2>

В основе распределенных вычислений, как правило, лежат операции **Map** и **Reduce**  
**Map** применяет заданную функцию множество раз к разным входным данным, в результате получается такой же по размеру набор выходных данных  
**Reduce** применяет заданную функцию к набору входных значений и _сокращает_ их до одного выходного  

Python имеет функции `map` и `reduce`, выполняющие эти действия  
Рассмотрим их работу на простом примере  

Но сначала модули! В этой части работы нам понадобятся `random` и `itertools`, а также функция `reduce`

In [3]:
import random
import itertools as it
from functools import reduce

---

Давайте загадаем целое число от 0 до 100 и попросим компьютер его угадать с помощью случайного генератора  

In [4]:
x = 52   # введите свое число

In [5]:
guesses = random.sample(range(101), k=10)    # берем 10 разных случайных чисел из диапазона
guesses

[28, 8, 60, 92, 77, 98, 96, 76, 18, 95]

<br>  

Теперь найдем лучшую попытку компьютера  
Для этого нам понадобится 2 шага:  
1. Оценить разность каждого числа в списке `guesses` с заданным числом `x`
2. Найти попытку с минимальной разностью  

Определим для этих шагов функции `difference` и `min_diff`

In [6]:
def difference(x, y):
    return y, abs(x - y)

In [7]:
def min_diff(p1, p2):
    if p2[1] < p1[1]:
        return p2
    return p1

<br>  

Обратите внимание на особенности функций:  
1. `difference` возвращает не только разность, но и само число `y`
2. `min_diff` сравнивает не весь список разностей, а только 2

Найдем все разности с помощью `map`:

In [8]:
diffs = map(difference, it.repeat(x), guesses)     # применяет функцию difference к наборам аргументов
                                                   # it.repeat(x) всегда возвращает x в качестве первого аргумента
                                                   # guesses содержит значения второго аргумента

diffs = list(diffs)        # map сама по себе не вызывает функцию difference, а возвращает итерируемый объект
                           # вызов difference происходит во время итерации - например, при записи значений в list
diffs

[(28, 24),
 (8, 44),
 (60, 8),
 (92, 40),
 (77, 25),
 (98, 46),
 (96, 44),
 (76, 24),
 (18, 34),
 (95, 43)]

<br>  

Теперь выберем из этих разностей минимальную с помощью `reduce`:

In [9]:
best_guess = reduce(min_diff, diffs)       # reduce работает следующим образом:
                                           # r0 = diffs[0]
                                           # r1 = min_diff(r0, diffs[1])
                                           # r2 = min_diff(r1, diffs[2])
                                           # ...
                                           # return rn

best_guess

(60, 8)

Итак, с помощью `map` и `reduce` мы определили число из списка, ближайшее к заданному  
Теперь особенности функций `difference` и `min_diff` должны быть понятнее:  
1. `difference` возвращает значение `y`, чтобы оно появилось на выходе алгоритма
2. `min_diff` сравнивает только 2 значения, поскольку `reduce` вызывает ее последовательно для пар значений

---

<center>⭐⭐</center>  

Реализуйте вычисление скалярного произведения векторов на основе функций `map` и `reduce`

In [10]:
import numpy as np

# входные вектора
a1 = np.random.random(10)
a2 = np.random.random(10)

In [11]:
# функции для map и reduce (допишите)

def mult(x1, x2):
    return x1 * x2

def add(x1, x2):
    return x1 + x2

In [17]:
multiplies = map(mult, a1, a2)
multiplies = list(multiplies)
multiplies

[np.float64(0.5401655980100144),
 np.float64(0.2578136449790416),
 np.float64(0.14851700159245274),
 np.float64(0.19441867429998713),
 np.float64(0.1779156469698378),
 np.float64(0.14475764413162032),
 np.float64(0.09730640073827963),
 np.float64(0.012023846220070676),
 np.float64(0.1671084251368729),
 np.float64(0.5273345650459877)]

In [18]:
answer = reduce(add, multiplies)

In [22]:
# проверка результата
print(answer == np.dot(a1, a2))    # замените answer на ваш результат вычислений

True


<center>💫</center>

<H2>2. Многопоточное распараллеливание</H2>

Рассмотренные выше примеры едва ли требуют параллелизации  
Необходимость в ней возникает, когда:  
- в памяти не хватает места для данных  
- вычисления выполняются слишком долго  

В этой части работы мы рассмотрим задачу поиска статьи на Википедии по запросу пользователя  

Модули к бою!

In [23]:
import math
from distance.levenshtein import levenshtein      # эта функция измеряет редакционное расстояние между строками

---

Для начала разберемся, как мы будем искать нужную статью  
Найти полное совпадения названия с запросом пользователя - задача тривиальная  
Однако в запросе могут быть опечатки, и хорошая поисковая система умеет их исправлять  

Это значит, что нашей основной задачей будет найти не абсолютное совпадение, а названия статей, которые _похожи_ на запрос  
Здесь нам пригодится _расстояние Левенштейна_ (или _редакционное расстояние_) - количество символов, которые необходимо добавить, убрать или заменить в строке 1, чтобы получить из нее строку 2  
К счастью, код для вычисления этой метрики за нас уже написали

In [24]:
levenshtein('apple', 'orange')

np.float64(5.0)

In [25]:
levenshtein('luck', 'yucK')

np.float64(2.0)

In [26]:
levenshtein('luck', 'yucK', lowercase=True)

np.float64(1.0)

<br>  

Для упрощения пока представим, что на Википедии есть только 4 статьи с совершенно нормальными названиями, а пользователь - фанат тяжелого метала с дислексией:

In [27]:
article_names = ['Conquest', 'War', 'Famine', 'Death']
request = 'deaht'

<br>  

Как обычно, определим функции для `map` и `reduce`

In [28]:
def mapper(word, ref):
    return ref, levenshtein(word, ref, lowercase=True)

def reducer(current_ref, new_ref):
    if new_ref[1] < current_ref[1]:
        return new_ref
    return current_ref

<br>  

Но не будем забывать, что на этот раз вычисления нужно распараллелить  
Разобьем наш список названий на несколько частей и создадим функцию обработки одной части

In [29]:
def chunkify(refs, N):
    step = int(math.ceil(len(refs) / N))
    inds = list(range(0, len(refs) + step, step))
    for i1, i2 in zip(inds, inds[1:]):
        yield refs[i1:i2]

name_chunks = list(chunkify(article_names, 2))
name_chunks

[['Conquest', 'War'], ['Famine', 'Death']]

In [30]:
def chunk_processer(word, refs):
    mapped = map(mapper, it.repeat(word), refs)
    reduced = reduce(reducer, mapped)
    return reduced

print(chunk_processer(request, name_chunks[0]))
print(chunk_processer(request, name_chunks[1]))

('War', np.float64(4.0))
('Death', np.float64(2.0))


<br>  

Теперь применим `map` уже к функции обработки блока, а к результатам обработки нескольких блоков - функцию `reduce`

In [31]:
mapped = map(chunk_processer, it.repeat(request), name_chunks)
reduced = reduce(reducer, mapped)

reduced

('Death', np.float64(2.0))

<br>  

В итоге задача вычисления редакционных расстояний была разбита на несколько частей  
Однако вычисления все также выполнялись последовательно  

Вызов `map` можно параллелизовать с помощью модуля `multiprocessing`

In [32]:
from multiprocessing.pool import ThreadPool as Pool

N = 2

with Pool(processes=N) as pool:     # создаем менеджер параллельных процессов
    name_chunks = list(chunkify(article_names, N))
    mapped = pool.starmap(chunk_processer, zip(it.repeat(request), name_chunks))     # Pool.map() работает только с функциями одного аргумента, starmap() - с несколькими
    reduced = reduce(reducer, mapped)

print(reduced)

('Death', np.float64(2.0))


<br>  

**Важно!** В интерактивной среде распараллеливание на самом деле **не работает**  
`ThreadPool` создает новые потоки в том же процессе и использует только одно ядро процессора  
Для реального распараллеливания необходимо импортировать `Pool` и работать в скрипте, запускаемом из командной строки  

Поэтому далее будем работать с файлом `pool.py`  
Откройте его и ознакомьтесь с кодом - там уже написан скрипт, выполняющий поиск названия статьи с минимальным редакционным расстоянием до пользовательского запроса  

Откройте консольное окно, перейдите в директорию лабораторной работы и запустите скрипт:  
`python pool.py <any request>`  
_Если запустится другой python и это вызовет проблемы, напишите полный путь:_  
`..\..\dist\pyenv3.8-win64\python pool.py <any request>`

Пока скрипт выполняется, переходите к выполнению заданий  
Ждать завершения работы скрипта не рекомендуется - он работает долго

---

<H4>Итоговое задание</H4>

<center>⭐⭐⭐</center>

Измените функцию `mapper` таким образом, чтобы:  
⭐ для строк с разностью длин больше 2 не вычислялось редакционное расстояние, а сразу возвращалось большое значение (например, 10)  
⭐ проводилось сравнение наборов букв в строках без учета их расположения и при отклонении больше 2 возвращалось большое значение  

Запустите скрипт, сравните время работы  
⭐ Объясните разницу

<center>💫</center>

<center>⭐⭐🌟</center>

⭐⭐ Сделайте сравнение не по полным названиям статей, а по словам в названиях  
Считайте, что пользователь вводит одно слово, а вывести нужно названия статей, которые содержат найденное ближайшее к нему слово  

Все слова в статьях можно найти в файле:  
`data/words in titles.json`  
Каждому слову соотнесен список названий, в которых оно встречается (по индексам)  

Считать json файл можно с помощью модуля `json`:

In [None]:
import json

with open('data/words in titles.json', 'r') as f:
    words = json.load(f)

<br>  

🌟 Используя список названий из `data/wiki titles no special.txt`, самостоятельно сформируйте файл, аналогичный `data/words in titles.json`  
_Сохраните под другим названием, чтобы не потерять оригинал_  

Записать json файл можно с помощью того же модуля `json`:

In [None]:
import json

words = {'a': [1, 2]}

with open('data/words in titles1.json', 'w') as f:
    json.dump(words, f)

<center>💫</center>

<center>🌟🌟</center>

Модифицируйте алгоритм таким образом, чтобы выводились 10 ближайших совпадений, а не одно  

_Подсказка: функцию `reducer` придется пересмотреть_

<center>💫</center>