# Лабораторная работа 11 — HDF5, Dask (эмуляция) и блочная обработка

Файл: `minutes_n_ingredients_full.hdf5`, dataset: `recipe` (shape: N×3, колонки: `recipe_id`, `minutes`, `n_ingredients`).

In [1]:
import os, time, math, numpy as np, h5py, pandas as pd
from pathlib import Path
try:
    import dask.array as da
    HAS_DASK = True
except Exception as e:
    HAS_DASK = False

DATA_PATH = "minutes_n_ingredients_full.hdf5"
DATASET_NAME = "recipe"
assert os.path.exists(DATA_PATH), "Файл не найден: " + DATA_PATH

# Вспомогательные
def block_iter_indices(n_rows: int, blocksize: int):
    idx = 0
    while idx < n_rows:
        j = min(idx + blocksize, n_rows)
        yield idx, j
        idx = j

def streaming_mean_for_columns_h5(dset, cols, blocksize=100_000):
    n = dset.shape[0]
    sums = np.zeros(len(cols), dtype=np.float64)
    total = 0
    for start, end in block_iter_indices(n, blocksize):
        frag = dset[start:end, cols]
        sums += frag.sum(axis=0)
        total += (end - start)
    return sums / total

def streaming_mean_single_column(dset, col=1, blocksize=100_000):
    n = dset.shape[0]
    s = 0.0
    total = 0
    for start, end in block_iter_indices(n, blocksize):
        frag = dset[start:end, col]
        s += float(frag.sum())
        total += (end - start)
    return s / total

f = h5py.File(DATA_PATH, "r")
dset = f[DATASET_NAME]
dset.shape, dset.dtype

((2231637, 3), dtype('int64'))

## Задание 1
Считайте датасет `recipe` из файла `minutes_n_ingredients_full.hdf5` в виде `dask.array`. Укажите аргумент `chunks=(100_000, 3)` при создании массива. Выведите на экран основную информацию о массиве.

In [3]:
if HAS_DASK:
    darr = da.from_array(dset, chunks=(100_000, 3))
    print(darr)
    print("numblocks:", darr.numblocks)
else:
    print("Dask недоступен — используем блочную обработку (эмуляция chunks).")
    est_blocks = math.ceil(dset.shape[0] / 100_000)
    print(f"Оценочно блоков: {est_blocks} × по оси 0 при blocksize=100_000")


dask.array<array, shape=(2231637, 3), dtype=int64, chunksize=(100000, 3), chunktype=numpy.ndarray>
numblocks: (23, 1)


## Задание 2
Вычислите среднее значение по каждому столбцу, кроме первого. 

In [5]:
if HAS_DASK:
    try:
        means = da.from_array(dset, chunks=(100_000, 3))[:, 1:].mean(axis=0).compute()
    except Exception:
        means = streaming_mean_for_columns_h5(dset, [1, 2], 100_000)
else:
    means = streaming_mean_for_columns_h5(dset, [1, 2], 100_000)
means

array([1004.20805176,    5.4198008 ])

## Задание 3
Исследуйте, как влияет значение аргумента `chunks` при создании `dask.array` на скорость выполнения операции поиска среднего. 

In [7]:
import pandas as pd, time
tested_blocks = [10_000, 50_000, 100_000, 250_000, 500_000, 1_000_000]
rows = []
for bs in tested_blocks:
    t0 = time.perf_counter()
    m = streaming_mean_for_columns_h5(dset, [1, 2], blocksize=bs)
    dt = time.perf_counter() - t0
    rows.append({"blocksize": bs, "time_sec": dt, "mean_minutes": m[0], "mean_n_ingredients": m[1]})
pd.DataFrame(rows).sort_values("blocksize").reset_index(drop=True)

