**(Make sure you are working in a venv)**

In [1]:
# Install dask in the current Jupyter kernel
!python -m pip install "dask[complete]"



In [2]:
# Install dask-ml in the current Jupyter kernel
!python -m pip install dask-ml

Collecting dask-ml
  Downloading dask_ml-2023.3.24-py3-none-any.whl (148 kB)
     ---------------------------------------- 0.0/148.7 kB ? eta -:--:--
     -------- ------------------------------ 30.7/148.7 kB 1.4 MB/s eta 0:00:01
     -------------------------- ----------- 102.4/148.7 kB 1.2 MB/s eta 0:00:01
     -------------------------------------- 148.7/148.7 kB 1.1 MB/s eta 0:00:00
Collecting multipledispatch>=0.4.9
  Using cached multipledispatch-0.6.0-py3-none-any.whl (11 kB)
Collecting dask-glm>=0.2.0
  Using cached dask_glm-0.2.0-py2.py3-none-any.whl (12 kB)
Collecting scikit-learn>=1.2.0
  Downloading scikit_learn-1.2.2-cp39-cp39-win_amd64.whl (8.4 MB)
     ---------------------------------------- 0.0/8.4 MB ? eta -:--:--
      --------------------------------------- 0.1/8.4 MB 6.4 MB/s eta 0:00:02
     - -------------------------------------- 0.2/8.4 MB 3.6 MB/s eta 0:00:03
     - -------------------------------------- 0.3/8.4 MB 3.0 MB/s eta 0:00:03
     -- ----------------

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
tensorflow 2.10.0 requires protobuf<3.20,>=3.9.2, but you have protobuf 3.20.3 which is incompatible.
tensorflow 2.10.0 requires tensorflow-estimator<2.11,>=2.10.0, but you have tensorflow-estimator 2.12.0 which is incompatible.
tensorboard 2.10.0 requires protobuf<3.20,>=3.9.2, but you have protobuf 3.20.3 which is incompatible.
tensorboard 2.10.0 requires tensorboard-data-server<0.7.0,>=0.6.0, but you have tensorboard-data-server 0.7.0 which is incompatible.


# ⏩ Set-Up Dask (LocalCluster)

In [3]:
from dask.distributed import Client

client = Client()

client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 23.88 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:49996,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 23.88 GiB

0,1
Comm: tcp://127.0.0.1:50024,Total threads: 2
Dashboard: http://127.0.0.1:50025/status,Memory: 5.97 GiB
Nanny: tcp://127.0.0.1:49999,
Local directory: C:\Users\miket\AppData\Local\Temp\dask-worker-space\worker-su9i8ymb,Local directory: C:\Users\miket\AppData\Local\Temp\dask-worker-space\worker-su9i8ymb

0,1
Comm: tcp://127.0.0.1:50015,Total threads: 2
Dashboard: http://127.0.0.1:50017/status,Memory: 5.97 GiB
Nanny: tcp://127.0.0.1:50000,
Local directory: C:\Users\miket\AppData\Local\Temp\dask-worker-space\worker-mf8828cs,Local directory: C:\Users\miket\AppData\Local\Temp\dask-worker-space\worker-mf8828cs

0,1
Comm: tcp://127.0.0.1:50016,Total threads: 2
Dashboard: http://127.0.0.1:50018/status,Memory: 5.97 GiB
Nanny: tcp://127.0.0.1:50001,
Local directory: C:\Users\miket\AppData\Local\Temp\dask-worker-space\worker-55_ur00s,Local directory: C:\Users\miket\AppData\Local\Temp\dask-worker-space\worker-55_ur00s

0,1
Comm: tcp://127.0.0.1:50021,Total threads: 2
Dashboard: http://127.0.0.1:50022/status,Memory: 5.97 GiB
Nanny: tcp://127.0.0.1:50002,
Local directory: C:\Users\miket\AppData\Local\Temp\dask-worker-space\worker-e9fz3bf1,Local directory: C:\Users\miket\AppData\Local\Temp\dask-worker-space\worker-e9fz3bf1


# ⏩Tests - Dashboard

