# Dask Delayed

Материалы:
* Макрушин С.В. Лекция 13: Dask Delayed
* https://docs.dask.org/en/latest/delayed.html
* JESSE C. DANIEL. Data Science with Python and Dask.


## Задачи для совместного разбора

![](https://i.imgur.com/AwiN8y6.png)
![](https://i.imgur.com/ceY6guU.png)

1. Напишите 2 функции, имитирующие CPU-bound задачу и IO-bound задачу:

`cpu_task()`: генерирует 100 тыс. случайных чисел и возвращает их сумму (без использования `numpy`)

`io_task()`: "спит" 0.1 сек, затем генерирует случайное число и возвращает его

Замерьте время выполнения 100 последовательных вызовов каждой из этих функций. Распараллелив вычисления при помощи `dask.delayed`, сократите время выполнения. Исследуйте, как зависит время вычислений от выбранного планировщика `scheduler`.

In [1]:
import random
import time

def cpu_task():
    return sum([random.randint(0,10) for _ in range(100_000)])

def io_task():
    time.sleep(0.1)
    return random.randint(0,10)

In [2]:
%%time
res = [cpu_task() for _ in range(100)]

CPU times: user 9.71 s, sys: 69.3 ms, total: 9.78 s
Wall time: 9.83 s


In [3]:
%%time
res = [io_task() for _ in range(100)]

CPU times: user 5.7 ms, sys: 3.9 ms, total: 9.6 ms
Wall time: 10.4 s


In [4]:
import dask.delayed
import dask

In [5]:
cpu_task_delayed = dask.delayed(cpu_task)

In [6]:
%%time
res = [cpu_task_delayed() for _ in range(100)]

CPU times: user 7.55 ms, sys: 4.06 ms, total: 11.6 ms
Wall time: 13.4 ms


In [7]:
res[0]

Delayed('cpu_task-b02b79df-306b-4e53-8265-9bd6936172f1')

In [8]:
%%time
r1 = dask.compute(res)

CPU times: user 8.27 s, sys: 97.7 ms, total: 8.37 s
Wall time: 8.36 s


In [9]:
%%time
r2 = dask.compute(res, scheduler='multiprocessing')

CPU times: user 74.1 ms, sys: 40.4 ms, total: 115 ms
Wall time: 5.76 s


In [10]:
io_task_delayed = dask.delayed(io_task)

In [11]:
%%time
r3 = [io_task_delayed() for _ in range(100)]
r4 = dask.compute(r3)

CPU times: user 41.4 ms, sys: 19.1 ms, total: 60.4 ms
Wall time: 2.64 s


In [12]:
%%time
r3 = [io_task_delayed() for _ in range(100)]
r4 = dask.compute(r3, scheduler='multiprocessing')

CPU times: user 121 ms, sys: 58.9 ms, total: 180 ms
Wall time: 3.06 s


## Лабораторная работа 14

1. Напишите функцию, которая считывает файл формата xml из архива `reviewers_full.zip` и по данным этого файла формирует список словарей, содержащих следующие ключи: `username`, `name`, `sex`, `country`, `mail`, `registered`, `birthdate`, `name_prefix`, `country_code`. Часть из этих значений в исходном файле хранится в виде тэгов, часть - в виде атрибутов тэгов. Для конкретного человека какие-то из этих ключей могут отсутствовать. 



исходя из заданий ниже, нужно также добавить id

In [13]:
from os import listdir
from bs4 import BeautifulSoup
import dask.delayed
import dask
import dask.bag as db
import dask.dataframe as dd

In [14]:
for file in listdir('data/reviewers_full'):
    print(file)

reviewers_full_1.xml
reviewers_full_0.xml
reviewers_full_2.xml
reviewers_full_3.xml
reviewers_full_4.xml


In [15]:
def read_xml(filename):
    dct_keys = ['id', 'username', 'name', 'sex', 'country', 'mail', 'registered', 'birthdate']

    list_of_dicts = []
    
    with open('./data/reviewers_full/' + filename, 'r', encoding='utf8') as fp:
        data = BeautifulSoup(fp, 'xml')
        for user in data.find('users').find_all('user'):
            dicti = {}
            for i in range(len(dct_keys)):
                dicti[dct_keys[i]] = user.find(dct_keys[i]).text if user.find(dct_keys[i]) else ''
            dicti['name_prefix'] = user.get('prefix') if user.find('country') else ''
            dicti['country_code'] = user.find('country').get('code') if user.find('country') else ''
            list_of_dicts.append(dicti)
    return list_of_dicts

In [16]:
result_lst = read_xml('reviewers_full_1.xml')
result_lst[3]

{'id': '1571577',
 'username': 'debbie79',
 'name': '',
 'sex': 'M',
 'country': 'United States Minor Outlying Islands',
 'mail': '',
 'registered': '2008-01-29',
 'birthdate': '',
 'name_prefix': 'Mr.',
 'country_code': 'UM'}

2. Измерьте время выполнения функции из задания 1 на всех файлах из архива. Ускорьте время выполнения, используя `dask.delayed`.

In [17]:
%%time
result_2 = [read_xml(file) for file in listdir('data/reviewers_full')]

CPU times: user 1min 58s, sys: 1.06 s, total: 1min 59s
Wall time: 2min 5s


In [18]:
%%time
read_xml_delayed = dask.delayed(read_xml)
result_2_delayed = [read_xml_delayed(file) for file in listdir('data/reviewers_full')]
r = dask.compute(result_2_delayed, scheduler='multiprocessing')

CPU times: user 494 ms, sys: 231 ms, total: 725 ms
Wall time: 1min 19s


3. Задекорируйте функцию из задания 1 при помощи `dask.delayed` и создайте список `reviewers`, состоящий из 5 объектов `delayed` (по одному объекту на файл). Из списка объектов `delayed`, создайте `dask.bag` при помощи метода `db.from_delayed`. Добавьте ключ `birth_year`, в котором хранится год рождения человека. Оставьте в выборке только тех людей, которые __наверняка__ моложе 1980 года. Преобразуйте поле `id` к целому типу.

In [19]:
@dask.delayed
def read_xml_d(filename):
    dct_keys = ['id', 'username', 'name', 'sex', 'country', 'mail', 'registered', 'birthdate']

    list_of_dicts = []
    
    with open('./data/reviewers_full/' + filename, 'r', encoding='utf8') as fp:
        data = BeautifulSoup(fp, 'xml')
        for user in data.find('users').find_all('user'):
            dicti = {}
            for i in range(len(dct_keys)):
                dicti[dct_keys[i]] = user.find(dct_keys[i]).text if user.find(dct_keys[i]) else ''
            dicti['name_prefix'] = user.get('prefix') if user.find('country') else ''
            dicti['country_code'] = user.find('country').get('code') if user.find('country') else ''
            list_of_dicts.append(dicti)
    return list_of_dicts

In [20]:
reviewers = db.from_delayed([read_xml_d(file) for file in listdir('data/reviewers_full')])

In [21]:
def add_birth_year(x):
    x.update({'birth_year':x['birthdate'][:4]})
    x.update({'id': int(x['id'])})
    return x

In [22]:
def condition(x):
    if x['birth_year'] > '1980':
        return x

In [23]:
result = reviewers.map(add_birth_year).filter(condition)
result.take(3)

({'id': 394270,
  'username': 'bridgesdennis',
  'name': 'Melissa Vaughn',
  'sex': 'F',
  'country': '',
  'mail': 'carmengonzales@hotmail.com',
  'registered': '',
  'birthdate': '1992-07-28',
  'name_prefix': '',
  'country_code': '',
  'birth_year': '1992'},
 {'id': 512192,
  'username': 'vanessawilson',
  'name': 'Matthew Roach',
  'sex': '',
  'country': '',
  'mail': '',
  'registered': '',
  'birthdate': '1998-08-17',
  'name_prefix': '',
  'country_code': '',
  'birth_year': '1998'},
 {'id': 352465,
  'username': 'cindypierce',
  'name': 'Katherine Coleman',
  'sex': '',
  'country': 'Slovakia (Slovak Republic)',
  'mail': '',
  'registered': '',
  'birthdate': '1988-08-10',
  'name_prefix': None,
  'country_code': None,
  'birth_year': '1988'})

In [24]:
#для проверки,какие даты включены
result.pluck('birth_year').compute()

['1992',
 '1998',
 '1988',
 '1998',
 '1981',
 '2000',
 '2020',
 '1998',
 '2004',
 '1984',
 '1990',
 '1981',
 '2012',
 '2009',
 '1985',
 '2009',
 '1993',
 '2009',
 '1981',
 '1989',
 '2014',
 '2002',
 '1991',
 '2004',
 '1990',
 '1997',
 '2002',
 '1985',
 '2017',
 '2012',
 '2019',
 '1999',
 '2012',
 '2016',
 '2021',
 '2009',
 '2011',
 '1985',
 '2001',
 '2015',
 '2000',
 '2012',
 '1993',
 '2003',
 '1998',
 '1989',
 '2006',
 '1996',
 '2007',
 '1983',
 '2003',
 '2015',
 '1987',
 '2013',
 '1996',
 '1993',
 '1992',
 '1985',
 '1992',
 '2010',
 '1998',
 '2015',
 '1991',
 '2006',
 '2013',
 '1995',
 '1991',
 '1996',
 '1990',
 '2006',
 '1982',
 '2016',
 '1987',
 '2002',
 '2020',
 '2009',
 '2002',
 '1987',
 '2012',
 '2016',
 '2019',
 '1990',
 '2012',
 '2004',
 '2017',
 '1996',
 '2021',
 '1998',
 '1992',
 '2005',
 '1993',
 '1998',
 '1991',
 '2005',
 '2019',
 '1999',
 '1993',
 '2001',
 '1982',
 '1998',
 '2019',
 '2017',
 '2007',
 '2017',
 '1996',
 '2019',
 '2015',
 '2018',
 '2006',
 '1996',
 '1982',
 

4. Из `dask.bag`, полученного в задании 3, создайте `dask.dataframe` при помощи метода `bag.to_dataframe`. Укажите столбец `id` в качестве индекса.

In [25]:
df = result.to_dataframe().set_index('id')
df

Unnamed: 0_level_0,username,name,sex,country,mail,registered,birthdate,name_prefix,country_code,birth_year
npartitions=5,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
1676,object,object,object,object,object,object,object,object,object,object
377695,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...
2000443928,...,...,...,...,...,...,...,...,...,...
2002372706,...,...,...,...,...,...,...,...,...,...


5. Назовем отзыв негативным, если оценка равна 0, 1 или 2. Загрузите данные о негативных отзывах из файлов архива `reviews_full` (__ЛР12__) в виде `dask.DataFrame`. Посчитайте количество отзывов с группировкой по пользователю, оставившему отзыв. Объедините результат с таблицей, полученной в задаче 4.

In [27]:
# т.к. номер файла определяет оценку (рейтинг), то беру я только три файла 1-3
dd5 = dd.read_json(['../sem12/data/reviews_full/reviews_' + str(i) + '.json' for i in range(3)])
dd5.head()

Unnamed: 0,user_id,recipe_id,date,review
0,452355,292657,2016-05-08,WOW!!! This is the best. I have never been abl...
1,329304,433404,2006-06-14,This was good but the dressing needed somethin...
2,227932,2008187,1985-11-19,"Very good,it was a hit for my family. I used 6..."
3,171468,270716,2019-05-21,Made for ZWT-8 Family Picks after I saw these ...
4,91392,1159916,1972-09-18,Very nice slaw. I especially like that it does...


In [28]:
dd5_1 = dd5.groupby('user_id').review.count()
dd5_1

Dask Series Structure:
npartitions=1
    int64
      ...
Name: review, dtype: int64
Dask Name: series-groupby-count-agg, 10 tasks

In [29]:
final_df = df.merge(dd5_1.rename('reviews_count').to_frame())
final_df.head()

Unnamed: 0,username,name,sex,country,mail,registered,birthdate,name_prefix,country_code,birth_year,reviews_count
1676,lgeorge,,M,,,,1983-06-24,,,1983,29
1792,qbeard,,F,Guinea,rachel20@hotmail.com,,1986-03-12,,GN,1986,14
1938,adambrown,William Fisher,,New Caledonia,,2019-05-03,1991-11-11,,NC,1991,3
2046,vthompson,Emily Sanford,F,United Arab Emirates,omelendez@yahoo.com,2001-10-30,1981-11-27,,AE,1981,3
2178,michaeljones,Anthony Santiago,M,Haiti,jessicaharris@gmail.com,2013-12-12,1985-08-12,Mr.,HT,1985,20