Unnamed: 0,blocksize,time_sec,mean_minutes,mean_n_ingredients
0,10000,0.046685,1004.208052,5.419801
1,50000,0.040618,1004.208052,5.419801
2,100000,0.045171,1004.208052,5.419801
3,250000,0.056519,1004.208052,5.419801
4,500000,0.038617,1004.208052,5.419801
5,1000000,0.043436,1004.208052,5.419801


In [9]:
import pandas as pd, time
tested_blocks = [10_000, 50_000, 100_000, 250_000, 500_000, 1_000_000]
rows = []
for bs in tested_blocks:
    t0 = time.perf_counter()
    m = da.from_array(dset, chunks=(bs, 3))[:, 1:].mean(axis=0).compute()
    dt = time.perf_counter() - t0
    rows.append({"blocksize": bs, "time_sec": dt, "mean_minutes": m[0], "mean_n_ingredients": m[1]})
pd.DataFrame(rows).sort_values("blocksize").reset_index(drop=True)

Unnamed: 0,blocksize,time_sec,mean_minutes,mean_n_ingredients
0,10000,0.098276,1004.208052,5.419801
1,50000,0.036365,1004.208052,5.419801
2,100000,0.036891,1004.208052,5.419801
3,250000,0.033991,1004.208052,5.419801
4,500000,0.031297,1004.208052,5.419801
5,1000000,0.038371,1004.208052,5.419801


## Задание 4
Выберите рецепты, время выполнения которых меньше медианного значения

In [12]:
minutes = np.array(dset[:, 1], dtype=np.int64)
median_minutes = float(np.median(minutes))
mask = minutes < median_minutes
idx = np.nonzero(mask)[0][:10]
rows = [tuple(dset[i, :]) for i in idx]
import pandas as pd
pd.DataFrame(rows, columns=["recipe_id", "minutes", "n_ingredients"])

Unnamed: 0,recipe_id,minutes,n_ingredients
0,1089012,23,5
1,1428572,0,5
2,1400250,24,1
3,1798295,29,5
4,818815,21,5
5,1121713,11,4
6,1880251,5,3
7,1239263,22,3
8,1071099,20,2
9,1024438,1,7


In [16]:
import dask.dataframe as dd
# Оборачиваем HDF5-датасет в Dask Array (читаем лениво, блоками)
darr = da.from_array(dset, chunks=(100_000, 3))
# Явно задаём meta (dtype всех столбцов int64), чтобы не было ошибок инференции
meta = pd.DataFrame({
        "recipe_id": pd.Series(dtype="int64"),
        "minutes": pd.Series(dtype="int64"),
        "n_ingredients": pd.Series(dtype="int64"),
})
ddf = dd.from_dask_array(darr, columns=meta.columns, meta=meta)
# Скалярная медиана по столбцу minutes
median_minutes = float(ddf["minutes"].quantile(0.5).compute())
# Фильтрация и первые 10 строк
result = ddf.loc[ddf["minutes"] < median_minutes, ["recipe_id", "minutes", "n_ingredients"]].head(10)
result

Unnamed: 0,recipe_id,minutes,n_ingredients
1,1089012,23,5
2,1428572,0,5
3,1400250,24,1
5,1798295,29,5
8,818815,21,5
10,357565,32,6
12,1121713,11,4
16,1880251,5,3
17,1239263,22,3
18,1071099,20,2


## Задание 5
Посчитайте количество каждого из возможных значений кол-ва ингредиентов

In [18]:
n_ing = np.array(dset[:, 2], dtype=np.int64)
u, c = np.unique(n_ing, return_counts=True)
import pandas as pd
pd.DataFrame({"n_ingredients": u, "count": c}).sort_values("n_ingredients").reset_index(drop=True).head(20)

Unnamed: 0,n_ingredients,count
0,1,222071
1,2,224158
2,3,229388
3,4,234948
4,5,240720
5,6,244360
6,7,247181
7,8,246747
8,9,246816
9,10,22430