Open the localhost client dashboard to see diagnostics/status etc. It would be really helpful to have the dashboard in a second screen so you can what is happening (in parallel).

## 1️⃣ Hyperparameter Optimization

As we saw in the previous Jupyter mini-project the value of the *hyperparameter* `C` and `loss` in PA-Classifiers has a huge impact on the resultant model. The problem of finding the best hyperparameters is called *hyperparameter optimization*. Let's solve this problem for binary classification for a medium sized dataset.

### Create Data

In [4]:
from sklearn.datasets import make_classification

# Create data
X, y = make_classification(
    n_samples=100_000,
    n_features=100,
    n_informative=50,
    n_redundant=10,
    flip_y=0.2,
)

### Define Hyperparameters

There are a few ways to learn the best *hyperparameters* while training. One is `GridSearchCV`. As the name implies, this does a brute-force search over a grid of hyperparameter combinations.

In [5]:
from sklearn.model_selection import GridSearchCV
from sklearn.linear_model import PassiveAggressiveClassifier

# define the parameter grid
param_grid = {
    'C': [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1, 0.5, 1, 5, 10, 50, 100, 200],
    'loss' : ['hinge', 'squared_hinge']
}

# create a PAClassifier
pa_clf = PassiveAggressiveClassifier(random_state=1)

### Solve: Classical way

In [6]:
%%time

# n_jobs=-1 tells sklearn to use all processors
grid_search = GridSearchCV(pa_clf, param_grid, scoring='accuracy', n_jobs=-1)
grid_search.fit(X, y)

CPU times: total: 3.12 s
Wall time: 21.7 s


In [7]:
grid_search.best_params_, grid_search.best_score_

({'C': 1e-06, 'loss': 'hinge'}, 0.76282)

### Solve: Dask way

Reminder: Open Dask cluster dashboard (*Status*)

In [8]:
# Sklearn backend
from joblib import parallel_backend

In [9]:
%%time

grid_search = GridSearchCV(pa_clf, param_grid, scoring='accuracy', n_jobs=-1)
# change sklearn backend to dask (LocalCluster), scatter tells dask to send the data to workers beforehand
with parallel_backend("dask", scatter=[X,y]):
    grid_search.fit(X, y)

CPU times: total: 6.42 s
Wall time: 22.3 s


In [10]:
grid_search.best_params_, grid_search.best_score_

({'C': 1e-06, 'loss': 'hinge'}, 0.76282)

### Close Client

In [25]:
client.close()

## 2️⃣ Incrementally Train Large Datasets

### Create Data

In [12]:
import dask.array as da
from dask_ml.datasets import make_classification # Dask way
from dask.distributed import Client

n, d = 10_000_000, 100

# Dask lazilly loaded into memory. ~8gb
X, y = make_classification(
    n_samples=n,
    n_features=d,
    n_informative=50,
    n_redundant=10,
    flip_y=0.2,
    chunks=n // 10  # partitions
)
X

Unnamed: 0,Array,Chunk
Bytes,7.45 GiB,762.94 MiB
Shape,"(10000000, 100)","(1000000, 100)"
Dask graph,10 chunks in 1 graph layer,10 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 7.45 GiB 762.94 MiB Shape (10000000, 100) (1000000, 100) Dask graph 10 chunks in 1 graph layer Data type float64 numpy.ndarray",100  10000000,

Unnamed: 0,Array,Chunk
Bytes,7.45 GiB,762.94 MiB
Shape,"(10000000, 100)","(1000000, 100)"
Dask graph,10 chunks in 1 graph layer,10 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


### Split data for training and testing

In [13]:
from dask_ml.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y)
X_train

Unnamed: 0,Array,Chunk
Bytes,6.71 GiB,686.65 MiB
Shape,"(9000000, 100)","(900000, 100)"
Dask graph,10 chunks in 33 graph layers,10 chunks in 33 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 6.71 GiB 686.65 MiB Shape (9000000, 100) (900000, 100) Dask graph 10 chunks in 33 graph layers Data type float64 numpy.ndarray",100  9000000,

