In [None]:
## Create conda environment - execute in terminal
'''
conda create -n rapids-0.17 -c rapidsai -c nvidia \
-c conda-forge -c dask -c defaults rapids-blazing=0.17 python=3.7 cudatoolkit=11.0  gcsfs=0.7.1 -y

conda activate rapids-0.17
'''

In [3]:
!git clone https://github.com/remylouisew/rapids_AIP.git

Cloning into 'rapids_AIP'...
remote: Enumerating objects: 155, done.[K
remote: Counting objects: 100% (155/155), done.[K
remote: Compressing objects: 100% (117/117), done.[K
remote: Total 155 (delta 39), reused 131 (delta 27), pack-reused 0[K
Receiving objects: 100% (155/155), 107.19 KiB | 2.19 MiB/s, done.
Resolving deltas: 100% (39/39), done.


In [1]:
import dask_cudf
import cupy as cp
import argparse
import time
import gcsfs
import os, json
import subprocess
import pandas as pd

In [2]:
# Instatiate LocalCUDACluster to assign dask processes to GPUs

from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait

cluster = LocalCUDACluster()
client = Client(cluster)


In [3]:
# Read dataset into Dask DMatrix from Google Cloud Storage

colnames = ['label'] + ['feature-%02d' % i for i in range(1, 29)]
train_dir='gs://nvidiadask/higgs1/*.csv' #GCS public bucket
df = dask_cudf.read_csv(train_dir, header=None, names=colnames, chunksize=None)

print("Number of partitions is", df.npartitions)

Number of partitions is 10


In [9]:
train_dir='gs://nvidiadask/higgs1/*.csv' #GCS public bucket
colnames = ['label'] + ['feature-%02d' % i for i in range(1, 29)]
start_time = time.time()
df = dask_cudf.read_csv(train_dir, header=None, names=colnames, chunksize=None)
print("[INFO]: ------ Files read in ---",float(time.time() - start_time))

[INFO]: ------ Files read in --- 1.5120880603790283


In [11]:
#REMOVE: TRYING PARQUET

colnames = ['label'] + ['feature-%02d' % i for i in range(1, 29)]
train_dir='gs://nvidiadask/higgsp.csv' #GCS public bucket
df = dask_cudf.read_parquet(train_dir, header=None, names=colnames, chunksize=None)

print("Number of partitions is", df.npartitions)

Number of partitions is 400


In [None]:
# Some basic functions using Dask

%%time
df["key"] = df.feature02.round()
group_means = df.groupby("key").mean().persist()
wait(group_means);

group_means.head()

#group_means.compute() will output the DMatrix as a Pandas DataFrame. A good workflow would be to summarize your data using Dask, then output to pandas for plotting or other pandas functions.


In [None]:
# Now restart the kernal so that you can instatiate a new LocalCUDAcluster, 
# this time one that will spill to host memory when the GPU memory is exceeded

import dask_cudf
import cupy as cp
import argparse
import time
import gcsfs
import dask_cudf
import os, json
import subprocess
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait
from dask.utils import parse_bytes

cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0",
    rmm_pool_size=parse_bytes("15GB"), # This GPU has 16GB of memory
    device_memory_limit=parse_bytes("10GB"),)
client = Client(cluster)
client


In [None]:
# Read in larger dataset (20GB) from GCS

colnames = ['label'] + ['feature-%02d' % i for i in range(1, 29)]
train_dir='gs://nvidiadask/higgs2/*.csv' #GCS public bucket
df = dask_cudf.read_csv(train_dir, header=None, names=colnames, chunksize=None)

print("Number of partitions is", df.npartitions)

# Run the dask functions, which will require nearly double the memory available on the GPU
%%time
df["key"] = df.feature02.round()
group_means = df.groupby("key").mean().persist()
wait(group_means);

group_means.head()

In [None]:
##spin up notebook with rapids xgboost AND use conda install