In [20]:
darr = da.from_array(dset, chunks=(100_000, 3))
ddf = dd.from_dask_array(darr, columns=meta.columns, meta=meta)
# Группировка по числу ингредиентов и подсчёт
counts = (
        ddf.groupby("n_ingredients")
           .size()
           .compute()
           .reset_index(name="count")
           .sort_values("n_ingredients")
           .reset_index(drop=True)
    )
counts.head(20)

Unnamed: 0,n_ingredients,count
0,1,222071
1,2,224158
2,3,229388
3,4,234948
4,5,240720
5,6,244360
6,7,247181
7,8,246747
8,9,246816
9,10,22430


## Задание 6
Найдите максимальную продолжительность рецепта. Ограничьте максимальную продолжительность рецептов сверху значением, равному 75% квантилю.

In [22]:
max_minutes = int(minutes.max())
q75 = float(np.quantile(minutes, 0.75))
clipped_max = float(np.minimum(minutes, q75).max())
max_minutes, q75, clipped_max

(2147483647, 49.0, 49.0)

In [24]:
# Dask Array из HDF5
darr = da.from_array(dset, chunks=(100_000, 3))
# Берём только колонку minutes (вторая колонка)
minutes = darr[:, 1]
# 1. Максимальное значение
max_minutes = int(minutes.max().compute())
# 2. 75%-квантиль
q75 = float(da.percentile(minutes, 75).compute())
# 3. Обрезаем сверху и ищем максимум
minutes_clipped = da.minimum(minutes, q75)
clipped_max = float(minutes_clipped.max().compute())
max_minutes, q75, clipped_max

(2147483647, 49.0, 49.0)

## Задание 7
Создайте массив `dask.array` из 2 чисел, содержащих ваши предпочтения относительно времени выполнения рецепта и кол-ва ингредиентов. Найдите наиболее похожий (в смысле $L_1$) рецепт из имеющихся в датасете.

In [26]:
pref = np.array([30.0, 6.0])
feats = np.stack([minutes.astype(np.float64), n_ing.astype(np.float64)], axis=1)
l1 = np.abs(feats - pref).sum(axis=1)
best_idx = int(np.argmin(l1))
best_row = tuple(dset[best_idx, :])
best_idx, best_row, float(l1[best_idx])

(34, (113179, 30, 6), 0.0)

In [28]:
darr = da.from_array(dset, chunks=(100_000, 3))
# Берём признаки minutes и n_ingredients
feats = darr[:, 1:3].astype("float64")   # shape = (N, 2)
# Предпочтения
pref = da.from_array([30.0, 6.0], chunks=(2,))
# Считаем L1 расстояния: сумма модулей разностей по двум признакам
l1 = da.abs(feats - pref).sum(axis=1)  # shape = (N,)
# Находим индекс минимального расстояния
best_idx = int(l1.argmin().compute())
# Получаем строку из исходного HDF5 по индексу
best_row = tuple(dset[best_idx, :])
# Считаем само минимальное расстояние
best_dist = float(l1[best_idx].compute())
best_idx, best_row, best_dist

(34, (113179, 30, 6), 0.0)

## Задание 8
Работая с исходным файлом в формате `hdf5`, реализуйте алгоритм подсчета среднего значения в блочной форме и вычислите с его помощью среднее значение второго столбца в массиве.

Блочный алгоритм вычислений состоит из двух частей:
1. Загрузка фрагмента за фрагментом данных по `blocksize` элементов и проведение вычислений на этим фрагментом
2. Агрегация результатов вычислений на различных фрагментах для получения результата на уровне всего набора данных

Важно: при работе с `h5py` в память загружаются не все элементы, а только те, которые запрашиваются в данный момент

In [34]:
mean_minutes_block = streaming_mean_single_column(dset, col=1, blocksize=100_000)
mean_minutes_numpy = float(minutes.mean())
(mean_minutes_block, mean_minutes_numpy, abs(mean_minutes_block - mean_minutes_numpy))

(1004.2080517575215, 1004.2080517575215, 0.0)

In [36]:
f.close()