Unnamed: 0,Array,Chunk
Bytes,6.71 GiB,686.65 MiB
Shape,"(9000000, 100)","(900000, 100)"
Dask graph,10 chunks in 33 graph layers,10 chunks in 33 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


### Persist data in memory

This dataset can fit in distributed memory (**warning**: it is ~8GB), so we call `dask.persist` to ask Dask to execute the computations above and keep the results in memory. If we are working in situations where our dataset does not fit into memory we should skip this.

In [14]:
from dask import persist

X_train, X_test, y_train, y_test = persist(X_train, X_test, y_train, y_test)

### Precompute classes

Since we aim to use `partial_fit` (actually Dask will use it) we need to precompute the classes (distinct labels) beforehand and pass them.

In [15]:
classes = da.unique(y_train).compute()
classes

array([0, 1])

### Create sklearn model

In [16]:
from sklearn.linear_model import PassiveAggressiveClassifier

# We solved hyperparameter opt. problem for this data before :)
pa_clf = PassiveAggressiveClassifier(C=1e-06, loss='hinge')

### Wrap with Dask-ML's Incremental meta-estimetor

We now wrap our `PassiveAggressiveClassifier` with the `dask_ml.wrappers.Incremental` meta-estimator. `dask_ml` provides some meta-estimators that parallelize and scaling out certain tasks that may not be parallelized within scikit-learn itself. `dask_ml.wrappers.Incremental` provides a bridge between Dask and Scikit-Learn estimators supporting the `partial_fit` API. You wrap the underlying estimator in `Incremental`. Dask-ML will sequentially pass each block of a Dask Array to the underlying estimator’s `partial_fit` method.

In [17]:
from dask_ml.wrappers import Incremental

inc = Incremental(pa_clf, scoring='accuracy')

`Incremental` only does data management while leaving the actual algorithm to the underlying sklearn classifier.

### Model training

`Incremental` implements `fit` method, which will perform one loop over the dataset, calling `partial_fit` over each chunk in the Dask array.

In [18]:
inc.fit(X_train, y_train, classes=classes)

In [19]:
inc.score(X_test, y_test)

0.893847

### Pass over the training data many times

Calling `.fit` passes over all chunks of our data once. However, in many cases we may want to pass over ht etraining data many times. To do this we can use the `Incremental.partial_fit` method and a for loop.

In [20]:
for _ in range(10):
    inc.partial_fit(X_train, y_train, classes=classes)
    print(f"Score: {inc.score(X_test, y_test)}")

Score: 0.893861
Score: 0.893875
Score: 0.893883
Score: 0.893875
Score: 0.893903
Score: 0.893888
Score: 0.893905
Score: 0.893897
Score: 0.893916
Score: 0.893852


In [21]:
inc.predict(X_test) # Predict produces lazy arrays

Unnamed: 0,Array,Chunk
Bytes,3.81 MiB,390.62 kiB
Shape,"(1000000,)","(100000,)"
Dask graph,10 chunks in 2 graph layers,10 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray
"Array Chunk Bytes 3.81 MiB 390.62 kiB Shape (1000000,) (100000,) Dask graph 10 chunks in 2 graph layers Data type int32 numpy.ndarray",1000000  1,

Unnamed: 0,Array,Chunk
Bytes,3.81 MiB,390.62 kiB
Shape,"(1000000,)","(100000,)"
Dask graph,10 chunks in 2 graph layers,10 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray


In [22]:
inc.predict(X_test)[:10].compute() # call compute to get reslts

array([1, 0, 0, 1, 1, 0, 0, 1, 1, 1])

In [23]:
inc.score(X_test, y_test)

0.893852

# ⏩ Diving into Dask's internals


## Components of a cluster

A Dask cluster is composed of three different types of objects:

1. **Scheduler**: A single, centralized scheduler process which responds to requests for computations, maintains relavant state about tasks and worker, and sends tasks to workers to be computed.
2. **Workers**: One or more worker processes which compute tasks and store/serve their results.
3. **Clients**: One or more client objects which are the user-facing entry point to interact with the cluster.

<img src="images/dask-cluster.png"
     width="90%"
     alt="Dask components\">

A couple of notes about workers:

