# Dask Array

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* Макрушин С.В. Лекция "Dask"
* https://docs.dask.org/en/latest/array.html

In [1]:
import dask.array as da
import h5py
import numpy as np
import dask
import pandas as pd

In [2]:
# !pip install graphviz 

In [3]:
# !pip install --user --upgrade dask

In [4]:
dask.__version__

'2021.10.0'

## Задачи для совместного разбора

**1.** Создайте массив размерностью 1000 на 300000, заполненный числами из стандартного нормального распределения. Исследуйте основные характеристики полученного массива. Визуализируйте граф вычисления задачи.

In [5]:
import numpy as np
import h5py


with h5py.File("demo.h5", "w") as hdf:
    hdf.create_dataset('arr', data=np.random.normal(0, 1, size = (1000, 300_000)))

In [6]:
hdf = h5py.File('demo.h5', 'r')
dset = hdf['arr']
arr = da.from_array(dset,) 
#                     chunks=(1000,300_000))
arr

Unnamed: 0,Array,Chunk
Bytes,2.24 GiB,114.44 MiB
Shape,"(1000, 300000)","(1000, 15000)"
Count,21 Tasks,20 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.24 GiB 114.44 MiB Shape (1000, 300000) (1000, 15000) Count 21 Tasks 20 Chunks Type float64 numpy.ndarray",300000  1000,

Unnamed: 0,Array,Chunk
Bytes,2.24 GiB,114.44 MiB
Shape,"(1000, 300000)","(1000, 15000)"
Count,21 Tasks,20 Chunks
Type,float64,numpy.ndarray


In [7]:
%%time
arr.mean().compute()

Wall time: 1.12 s


-0.00015286184753828664

In [8]:
%%time
arr1 = arr *2 
s = arr1.sum()
m = arr1.mean()
dask.compute(s,m)

Wall time: 1.26 s


(-91717.10852297198, -0.00030572369507657327)

**2.** Посчитайте сумму квадратов элементов массива, созданного в задаче 1. Создайте массив `np.array` такого же размера и сравните скорость решения задачи с использование `da.array` и `np.array`

In [9]:
%%time
arr_da = da.random.normal(0, 1, size = (1000, 300000))
(arr_da ** 2).sum().compute()

Wall time: 1.77 s


299970443.9540201

## Лабораторная работа 11

__При решении данных задач не подразумевается использования циклов или генераторов Python в ходе работы с пакетами `numpy`, `pandas` и `dask`, если в задании не сказано обратного. Решения задач, в которых для обработки массивов `numpy`, структур `pandas` или структур `dask` используются явные циклы (без согласования с преподавателем), могут быть признаны некорректными и не засчитаны.__

В ходе выполнения все операции вычислений (расчет средних значений, расчет косинусной близости и т.д.) проводятся над `dask.array` и средствами пакета `dask`, если в задании не сказано обратного. Переход от `dask.array` к `numpy.array` или `pd.DataFrame` возможен исключительно для демонстрации результата в конце решения задачи. Если в задаче используются результаты выполнения предыдущих задач, то подразумевается, что вы используете результаты в виде `dask.array` (то есть то, что было получено до вызова `compute`, а не после).

In [1]:
import dask.array as da
import h5py
import numpy as np
import dask
import pandas as pd

**1\.** Считайте датасет `embeddings` из файла `recipe_embeddings.h5` в виде `dask.array`. Выведите на экран основную информацию о массиве: размер, форму, тип, количество и размер сегментов. 

In [2]:
file = h5py.File('C:/Users/micha/Downloads/recipe_embeddings.h5', 'r') 

In [3]:
embeddings_da = da.from_array(file['embeddings'])
embeddings_da

Unnamed: 0,Array,Chunk
Bytes,1.39 GiB,119.02 MiB
Shape,"(1200000, 312)","(100000, 312)"
Count,13 Tasks,12 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 1.39 GiB 119.02 MiB Shape (1200000, 312) (100000, 312) Count 13 Tasks 12 Chunks Type float32 numpy.ndarray",312  1200000,

Unnamed: 0,Array,Chunk
Bytes,1.39 GiB,119.02 MiB
Shape,"(1200000, 312)","(100000, 312)"
Count,13 Tasks,12 Chunks
Type,float32,numpy.ndarray


