# Dask Array

Материалы:
* Макрушин С.В. Лекция 11: Dask
* https://docs.dask.org/en/latest/array.html
* JESSE C. DANIEL. Data Science with Python and Dask. 

## Задачи для совместного разбора

1. Создайте массив размерностью 1000 на 300000, заполненный числами из стандартного нормального распределения. Исследуйте основные характеристики полученного массива.

2. Посчитайте сумму квадратов элементов массива, созданного в задаче 1. Создайте массив `np.array` такого же размера и сравните скорость решения задачи с использование `da.array` и `np.array`

3. Визуализируйте граф вычислений для задачи 2.

## Лабораторная работа 11

In [1]:
import dask.array as da
import h5py
import numpy as np

1. Считайте датасет `recipe` из файла `minutes_n_ingredients_full.hdf5` в виде `dask.array`. Укажите аргумент `chunks=(100_000, 3)` при создании массива. Выведите на экран основную информацию о массиве.

In [2]:
f = h5py.File('minutes_n_ingredients_full.hdf5')
d = f['recipe']
recipes = da.from_array(d, (100_000, 3))

In [3]:
recipes

Unnamed: 0,Array,Chunk
Bytes,51.08 MiB,2.29 MiB
Shape,"(2231637, 3)","(100000, 3)"
Count,24 Tasks,23 Chunks
Type,int64,numpy.ndarray
"Array Chunk Bytes 51.08 MiB 2.29 MiB Shape (2231637, 3) (100000, 3) Count 24 Tasks 23 Chunks Type int64 numpy.ndarray",3  2231637,

Unnamed: 0,Array,Chunk
Bytes,51.08 MiB,2.29 MiB
Shape,"(2231637, 3)","(100000, 3)"
Count,24 Tasks,23 Chunks
Type,int64,numpy.ndarray


In [11]:
recipes[1].compute()

array([1089012,      23,       5], dtype=int64)

|id|minutes|n_ingridients|
|---|---|---|
|1089012|23|5|

2. Вычислите среднее значение по каждому столбцу, кроме первого. 

In [4]:
print(recipes[:, 1].mean().compute())
print(recipes[:, 2].mean().compute())

1004.2080517575215
5.419800800936711


3. Исследуйте, как влияет значение аргумента `chunks` при создании `dask.array` на скорость выполнения операции поиска среднего. 

In [5]:
recipes.shape

(2231637, 3)

In [67]:
from time import time

times = []
for i in [10_000, 25_000, 50_000, 100_000, 250_000, 500_000, 1_000_000]:
    r = da.from_array(d, (i, 3))
    start = time()
    r[:, 1].mean().compute()
    times.append((i, time() - start))
times

[(10000, 0.517362117767334),
 (25000, 0.08109331130981445),
 (50000, 0.056220054626464844),
 (100000, 0.049894094467163086),
 (250000, 0.05480194091796875),
 (500000, 0.04646778106689453),
 (1000000, 0.08265352249145508)]

4. Выберите рецепты, время выполнения которых меньше медианного значения

In [13]:
median = da.median(recipes[:, 1], axis=0).compute()
median

32.0

In [14]:
recipes[recipes[:, 1] < median, :].compute()

array([[1089012,      23,       5],
       [1428572,       0,       5],
       [1400250,      24,       1],
       ...,
       [1029131,      19,       4],
       [1700703,       1,       1],
       [ 713836,       0,       9]], dtype=int64)

5. Посчитайте количество каждого из возможных значений кол-ва ингредиентов

In [16]:
import pandas as pd

In [69]:
values, counts = da.unique(recipes[:, 2], return_counts=True)
pd.DataFrame({"value": values.compute(), "count": counts.compute()})

Unnamed: 0,value,count
0,1,222071
1,2,224158
2,3,229388
3,4,234948
4,5,240720
5,6,244360
6,7,247181
7,8,246747
8,43,342064