- Each worker runs in its own Python process. Each worker Python process has its own `concurrent.futures.ThreadPoolExecutor` which is uses to compute tasks in parallel.
- There's actually a fourth cluster object which is often not discussed: the **Nanny**. By default Dask workers are launched and managed by a separate nanny process. This separate process allows workers to restart themselves if we want to use the `Client.restart` method, or to restart workers automatically if they get above a certain memory limit threshold.

### Cluster managers (recommended)

Dask has the notion of cluster manager objects. Cluster managers offer a consistent interface for common activities like adding/removing workers to a cluster, retrieving logs, etc. We can deploy Dask on **YARN**, **Kubernetes**, **AWS**, **Azure**, **GCP** etc.

In [9]:
from dask.distributed import LocalCluster

# Launch a scheduler and 4 workers on my local machine
cluster = LocalCluster()
cluster

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 23.88 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:52066,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 23.88 GiB

0,1
Comm: tcp://127.0.0.1:52092,Total threads: 2
Dashboard: http://127.0.0.1:52095/status,Memory: 5.97 GiB
Nanny: tcp://127.0.0.1:52069,
Local directory: C:\Users\miket\AppData\Local\Temp\dask-worker-space\worker-nbdsj2f7,Local directory: C:\Users\miket\AppData\Local\Temp\dask-worker-space\worker-nbdsj2f7

0,1
Comm: tcp://127.0.0.1:52091,Total threads: 2
Dashboard: http://127.0.0.1:52093/status,Memory: 5.97 GiB
Nanny: tcp://127.0.0.1:52070,
Local directory: C:\Users\miket\AppData\Local\Temp\dask-worker-space\worker-r6c90pdk,Local directory: C:\Users\miket\AppData\Local\Temp\dask-worker-space\worker-r6c90pdk

0,1
Comm: tcp://127.0.0.1:52085,Total threads: 2
Dashboard: http://127.0.0.1:52086/status,Memory: 5.97 GiB
Nanny: tcp://127.0.0.1:52071,
Local directory: C:\Users\miket\AppData\Local\Temp\dask-worker-space\worker-hs22i20f,Local directory: C:\Users\miket\AppData\Local\Temp\dask-worker-space\worker-hs22i20f

0,1
Comm: tcp://127.0.0.1:52088,Total threads: 2
Dashboard: http://127.0.0.1:52089/status,Memory: 5.97 GiB
Nanny: tcp://127.0.0.1:52072,
Local directory: C:\Users\miket\AppData\Local\Temp\dask-worker-space\worker-ergvcj2a,Local directory: C:\Users\miket\AppData\Local\Temp\dask-worker-space\worker-ergvcj2a


In [10]:
# Retrieve cluster logs
cluster.get_logs()

One of the nice things about `LocalCluster` is it gives us direct access the `Scheduler` Python object. This allows us to easily inspect the scheduler directly.

In [11]:
scheduler = cluster.scheduler
scheduler

In [12]:
dict(scheduler.workers) # The state of all the workers

{'tcp://127.0.0.1:52085': <WorkerState 'tcp://127.0.0.1:52085', name: 2, status: running, memory: 0, processing: 0>,
 'tcp://127.0.0.1:52088': <WorkerState 'tcp://127.0.0.1:52088', name: 3, status: running, memory: 0, processing: 0>,
 'tcp://127.0.0.1:52091': <WorkerState 'tcp://127.0.0.1:52091', name: 1, status: running, memory: 0, processing: 0>,
 'tcp://127.0.0.1:52092': <WorkerState 'tcp://127.0.0.1:52092', name: 0, status: running, memory: 0, processing: 0>}

Let's take a look at the `WorkerState` attributes

In [14]:
worker_state = next(iter(scheduler.workers.values()))
worker_state

In [15]:
worker_state.address   # Worker's address

'tcp://127.0.0.1:52085'

In [16]:
worker_state.nthreads   # Number of threads in the worker's `ThreadPoolExecutor`

2

In [17]:
# Dictionary of all tasks which are currently being processed, along with the current duration of the task
worker_state.executing

{}

In [18]:
worker_state.metrics   # Various metrics describing the current state of the worker

