In [None]:
import subprocess
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, progress
import dask_cudf
import cudf
import dask.dataframe as dd

In [None]:
import os
os.environ["PATH"] += os.pathsep + '/usr/bin/dot '

In [None]:
cmd = "hostname --all-ip-addresses"

process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
IPADDR = str(output.decode()).split()[0]

cluster = LocalCUDACluster(ip=IPADDR, dashboard_address=':8902')
client = Client(cluster)
client

In [None]:
%%time
df = cudf.read_parquet('covid.gzip')
df = df.dropna()
df.loc[df['sex']=='Male']

В dask-cudf применяются ленивые вычисления. То есть мы что-то делаем только тогда, когда это необходимо, например, если вы просите вернуть какой-то ответ.

In [None]:
%time ddf = dask_cudf.read_parquet('covid.gzip')

In [None]:
ddf

In [None]:
gdf = ddf.dropna()

Можно визуализировать граф, котоырй будет исполняться

In [None]:
gdf.visualize(format='svg')

In [None]:
male_gdf = gdf.loc[gdf['sex']=='Male']
male_gdf.visualize(format='svg')

А что если я хочу, чтобы уже что-то посчиталось предварительно, пока я буду дальше код писать?

In [None]:
copy_male_gdf = male_gdf.copy()
copy_male_gdf.visualize(format='svg')

Строчкой ниже мы попросили посчитать все и положить это в память. Действие особенно полезно при использовании распределенных систем,
поскольку результаты будут храниться в распределенной памяти, а не возвращаться в один локальный процесс

In [None]:
copy_male_gdf = copy_male_gdf.persist()
copy_male_gdf.visualize(format='svg')

Выполнили compute = сложили данные на одну видеокарту и тип данных уже cudf

In [None]:
%time male_gdf.compute()

In [None]:
male_gdf = male_gdf.compute()

In [None]:
male_gdf['age_group'].value_counts() / male_gdf.shape[0]

### В целом все очень похоже на cudf. Но еще пара моментов

In [None]:
from cudf.datasets import randomdata

df = randomdata(nrows=10, dtypes={'a':float, 'b':float}, seed=12)
df.mean()

Перевод из cudf в dask-cudf

In [None]:
ddf = dask_cudf.from_cudf(df, npartitions=2)

In [None]:
type(ddf)

In [None]:
ddf.mean().compute()

В dask-cudf пока нельзя применять метод applymap к отдельному столбцу напрямую, можно только к каждой партиции через map_partitions.

In [None]:
def add_ten(num):
    return num + 10

было

In [None]:
df['a'].applymap(add_ten)

стало

In [None]:
ddf['a'].map_partitions(add_ten).compute()

Транспонировать партицированные данные пока тоже нельзя, придется сначала выполнить медот compute(), но если в одну видеокарту не влезет, то не выйдет (( А еще если есть переменные типа string, то также не выйдет транспонировать

In [None]:
ddf.T

In [None]:
ddf.compute().T

In [None]:
type(ddf.compute())

# Домашнее задание

Сравните среднюю вероятность смерти мужчин и женщин по группам возростов на основе столбца death_ind. Тоже самое проделайте для вероятности госпитализации, преобразовав переменную hosp_yn, как мы сделали с переменной death_yn. Используйте cudf и сохраните результат на диск.
Как и в ноутбуке с cudf, только теперь с несколькими GPU=)