# Lab 8: Cluster Computing in Dask with RCC and AWS

## A Quick Start of Using Dask Locally

Today we are going to learn about [**Dask**](https://dask.org/), a powerful and comprehensive cluster computing library in python. To install
dask on your local machine, just run the following command:

`python -m pip install "dask[complete]"`

It is recommended to use the "complete" syntax here, which would automatically install all the core packages in dask. Besides, when running on the RCC system, where the python environment is managed by anaconda, it is highly recommended to create a virtual environment to hold dask. You can either run `conda install dask` or the command above to install dask in your conda environment.

To use dask on a local single machine (e.g., your laptop), there is nothing else to install. You could simply run the following commands to start a local cluster:

In [1]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()  # sceduler_port=1234
cluster

VBox(children=(HTML(value='<h2>LocalCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    …

In a local cluster, the default number of processes started equals to the number of CPUs on the local machine. To adjust the number of workers (processes) in dask, you can use the scale method as below.

In [2]:
cluster.scale(4) #your number of processes

Dask provides users with a convenient interface to monitor the usage and status of the workers in the cluster. To open the monitor, you just need to type `localhost:8787` after launching the cluster. 8787 is the default port number for the diagonastic server, you can also customize the port number by specifying the `dashboard_address` argument when setting up the cluster. More options to customize the local cluster configuration could be found [here](https://docs.dask.org/en/latest/setup/single-distributed.html).


<img src="./dask_worker monitor.png">

To use the resources in the cluster, we need a scheduler to coordinate the operation and storage of the data, and this is why the client object in dask comes into being. The client object provides some functions similar to pool.map (map) and pool.apply (submit) in python multiprocessing, and also functions analogous to the scatter and gather methods in MPI4PY to handle large iterable objects and collect map results. Below it's a simple example of how to use these methods in dask, and all the data operation methods and data types, which we would mention in a moment, can all be used in the same way once the cluster is configured properly, no matter on a single machine or in a multi-node system (RCC) or a cloud computing platform (AWS).

In [3]:
client = Client(cluster)
def digit_sum(x):
    string = str(x)
    return sum([int(c) for c in string])

In [4]:
import numpy as np

# submit single jobs and get the result
result_future = client.submit(digit_sum,1234)
result_future.result()

10

In [5]:
# submit map jobs and get the result
results_future = client.map(digit_sum, np.random.randint(1e+8, 1e+9, int(1e+4)))
results = client.gather(results_future)
results[:10]

[62, 42, 40, 34, 57, 41, 40, 29, 57, 41]

Alternatively, we can firstly scatter the data before mapping, this is useful when elements in the iterable are large. This generally works for python built-in iterables like list, tuple, set and dictionaries. For numpy arrays, please use client.map directly or dask.array to parallelize.

In [6]:
data_future = client.scatter({x:y for x,y in enumerate(np.random.randint(1e+8,1e+9,20))})
data_future

{0: <Future: finished, type: numpy.int64, key: 0>,
 1: <Future: finished, type: numpy.int64, key: 1>,
 2: <Future: finished, type: numpy.int64, key: 2>,
 3: <Future: finished, type: numpy.int64, key: 3>,
 4: <Future: finished, type: numpy.int64, key: 4>,
 5: <Future: finished, type: numpy.int64, key: 5>,
 6: <Future: finished, type: numpy.int64, key: 6>,
 7: <Future: finished, type: numpy.int64, key: 7>,
 8: <Future: finished, type: numpy.int64, key: 8>,
 9: <Future: finished, type: numpy.int64, key: 9>,
 10: <Future: finished, type: numpy.int64, key: 10>,
 11: <Future: finished, type: numpy.int64, key: 11>,
 12: <Future: finished, type: numpy.int64, key: 12>,
 13: <Future: finished, type: numpy.int64, key: 13>,
 14: <Future: finished, type: numpy.int64, key: 14>,
 15: <Future: finished, type: numpy.int64, key: 15>,
 16: <Future: finished, type: numpy.int64, key: 16>,
 17: <Future: finished, type: numpy.int64, key: 17>,
 18: <Future: finished, type: numpy.int64, key: 18>,
 19: <Future:

In [7]:
results_future = client.map(digit_sum, data_future.values())
results = client.gather(results_future)
results[:10]

[31, 51, 43, 49, 41, 28, 39, 54, 42, 52]

From this example, you see that dask would not directly send back the computation result but store them in the future objects instead. Future objects are stored in the shared memory for each processes. An advantage of this mechanism is that it can avoid unnecessary communications between processes, which cost extra time. This design is suitable for build up process pipelines, where we would have one or more intermediate results to pass through the whole process. That is to say, you can pass the intermediate data to another client.map function for further computation without the effort to manually send the data from local process to the shared memory.     

Dask also provides interface for users to check the address, status, etc. of individual workers in code by defining the Nanny object. To check this, call cluster.workers and its affliated functions  

In [8]:
cluster.workers

{0: <Nanny: tcp://127.0.0.1:61531, threads: 2>,
 1: <Nanny: tcp://127.0.0.1:61532, threads: 2>,
 2: <Nanny: tcp://127.0.0.1:61529, threads: 2>,
 3: <Nanny: tcp://127.0.0.1:61530, threads: 2>}

To restart (this means killing all currently running jobs and cleaning the memory) run the following, the restarted worker could have a different address:

In [9]:
worker_index = 0
cluster.workers[worker_index].restart(timeout=10)

<coroutine object Nanny.restart at 0x7ff8e50d5200>

In [10]:
cluster.close()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError


To restart the cluster, use `client.restart()`. To shut down the local cluster, use `cluster.close()`. You do not have to manually shut down the client session since it would end as long as the python kernel is terminated. 

## Running Dask in RCC

In the following tutorial, we are going to use the same datasets in the spark session to get you familiar with using dask in a multi-node cluster computing environment, the RCC system. And we would still use the example of classifying the ratings of books to show the details of deploying dask in machine learning tasks.

Ideally, we should be able to read the product review dataset from Amazon S3 using [dask.read_parquet](https://docs.dask.org/en/latest/dataframe-api.html?highlight=read_parquet#dask.dataframe.read_parquet). However, the RCC system is designed to block Internet connection on the compute nodes, which would be used as components of our Dask cluster. So we are unable to use Dask to parallelly read the external datasets from S3 on RCC. To make it work, we have manually downloaded the book review datasets to the directory of our class resource on RCC ('/project2/macs30123/'), and we can read the data "locally" on RCC now.  

To use the nodes of RCC with dask, we need to install both dask and dask-jobqueue modules in our conda environment as follow. The [dask-jobqueue](https://jobqueue.dask.org/en/latest/) is used to deploy dask on common job queuing systems like PBS (used in SSD acropolis) and Slurm (used in RCC). It allows us to automatically generate sbatch scripts and directly submit them to the job manager system interactively in jupyter notebook.  

In [None]:
! pip install "dask[complete]" dask-jobqueue --upgrade #conda install dask-jobqueue -c conda-forge

To construct a cluster on RCC system, we use the SLURMCluster of the RCC system to define our sbatch scripts. The "queue" argument specifies the partition to which you would like to submit your jobs; the "core" argument specifies the number of cpus you would like to use for each node (not the entire number of cpus); the "memory" argument defines the total memory allocation for a single node in the cluster, which is evenly distributed over all cpus; the "process" argument integrate cores into subgroups and for most pythonic jobs it is recommmended to make each process hold one cpu to avoid the [Global Interpreter Lock](https://realpython.com/python-gil/); the 'interface' argument is used to define the type of network interface linking different nodes, which in our case is the infiniband interconnect (ib0).       

In [None]:
import dask_jobqueue as jobq

cluster = jobq.SLURMCluster(queue='broadwl',cores=10, memory='40GB',processes=10,
                            walltime='01:00:00', interface='ib0', job_extra=['--account=macs30123']
                            #job_extra=['--output=dask_worker.out',
                            #           '--error=dask_worker.err']
                           )

According to the printed information (if none, use the default port number 8787), we can open the monitor at `localhost:[port-number]`. By default, there are no workers started at this moment, you can check the generated sbatch script, which is just like our sbatch files, before finally submitting the jobs to the queue system by using the scale methods, which would create the multi-node cluster needed by our parallel analysis.

In [None]:
print(cluster.job_script())

The assignment of the client object is an important step to connect the dask backend to the cluster you just specified. Otherwise, they would just run on a local cluster (which is forbidden by RCC). 

In [None]:
# allocate 2 nodes to the cluster
cluster.scale(jobs=2)

# connect the local machine to the nodes
from dask.distributed import Client
client = Client(cluster)

Dask was developed in coordination with other widely used python community projects like Numpy, Pandas, and Scikit-Learn. Like Spark, Dask provides users with parallelized arrays ([dask.array](https://docs.dask.org/en/latest/array.html)), dataframes ([dask.dataframe](https://docs.dask.org/en/latest/dataframe.html)) to store and process large dataset. It also includes a parallelized version of general python collection objects ([dask.bag](https://docs.dask.org/en/latest/bag.html)) for users to implement operations like map and filter. You would practice using these dask data types in courses on datacamp. For this section, we would primarily use the dask dataframes and arrays to finish the machine learning tasks. 

In [None]:
import dask.dataframe as dd

# load the book review data
df = dd.read_csv('/project2/macs30123/AWS_book_reviews/*.csv')

The * symbol above is specially used in dask to read in multiple datasets in a parallel way.

In [None]:
# take a look
df.head()

In [None]:
df.dtypes

In [None]:
df.shape

In [None]:
df.shape[0].compute()

In [None]:
df.describe().compute()

According to the above data exploration, we could see that the dask dataset has many methods that are analogous to the functions in [pandas](https://pandas.pydata.org/docs/reference/index.html), so it would be very easy-to-use if you are already familiar with pandas. One major difference is the use of delay mechanism in dask dataframes, which is revealed by the compute() function. This means that dataframe operations would not be executed unless the compute method is called, and this would trigger all previously called related operations to be performed like a pipeline.  

### Preprocessing

Next we are going to perform the preprocessing for the book review dataset to obtain our features and labels for the machine learning task. Based on our intuition, id_like attributes are not likely to provide much useful information. Other attributes like total_votes and helpful_votes might contain more useful information about the ratings of books. We can make a mean calculation based on the groupby results to check our speculations. 

In [None]:
# do a groupby operation
df.groupby(by='marketplace').mean().compute()

Here we manually encoded the categorical of vine and verified_purchase. You can also do this in dask with the [OrdinalEncoder](https://ml.dask.org/modules/generated/dask_ml.preprocessing.OrdinalEncoder.html). 

In [None]:
# adding some coded features
df['vine_code'] = df['vine'].apply(lambda x:1 if x=='Y' else 0, meta=('vine', 'int64'))
df['verified_purchase_code'] = df['verified_purchase'].apply(lambda x:1 if x=='Y' else 0, meta=('vine', 'int64'))

In [None]:
df[['vine_code','verified_purchase_code']].head()

In [None]:
# do a groupby operation
df.groupby(by='star_rating').mean().compute()

Based on the results, it seems that our guesses are likely to be correct. For these numerical features, helpful_votes, total_votes, vine_code and verified_purchase_code would be included in our machine learning task.     

#### Text Preprocessing

All available feature engineering tools in dask are covered in [dask_ml](https://ml.dask.org/index.html), which we need to install first. To precess the text in dask, you can visit the [vectorizers](https://ml.dask.org/modules/generated/dask_ml.preprocessing.OrdinalEncoder.html) for more information. Noticing that the result of text processing are dask arrays with scipy sparse matrix internally. If you want to check the values in them, you need to use the toarray() or todense() methods.  

In [None]:
!pip install dask_ml --upgrade

In [None]:
# deal with missing values 
df[['review_body','review_headline']]=df[['review_body','review_headline']].fillna(value='empty')

In [None]:
from dask_ml import feature_extraction

vect = feature_extraction.text.HashingVectorizer(stop_words='english',n_features=64)
text_sparse_array = vect.fit_transform(df['review_body'])
text_sparse_array.compute_chunk_sizes()

In [None]:
df['label'] = df['star_rating'].apply(lambda x:1 if x>3 else 0, meta=('star_rating', 'int64'))
df.head()

In [None]:
df.groupby('label').count().compute()['star_rating']

In [None]:
# Pick up some features we think of useful as features
num_feature_df = df[['helpful_votes','total_votes','vine_code','verified_purchase_code']]

It is useful to do some scaling here to decrease the effect of different scales for difffernt attributes. For different scale methods, you can see [here](https://ml.dask.org/modules/api.html#module-dask_ml.preprocessing)

In [None]:
from dask_ml.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
num_feature_df = scaler.fit_transform(num_feature_df)
num_feature_df.head()

In [None]:
import dask.array
label_array = df['label'].to_dask_array().compute_chunk_sizes()
num_feature_array = num_feature_df.to_dask_array().compute_chunk_sizes()

It is worth mentioning the usage of compute_chunk_sizes(), this function computes and memorizes the original size and chunk size of the datasets which is useful for deciding how many samples we need for train and test sets. Previously, the total sample size of feature and label arrays are not known because they are distributed over the cluster nodes. 

### Machine Learning

Finally come to the train-test section. Actually, dask hoes not developed most of their machine learning models. They developed parallel interface for a resourceful machine learning package [scikit-learn](https://scikit-learn.org/stable/). So the way we run machine learning methods in dask is quite similar to using scikit-learn.   

In [None]:
# use the numerical features
from dask_ml.model_selection import train_test_split
X_train, X_test, y_train, y_test=train_test_split(num_feature_array, 
                                                  label_array,
                                                  train_size=0.7,test_size=0.3,
                                                  random_state=0)

In [None]:
X_train

In [None]:
y_test

In [None]:
# using class weights to mitigate data imbalance
class_counts = df.groupby('label').count().compute()['star_rating']

Dask allows you to use multiple models from sklearn, and here we use the SGD classifier to perform the machine learning task. As you can see from the code, we implement cross validation and hyperparameter searching in the model, which is the best part in machine learning tasks to be parallelized. The IncrementalSearchCV we used here is designed for models that have [partial_fit](https://ml.dask.org/modules/generated/dask_ml.wrappers.Incremental.html#) functions, like the SGDClassifier. These incremental learners can train on batches of data. This fits well with Dask’s blocked data structures. Besides, the use of hyper-parameter optimizer is also related to the constraint posed by memory and the increased complexity with more parameters. For details about how to choose this the optimizer, you could view [here](https://ml.dask.org/hyper-parameter-search.html?highlight=memory#scaling-hyperparameter-searches).

In [None]:
from dask_ml.model_selection import IncrementalSearchCV
from sklearn.linear_model import SGDClassifier
import numpy as np
#import joblib

clf = SGDClassifier(class_weight={0:1,1:class_counts[0]/class_counts[1]})
params = {'alpha': np.logspace(-4, 0, num=1000),
          'loss': ['hinge', 'log', 'modified_huber', 'squared_hinge'],
          'average': [True, False]}
#clf.fit(X_train, y_train)
#clf = Incremental(est, scoring='accuracy')
search = IncrementalSearchCV(clf, params)
search.fit(X_train, y_train, classes=[0,1])

In [None]:
# check the best parameter
search.best_estimator_

In [None]:
# check the train accuracy
search.best_score_

In [None]:
# check the test accuracy
search.best_estimator_.score(X_test,y_test)

In [None]:
# plot the confusion matrix
from sklearn.metrics import plot_confusion_matrix
from matplotlib import pyplot as plt
plot_confusion_matrix(search.best_estimator_, X_test, y_test)
plt.show()

This primitive model suffers a lot from data imbalance even after we specified weights. So you may need to find some other ways to reduce this negative effect in your assignments.

In [None]:
# use all the features
from dask_ml.model_selection import train_test_split

X_train, X_test, y_train, y_test=train_test_split(text_sparse_array,
                                                  label_array,
                                                  train_size=0.7,test_size=0.3,
                                                  random_state=0)

In [None]:
X_train

In [None]:
from sklearn.linear_model import SGDClassifier
import dask_ml

sgd = SGDClassifier(class_weight={0:1,1:class_counts[0]/class_counts[1]})
clf = dask_ml.wrappers.Incremental(
    sgd, scoring='accuracy',
)
clf.fit(X_train,y_train, classes=[0, 1])

In [None]:
clf.score(X_train,y_train)

In [None]:
clf.score(X_test,y_test)

In [None]:
X_test_array = X_test.compute().toarray()

In [None]:
# plot the confusion matrix
from sklearn.metrics import plot_confusion_matrix
from matplotlib import pyplot as plt
plot_confusion_matrix(clf.estimator_, X_test_array, y_test)
plt.show()

From the result here, we see that both of the two models have some drawbacks. Using the numerical (including categorical) features yield a result of high bias while using the pure text-based (review-body) features makes the overall accuracy very low. What about use both of these features? It's your turn to try! However, I have to notify you that the sparse matrix generated by the text vectors do not function well with many models, as it has some structural differences with ordinary arrays (like shape). So you might need to think about a feasible way to transfer the text vectors to normal numpy arrays and reload it to dask (This sounds somewhat inefficient, if you have better ideas, please let us know~).  