# Домашняя работа

Сделайте mapper и reducer, чтобы посчитать среднее и дисперсию оценок за фильм.

Реализация через цикл (предпогаем, что мы не знаем сколько у нас фильмов):

In [1]:
!pip install opendatasets



In [1]:
import opendatasets as od
import pandas as pd
import time
from pathlib import Path

In [2]:
dataset_path = Path('imdb-user-reviews', 'song_lyrics.csv')
if not dataset_path.is_file():
    od.download('https://www.kaggle.com/datasets/sadmadlad/imdb-user-reviews')

Skipping, found downloaded files in ".\imdb-user-reviews" (use force=True to force download)


In [3]:
import json
from functools import reduce


n, mean, M2 = 0, 0.0, 0
for path in Path('imdb-user-reviews').glob('**/*'):
    if path.is_file() and path.suffix == '.json':
        with open(path, 'r') as f:
            info = json.load(f)
        score = float(info['movieIMDbRating'])
        n += 1
        delta = score - mean
        mean += delta / n
        M2 += delta * (score - mean)

print(mean, (M2 / n) ** (1/2))

8.03 1.0517128885774865


In [71]:
def mapper(path):
    score = 0
    if path.is_file() and path.suffix == '.json':
        with open(path, 'r') as f:
            info = json.load(f)
        return(1,float(info['movieIMDbRating']),None)
    return None


def reducer(score1, score2):
    if score1 is None and score2 is None:
        return None
    elif score1 is None:
        return score2
    elif score2 is None:
        return score1
    else:
        scores = []
        if score1[2] == None:
            n, mean, M2 = 0, 0.0, 0
            scores.append(score1[1])
        else:
            n, mean, M2 = score1
        scores.append(score2[1])
        
        for score in scores:
            n += 1
            delta = score - mean
            mean += delta / n
            M2 += delta * (score - mean)
        return n, mean, M2

In [72]:
print(*map(mapper, Path('imdb-user-reviews').glob('**/*')))

None None None None None None None None None None (1, 8.4, None) None (1, 8.8, None) None (1, 7.4, None) None (1, 8.4, None) None (1, 5.2, None) None (1, 8.9, None) None (1, 8.3, None) None (1, 8.0, None) None (1, 9.0, None) None (1, 7.9, None) None


In [76]:
%%time
n, mean, M2 = reduce(reducer, map(mapper, Path('imdb-user-reviews').glob('**/*')))
print(n, mean, (M2 / n) ** (1/2))

10 8.03 1.0517128885774865
CPU times: total: 0 ns
Wall time: 4.99 ms


# Parallel

In [159]:
from joblib import Parallel, delayed

In [160]:
%%time
N = 2
n, mean, M2 = reduce(reducer, Parallel(n_jobs=N)(delayed(mapper)(path) for path in Path('imdb-user-reviews').glob('**/*')))
print(mean, (M2 / n) ** (1/2))

8.03 1.0517128885774865
CPU times: total: 31.2 ms
Wall time: 936 ms


In [161]:
%%time
N = 8
n, mean, M2 = reduce(reducer, Parallel(n_jobs=N)(delayed(mapper)(path) for path in Path('imdb-user-reviews').glob('**/*')))
print(mean, (M2 / n) ** (1/2))

8.03 1.0517128885774865
CPU times: total: 46.9 ms
Wall time: 1.95 s


In [162]:
%%time
N = 16
n, mean, M2 = reduce(reducer, Parallel(n_jobs=N)(delayed(mapper)(path) for path in Path('imdb-user-reviews').glob('**/*')))
print(mean, (M2 / n) ** (1/2))

8.03 1.0517128885774865
CPU times: total: 46.9 ms
Wall time: 145 ms


# Chunkify

In [163]:
import multiprocessing
multiprocessing.cpu_count()

4

In [191]:
from itertools import tee      
def split_gen(gen):
    gen_a, gen_b = tee(gen, 2)
    return (a for a, b in gen_a), (b for a, b in gen_b)


def chunkify(list_of_strings, number_of_chunks=30):
    step = len(list_of_strings) // number_of_chunks
    if step != 0:
        for i in range(0, len(list_of_strings), step):
            yield list_of_strings[i : i + step]
    else:
        yield list_of_strings  # Генераторы
        

def chunks_mapper(chunk):
    mapped_chunk = map(mapper, chunk)
    mapped_chunk = zip(chunk, mapped_chunk)
    return reduce(reducer, mapped_chunk)


In [193]:
%%time
data_chunks = split_gen(Path('imdb-user-reviews').glob('**/*'))
for i in data_chunks:
    print(*i)
#step 1:
mapped = map(chunks_mapper, data_chunks)

#step 2:
reduced = reduce(reducer, mapped)
print(reduced)

(<generator object split_gen.<locals>.<genexpr> at 0x000001F69F3E96D0>, <generator object split_gen.<locals>.<genexpr> at 0x000001F69F3E9270>)


TypeError: cannot unpack non-iterable WindowsPath object