6. Найдите максимальную продолжительность рецепта. Ограничьте максимальную продолжительность рецептов сверху значением, равному 75% квантилю.

In [70]:
mx = da.max(recipes[:, 1]).compute()
mx

2147483647

In [71]:
q75 = recipes.to_dask_dataframe()[1].quantile(0.75).compute()
q75

49.0

In [74]:
recipes[recipes[:, 1] > q75, 1] = q75

In [75]:
recipes[recipes[:, 1] > q75, 1].compute()

array([], dtype=int64)

7. Создайте массив `dask.array` из 2 чисел, содержащих ваши предпочтения относительно времени выполнения рецепта и кол-ва ингредиентов. Найдите наиболее похожий (в смысле $L_1$) рецепт из имеющихся в датасете.

In [41]:
a = da.from_array([15, 2])

In [42]:
recipes[da.fabs((a - recipes[:, 1:])).sum(axis=1).argmin()].compute()

array([1511513,      15,       2], dtype=int64)

8. Работая с исходным файлом в формате `hdf5`, реализуйте алгоритм подсчета среднего значения в блочной форме и вычислите с его помощью среднее значение второго столбца в массиве.

Блочный алгоритм вычислений состоит из двух частей:
1. загрузка фрагмента за фрагментом данных по `blocksize` элементов и проведение вычислений на этим фрагментом;
2. агрегация результатов вычислений на различных фрагментах для получения результата на уровне всего набора данных.

Важно: при работе с `h5py` в память загружаются не все элементы, а только те, которые запрашиваются в данный момент

In [62]:
f = h5py.File('minutes_n_ingredients_full.hdf5')
ds = f['recipe']

In [63]:
def mean(dataset, blocksize, column):
    sum_size = []
    for i in range(0, dataset.shape[0] + 1, blocksize):
        end = (i + blocksize if (i + blocksize <= dataset.shape[0] + 1) else dataset.shape[0] + 1)
        sum_size.append((dataset[i:end, column].sum(), end - i))
    return sum([a[0] for a in sum_size]) / dataset.shape[0]

In [64]:
%time
mean(ds, 500_000, 1)

Wall time: 0 ns


1004.2080517575215

In [61]:
%time
recipes[:, 1].mean().compute()

Wall time: 0 ns


1004.2080517575215

In [1]:
import dask.array as da
x = da.random.random((10_000, 1_000), chunks=(1000, 1000))
x

Unnamed: 0,Array,Chunk
Bytes,76.29 MiB,7.63 MiB
Shape,"(10000, 1000)","(1000, 1000)"
Count,10 Tasks,10 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 76.29 MiB 7.63 MiB Shape (10000, 1000) (1000, 1000) Count 10 Tasks 10 Chunks Type float64 numpy.ndarray",1000  10000,

Unnamed: 0,Array,Chunk
Bytes,76.29 MiB,7.63 MiB
Shape,"(10000, 1000)","(1000, 1000)"
Count,10 Tasks,10 Chunks
Type,float64,numpy.ndarray


In [21]:
rn = da.random.random_integers(0, 10,(100, 1_000))
rn

Unnamed: 0,Array,Chunk
Bytes,390.62 kiB,390.62 kiB
Shape,"(100, 1000)","(100, 1000)"
Count,1 Tasks,1 Chunks
Type,int32,numpy.ndarray
"Array Chunk Bytes 390.62 kiB 390.62 kiB Shape (100, 1000) (100, 1000) Count 1 Tasks 1 Chunks Type int32 numpy.ndarray",1000  100,

Unnamed: 0,Array,Chunk
Bytes,390.62 kiB,390.62 kiB
Shape,"(100, 1000)","(100, 1000)"
Count,1 Tasks,1 Chunks
Type,int32,numpy.ndarray


In [22]:
a = rn.mean(0).compute()

In [23]:
sum(a > 6)

1

In [24]:
sum(i>6 for i in a)

1