# Assessment(CuDF, CuML, Dask)
=======================

This is a assessment which covers cuDF, Dask-cuDF,cuml, Dask-cuml geared mainly for new learners. The assessment is split into modules with embedded exercises for you to practice the concepts. All the concepts have both CuDF and Dask-CuDF syntax & cuML  and Dask-CuML for enhanced understanding.

### Quick Recap

### What are these Libraries?

[cuDF](https://github.com/rapidsai/cudf) is a Python GPU DataFrame library (built on the Apache Arrow columnar memory format) for loading, joining, aggregating, filtering, and otherwise manipulating data. It is GPU equivalent library to pandas.

[Dask](https://dask.org/) is a flexible library for parallel computing in Python that makes scaling out your workflow smooth and simple.

[Dask-cuDF](https://github.com/rapidsai/dask-cudf) is a library that provides a partitioned, GPU-backed dataframe, using Dask.

[cuML](https://github.com/rapidsai/cuml) is the RAPIDS library that implements similar scikit-learn machine learning algorithms that use CUDA to run on GPUs, with an API that mirrors the Scikit-learn one as much as possible.

[Dask-cuML](https://github.com/rapidsai/dask-cuml) is library that helps to train and inference cuML Algorithms on Multi-Node and  Multi-GPU.


### When to use cuDF and Dask-cuDF/ cuML and Dask-cuML

If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF and cuML. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask-cuDF and Dask-cuML.




## Here is the list of excercises covered in this assessment:


- <a href='#objcreation'>Creating Dask-CuDF Objects</a><br> This module shows you how to work with Dask-CuDF dataframes, the distributed GPU equivalent of Pandas dataframes, for faster data transactions. It includes creating Dask-CuDF objects, viewing data, selecting data, boolean indexing and dealing with missing data.
- <a href='#operation'>Operations</a><br> Work on how to view descriptive statistics, perform string operations, histogramming, concatenate, joins, group data and use applymap.
- <a href='#time'>TimeSeries</a><br> Working with TimeSeries data format in Dask-CuDF    
- <a href='#condatarep'>Converting Data Representations</a><br> Here we will work with converting data representations, including Arrow, Pandas and Numpy, that are commonly required in data science pipelines.
- <a href='#datainout'>Getting Data In and Out</a><br> Transfering Dask-CuDf dataframes to and from CSV and Parquet files.
- <a href='#objcreation'>Start Dask Cluster</a><br> We will specify what resources we want to utilize in our GPU cluster for executing the ML algorithms.
- <a href='#viewing'>Define Parameters</a><br> Define the data and model parameters as per your choice for creating data and model.
- <a href='#selection'>Generation of Data</a><br> We will train our model on a custom dataset created using the parameters specified before.
- <a href='#sellabel'>Distribute Data</a><br> As we are working on a multi-GPU set we will be distributing the data evenly after ensuring that it is well shuffled.
- <a href='#selpos'>Scikit-learn</a><br> Create the scikit-learn implementation of the selection model.
- <a href='#boolean'> CuML</a><br> Convert the Scikit-learn model to CuML and use Dask.
- <a href='#missing'> Accuracy</a><br> Evaluate the performance of both models in terms of accuracy and timing output.


**TODO**: Replace the `FIXME` to import cudf and dask_cudf

In [None]:
import os

import numpy as np
import pandas as pd

#import cudf
import cudf

#import dask_cudf
import dask_cudf

np.random.seed(12)


<a id='objcreation'></a>

Object Creation
---------------

Creating a `cudf.Series` and `dask_cudf.Series`.

In [None]:
s = cudf.Series([1,2,3,None,4])
print(s)

**TODO**: Replace the `FIXME` to create dask_cudf series from cudf series s, use npartitons = 2

In [None]:
ds = "FIXME" 
print(ds.compute())

Creating a `cudf.DataFrame` and a `dask_cudf.Dataframe` from a `cudf.Dataframe`.


In [None]:
df = cudf.DataFrame({'a': list(range(20)),
                     'b': list(reversed(range(20))),
                     'c': list(range(20))
                    })

In [None]:
ddf = dask_cudf.from_cudf(df, npartitions=2) 
print(ddf.compute())

<a id='viewing'></a><br>

Viewing Data
-------------

Viewing the top rows of a GPU dataframe.

In [None]:
print(df.head(4))

**TODO**: Replace the `FIXME` to view top 4 rows of dask_cudf data frame

In [None]:
print("FIXME")

**TODO**:Replace the `FIXME` to sort dask_cudf dataframe using column 'b'

In [None]:
print("FIXME".compute())

<a id='selection'></a>

Selection
------------

## Getting

<a id='sellabel'></a>

## Selection by Label

Selecting rows from index 2 to index 5 from columns 'a' and 'b'.

In [None]:
print(df.loc[2:5, ['a', 'b']])

**TODO**: Replace the `FIXME` to select rows from index 2 to index 5 from columns 'a' and 'b'

In [None]:
print("FIXME".compute())

<a id='selpos'></a>

## Selection by Position

Selecting via integers and integer slices, like numpy/pandas. Note that this functionality is not available for Dask-cuDF DataFrames.

**TODO**: Replace the `FIXME` to select rows from 3 rows and first 2 columns from cudf dataframe

In [None]:
print("FIXME")

   a   b
0  0  19
1  1  18
2  2  17


<a id='boolean'></a>

## Boolean Indexing

Selecting values from a `DataFrame`  using `query` API.

**TODO**: Replace the `FIXME` to select where rows of column 'b' is >= 6 using query api from cudf dataframe

In [None]:
print("FIXME")  

     a   b   c
0    0  19   0
1    1  18   1
2    2  17   2
3    3  16   3
4    4  15   4
5    5  14   5
6    6  13   6
7    7  12   7
8    8  11   8
9    9  10   9
10  10   9  10
11  11   8  11
12  12   7  12
13  13   6  13


**TODO**: Replace the `FIXME` to select where rows of column 'b' is >= 6 using query api from dask-cudf dataframe

In [None]:
print("FIXME".compute())  

     a   b   c
0    0  19   0
1    1  18   1
2    2  17   2
3    3  16   3
4    4  15   4
5    5  14   5
6    6  13   6
7    7  12   7
8    8  11   8
9    9  10   9
10  10   9  10
11  11   8  11
12  12   7  12
13  13   6  13


<a id='missing'></a><br>

Missing Data
------------

Missing data can be replaced by using the `fillna` method.

**TODO**: Replace the `FIXME` to replace null values with 999 from cudf series

In [None]:
print(s."FIXME")

0      1
1      2
2      3
3    999
4      4
dtype: int64


**TODO**: Replace the `FIXME` to replace null values with 999 from cudf-dask series

In [None]:
print("FIXME".compute())

0      1
1      2
2      3
3    999
4      4
dtype: int64


<a id='operation'></a><br>

Operations
------------

<a id='stats'></a><br>

## Stats

Calculating descriptive statistics for a `Series`.

In [None]:
print(s.mean(), s.var())

2.5 1.666666666666666


**TODO**: Replace the `FIXME` to caluculate mean and variance for cudf series

In [None]:
print("FIXME".compute(), "FIXME".compute())

2.5 1.6666666666666667


<a id='applymap'></a><br>

## Applymap

Applying functions to a `Series`. Note that applying user defined functions directly with Dask-cuDF is not yet implemented. For now, you can use [map_partitions](http://docs.dask.org/en/stable/dataframe-api.html#dask.dataframe.DataFrame.map_partitions) to apply a function to each partition of the distributed dataframe.

In [None]:
def add_ten(num):
    return num + 10

print(df['a'].applymap(add_ten))

0     10
1     11
2     12
3     13
4     14
5     15
6     16
7     17
8     18
9     19
10    20
11    21
12    22
13    23
14    24
15    25
16    26
17    27
18    28
19    29
Name: a, dtype: int64


**TODO**: Replace the `FIXME` to add ten to columns 'a' using applymap and add_ten function created above 

In [None]:
print("FIXME".compute())

0     10
1     11
2     12
3     13
4     14
5     15
6     16
7     17
8     18
9     19
10    20
11    21
12    22
13    23
14    24
15    25
16    26
17    27
18    28
19    29
Name: a, dtype: int64


<a id='histo'></a>

## Histogramming

Counting the number of occurrences of each unique value of variable.

In [None]:
print(df.a.value_counts())

15    1
6     1
1     1
14    1
2     1
5     1
11    1
7     1
17    1
13    1
8     1
16    1
0     1
10    1
4     1
9     1
19    1
18    1
3     1
12    1
Name: a, dtype: int32


**TODO**: Replace the `FIXME` to count number of occurrence of each unique value of variable for dask-cudf dataframe

In [None]:
print("FIXME".compute())

15    1
6     1
1     1
14    1
2     1
5     1
11    1
7     1
17    1
13    1
8     1
16    1
0     1
10    1
4     1
9     1
19    1
18    1
3     1
12    1
Name: a, dtype: int64


<a id='string'></a><br>

## String Methods

Like pandas, cuDF provides string processing methods in the `str` attribute of `Series`. Full documentation of string methods is a work in progress. Please see the cuDF API documentation for more information.

**TODO**: Replace the `FIXME` to convert all words to lower case for cudf series

In [None]:
s = cudf.Series(['A', 'B', 'C', 'Aaba', 'Baca', None, 'CABA', 'dog', 'cat'])
print("FIXME")

0       a
1       b
2       c
3    aaba
4    baca
5    <NA>
6    caba
7     dog
8     cat
dtype: object


**TODO**: Replace the `FIXME` to convert all words to lower case for dask-cudf series

In [None]:
ds = dask_cudf.from_cudf(s, npartitions=2)
print("FIXME")

0       a
1       b
2       c
3    aaba
4    baca
5    <NA>
6    caba
7     dog
8     cat
dtype: object


<a id='concat'></a><br>

## Concat

Concatenating `Series` and `DataFrames` row-wise.

**TODO**: Replace the `FIXME` to concatenate series with itself for cudf series

In [None]:
s = cudf.Series([1, 2, 3, None, 5])
print("FIXME")

0       1
1       2
2       3
3    <NA>
4       5
0       1
1       2
2       3
3    <NA>
4       5
dtype: int64


**TODO**: Replace the `FIXME` to concatenate series with itself for dask-cudf series

In [None]:
ds2 = dask_cudf.from_cudf(s, npartitions=2)
print("FIXME")

0       1
1       2
2       3
3    <NA>
4       5
0       1
1       2
2       3
3    <NA>
4       5
dtype: int64


<a id='join'></a><br>

## Join

Performing SQL style merges. Note that the dataframe order is not maintained, but may be restored post-merge by sorting by the index.

**TODO**: Replace the `FIXME` to do 'left' merge on below created cudf dataframes using column 'key'

In [None]:
df_a = cudf.DataFrame()
df_a['key'] = ['a', 'b', 'c', 'd', 'e']
df_a['vals_a'] = [float(i + 10) for i in range(5)]

df_b = cudf.DataFrame()
df_b['key'] = ['a', 'c', 'e']
df_b['vals_b'] = [float(i+100) for i in range(3)]

merged = "FIXME"
print(merged)

  key  vals_a vals_b
0   a    10.0  100.0
1   c    12.0  101.0
2   e    14.0  102.0
3   b    11.0   <NA>
4   d    13.0   <NA>


**TODO**: Replace the `FIXME` to do 'left' merge on below created dask-cudf dataframes using column 'key'

In [None]:
ddf_a = dask_cudf.from_cudf(df_a, npartitions=2)
ddf_b = dask_cudf.from_cudf(df_b, npartitions=2)

merged = "FIXME".compute()
print(merged)

  key  vals_a vals_b
0   a    10.0  100.0
1   c    12.0  101.0
2   b    11.0   <NA>
0   e    14.0  102.0
1   d    13.0   <NA>


<a id='grouping'></a><br>

## Grouping

Like pandas, cuDF and Dask-cuDF support the Split-Apply-Combine groupby paradigm.

In [None]:
df['agg_col1'] = [1 if x % 2 == 0 else 0 for x in range(len(df))]
df['agg_col2'] = [1 if x % 3 == 0 else 0 for x in range(len(df))]

ddf = dask_cudf.from_cudf(df, npartitions=2)

Grouping hierarchically then applying the `sum` function to grouped data. We send the result to a pandas dataframe only for printing purposes.

In [None]:
print(df.groupby(['agg_col1', 'agg_col2']).sum().to_pandas())

                    a   b   c
agg_col1 agg_col2            
1        0         54  60  54
0        0         73  60  73
1        1         36  40  36
0        1         27  30  27


**TODO**: Replace the `FIXME` to group hierarchially using 'agg_col1' and 'agg_col2' and then apply sum function to grouped data

In [None]:
"FIXME".compute().to_pandas()

Unnamed: 0_level_0,Unnamed: 1_level_0,a,b,c
agg_col1,agg_col2,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1,1,36,40,36
0,0,73,60,73
1,0,54,60,54
0,1,27,30,27


<a id='tran'></a><br>

## Transpose

Transposing a dataframe, using either the `transpose` method or `T` property. Currently, all columns must have the same type. Transposing is not currently implemented in Dask-cuDF.

In [None]:
sample = cudf.DataFrame({'a':[1,2,3], 'b':[4,5,6]})
print(sample)

   a  b
0  1  4
1  2  5
2  3  6


**TODO**: Replace the `FIXME` to transpose sample cudf dataframe

In [None]:
print("FIXME")

   0  1  2
a  1  2  3
b  4  5  6


<a id='time'></a><br>

Time Series
------------


`DataFrames` supports `datetime` typed columns, which allow users to interact with and filter data based on specific timestamps.

In [None]:
import datetime as dt

date_df = cudf.DataFrame()
date_df['date'] = pd.date_range('11/20/2018', periods=72, freq='D')
date_df['value'] = np.random.sample(len(date_df))

search_date = dt.datetime.strptime('2018-11-23', '%Y-%m-%d')
print(date_df.query('date <= @search_date'))

        date     value
0 2018-11-20  0.154163
1 2018-11-21  0.740050
2 2018-11-22  0.263315
3 2018-11-23  0.533739


**TODO**: Replace the `FIXME` to get only those rows with date before '2018-11-23' using query api

In [None]:
date_ddf = dask_cudf.from_cudf(date_df, npartitions=2)
print("FIXME".compute())

<a id='condatarep'></a><br>


Converting Data Representation
--------------------------------

<a id='pandas'></a><br>

## Pandas

Converting a cuDF and Dask-cuDF `DataFrame` to a pandas `DataFrame`.

**TODO**: Replace the `FIXME` to convert cudf dataframe to pandas dataframe

In [None]:
print(df.head()."FIXME")

   a   b  c  agg_col1  agg_col2
0  0  19  0         1         1
1  1  18  1         0         0
2  2  17  2         1         0
3  3  16  3         0         1
4  4  15  4         1         0


**TODO**: Replace the `FIXME` to convert dask-cudf dataframe to pandas dataframe

In [None]:
print("FIXME")

   a   b  c  agg_col1  agg_col2
0  0  19  0         1         1
1  1  18  1         0         0
2  2  17  2         1         0
3  3  16  3         0         1
4  4  15  4         1         0


<a id='arrow'></a><br>   

## Arrow

Converting a cuDF or Dask-cuDF `DataFrame` to a PyArrow `Table`.

**TODO**: Replace the `FIXME` to convert cudf dataframe to PyArrow table

In [None]:
print("FIXME")

pyarrow.Table
a: int64
b: int64
c: int64
agg_col1: int64
agg_col2: int64


**TODO**: Replace the `FIXME` to convert dask-cudf dataframe to PyArrow table

In [None]:
print("FIXME")

pyarrow.Table
a: int64
b: int64
c: int64
agg_col1: int64
agg_col2: int64


<a id='datainout'></a><br>

Getting Data In/Out
------------------------


 <a id='csv'></a><br>

## CSV

Writing to a CSV file, by first sending data to a pandas `Dataframe` on the host.

In [None]:
if not os.path.exists('example_output'):
    os.mkdir('example_output')
    
df.to_pandas().to_csv('example_output/foo.csv', index=False)

**TODO**: Replace the `FIXME` to write ddf data to csv file

In [None]:
ddf.compute().to_pandas()."FIXME"

Reading from a csv file.

**TODO**: Replace the `FIXME` to read date from csv file created using cudf dataframe

In [None]:
df = "FIXME"
print(df)

     a   b   c  agg_col1  agg_col2
0    0  19   0         1         1
1    1  18   1         0         0
2    2  17   2         1         0
3    3  16   3         0         1
4    4  15   4         1         0
5    5  14   5         0         0
6    6  13   6         1         1
7    7  12   7         0         0
8    8  11   8         1         0
9    9  10   9         0         1
10  10   9  10         1         0
11  11   8  11         0         0
12  12   7  12         1         1
13  13   6  13         0         0
14  14   5  14         1         0
15  15   4  15         0         1
16  16   3  16         1         0
17  17   2  17         0         0
18  18   1  18         1         1
19  19   0  19         0         0


**TODO**: Replace the `FIXME` to read date from csv file created using dask-cudf dataframe

In [None]:
ddf = "FIXME"
print(ddf.compute())

     a   b   c  agg_col1  agg_col2
0    0  19   0         1         1
1    1  18   1         0         0
2    2  17   2         1         0
3    3  16   3         0         1
4    4  15   4         1         0
5    5  14   5         0         0
6    6  13   6         1         1
7    7  12   7         0         0
8    8  11   8         1         0
9    9  10   9         0         1
10  10   9  10         1         0
11  11   8  11         0         0
12  12   7  12         1         1
13  13   6  13         0         0
14  14   5  14         1         0
15  15   4  15         0         1
16  16   3  16         1         0
17  17   2  17         0         0
18  18   1  18         1         1
19  19   0  19         0         0


Reading all CSV files in a directory into a single `dask_cudf.DataFrame`, using the star wildcard.

**TODO**: Replace the `FIXME` to read all csv files in a directory into a single dask_cudf.DataFrame using the star wildcard

In [None]:
ddf = "FIXME"
print(ddf.compute())

     a   b   c  agg_col1  agg_col2
0    0  19   0         1         1
1    1  18   1         0         0
2    2  17   2         1         0
3    3  16   3         0         1
4    4  15   4         1         0
5    5  14   5         0         0
6    6  13   6         1         1
7    7  12   7         0         0
8    8  11   8         1         0
9    9  10   9         0         1
10  10   9  10         1         0
11  11   8  11         0         0
12  12   7  12         1         1
13  13   6  13         0         0
14  14   5  14         1         0
15  15   4  15         0         1
16  16   3  16         1         0
17  17   2  17         0         0
18  18   1  18         1         1
19  19   0  19         0         0
0    0  19   0         1         1
1    1  18   1         0         0
2    2  17   2         1         0
3    3  16   3         0         1
4    4  15   4         1         0
5    5  14   5         0         0
6    6  13   6         1         1
7    7  12   7      

In [None]:
import numpy as np
import sklearn

import pandas as pd
import cudf
import cuml

from sklearn.metrics import accuracy_score
from sklearn import model_selection, datasets

from cuml.dask.common import utils as dask_utils
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import dask_cudf

from cuml.dask.ensemble import RandomForestClassifier as cumlDaskRF
from sklearn.ensemble import RandomForestClassifier as sklRF

<a id='objcreation'></a><br>

## Start Dask cluster

**TODO**: Replace the `FIXME` to start dask's localCUDACluster

In [None]:
# This will use all GPUs on the local host by default
cluster = "FIXME"
c = Client(cluster)

# Query the client for all connected workers
workers = c.has_what().keys()
n_workers = len(workers)
n_streams = 8 # Performance optimization

 <a id='viewing'></a><br>
## Define Parameters

In addition to the number of examples, random forest fitting performance depends heavily on the number of columns in a dataset and (especially) on the maximum depth to which trees are allowed to grow. Lower `max_depth` values can greatly speed up fitting, though going too low may reduce accuracy.

In [None]:
# Data parameters
train_size = 100000
test_size = 1000
n_samples = train_size + test_size
n_features = 20

# Random Forest building parameters
max_depth = 12
n_bins = 16
n_trees = 1000

<a id='selection'></a><br>

## Generate Data on host

In this case, we generate data on the client (initial process) and pass it to the workers. You could also load data directly onto the workers via, for example, `dask_cudf.read_csv()`. 

In [None]:
X, y = datasets.make_classification(n_samples=n_samples, n_features=n_features,
                                 n_clusters_per_class=1, n_informative=int(n_features / 3),
                                 random_state=123, n_classes=5)
X = X.astype(np.float32)
y = y.astype(np.int32)
X_train, X_test, y_train, y_test = model_selection.train_test_split(X, y, test_size=test_size)

<a id='sellabel'></a><br>

## Distribute data to worker GPUs

**TODO**: Replace the `FIXME` to create dask objects X_test_dask and y_test_dask using GPU workers

In [None]:
n_partitions = n_workers

def distribute(X, y):
    # First convert to cudf (with real data, you would likely load in cuDF format to start)
    X_cudf = cudf.DataFrame.from_pandas(pd.DataFrame(X))
    y_cudf = cudf.Series(y)

    # Partition with Dask
    # In this case, each worker will train on 1/n_partitions fraction of the data
    X_dask = dask_cudf.from_cudf(X_cudf, npartitions=n_partitions)
    y_dask = dask_cudf.from_cudf(y_cudf, npartitions=n_partitions)

    # Persist to cache the data in active memory
    X_dask, y_dask = \
      dask_utils.persist_across_workers(c, [X_dask, y_dask], workers=workers)
    
    return X_dask, y_dask

X_train_dask, y_train_dask = distribute(X_train, y_train)
X_test_dask, y_test_dask = "FIXME"

<a id='selpos'></a><br>

## Build a scikit-learn model (single node)

Dask does not currently have a simple wrapper for scikit-learn's RandomForest, but scikit-learn does offer multi-CPU support via joblib, which we'll use.

**TODO**: Replace the `FIXME` to build and train scikit-learn RandomForest model using all available cpu cores

In [None]:
%%time

# Use all avilable CPU cores
skl_model = "FIXME"
skl_model."FIXME"

CPU times: user 6min 16s, sys: 906 ms, total: 6min 17s
Wall time: 13.2 s


RandomForestClassifier(max_depth=12, n_estimators=1000, n_jobs=-1)

<a id='boolean'></a><br>

## Train the distributed cuML model

**TODO**: Replace the `FIXME` to build and train dask-cuml model 

In [None]:
%%time

cuml_model = "FIXME"
cuml_model."FIXME"

wait(cuml_model.rfs) # Allow asynchronous training tasks to finish

CPU times: user 121 ms, sys: 73 ms, total: 194 ms
Wall time: 8.47 s


DoneAndNotDoneFutures(done={<Future: finished, type: cuml.ensemble.randomforestclassifier.RandomForestClassifier, key: _construct_rf-40f62d4d-fa2f-490f-8b89-c459fecfd0bc>}, not_done=set())

<a id='missing'></a><br>

# Predict and check accuracy

**TODO**: Replace the `FIXME` to get predictions for test data

In [None]:
skl_y_pred = "FIXME"
cuml_y_pred = "FIXME"

# Due to randomness in the algorithm, you may see slight variation in accuracies
print("SKLearn accuracy:  ", accuracy_score(y_test, skl_y_pred))
print("CuML accuracy:     ", accuracy_score(y_test, cuml_y_pred))

# Conclusion

Hope you enjoyed the assessment, Thank you!!