# Домашняя работа

Сделайте mapper и reducer, чтобы посчитать среднее и дисперсию оценок за фильм.

Реализация через цикл (предпогаем, что мы не знаем сколько у нас фильмов):

In [25]:
from pathlib import Path
import json
from functools import reduce
from joblib import Parallel, delayed

In [26]:
%%time
n, mean, M2 = 0, 0.0, 0
for path in Path("les/imdb-user-reviews").glob("**/*"):
    if path.is_file() and path.suffix == ".json":
        with open(path, "r") as f:
            info = json.load(f)
        score = float(info["movieIMDbRating"])
        n += 1
        delta = score - mean
        mean += delta / n
        M2 += delta * (score - mean)

print(mean, (M2 / n) ** (1 / 2))

8.03 1.0517128885774865
CPU times: total: 15.6 ms
Wall time: 9 ms


На основе этого кода соберите mapper и reducer:

In [43]:
def mapper(path):
    # Ваш код
    with open(path, "r") as f:
        info = json.load(f)
    score = float(info["movieIMDbRating"])
    return 1, score, 0.0


def reducer(score_data1, score_data2):
    #  Ваш код
    n1, mean1, M2_1 = score_data1
    n2, mean2, M2_2 = score_data2

    n = n1 + n2
    if n == 0:
        return 0, 0.0, 0.0

    delta = mean2 - mean1
    # delta = (mean2 * n2 - mean1 * n1) / n
    mean = mean1 + delta * n2 / n
    M2 = fsum([M2_1, M2_2, delta**2 * n1 * n2 / n])
    return n, mean, M2

In [44]:
%%time
n, mean, M2 = reduce(
    reducer, map(mapper, Path("les/imdb-user-reviews").rglob("*.json"))
)
print(mean, (M2 / n) ** (1 / 2))

8.03 1.0517128885774865
CPU times: total: 46.9 ms
Wall time: 7 ms


In [45]:
%%time
n, mean, M2 = reduce(
    reducer,
    Parallel(n_jobs=2)(
        delayed(mapper)(path) for path in Path("les/imdb-user-reviews").rglob("*.json")
    ),
)
print(mean, (M2 / n) ** (1 / 2))

8.03 1.0517128885774865
CPU times: total: 46.9 ms
Wall time: 615 ms


**Из приведенных выше результатов можно сделать следующие наблюдения:**

1. **Результаты расчетов**:
   - Среднее значение оценки фильмов: **8.03**
   - Стандартное отклонение: **1.0517128885774865**

2. **Время выполнения**:
   - Вариант с использованием `reduce` и `map`: 
     - CPU время: **46.9 ms**
     - Время на стенке (Wall time): **7 ms**
   - Вариант с использованием `reduce` и `Parallel` из `joblib`:
     - CPU время: **46.9 ms**
     - Время на стенке (Wall time): **615 ms**

3. **Анализ производительности**:
   - Вариант с `reduce` и `map` оказался более эффективным с точки зрения общего времени выполнения (Wall time).
   - Использование параллелизации с помощью `Parallel(n_jobs=2)` из библиотеки `joblib` привело к значительному увеличению времени на стенке (Wall time) до **615 ms** по сравнению с **7 ms** в первом варианте, несмотря на одинаковое CPU время (**46.9 ms**).
   - Это может быть связано с накладными расходами на создание и управление параллельными задачами, которые могут превысить выгоду от параллелизации, особенно на небольших объемах данных.

4. **Рекомендации**:
   - Для данной задачи использование простого `reduce` и `map` кажется более эффективным и быстрым решением.
   - Параллелизация может быть полезной при обработке больших объемов данных или при наличии значительного количества вычислительных задач, но в данном случае, на небольших объемах данных, она оказывается менее эффективной.

Следовательно, для текущей задачи рекомендуется использовать первый вариант с `reduce` и `map` для более быстрого выполнения.



### Увеличим количество данных

In [47]:
%%time
n, mean, M2 = reduce(
    reducer, map(mapper, list(Path("les/imdb-user-reviews").rglob("*.json")) * 100000)
)
print(mean, (M2 / n) ** (1 / 2))

8.03000000000032 1.0517128885774976
CPU times: total: 4min 18s
Wall time: 4min 18s


In [46]:
%%time
n, mean, M2 = reduce(
    reducer,
    Parallel(n_jobs=2)(
        delayed(mapper)(path) for path in Path("les/imdb-user-reviews").rglob("*.json")
    )
    * 100000,
)
print(mean, (M2 / n) ** (1 / 2))

8.03000000000032 1.0517128885774976
CPU times: total: 1.22 s
Wall time: 1.21 s


**Из приведенных выше результатов можно сделать следующие наблюдения:**

1. **Время выполнения**:
   - Вариант с использованием `reduce` и `map` на большом количестве данных (100,000 повторов):
     - CPU время: **4 минуты 18 секунд**
     - Время на стенке (Wall time): **4 минуты 18 секунд**
   - Вариант с использованием `reduce` и `Parallel` из `joblib` на том же количестве данных:
     - CPU время: **1.22 секунды**
     - Время на стенке (Wall time): **1.21 секунды**

2. **Анализ производительности**:
   - В первом варианте с `reduce` и `map` время выполнения значительно увеличилось до более чем 4 минут при обработке 100,000 повторов данных. Это связано с последовательной обработкой большого объема данных.
   - Во втором варианте с `reduce` и `Parallel` время выполнения существенно сократилось до чуть более 1 секунды для тех же 100,000 повторов данных. Это показывает значительное преимущество параллельной обработки при больших объемах данных.

3. **Рекомендации**:
   - Для обработки большого объема данных (таких как 100,000 повторов в данном примере) использование параллелизации с помощью `joblib` оказывается гораздо более эффективным, значительно сокращая как CPU время, так и время на стенке.
   - Параллелизация позволяет существенно ускорить выполнение задач за счет распределения нагрузки между несколькими процессорами, что особенно заметно на больших объемах данных.

Следовательно, для больших объемов данных рекомендуется использовать второй вариант с параллелизацией (`reduce` и `Parallel` из `joblib`), так как он обеспечивает значительное улучшение производительности по сравнению с последовательной обработкой.