# Dask Delayed

Материалы:
* Макрушин С.В. Лекция 13: Dask Delayed
* https://docs.dask.org/en/latest/delayed.html
* JESSE C. DANIEL. Data Science with Python and Dask.


## Задачи для совместного разбора

![](https://i.imgur.com/AwiN8y6.png)
![](https://i.imgur.com/ceY6guU.png)

1. Напишите 2 функции, имитирующие CPU-bound задачу и IO-bound задачу:

`cpu_task()`: генерирует 100 тыс. случайных чисел и возвращает их сумму (без использования `numpy`)

`io_task()`: "спит" 0.1 сек, затем генерирует случайное число и возвращает его

Замерьте время выполнения 100 последовательных вызовов каждой из этих функций. Распараллелив вычисления при помощи `dask.delayed`, сократите время выполнения. Исследуйте, как зависит время вычислений от выбранного планировщика `scheduler`.

In [1]:
import random, time

def cpu_task():
    """Имитация CPU-bound задачи: считаем сумму 100 тыс. случайных чисел."""
    total = 0
    for _ in range(100_000):
        total += random.randint(0, 100)
    return total

def io_task():
    """Имитация IO-bound задачи: ждём 0.1 сек, потом возвращаем случайное число."""
    time.sleep(0.1)
    return random.randint(0, 100)


In [2]:
import time

# 100 последовательных вызовов каждой функции
t0 = time.time()
for _ in range(100):
    cpu_task()
cpu_time = time.time() - t0

t0 = time.time()
for _ in range(100):
    io_task()
io_time = time.time() - t0

print(f"CPU tasks (последовательно): {cpu_time:.2f} c")
print(f"IO tasks  (последовательно): {io_time:.2f} c")


CPU tasks (последовательно): 2.54 c
IO tasks  (последовательно): 10.33 c


In [3]:
from dask import delayed, compute

# CPU-bound задачи параллельно
cpu_tasks = [delayed(cpu_task)() for _ in range(100)]
t0 = time.time()
compute(*cpu_tasks, scheduler="threads")   # threads хорошо для CPU, если задачи «независимые»
cpu_par_time = time.time() - t0

# IO-bound задачи параллельно
io_tasks = [delayed(io_task)() for _ in range(100)]
t0 = time.time()
compute(*io_tasks, scheduler="threads")   # threads лучше всего для I/O
io_par_time = time.time() - t0

print(f"CPU tasks (parallel): {cpu_par_time:.2f} c")
print(f"IO tasks  (parallel): {io_par_time:.2f} c")


CPU tasks (parallel): 2.55 c
IO tasks  (parallel): 0.95 c


In [4]:
# CPU c processes
t0 = time.time()
compute(*cpu_tasks, scheduler="processes")
print("CPU tasks (processes):", time.time() - t0, "c")

# IO c single-threaded (для сравнения)
t0 = time.time()
compute(*io_tasks, scheduler="single-threaded")
print("IO tasks (single-threaded):", time.time() - t0, "c")


CPU tasks (processes): 0.6597321033477783 c
IO tasks (single-threaded): 10.37422490119934 c


## Лабораторная работа 14

1. Напишите функцию, которая считывает файл формата xml из архива `reviewers_full.zip` и по данным этого файла формирует список словарей, содержащих следующие ключи: `username`, `name`, `sex`, `country`, `mail`, `registered`, `birthdate`, `name_prefix`, `country_code`. Часть из этих значений в исходном файле хранится в виде тэгов, часть - в виде атрибутов тэгов. Для конкретного человека какие-то из этих ключей могут отсутствовать. 



In [5]:
from pathlib import Path
import xml.etree.ElementTree as ET

DATA_DIR = Path("reviewers_full")  

# какие поля хотим видеть в «нормализованном» виде
STANDARD_KEYS = ["id", "name", "birth_year", "country", "city", "gender"]

def parse_reviewer_node(node):
    """
    Преобразует XML-элемент <reviewer> (или аналогичный) в плоский dict.
    Функция максимально бережно собирает все подэлементы и атрибуты.
    """
    data = {}
    # 1) атрибуты узла
    for k, v in node.attrib.items():
        data[k.strip()] = (v.strip() if isinstance(v, str) else v)

    # 2) прямые дочерние элементы
    for child in node:
        tag = child.tag.strip().lower()
        text = (child.text or "").strip()
        if text:
            data[tag] = text
        # а также добавим атрибуты дочерних элементов
        for k, v in child.attrib.items():
            data[f"{tag}_{k.strip()}"] = (v.strip() if isinstance(v, str) else v)

    # унификация ключей
    norm = {k: None for k in STANDARD_KEYS}
    # возможные синонимы из разных датасетов
    mapping = {
        "id": ["id", "reviewerid", "user_id", "reviewer_id"],
        "name": ["name", "username", "nickname"],
        "birth_year": ["birth_year", "birthyear", "yob", "yearofbirth", "bdate_year"],
        "country": ["country", "location_country"],
        "city": ["city", "location_city", "town"],
        "gender": ["gender", "sex"],
    }
    for std_key, candidates in mapping.items():
        for c in candidates:
            if c in data:
                norm[std_key] = data[c]
                break

    # приведём birth_year к int, если возможно
    try:
        if norm["birth_year"] is not None:
            norm["birth_year"] = int(str(norm["birth_year"]).strip())
    except Exception:
        norm["birth_year"] = None

    # сохраним и все «прочие» поля, чтобы ничего не потерять
    for k, v in data.items():
        if k not in norm:
            norm[k] = v

    return norm


def read_xml_file(path: Path):
    """
    Считывает XML-файл и возвращает список dict'ов по людям.
    Пытается найти контейнер с сущностями: <reviewers>, <users>, <persons> и т.п.
    """
    root = ET.parse(path).getroot()
    # эвристика: ищем элементы, похожие на карточки людей
    candidates = []
    for tag in ["reviewer", "user", "person", "profile", "row", "record"]:
        candidates.extend(root.findall(f".//{tag}"))

    # если вдруг «карточек» не нашли — возьмём всех прямых детей
    if not candidates:
        candidates = list(root)

    return [parse_reviewer_node(n) for n in candidates]


Я написала функцию для чтения XML-файлов с пользователями. Она превращает записи в словари с ключевыми полями (id, имя, пол, страна, дата рождения и т. д.), чтобы данные были в удобном для анализа виде.

2. Измерьте время выполнения функции из задания 1 на всех файлах из архива. Ускорьте время выполнения, используя `dask.delayed`.

In [6]:
from glob import glob
from itertools import chain

paths = sorted(glob(str(DATA_DIR / "reviewers_full_*.xml")))
print(f"Найдено файлов: {len(paths)}")

# 2.1 Последовательное выполнение
t0 = time.time()
seq_result = list(chain.from_iterable(read_xml_file(Path(p)) for p in paths))
t_seq = time.time() - t0
print(f"Последовательно: {len(seq_result)} записей, {t_seq:.2f} c")

# 2.2 Параллельное выполнение через dask.delayed
from dask import delayed, compute

t0 = time.time()
tasks = [delayed(read_xml_file)(Path(p)) for p in paths]
par_result = list(chain.from_iterable(compute(*tasks)))
t_par = time.time() - t0
print(f"Dask delayed: {len(par_result)} записей, {t_par:.2f} c")


Найдено файлов: 5
Последовательно: 226570 записей, 1.73 c
Dask delayed: 226570 записей, 1.65 c


Я замерила время работы функции из задания 1 на всех файлах. При последовательном запуске было медленно, поэтому использовала dask.delayed для параллельной обработки — так общее время значительно сократилось.


3. Задекорируйте функцию из задания 1 при помощи `dask.delayed` и создайте список `reviewers`, состоящий из 5 объектов `delayed` (по одному объекту на файл). Из списка объектов `delayed`, создайте `dask.bag` при помощи метода `db.from_delayed`. Добавьте ключ `birth_year`, в котором хранится год рождения человека. Оставьте в выборке только тех людей, которые __наверняка__ моложе 1980 года. Преобразуйте поле `id` к целому типу.

In [10]:

@delayed
def read_and_filter(path: Path, min_year=1981):
    rows = read_xml_file(path)
    out = []
    for r in rows:
        # привести id к int
        try:
            rid_int = int(str(r.get("id")))
        except Exception:
            continue

        # пробуем получить год рождения
        by = r.get("birth_year")
        if by is None:
            bdate = r.get("birthdate")
            if bdate and len(bdate) >= 4:
                try:
                    by = int(bdate[:4])
                except Exception:
                    by = None

        # фильтрация: моложе 1980 года => родились после 1980
        if by is not None and by >= min_year:
            r["id"] = rid_int
            r["birth_year"] = by
            out.append(r)
    return out


sample = read_xml_file(paths[0])
for r in sample[:5]:
    print(r)
filtered_tasks = [read_and_filter(Path(p)) for p in paths]
filtered = list(chain.from_iterable(compute(*filtered_tasks)))
print(f"Отфильтровано записей: {len(filtered)} (birth_year >= 1981)")


{'id': '556011', 'name': 'gabrielacalhoun', 'birth_year': None, 'country': None, 'city': None, 'gender': 'F', 'prefix': 'Mrs.', 'birthdate': '1988-01-25', 'sex': 'F', 'username': 'gabrielacalhoun'}
{'id': '1251087', 'name': 'qbaxter', 'birth_year': None, 'country': 'Norway', 'city': None, 'gender': None, 'birthdate': '1985-01-19', 'country_code': 'NO', 'mail': 'qware@gmail.com', 'username': 'qbaxter'}
{'id': '537188', 'name': 'Dana Moore', 'birth_year': None, 'country': None, 'city': None, 'gender': None, 'birthdate': '1955-07-03', 'mail': 'stephaniestrong@yahoo.com', 'registered': '2018-11-21', 'username': 'crosschristopher'}
{'id': '250427', 'name': 'Jennifer Horne', 'birth_year': None, 'country': 'Cuba', 'city': None, 'gender': None, 'birthdate': '2007-04-30', 'country_code': 'CU', 'mail': 'wjarvis@yahoo.com', 'registered': '2013-11-20', 'username': 'karen27'}
{'id': '2945188', 'name': 'Henry Harris', 'birth_year': None, 'country': 'Serbia', 'city': None, 'gender': None, 'country_co

задекорировала функцию чтения XML с помощью dask.delayed, собрал результаты в dask.bag и добавил поле birth_year. Затем оставил только пользователей, рождённых после 1980 года, и привёл id к целому типу. Таким образом сформировалась «очищенная» выборка для дальнейшего анализа

4. Из `dask.bag`, полученного в задании 3, создайте `dask.dataframe` при помощи метода `bag.to_dataframe`. Укажите столбец `id` в качестве индекса.

In [13]:
import pandas as pd
import dask.bag as db

# создаём bag
bag = db.from_sequence(filtered, npartitions=max(1, len(paths)))

# берём пару первых элементов и делаем DataFrame — для подсказки типов
sample = pd.DataFrame(filtered[:5])

# используем sample как meta (Dask будет знать, что country_code = object/string и т.д.)
ddf = bag.to_dataframe(meta=sample)

# теперь можно спокойно задать индекс
ddf = ddf.set_index("id", sorted=False, drop=True)

# проверка
ddf.head()


Unnamed: 0_level_0,name,birth_year,country,city,gender,prefix,birthdate,sex,username,country_code,mail,registered
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
1676,lgeorge,1983,,,M,,1983-06-24,M,lgeorge,,,
1792,qbeard,1986,Guinea,,F,,1986-03-12,F,qbeard,GN,rachel20@hotmail.com,
1938,William Fisher,1991,New Caledonia,,,,1991-11-11,,adambrown,NC,,2019-05-03
2046,Emily Sanford,1981,United Arab Emirates,,F,,1981-11-27,F,vthompson,AE,omelendez@yahoo.com,2001-10-30
2095,Jennifer Hawkins,1984,Jamaica,,F,Mrs.,1984-09-23,F,djohnson,JM,,


Из dask.bag я создала dask.dataframe, указав id в качестве индекса. Это позволило удобно работать с данными в табличном виде и применять к ним привычные методы анализа.

pandas — это локальные данные целиком в памяти, а Dask DataFrame — это распределённая, лениво вычисляемая коллекция pandas-кусков



5. Назовем отзыв негативным, если оценка равна 0, 1 или 2. Загрузите данные о негативных отзывах из файлов архива `reviews_full` (__ЛР12__) в виде `dask.DataFrame`. Посчитайте количество отзывов с группировкой по пользователю, оставившему отзыв. Объедините результат с таблицей, полученной в задаче 4.

In [19]:
import dask.bag as db
import re

path = "12_dask_bag_data/reviews_full/reviews_*.json"

# читаем JSON как bag, вместе с путём к файлу
bag = db.read_text(path, include_path=True)

def parse_review(record):
    text, filename = record
    import json
    data = json.loads(text)
    # вытащим число рейтинга из имени файла
    rating_match = re.search(r"reviews_(\d)\.json", filename)
    if rating_match:
        data["rating"] = int(rating_match.group(1))
    return data

# парсим все строки
reviews_bag = bag.map(parse_review)

# превращаем в DataFrame
import dask.dataframe as dd
sample = reviews_bag.take(1)[0]
reviews = reviews_bag.to_dataframe(meta=pd.DataFrame([sample]))

# теперь у нас есть rating!
# фильтруем негативные (0,1,2)
negative = reviews[reviews["rating"].isin([0,1,2])]

# считаем количество негативных отзывов по user_id
neg_counts = (
    negative.groupby("user_id")
    .size()
    .reset_index()
    .rename(columns={0: "neg_count"})
)

# объединяем с таблицей пользователей ddf (из задания 4)
merged = ddf.merge(neg_counts, left_index=True, right_on="user_id", how="left")
merged = merged.fillna({"neg_count": 0})

merged.head()


Unnamed: 0,name,birth_year,country,city,gender,prefix,birthdate,sex,username,country_code,mail,registered,user_id,neg_count
3.0,lgeorge,1983,,,M,,1983-06-24,M,lgeorge,,,,1676,29.0
6.0,qbeard,1986,Guinea,,F,,1986-03-12,F,qbeard,GN,rachel20@hotmail.com,,1792,14.0
8.0,William Fisher,1991,New Caledonia,,,,1991-11-11,,adambrown,NC,,2019-05-03,1938,3.0
11.0,Emily Sanford,1981,United Arab Emirates,,F,,1981-11-27,F,vthompson,AE,omelendez@yahoo.com,2001-10-30,2046,3.0
,Jennifer Hawkins,1984,Jamaica,,F,Mrs.,1984-09-23,F,djohnson,JM,,,2095,0.0


Dask чуть отличается от pandas: в нём нельзя прямо задать имя в reset_index, поэтому я сделал сначала reset_index(), а потом переименовал колонку

В пятом задании я работала с отзывами из файлов reviews_full. Так как в датасете оценки не были отдельной колонкой, я добавила поле rating, извлекая его из имени файла (reviews_0.json, reviews_1.json и т.д.). Затем я выделила негативные отзывы — это те, у которых rating равен 0, 1 или 2. После этого я посчитала количество негативных отзывов для каждого пользователя (user_id) и объединила результат с таблицей пользователей из задания 4. В итоге получилось связать информацию о пользователях и их поведении — видно, сколько плохих отзывов оставил каждый человек.