<a id="introduction"></a>
## Scaling ETL with Dask cuDF
#### By Paul Hendricks
-------

In this notebook, we will show:

* how to get started with Dask, 
* how to work with cuDF DataFrames using Dask, 
* why we might consider using Dask cuDF DataFrames, and 
* and how to actually use the Dask cuDF API.

**Table of Contents**

* [Scaling ETL with Dask cuDF](#introduction)
* [Setup](#setup)
* [Introduction to Dask](#dask)
* [Using cuDF DataFrames with Dask](#using)
* [Dask cuDF DataFrames](#daskcudfdataframes)
* [Dask cuDF API](#daskcudfapi)
* [Conclusion](#conclusion)

<a id="setup"></a>
## Setup

This notebook was tested using the following Docker containers:

* `rapidsai/rapidsai:0.6-cuda10.0-devel-ubuntu18.04-gcc7-py3.7` from [DockerHub](https://hub.docker.com/r/rapidsai/rapidsai)
* `rapidsai/rapidsai-nightly:0.6-cuda10.0-devel-ubuntu18.04-gcc7-py3.7` from [DockerHub](https://hub.docker.com/r/rapidsai/rapidsai-nightly)

This notebook was run on the NVIDIA Tesla V100 GPU. Please be aware that your system may be different and you may need to modify the code or install packages to run the below examples. 

If you think you have found a bug or an error, please file an issue here: https://github.com/rapidsai/notebooks/issues

Before we begin, let's check out our hardware setup by running the `nvidia-smi` command.

In [None]:
!nvidia-smi

Next, let's see what CUDA version we have:

In [None]:
!nvcc --version

<a id="dask"></a>
## Introduction to Dask

Dask is a library the allows for parallelized computing. Written in Python, it allows one to compose complex workflows using large data structures like those found in NumPy, Pandas, and cuDF. In the following examples and notebooks, we'll show how to use Dask with cuDF to accelerate common ETL tasks as well as build and train machine learning models like Linear Regression and XGBoost.

To learn more about Dask, check out the documentation here: http://docs.dask.org/en/latest/

#### Directed Acyclic Graphs (DAGS)

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

#### Client/Workers

Dask operates by creating a cluster composed of a "client" and multiple "workers". The client is responsible for scheduling work; the workers are responsible for actually executing that work. 

Typically, we set the number of workers to be equal to the number of computing resources we have available to us. For CPU based workflows, this might be the number of cores or threads on that particlular machine. For example, we might set `n_workers = 8` if we have 8 CPU cores or threads on our machine that can each operate in parallel. This allows us to take advantage of all of our computing resources and enjoy the most benefits from parallelization.

On a system with one or more GPUs, we usually set the number of workers equal to the number of GPUs available to us. Dask is a first class citizen in the world of General Purpose GPU computing and the RAPIDS ecosystem makes it very easy to use Dask with cuDF and XGBoost. 

Before we get started with Dask, we need to setup a Local Cluster of workers to execute our work and a Client to coordinate and schedule work for that cluster. As we see below, we can inititate a `cluster` and `client` using only few lines of code.

In [None]:
import dask; print('Dask Version:', dask.__version__)
from dask.distributed import Client, LocalCluster
import subprocess

# parse the hostname IP address
cmd = "hostname --all-ip-addresses"
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
ip_address = str(output.decode()).split()[0]

# create a local cluster with 4 workers
n_workers = 4
cluster = LocalCluster(ip=ip_address, n_workers=n_workers)
client = Client(cluster)

Let's inspect the `client` object to view our current Dask status. We should see the IP Address for our Scheduler as well as the the number of workers in our Cluster. 

In [None]:
# show current Dask status
client

You can also see the status and more information at the Dashboard, found at `http://<ip_address>/status`. You can ignore this for now, we'll dive into this in subsequent tutorials.

With our client and workers setup, it's time to execute our first program in parallel. We'll define a function that takes some value `x` and adds 5 to it.

In [None]:
def add_5_to_x(x):
    return x + 5

Next, we'll iterate through our `n_workers` and create an execution graph, where each worker is responsible for taking its ID and passing it to the function `add_5_to_x`. For example, the worker with ID 2 will take its ID and add 5, resulting in the value 7.

In [None]:
from dask import delayed

addition_operations = [delayed(add_5_to_x)(i) for i in range(n_workers)]
addition_operations

The above output shows a list of several `Delayed` objects. An important thing to note is that the workers aren't actually executing these results - we're just defining the execution graph for our client to execute later. The `delayed` function wraps our function `add_5_to_x` and returns a `Delayed` object. This ensures that this computation is in fact "delayed" - or lazily evaluated - and not executed on the spot i.e. when we define it.

Next, let's sum each one of these intermediate results. We can accomplish this by wrapping Python's built-in `sum` function using our `delayed` function and storing this in a variable called `total`.

In [None]:
total = delayed(sum)(addition_operations)
total

Using the `graphviz` library, we can use the `visualize` method of a `Delayed` object to visualize our current graph.

In [None]:
total.visualize()

As we mentioned before, none of these results - intermediate or final - have actually been compute. We can compute them using the `compute` method of our `client`.

In [None]:
import time

addition_futures = client.compute(addition_operations, optimize_graph=False, fifo_timeout="0ms")
total_future = client.compute(total, optimize_graph=False, fifo_timeout="0ms")
time.sleep(1)  # this will give Dask time to execute each worker

Let's inspect the output of each call to `client.compute`:

In [None]:
addition_futures

We can see from the above output that our `addition_futures` variable is a list of `Future` objects - not the "actual results" of adding 5 to each of `[0, 1, 2, 3]`. These `Future` objects are a promise that at one point a computation will take place and we will be left with a result. Dask is responsible for ensuring that promise by delegating that task to the appropriate Dask worker and collecting the result.

Let's take a look at our `total_future` object:

In [None]:
print(total_future)
print(type(total_future))

Again, we see that this is an object of type `Future` as well as metadata about the status of the request (i.e. whether it has finished or not), the type of the result, and a key associated with that operation. To collect and print the result of each of these `Future` objects, we can call the `result()` method.

In [None]:
addition_results = [future.result() for future in addition_futures]
print('Addition Results:', addition_results)

Now we see the results that we want from our addition operations. We can also use the simpler syntax of the `client.gather` method to collect our results.

In [None]:
addition_results = client.gather(addition_futures)
total_result = client.gather(total_future)
print('Addition Results:', addition_results)
print('Total Result:', total_result)

Awesome! We just wrote our first distributed workflow.

To confirm that Dask is truly executing in parallel, let's define a function that sleeps for 1 second and returns the string "Success!". In serial, this function should take our 4 workers around 4 seconds to execute.

In [None]:
def sleep_1():
    time.sleep(1)
    return 'Success!'

In [None]:
%%time

for _ in range(n_workers):
    sleep_1()

As expected, our process takes about 4 seconds to run. Now let's execute this same workflow in parallel using Dask.

In [None]:
%%time

# define delayed execution graph
sleep_operations = [delayed(sleep_1)() for _ in range(n_workers)]

# use client to perform computations using execution graph
sleep_futures = client.compute(sleep_operations, optimize_graph=False, fifo_timeout="0ms")

# collect and print results
sleep_results = client.gather(sleep_futures)
print(sleep_results)

Using Dask, we see that this whole process takes a little over a second - each worker is executing in parallel!

<a id="using"></a>
## Using cuDF DataFrames with Dask

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
import subprocess

# parse the hostname IP address
cmd = "hostname --all-ip-addresses"
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
ip_address = str(output.decode()).split()[0]

# create a local CUDA cluster
cluster = LocalCUDACluster(ip=ip_address)
client = Client(cluster)
client

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
import cudf; print('cuDF Version:', cudf.__version__)
import numpy as np; print('NumPy Version:', np.__version__)


def load_data(n_rows):
    df = cudf.DataFrame()
    #n_rows = 10000
    #df['key'] = np.arange(n_rows)
    df['key'] = np.random.binomial(n=1, p=0.5, size=(n_rows,))
    df['value'] = np.random.normal(size=(n_rows,))
    return df

In [None]:
n_workers = 8
# n_rows = 125000000
n_rows = 12500
# dfs = [delayed(load_data)((i + 1) * 10000) for i in range(n_workers)]
dfs = [delayed(load_data)() for i in range(n_workers)]
dfs

In [None]:
from dask.delayed import delayed

def head(dataframe):
    return dataframe.head()

In [None]:
from dask.distributed import wait


# dfs = [delayed(load_data)((i + 1) * 10000) for i in range(n_workers)]
dfs = [delayed(load_data)(n_rows) for i in range(n_workers)]
wait(dfs)
dfs = [delayed(head)(d) for d in dfs]

In [None]:
futures = client.compute(dfs)
time.sleep(3)
futures

In [None]:
results = client.gather(futures)
results

In [None]:
print(results[0])

In [None]:
from dask.delayed import delayed


def length(dataframe):
    return dataframe.shape[0]

In [None]:
from dask.distributed import wait


# dfs = [delayed(load_data)((i + 1) * 10000) for i in range(n_workers)]
dfs = [delayed(load_data)(n_rows) for i in range(n_workers)]
wait(dfs)
lengths = [delayed(length)(d) for d in dfs]
number_of_rows = delayed(sum)(lengths)

In [None]:
number_of_rows.visualize()

In [None]:
future = client.compute(number_of_rows)
time.sleep(3)
future

In [None]:
result = client.gather(future)
result

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
from dask.delayed import delayed


def groupby(dataframe):
    return dataframe.groupby('key')['value'].mean()

In [None]:
from dask.distributed import wait


# dfs = [delayed(load_data)((i + 1) * 10000) for i in range(n_workers)]
dfs = [delayed(load_data)(n_rows) for i in range(n_workers)]
wait(dfs)
groupbys = [delayed(groupby)(d) for d in dfs]

In [None]:
dfs_futures = client.compute(dfs)
groupbys_futures = client.compute(groupbys)
import time; time.sleep(3)

In [None]:
persisted_dfs = client.gather(dfs_futures)
persisted_dfs

In [None]:
original_df = cudf.concat(persisted_dfs, ignore_index=True)
print(original_df.head())

In [None]:
print(original_df.groupby('key')['value'].mean())

In [None]:
results = client.gather(groupbys_futures)
results

In [None]:
computed_results = cudf.concat(results, ignore_index=True)
print(computed_results)

In [None]:
foo = cudf.DataFrame({'key': np.array(computed_results.index), 
                      'value': computed_results.to_array()})

In [None]:
print(foo)

In [None]:
print(foo.groupby('key')['value'].mean())

In [None]:
print(foo.groupby('key')['value'].sum())

In [None]:
print(original_df.groupby('key')['value'].mean())

#### Dask cuDF - a simplified API

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
# ddf = dask_cudf.from_cudf()

In [None]:
# print(ddf.groupby('key')['value'].mean())

<a id="daskcudfdataframes"></a>
## Dask cuDF DataFrames

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

#### Reading data

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
import dask_cudf; print('Dask cuDF Version:', dask_cudf.__version__)


# performance_glob = os.path.join(performance_path, 'Performance_*')
# performance_df = dask_cudf.read_csv(performance_glob, delimiter='|', 
#                                     names=list(performance_dtypes.keys()), 
#                                     dtype=list(performance_dtypes.values()))

In [None]:
# performance_glob = os.path.join(performance_path, 'Performance_*')
# performance_df = dask_cudf.read_csv(performance_glob, delimiter='|', 
#                                     names=list(performance_dtypes.keys()), 
#                                     dtype=list(performance_dtypes.values()))

In [None]:
# performance_glob = os.path.join(performance_path, 'Performance_*')
# performance_df = dask_cudf.read_csv(performance_glob, delimiter='|', 
#                                     names=list(performance_dtypes.keys()), 
#                                     dtype=list(performance_dtypes.values()))

In [None]:
# performance_glob = os.path.join(performance_path, 'Performance_*')
# performance_df = dask_cudf.read_csv(performance_glob, delimiter='|', 
#                                     names=list(performance_dtypes.keys()), 
#                                     dtype=list(performance_dtypes.values()))

#### Inspecting a Dask cuDF DataFrame

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
performance_df

In [None]:
type(performance_df)

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
performance_df.npartitions

<a id="daskcudfapi"></a>
## Dask cuDF API

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
performance_df.head()

In [None]:
print(performance_df.head())

In [None]:
# calculate number of rows
performance_df.map_partitions(len).compute().sum()

In [None]:
!cat /datasets/rapids/mortgage/mortgage_2000_1gb/perf/* | wc -l

In [None]:
aggregation = performance_df['loan_age'].mean()
print(aggregation.compute())

In [None]:
%%bash

ls -alh /datasets/rapids/mortgage/mortgage_2000_1gb/perf

In [None]:
from collections import OrderedDict
import cudf; print('cuDF Version:', cudf.__version__)
import dask_cudf; print('Dask cuDF Version:', dask_cudf.__version__)
import utils





In [None]:
import os

base_path = os.path.join('/', 'datasets', 'rapids', 'mortgage', 'mortgage_2000_1gb')
filepath = os.path.join(base_path, 'perf', 'Performance_*')
# filepath = os.path.join(base_path, 'perf', 'Performance_2000Q1.txt_0')

In [None]:
df = load_performance_dataset(filepath)
df

#### Selecting Rows or Columns

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
client.restart()

In [None]:
print(type(df))

In [None]:
# select rows
df_subset = df[0:4]
print(df_subset)

In [None]:
print(type(df_subset))

In [None]:
df_result = df_subset.compute()
print(df_result)

In [None]:
print(type(df_result))

In [None]:
print(df_result.shape)

In [None]:
df.npartitions * 5

In [None]:
# select columns
df_subset = df['loan_id']
print(df_subset)

In [None]:
print(type(df_subset))

In [None]:
print(df_subset.head())
print(type(df_subset.head()))

In [None]:
df_subset = df[['loan_id', 'current_loan_delinquency_status']]
print(df_subset)

In [None]:
print(type(df_subset))

In [None]:
print(df_subset.head())

In [None]:
# select both rows and columns
df_subset = df.loc[0:4, ['loan_id', 'current_loan_delinquency_status']]
print(df_subset)

In [None]:
print(type(df_subset))

In [None]:
df_result = df_subset.compute()
print(df_result)

In [None]:
print(type(df_result))
print(df_result.shape)

#### Dropping Rows or Columns

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
client.restart()

In [None]:
df.map_partitions(len).compute().sum()

In [None]:
# df.drop(0:100, axis=0)

In [None]:
df.map_partitions(len).compute().sum()

In [None]:
df.columns

In [None]:
# df.drop(['loan_age'], axis=1)

In [None]:
df.columns

#### Manipulating Columns

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
client.restart()

In [None]:
df['new_column'] = df['loan_id']

In [None]:
df.columns

In [None]:
print(type(df))

In [None]:
print(df['new_column'].head())

In [None]:
# df.drop(['new_column'], axis=1)

#### Transforming Columns

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
client.restart()

In [None]:
df['mean_loan_age'] = df['loan_age'].mean()

#### Renaming Columns

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
client.restart()

In [None]:
df.columns

In [None]:
# df.columns[9] = 'metropolitan_statistical_area'

In [None]:
df.columns

In [None]:
# df['new_column'] = df['loan_id']
# df.drop('loan_id', axis=1)

In [None]:
df.columns

#### Modifying Data Types

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
client.restart()

In [None]:
df.dtypes

In [None]:
df.dtypes

In [None]:
df.dtypes

#### Working with Missing Values

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
client.restart()

In [None]:
# calculate how many rows in each column have actual values
# # ideal
# column_counts = df.count()
# column_counts

# alternative
column_counts = []
for column in list(df.columns):
    column_count = df[column].count().compute()
    column_counts.append((column, column_count))

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
number_of_rows = df.map_partitions(len).compute().sum()

In [None]:
for column, count in column_counts:
    print(column, ':', (count / number_of_rows) * 100)

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
# # loop over each column in the dataframe and each column's dtype
# for column, data_type in df.dtypes.items():
#     # if the data type is not numeric, cast to int32 and fill with -1
#     if str(data_type) == "category":
#         df[column] = df[column].astype('int32').fillna(-1)

#     # if the data type is numeric, cast to appropriate type and fill with -1
#     if str(data_type) in ['int8', 'int16', 'int32', 'int64', 'float32', 'float64']:
#         df[column] = df[column].fillna(np.dtype(data_type).type(-1))

In [None]:
# df.persist()

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
# calculate how many rows in each column have actual values
# # ideal
# column_counts = df.count()
# column_counts

# alternative
column_counts = []
for column in list(df.columns):
    column_count = df[column].count().compute()
    column_counts.append((column, column_count))

In [None]:
for column, count in column_counts:
    print(column, ':', (count / number_of_rows) * 100)

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

#### Working with Indexes

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
client.restart()

#### Sorting Values

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
client.restart()

#### Merging DataFrames

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
client.restart()

In [None]:
!ls -alh /datasets/rapids/mortgage/mortgage_2000_1gb/acq

In [None]:
dtypes = OrderedDict([
        ('loan_id', 'int64'),
        ('orig_channel', 'category'),
        ('seller_name', 'category'),
        ('orig_interest_rate', 'float64'),
        ('orig_upb', 'int64'),
        ('orig_loan_term', 'int64'),
        ('orig_date', 'date'),
        ('first_pay_date', 'date'),
        ('orig_ltv', 'float64'),
        ('orig_cltv', 'float64'),
        ('num_borrowers', 'float64'),
        ('dti', 'float64'),
        ('borrower_credit_score', 'float64'),
        ('first_home_buyer', 'category'),
        ('loan_purpose', 'category'),
        ('property_type', 'category'),
        ('num_units', 'int64'),
        ('occupancy_status', 'category'),
        ('property_state', 'category'),
        ('zip', 'int64'),
        ('mortgage_insurance_percent', 'float64'),
        ('product_type', 'category'),
        ('coborrow_credit_score', 'float64'),
        ('mortgage_insurance_type', 'float64'),
        ('relocation_mortgage_indicator', 'category')
    ])

In [None]:
filepath = os.path.join(base_path, 'acq', 'Acquisition_2000Q1.txt')
# filepath = os.path.join(base_path, 'perf', 'Acquisition_')
acq_df = dask_cudf.read_csv(filepath, delimiter='|', 
                            names=list(dtypes.keys()), dtype=list(dtypes.values()))

In [None]:
print(acq_df.head())

In [None]:
# calculate number of rows
acq_df.map_partitions(len).compute().sum()

In [None]:
acq_df.dtypes

In [None]:
# acq_df['orig_date'] = acq_df['orig_date'].astype(np.datetime64)
# acq_df['first_pay_date'] = acq_df['first_pay_date'].astype(np.datetime64)

In [None]:
acq_df.dtypes

In [None]:
# acq_df.drop(['orig_date', 'first_pay_date'], axis=1)

In [None]:
acq_df.head()

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
!head /datasets/rapids/mortgage/mortgage_2000_1gb/names.csv

In [None]:
dtypes = OrderedDict([
        ("seller_name", "category"),
        ("new", "category"),
    ])

In [None]:
filepath = os.path.join(base_path, 'names.csv')
# filepath = os.path.join(base_path, 'perf', 'Performance_2000Q1.txt_0')
names_df = dask_cudf.read_csv(filepath, delimiter='|', 
                              names=list(dtypes.keys()), dtype=list(dtypes.values()))

In [None]:
print(names_df.head())

In [None]:
names_df.dtypes

In [None]:
# calculate number of rows
names_df.map_partitions(len).compute().sum()

In [None]:
print(names_df.head())

In [None]:
print(acq_df.head())

In [None]:
print(type(acq_df), type(names_df))

In [None]:
acq_df.dtypes

In [None]:
# acq_df.drop(['orig_date', 'first_pay_date'], axis=1)
subset_columns = [i for i in acq_df.columns if i not in ['orig_date', 'first_pay_date']]
print(subset_columns)

In [None]:
merged_df = acq_df[subset_columns].merge(names_df, how='left', on=['seller_name'])

In [None]:
acq_df.dtypes

In [None]:
merged_df.dtypes

In [None]:
# calculate number of rows
merged_df.head()

#### Concatenating DataFrames

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
client.restart()

In [None]:
df_delayed = df.to_delayed()
df_delayed

In [None]:
from dask.delayed import delayed


def head(dataframe):
    return dataframe.head()


dfs = [delayed(head)(d) for d in df_delayed]

In [None]:
client.scheduler_info()

In [None]:
workers = client.scheduler_info()['workers']
print(len(workers))
print(workers)

In [None]:
worker_ids = [worker['id'] for worker in workers.values()]
print(worker_ids)

In [None]:
from dask.distributed import wait

futures = client.compute(dfs)
wait(futures)
futures

In [None]:
# [(gpu_df, list(client.who_has(gpu_df).values())[0]) for gpu_df in gpu_dfs]

# partition_worker_map = [(partition, list(client.who_has(partition).values())[0]) for partition in df]
# [client.who_has(partition) for partition in df ]

In [None]:
concatenations = []
for worker, list_of_partitions_delayed in client.has_what().items():
    concatenations.append(delayed(cudf.concat)(list_of_partitions_delayed))

In [None]:
concatenations

In [None]:
futures = client.compute(concatenations)

In [None]:
# results = [result.result() for future in futures]
results = client.gather(futures)

In [None]:
results[0]

#### Aggregating with Groupbys

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
client.restart()

#### One Hot Encoding

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
client.restart()

#### Custom Operations

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

In [None]:
client.restart()

In [None]:
performance_df_delayed = performance_df.to_delayed()
performance_df_delayed

In [None]:
head_dfs = [delayed(head)(d) for d in performance_df_delayed]
wait(head_dfs)

In [None]:
futures = client.compute(head_dfs)
time.sleep(3)
futures

In [None]:
results = client.gather(futures)
results

In [None]:
print(results[0])

In [None]:
print(results[1])

<a id="conclusion"></a>
## Conclusion

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

To learn more about RAPIDS, be sure to check out: 

* [Open Source Website](http://rapids.ai)
* [GitHub](https://github.com/rapidsai/)
* [Press Release](https://nvidianews.nvidia.com/news/nvidia-introduces-rapids-open-source-gpu-acceleration-platform-for-large-scale-data-analytics-and-machine-learning)
* [NVIDIA Blog](https://blogs.nvidia.com/blog/2018/10/10/rapids-data-science-open-source-community/)
* [Developer Blog](https://devblogs.nvidia.com/gpu-accelerated-analytics-rapids/)
* [NVIDIA Data Science Webpage](https://www.nvidia.com/en-us/deep-learning-ai/solutions/data-science/)