**2\.** Посчитайте и выведите на экран среднее значение всех элементов массива. Исследуйте, как влияет значение аргумента `chunks` при создании `dask.array` на скорость выполнения операции поиска среднего. 

Пусть $M$ - количество строк в массиве, $N$ - количество столбцов в массиве, `chunks=(r,c)`. Сравните несколько вариантов:
* $r=M$, $с \ll N$ , 
* $r \ll M$, $c=N$ 
* $r = M$, $c = N$ 
* значения $r, c$ по умолчанию.

Выберите наиболее оптимальные значения $r$ и  $c$ в смысле скорости вычислений и далее продолжайте работу с ними.

In [4]:
def mean_among_array(chunks:tuple):
    embeddings_da = da.from_array(file['embeddings'], chunks=chunks)
    return embeddings_da.mean()

In [5]:
variants = [(embeddings_da.shape[0], embeddings_da.shape[1]//100),
            (embeddings_da.shape[0]//100, embeddings_da.shape[1]),
            (embeddings_da.shape[0], embeddings_da.shape[1])]
variants

[(1200000, 3), (12000, 312), (1200000, 312)]

$\text{1 вариант: (1200000, 3)}$

In [6]:
%time mean_among_array(variants[0]).compute()

Wall time: 31.1 s


0.0023777562

$\text{2 вариант: (12000, 312)}$

In [7]:
%time mean_among_array(variants[1]).compute()

Wall time: 564 ms


0.0023777566

 $\text{3 вариант: (1200000, 312)}$

In [8]:
%time mean_among_array(variants[2]).compute()

Wall time: 744 ms


0.0023777678

 $\text{4 вариант: defalt}$

In [9]:
%time da.from_array(file['embeddings']).mean().compute()

Wall time: 521 ms


0.0023777566

$\text{Выбираем 2 вариант}$

In [10]:
embeddings_da = da.from_array(file['embeddings'], chunks=variants[1])
embeddings_da

Unnamed: 0,Array,Chunk
Bytes,1.39 GiB,14.28 MiB
Shape,"(1200000, 312)","(12000, 312)"
Count,101 Tasks,100 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 1.39 GiB 14.28 MiB Shape (1200000, 312) (12000, 312) Count 101 Tasks 100 Chunks Type float32 numpy.ndarray",312  1200000,

Unnamed: 0,Array,Chunk
Bytes,1.39 GiB,14.28 MiB
Shape,"(1200000, 312)","(12000, 312)"
Count,101 Tasks,100 Chunks
Type,float32,numpy.ndarray


**3\.** Опишите пространство, в котором расположены эмбеддинги, посчитав минимальное и максимальное значение для каждой из координат. Сведите результаты в таблицу `pd.DataFrame`, состоящую из двух строк и 312 столбцов. Задайте индексы строк "min" и "max". Названия столбцов сделайте вида $x_i$. Выведите полученную таблицу на экран.

Решите задачу двумя способами. В первом варианте сделайте два вызова метода `compute` для расчета каждого из векторов максимальных и минимальных значений. Во втором варианте сделайте один вызов функции `dask.compute` для одновременного расчета двух векторов. Сравните время выполнения двух решений.

In [11]:
%%time
min_da = embeddings_da.min(axis=0).compute()
max_da = embeddings_da.max(axis=0).compute()

pd.DataFrame([min_da, max_da],
             columns=['x'+str(i) for i in np.arange(0, embeddings_da.shape[1])],
             index=['min','max'])

Wall time: 1.07 s


Unnamed: 0,x0,x1,x2,x3,x4,x5,x6,x7,x8,x9,...,x302,x303,x304,x305,x306,x307,x308,x309,x310,x311
min,-0.132803,-0.149056,-0.094468,-0.191697,-0.114229,-0.114341,-0.096039,-0.115178,-0.157275,-0.116715,...,-0.103254,-0.122285,-0.149789,-0.127703,-0.094802,-0.11969,-0.141425,-0.123732,-0.081543,-0.227348
max,0.135038,0.076125,0.157854,0.030987,0.101192,0.111774,0.147497,0.173821,0.099808,0.115573,...,0.119518,0.197589,0.113135,0.13649,0.162921,0.099021,0.086653,0.158176,0.166968,0.048967


In [12]:
%%time
min_da = embeddings_da.min(axis=0)
max_da = embeddings_da.max(axis=0)
min_da_max_da = dask.compute(min_da, max_da)

pd.DataFrame([min_da_max_da[0], min_da_max_da[1]],
             columns=['x'+str(i) for i in np.arange(0, embeddings_da.shape[1])],
             index=['min','max'])

Wall time: 577 ms


Unnamed: 0,x0,x1,x2,x3,x4,x5,x6,x7,x8,x9,...,x302,x303,x304,x305,x306,x307,x308,x309,x310,x311
min,-0.132803,-0.149056,-0.094468,-0.191697,-0.114229,-0.114341,-0.096039,-0.115178,-0.157275,-0.116715,...,-0.103254,-0.122285,-0.149789,-0.127703,-0.094802,-0.11969,-0.141425,-0.123732,-0.081543,-0.227348
max,0.135038,0.076125,0.157854,0.030987,0.101192,0.111774,0.147497,0.173821,0.099808,0.115573,...,0.119518,0.197589,0.113135,0.13649,0.162921,0.099021,0.086653,0.158176,0.166968,0.048967


Используя `dask.compute(min, max)` вместо `min.compute()` + `max.compute()`, мы уменьшили скорость выполнения программы в **два раза**

**4\.** Найдите вектор $x \ne x_{256}$ из набора данных, ближайший к вектору $x_{256}$ в смысле метрики $L_1$. Выведите на экран первые 10 координат вектора $x$.

$$d_1(\textbf{x},\textbf{y})=\sum_{k=1}^{n}{|x_k - y_k|}, \textbf{x}, \textbf{y} \in \mathbb{R}^n$$

In [13]:
%%time
l1_sums = da.absolute(embeddings_da[255]-embeddings_da).sum(axis=1) # суммы разностей элементов вектора 256 и всех векторов
l1_argmin = da.ma.masked_where(l1_sums==0,l1_sums).argmin() # маска, "прячущая" элементы, совпадающие с вектором 256 (он сам)

dask.compute(l1_sums[l1_argmin], embeddings_da[l1_argmin][:10])

Wall time: 941 ms


(6.391473,
 array([-0.01873741, -0.07140347,  0.02849776, -0.10885686,  0.03978413,
        -0.00868603,  0.03658793,  0.02858754, -0.07105186, -0.01334546],
       dtype=float32))

In [14]:
da.absolute(embeddings_da[255]-embeddings_da[106329]).sum().compute() # проверка

6.391473

**5\.** Рецепты разбиты на 4 группы. Загрузите маску для разбиения на группы из датасета `mask` из файла `recipe_embeddings.h5` в виде `dask.array`. Для каждой группы посчитайте и выведите на экран максимальное значение  нормы $\ell_1$ векторов рецептов, принадлежащих к этой группе. 

Подсказка: закодируйте маску принадлежности к группе при помощи метода кодирования one-hot encoding и воспользуйтесь механизмом распространения.

$$\ell_1: ||\textbf{x}||_1=\sum_{k=1}^{n}{|x_k|}, \textbf{x} \in \mathbb{R}^n$$

In [15]:
from sklearn.preprocessing import OneHotEncoder

In [16]:
mask_da = da.from_array(file['mask'])
mask_da

Unnamed: 0,Array,Chunk
Bytes,9.16 MiB,9.16 MiB
Shape,"(1200000,)","(1200000,)"
Count,2 Tasks,1 Chunks
Type,int64,numpy.ndarray
"Array Chunk Bytes 9.16 MiB 9.16 MiB Shape (1200000,) (1200000,) Count 2 Tasks 1 Chunks Type int64 numpy.ndarray",1200000  1,

Unnamed: 0,Array,Chunk
Bytes,9.16 MiB,9.16 MiB
Shape,"(1200000,)","(1200000,)"
Count,2 Tasks,1 Chunks
Type,int64,numpy.ndarray


In [17]:
onehot = OneHotEncoder(sparse=True)

final_mask = onehot.fit_transform(mask_da.reshape(-1, 1))

In [18]:
def l1_norm(n):
    return (da.absolute(da.array(final_mask.toarray().T[n][:, np.newaxis]) * embeddings_da)).sum().max()

In [19]:
%%time
zero, one, two, second = map(l1_norm, np.arange(0,4))

dask.compute(zero, one, two, second)

Wall time: 4.8 s


(7239407.216496968, 4826370.75549789, 2292290.1197778326, 120679.39365272256)

**6\.** Пусть $X=[\textbf{x}_1,...\textbf{x}_M]^\top$ - матрица эмбеддингов рецептов размера $M\times N$, $W=[\textbf{w}_1,...,\textbf{w}_N]^\top$ - матрица коэффициентов некоторой модели машинного обучения размера $N\times 4$, $y=[y_1,...,y_M]^\top$ - вектор размера $M$, содержащий номера групп рецептов (метки классов). Тогда задачу классификации можно решить следующим образом: $$\hat{y_i} = argmax_j{<X_{i\cdot}, W_{\cdot j}>}$$ где $A_{i\cdot}$ обозначает $i$ строку матрицы, $A_{\cdot j}$ обозначает $j$ столбец матрицы, $\hat{y_i}$ - прогноз класса для рецепта $i$, $<\cdot, \cdot>$ - скалярное произведение векторов.

Инициализируйте матрицу $W$ случайным образом и получите прогнозы для всех рецептов при помощи этой матрицы и матрицы эмбеддингов. Подсчитайте и выведите на экран значение accuracy на основе полученных прогнозов $\hat{y}$ и правильных ответов $y$.

In [20]:
W = da.random.normal(0, 1, size = (embeddings_da.shape[1], 4))

In [21]:
y_hat = da.argmax(da.tensordot(W.T, embeddings_da.T, axes=1), axis=0)

In [22]:
y_hat

Unnamed: 0,Array,Chunk
Bytes,9.16 MiB,93.75 kiB
Shape,"(1200000,)","(12000,)"
Count,503 Tasks,100 Chunks
Type,int64,numpy.ndarray
"Array Chunk Bytes 9.16 MiB 93.75 kiB Shape (1200000,) (12000,) Count 503 Tasks 100 Chunks Type int64 numpy.ndarray",1200000  1,

Unnamed: 0,Array,Chunk
Bytes,9.16 MiB,93.75 kiB
Shape,"(1200000,)","(12000,)"
Count,503 Tasks,100 Chunks
Type,int64,numpy.ndarray


In [23]:
y = mask_da

$$\text{Accuracy} = \frac{\text{True}}{\text{Total}}$$

In [24]:
accuracy = ((y_hat == y).sum()/y.shape[0])

In [25]:
%time accuracy.compute()

Wall time: 1.63 s


0.257135

`da.argmax(da.tensordot(W.T, embeddings_da[0], axes=1)).compute()` - пример поика максимального значения с 0-ым эмбеддингом

*вывод: 3*

**7\.** Сингулярным разложением (SVD) матрицы $A$ размера $M\times N$ называется разложение вида $A = USV^\top$, где $U$ - матрица размера $M\times N$  ортонормированных векторов произведения $AA^\top$, $V^T$ - транспонированная матрица размера $N\times N$ ортонормированных векторов произведения $A^\top A$, $S$ - диагональная матрица сингулярных значений размера $N\times N$.

SVD может быть использовано для понижения размерности векторов. Для этого от матрицы $U$ оставляют первые $k$ столбцов $U_{\cdot,:k}$, от матрицы $S$ оставляют левый верхний квадрат размера $k\times k$ $S_{:k,:k}$ и вычисляется произведение $\hat{A} = U_{\cdot,:k}S_{:k,:k}$

Выберите эмбеддинги тех рецептов, которые относятся к группе с номеров 3, и уменьшите их размерность до 64 при помощи реализации алгоритма SVD из пакета `dask.array.linalg`. Выведите количество строк и столбцов полученного массива.

Примечание: после отбора рецепта, принадлежащих третьей группе, вызовите у полученного массива метод `compute_chunk_sizes`, чтобы `dask` обновил метаинформацию в этом массиве. 

In [26]:
group_three = embeddings_da[da.ma.getmaskarray(da.ma.masked_not_equal(da.array(final_mask.toarray().T[2]), 0))]

In [27]:
group_three.compute_chunk_sizes()

Unnamed: 0,Array,Chunk
Bytes,226.14 MiB,2.42 MiB
Shape,"(190000, 312)","(2030, 312)"
Count,405 Tasks,100 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 226.14 MiB 2.42 MiB Shape (190000, 312) (2030, 312) Count 405 Tasks 100 Chunks Type float32 numpy.ndarray",312  190000,

Unnamed: 0,Array,Chunk
Bytes,226.14 MiB,2.42 MiB
Shape,"(190000, 312)","(2030, 312)"
Count,405 Tasks,100 Chunks
Type,float32,numpy.ndarray


In [28]:
group_three.shape

(190000, 312)

In [29]:
u, s, v = da.linalg.svd(da.array(group_three))
# u, s, v = dask.compute(u, s, v)

In [30]:
u.shape, s.shape, v.shape

((190000, 312), (312,), (312, 312))

In [31]:
%%time
emb_da_svd = (s[:63] * u[:,:63]).compute()
emb_da_svd.shape

Wall time: 6.09 s


(190000, 63)

**8\.** Используя эмбеддинги уменьшенной размерности, полученные в задании 6, посчитайте косинусное сходство между каждой парой рецептов третьей группы. Выведите матрицу косинусного сходства на экран.

$${\displaystyle {\text{косинусное сходство}}=S_{C}(A,B)={\mathbf {A} \cdot \mathbf {B} \over \|\mathbf {A} \|\|\mathbf {B} \|}={\frac {\sum \limits _{i=1} ^{n}{A_{i}B_{i}}}{{\sqrt {\sum \limits _{i=1} ^{n}{A_{i}^{2}}}}{\sqrt {\sum \limits _{i=1}^{n}{B_{i}^{2}}}}}}}$$

$${\displaystyle \scriptsize\text{* чем меньше угол между двумя векторами, тем больше сходство двух векторов}}$$

In [32]:
from scipy.spatial.distance import cosine

In [33]:
da.array(emb_da_svd)

Unnamed: 0,Array,Chunk
Bytes,45.66 MiB,45.66 MiB
Shape,"(190000, 63)","(190000, 63)"
Count,1 Tasks,1 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 45.66 MiB 45.66 MiB Shape (190000, 63) (190000, 63) Count 1 Tasks 1 Chunks Type float32 numpy.ndarray",63  190000,

Unnamed: 0,Array,Chunk
Bytes,45.66 MiB,45.66 MiB
Shape,"(190000, 63)","(190000, 63)"
Count,1 Tasks,1 Chunks
Type,float32,numpy.ndarray


**9\.** Посчитайте и выведите на экран количество рецептов, для которых рецепт с индексом `242` входит число топ-5 ближайших рецептов в смысле косинусной близости. При поиске топ-5 рецептов для конкретного рецепта считайте, что он сам в это число не входит.

**10\.** Графом называется совокупность двух множеств $G=(V,E)$: множества $V=\{v_1, ..., v_M\}$ узлов и множества ребер $E=\{(v_i, v_j)|v_i\in V, v_j\in V\}$, соединяющих эти узлы. Матрицей смежности невзвешенного графа называется квадратная матрица $A=[a_{ij}]$, в которой ${a_{ij}}$ обозначает количество ребер, соединяющих вершины $i$ и $j$.

Постройте матрицу смежности для графа рецептов на основе матрицы косинусного сходства между каждой парой рецептов. Будем считать, что между двумя рецептами в этом графе существует ребро, если косинусное сходство между двумя этими рецептами не менее 0.85. Петли (ребра из вершины в саму в себя) в графе должны отсутствовать. Посчитайте и выведите на экран количество ребер в данном графе. Проверьте, является ли полученная матрица смежности симметричной.

Примечание: считайте, что два различных рецепта не могут иметь косинусное сходство, равное 1.

**11\.** Работая с исходным файлом в формате `h5`, реализуйте алгоритм подсчета среднего вектора датасета в блочной форме.

Блочный алгоритм вычислений состоит из двух частей:
1. Загрузка фрагмента за фрагментов данных и проведение вычислений над этим фрагментом
2. Агрегация результатов вычислений на различных фрагментах для получения результата на уровне всего набора данных

Важно: при работе с `h5` в память загружаются не все элементы, а только те, которые запрашиваются в данный момент. При работе с `h5` вы можете работать с массивами `numpy.array`. Для итерации по сегментам файла допускается использование циклов.

Сравните время и результаты решения работы вашего алгоритма с реализацией поиска среднего вектора из `dask`. 