# Dask Delayed

Материалы:
* Макрушин С.В. Лекция 13: Dask Delayed
* https://docs.dask.org/en/latest/delayed.html
* Jesse C. Daniel. Data Science with Python and Dask.


## Задачи для совместного разбора

![](https://i.imgur.com/AwiN8y6.png)
![](https://i.imgur.com/ceY6guU.png)

1. Напишите 2 функции, имитирующие CPU-bound задачу и IO-bound задачу:

`cpu_task()`: генерирует 100 тыс. случайных чисел и возвращает их сумму (без использования `numpy`)

`io_task()`: "спит" 0.1 сек, затем генерирует случайное число и возвращает его

Замерьте время выполнения 100 последовательных вызовов каждой из этих функций. Распараллелив вычисления при помощи `dask.delayed`, сократите время выполнения. Исследуйте, как зависит время вычислений от выбранного планировщика `scheduler`.

In [1]:
import random
import time

def cpu_task():
  lst = [random.randint(0, 10) for _ in range(100_000)]
  return sum(lst)

def io_task():
  time.sleep(0.1)
  return random.randint(0, 10)

In [2]:
%%time
res = [cpu_task() for _ in range(100)]

CPU times: user 6.29 s, sys: 44.3 ms, total: 6.33 s
Wall time: 6.33 s


In [3]:
%%time
res = [io_task() for _ in range(100)]

CPU times: user 10.4 ms, sys: 3.86 ms, total: 14.2 ms
Wall time: 10.3 s


In [4]:
import dask

In [5]:
cpu_task_delayed = dask.delayed(cpu_task)

In [6]:
%%time
res = [cpu_task_delayed() for _ in range(100)]
res_computed = dask.compute(res)

CPU times: user 6.53 s, sys: 100 ms, total: 6.63 s
Wall time: 6.6 s


In [7]:
%%time
res = [cpu_task_delayed() for _ in range(100)]
res_computed = dask.compute(res, scheduler = 'threading')

CPU times: user 6.52 s, sys: 89.5 ms, total: 6.61 s
Wall time: 6.56 s


In [8]:
%%time
res = [cpu_task_delayed() for _ in range(100)]
res_computed = dask.compute(res, scheduler = 'multiprocessing')

CPU times: user 51 ms, sys: 90 ms, total: 141 ms
Wall time: 1.83 s


In [9]:
io_task_delayed = dask.delayed(io_task)

In [10]:
%%time
res = [io_task_delayed() for _ in range(100)]
res_computed = dask.compute(res, scheduler = 'threading')

CPU times: user 41.4 ms, sys: 7.71 ms, total: 49.1 ms
Wall time: 1.37 s


In [11]:
%%time
res = [io_task_delayed() for _ in range(100)]
res_computed = dask.compute(res, scheduler = 'multiprocessing')

CPU times: user 53.9 ms, sys: 75 ms, total: 129 ms
Wall time: 1.98 s


## Лабораторная работа 14

1. Напишите функцию, которая считывает файл формата xml из каталога `reviewers_full` и по данным этого файла формирует список словарей, содержащих следующие ключи: `id`, `username`, `name`, `sex`, `country`, `mail`, `registered`, `birthdate`, `name_prefix`, `country_code`. Часть из этих значений в исходном файле хранится в виде тэгов, часть - в виде атрибутов тэгов. Для конкретного человека какие-то из этих ключей могут отсутствовать. 



In [12]:
import dask.bag as db
import os
import dask
from bs4 import BeautifulSoup

In [15]:
with open(os.path.join(f'reviewers_full_*.xml')) as f:
        data = BeautifulSoup(f,'xml')

FileNotFoundError: [Errno 2] No such file or directory: 'reviewers_full_*.xml'

In [None]:
reviews = db.read_text('reviews_*.json').map(json.loads)
reviews.take(5)

2. Измерьте время выполнения функции из задания 1 на всех файлах из каталога `reviewers_full`. Ускорьте время выполнения, используя `dask.delayed`.

3. Задекорируйте функцию из задания 1 при помощи `dask.delayed` и создайте список `reviewers`, состоящий из 5 объектов `delayed` (по одному объекту на файл). Из списка объектов `delayed`, создайте `dask.bag` при помощи метода `db.from_delayed`. Добавьте ключ `birth_year`, в котором хранится год рождения человека. Оставьте в выборке только тех людей, которые __наверняка__ моложе 1980 года. Преобразуйте поле `id` к целому типу.

4. Из `dask.bag`, полученного в задании 3, создайте `dask.dataframe` при помощи метода `bag.to_dataframe`. Укажите столбец `id` в качестве индекса.

5. Назовем отзыв негативным, если оценка равна 0, 1 или 2. Загрузите данные о негативных отзывах из файлов архива `reviews_full` (__ЛР12__) в виде `dask.DataFrame`. Посчитайте количество отзывов с группировкой по пользователю, оставившему отзыв. Объедините результат с таблицей, полученной в задаче 4.

#### [версия 2]
* Уточнена формулировка задачи 1