{'task_counts': {'executing': 0,
  'long-running': 0,
  'memory': 0,
  'ready': 0,
  'constrained': 0,
  'waiting': 0,
  'fetch': 0,
  'missing': 0,
  'flight': 0,
  'other': 0},
 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}},
 'managed_bytes': 0,
 'spilled_bytes': {'memory': 0, 'disk': 0},
 'transfer': {'incoming_bytes': 0,
  'incoming_count': 0,
  'incoming_count_total': 0,
  'outgoing_bytes': 0,
  'outgoing_count': 0,
  'outgoing_count_total': 0},
 'event_loop_interval': 0.020009611167159734,
 'cpu': 3.1,
 'memory': 81469440,
 'time': 1676744487.020529,
 'host_net_io': {'read_bps': 119.59302493498478,
  'write_bps': 328.8808185712081},
 'host_disk_io': {'read_bps': 0.0, 'write_bps': 0.0}}

In [20]:
cluster.workers # Notice the nanny object

{0: <Nanny: tcp://127.0.0.1:52092, threads: 2>,
 1: <Nanny: tcp://127.0.0.1:52091, threads: 2>,
 2: <Nanny: tcp://127.0.0.1:52085, threads: 2>,
 3: <Nanny: tcp://127.0.0.1:52088, threads: 2>}

In [21]:
worker = next(iter(cluster.workers.values()))
worker

<Nanny: tcp://127.0.0.1:52092, threads: 2>

In [8]:
# Shut down cluster
cluster.close()

# ⏩ Online Distributed ML using Dask

## 1️⃣ Customizing Worker class - Hacky

When we train a model , Dask's idea is that we wrap the `estimator` in some Dask wrapper from `dask_ml.wrappers` and Dask will handle the rest. It will handle how the training data is distributed (we have control), what each worker does, etc. Internally, Dask would distribute the data to the workers in `chunks` and fit the model in these `chunks` (it would call `partial_fit`), but we do not have a huge say in this process. It would indeed train the model in a distributed fashion and we would get the result. But, in our use-case we want each worker to have its own model. The model should be anything we would like it to, for example, `SGDClassifier`, `PassiveAggressiveClassifier` or anything (that supports `partial_fit`) in the sklearn library. We also want each `Worker` to label Online instances that we sent it. There was a lot of thought of how we can achieve this and this is the easiest approach:

We extend the `Worker` class from Dask and let it instanciate sklearn `estimators` of our liking. In the following implementation we only support `PassiveAggressiveClassifier` but it will be very obvious how easily this can be extended for other models (literally 5 lines of code). Now, the idea is simple: The client instructs each worker to `initialize_model`, given the `model_name`, `classes`, `n_features`, `kwargs` all necessary for the training to work since we intend to only use use `partial_fit` (never `fit`). More specifically, we have to do some extra work so everything runs smoothly (manually set `.coef_`, `intercept_`, etc.). Then, the client will send `chunks` (or batches) of data to our workers who will train their models: `model_partial_fit`. After some time (currently after every round) the workers send their models (to be precise, their `coef_`s and `intercept_`) to the client who will combine them (currently with `numpy.mean`) and send them back to the workers who will synchronize (to be precise, set their `coef_` to the `global_coef` and `intercept_` to `global_intercept`).

In [18]:
from distributed import Worker
from sklearn.linear_model import PassiveAggressiveClassifier
import numpy as np


class FedLWorker(Worker):
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
    
    def initialize_model(self, model_name, classes, n_features, **kwargs):
        """ Initializes the model in worker's memory.
        
        :param: model_name: The model's name (Useful for us only to know which model to create)
        :parm classes: The distinct y-labels of the whole dataset.
        :param n_features: Number of features in the instances. Used to init coef_ 
        :param kwargs: The initialization keyword-args of the model. ex. C=0.01, loss='hinge'
        """
        if model_name == "PassiveAggressiveClassifier":
            self.model = PassiveAggressiveClassifier(**kwargs)
            
            self.model.classes_ = classes
            
            n_classes = classes.size
            
            # See docs coef_ : ndarray of shape (1, n_features) if n_classes == 2 else (n_classes, n_features)
            if n_classes == 2:
                self.model.coef_ = np.zeros((1, n_features))
            else:
                self.model.coef_ = np.zeros((n_classes, n_features))
            
            # See docs intercept_ : ndarray of shape (1,) if n_classes == 2 else (n_classes,)
            if n_classes == 2:
                self.model.intercept_ = np.zeros(1)
            else:
                self.model.intercept_ = np.zeros(n_classes)
              
    def model_partial_fit(self, X_train, y_train):
        self.model.partial_fit(X_train, y_train)
        
    def get_model_coef(self):
        return self.model.coef_
    
    def get_model_intercept(self):
        return self.model.intercept_
    
    def set_model_coef(self, coef):
        self.model.coef_ = coef
    
    def set_model_intercept(self, intercept):
        self.model.intercept_ = intercept
    
    def predict(self, x):
        return self.model.predict(x)



## 2️⃣ Initialize Cluster - Client

The `worker_class` is the class used to instantiate workers. Usually we have **nanny** class but in our case we have defined our own worker class `CustomWorker` which supports the extra functionalities we need. Hence, `worker_class=FedLWorker`.

Adjust `n_workers`, `threads_per_worker` for your enviroment.

Note: By using `worker_class=FedLWorker` we overwrite the *nanny* process completely (no *nanny*)

In [19]:
from dask.distributed import Client, LocalCluster, get_worker

cluster = LocalCluster(n_workers=8, threads_per_worker=1, worker_class=FedLWorker)
client = Client(cluster)

## 3️⃣ Create Data

We create some data in the Dask way. Data are lazily stored and distributed in memory. Only when `.compute()` happens do the data get materialized in memory.

We follow the convention and split data to training data and testing data. Then, we create batches of data (or else `chunks`). Each `chunk` of data holds `batch_size` number of X-instances and `batch_size` number of y-labels. The `X_batches` and `y_batches` are then ready to be distributed to the workers.

Change `n`, `d`, `batch_size` to your liking, but note that with small `batch_size` we have heavy communication cost overhead and the learning might take considerably longer. You will see it in Dashboard.

Note: Ignore `UserWarning`, it has to do with the scheduler (no problem).

In [20]:
from dask_ml.model_selection import train_test_split
from dask_ml.datasets import make_classification # Dask way
import dask.array as da

# ~ 1GB Dataset (with the current numbers)
n, d = 1_000_000, 100
#batch_size = 16_384
batch_size = 4_096

# Distinct y-labels in the entire Dataset. We know that for binary classification:
classes = np.array([0,1])

# Dask lazilly loaded into memory.
X, y = make_classification(
    n_samples=n,
    n_features=d,
    n_informative=50,
    n_redundant=10,
    n_classes=2,
    flip_y=0.05,
    chunks=batch_size  # partitions -> n // batch_size
)

# Compute classes using Dask (the proper way)
#classes = da.unique(y).compute() 

# Split into random train and test subsets
X_train, X_test, y_train, y_test =  train_test_split(X, y, test_size=0.1)

# Each batch is one block (not yet materialized in memory, no .compute() yet)
X_batches = X_train.blocks
y_batches = y_train.blocks



In [21]:
X_train

Unnamed: 0,Array,Chunk
Bytes,686.57 MiB,2.81 MiB
Shape,"(899902, 100)","(3686, 100)"
Dask graph,245 chunks in 738 graph layers,245 chunks in 738 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 686.57 MiB 2.81 MiB Shape (899902, 100) (3686, 100) Dask graph 245 chunks in 738 graph layers Data type float64 numpy.ndarray",100  899902,

Unnamed: 0,Array,Chunk
Bytes,686.57 MiB,2.81 MiB
Shape,"(899902, 100)","(3686, 100)"
Dask graph,245 chunks in 738 graph layers,245 chunks in 738 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


## 4️⃣ Train in Federated Learning setting (with *stream* of batches)

In Federated Learning, the goal is to train a global model using data that is distributed across multiple devices, without centralizing the data. In this application, there is a central coordinator (`client`), and multiple workers (`FedLWorker`) that each hold a portion of the data. Workers train their local models on their own data (batches), and periodically send the updated model parameters to the coordinator, which aggregates them to update the `global_model`. It's worth noting that in this application our coordinator `client` only tells workers how to find data (with *futures* using `.scatter`). No data (batch) is materialized outside of some worker's memory at all. All data is materialized when workers use `.compute()`. This way the basic premise of `decentrilized` data (in FL) holds in our application.

More specifically, the idea is the following:
- Initialize a global model on client: `global_model`
- Initialize each worker's local model: `.run` the `worker_initialize_model` function on each worker.
- Start iterating over tuples of batches and workers (one-to-one), in a cyclic manner considering the workers. Again, we have to note that the batches are not materialized yet (Dask chunks)
    - Distribute the batch to the worker with `.scatter` (only the information is passed, not the data).
    - Instruct the worker to train on this batch with `.submit`. The task goes to the scheduler. Since we only care about the side-effects of the task (i.e., the worker trains its model) and we do not expect the worker to send anything back we use `fire_and_forget`. Workers materialize the batch with `.compute` directly (noone except them has seen the data until now) and train their model with one `.partial_fit`
    - Check the round terminating codition, `round_terminates()`.
        - If **true** it's time for synchronization: Get the each worker's model `coef_` and `intercept_` and use some scheme to combine the models (currently just mean) to find the `global_coef` and `global_intercept`. This happens with `gather_and_combine_worker_models`. Update the `global_model` to these values and distribute them to all the workers (synchronize all worker models).
        - If **false** continue.
- Synchronize one last time. Note: we only update `global_model` using `gather_and_combine_worker_models` as explained before, we don't synchronize the workers one last time, on purpose.

**TODO**: 
   - IMPORTANT: When round terminating condition is true it means that we must now synchronize. But, tasks still exist in the scheduler that instruct workers to train on (previous) batches. We obviously block inside synchronization (i.e. `.run`) so no more learning tasks will be sent, but the already sent tasks must first run. This means, even though we know that we must synchronize exactly at this moment, workers will continue training (until tasks are complete) before we synchronize. Think about this... might cause problems. (One potential approach: don't use `fire_and_forget`, collect futures in a list and `.cancel` when sync must happen).
   - Use better round terminating condition.
   - When thinking about synchronization approach, research of ways for workers to inform client that synchronization must happen (their model drifted) without blocking `.gather` or `.result`. (Sol: `pub()` - `sub()`)
   - *Mean* of the weights (`coef_`) of each model is not suitable in all scenarios. Create a `combine` function to be able to abstract this.
   - Same goes for `intercept_`.

Note1: This cell is re-runnable.

Note2: Open the *Dashboard* in *Status* and run the cell to see that the approach is indeed distributed.

In [22]:
%%time
from dask.distributed import fire_and_forget

# kwargs for our models
kwargs = {'C':1e-06, 'loss':'squared_hinge'}

# Our global model
global_model = PassiveAggressiveClassifier(**kwargs)
global_model.classes_ = classes

        
def worker_initialize_model(dask_worker, model_name, classes, n_features, **kwargs):
    """ Used by Client.run to initialize models inside each worker
    
    :param dask_worker: This variable will be populated with the worker itself.
    :params: See CustomWorker.initialize_model
    """
    dask_worker.initialize_model(model_name, classes, n_features, **kwargs)
    
    
def worker_synchronize_with_global_model(dask_worker, global_model_coef, global_model_intercept):
    """ Synchronize worker with the global model. (coef_ , intercept_) """
    dask_worker.set_model_coef(global_model_coef)
    dask_worker.set_model_intercept(global_model_intercept)

    
def worker_batch_train(X_batch, y_batch):
    """ Train worker's model with this batch -> (X_batch, y_batch) 
    Warning: X_batch, y_batch are futures, not realized in memory yet.
    """
    # The worker on which this task is running
    worker = get_worker()
    
    # Trigger the computation of the future objects: worker (us) gets the results in-memory
    X_train = X_batch.compute()
    y_train = y_batch.compute()
    
    # Train my model
    worker.model_partial_fit(X_train, y_train)
    
    # IMPORTANT: Free .compute data from worker's memory
    del X_train, y_train


def worker_get_current_model(dask_worker):
    """ returns (coef_ , intercept_) from worker 
    
    :param dask_worker: This variable will be populated with the worker itself. (.run)
    """
    return dask_worker.get_model_coef(), dask_worker.get_model_intercept()


def worker_addr_generator():
    """ Simple generator to yield the address of each worker in a cyclic fashion (forever) """
    while True:
        for worker_addr in client.scheduler_info()['workers']:
            yield worker_addr


def round_terminates(batch_count):
    """ Dummy logic for round termination. To-be extended """
    if batch_count % 50 == 0:
        return True
    return False


def gather_and_combine_worker_models():
    """ Gathers all coef_ and intercept_ from workers, 
    combines them and returns their mean in global_coef, global_intercept
    """
    
    # Gather all the worker's coef_ , intercept_
    result = client.run(worker_get_current_model)

    workers_coefs = []
    workers_intercepts = []

    for _, (worker_coef, worker_intercept) in result.items():
        workers_coefs.append(worker_coef)
        workers_intercepts.append(worker_intercept)

    # Combine all the worker's coefficients
    global_coef = np.mean(workers_coefs, axis=0)

    # Combine all the worker's intercepts
    global_intercept = np.mean(workers_intercepts, axis=0)
    
    del result, workers_coefs, workers_intercepts
    
    return global_coef, global_intercept
    

# Initialize models of Workers

# This calls worker_initialize_model on all currently known workers immediately, blocks until results come back
_ = client.run(
    worker_initialize_model,
    model_name="PassiveAggressiveClassifier",
    classes=classes,
    n_features=d,
    **kwargs
)


# Start rounds

batch_count, round_count = 0, 0

for X_batch, y_batch, worker_addr in zip(X_batches, y_batches, worker_addr_generator()):
    
    # See after the code for specific comment about this
    # 'Send' data (!not data itself!, but a future) to the worker with address 'worker_addr'
    X_batch_scattered = client.scatter(X_batch, workers=worker_addr)
    y_batch_scattered = client.scatter(y_batch, workers=worker_addr)

    future = client.submit(
        worker_batch_train,
        X_batch=X_batch_scattered,
        y_batch=y_batch_scattered,
        workers=worker_addr
    )
    
    fire_and_forget(future)
    
    batch_count += 1
    
    # already sent to scheduler, free up memory
    del future, X_batch_scattered, y_batch_scattered
    
    if round_terminates(batch_count):
        
        # Combine all worker's models and synchronize the global model
        global_model.coef_, global_model.intercept_ = gather_and_combine_worker_models()

        # Synchronize the workers with the global model, blocks until finished
        future = client.run(
            worker_synchronize_with_global_model,
            global_model_coef=global_model.coef_,
            global_model_intercept=global_model.intercept_,
        )

        round_count += 1
        
        del future


# Synchronize the global model one last time (we don't synchronize the workers again just for simplicity)
global_model.coef_, global_model.intercept_ = gather_and_combine_worker_models()

CPU times: total: 32.7 s
Wall time: 27.6 s


In [23]:
batch_count

245

In [24]:
round_count

4

Usually:

`future = client.submit(func, big_data)  # bad`

`big_future = client.scatter(big_data)  # good`

`future = client.submit(func, big_future)  # good`

Also (see https://docs.dask.org/en/stable/futures.html):

'Dask will only compute and hold onto results for which there are active futures. In this way, your local variables define what is active in Dask. When a future is garbage collected by your local Python session, Dask will feel free to delete that data or stop ongoing computations that were trying to produce it.' 

### Accuracy test

In [25]:
from sklearn.metrics import accuracy_score

X_test_computed = X_test.compute()
y_test_computed = y_test.compute()

y_pred = global_model.predict(X_test_computed)
accuracy_score(y_test_computed, y_pred)

0.8995983935742972

In [26]:
del y_test_computed
del X_test_computed

## 5️⃣ Online cont...

### Stop Cluster/Client

In [27]:
# Close Cluster/Client
client.close()
cluster.close()