# Sommes distribuées de matrices avec Dask

Ce notebook montre plusieurs choses :
  * Comment démarrer un cluster Dask depuis un notebook sur TREX
  * Comment utiliser Dask pour paralléliser des appels de fonctions avec *dask.delayed*

## Create cluster

Nous allons démarrer un cluster Dask. Ce cluster sera un PBS cluster qui se connectera au cluster HAL.

### Imports
Pour créer un cluster SLURM, nous allons utiliser le module *dask_jobqueue* . Il permet d'initialiser un cluster en quelques lignes  depuis un notebook.

In [1]:
from dask_jobqueue import SLURMCluster
from distributed import LocalCluster, Client

### Cluster initialisation

Le cluster sera composé de workers Dask, lancé via des jobs SLURM. Chaque job SLURM lancera 4 worker chacun utilisant 2 cpus et 16 GB de mémoire dans SLURM. 

In [14]:
cluster = SLURMCluster(
    # Dask-worker specific keywords
    n_workers=4,  # start 4 workers
    cores=2,  # each worker runs on 2 cores
    memory="16GB",  # each worker uses 16GB memory
    processes=1,  # Number of Python processes to cut up each job
    local_directory="$TMPDIR",  # Location to put temporary data if necessary
    account="supporthpc",
    walltime="01:00:00",
    interface="ib0",
    log_directory="../dask-logs",
    job_extra_directives=[],
    # job_extra_directives=['--qos="cpu_2022_1280"'],)         # qos to use
)
cluster

Perhaps you already have a cluster running?
Hosting the HTTP server on port 46197 instead


0,1
Dashboard: http://10.11.2.29:46197/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.11.2.29:34345,Workers: 0
Dashboard: http://10.11.2.29:46197/status,Total threads: 0
Started: Just now,Total memory: 0 B


Pour le moment, il n'y a pas de worker Dask. En accédant au Dashboard, nous pouvons nos workers.

In [15]:
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://10.11.2.29:46197/status,

0,1
Dashboard: http://10.11.2.29:46197/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.11.2.29:34345,Workers: 0
Dashboard: http://10.11.2.29:46197/status,Total threads: 0
Started: Just now,Total memory: 0 B


Nous pouvons voir que nous avons dorénavant des workers Dask.

## Compute facets contribution
Nous allons maintenant appliquer des sommes distribuées de matrices avec le cluster que nous venons de créer. Pour cela, nous aurons besoin de *dask.delayed*, qui permet d'indiquer à Dask la fonction que nous souhaitons paralléliser.

In [4]:
import dask.delayed
import dask.array as da
import numpy as np

In [5]:
# List of MNT facets
facets = range(10)

Ici, nous définissons la fonction qui sera parallélisée.

In [6]:
# Fonction that takes a facet and returns a matrix corresponding
# to the controibution of the facet for the image
def my_f(i):
    return i * np.ones((2048 * 8, 2048 * 8), dtype=complex)

Et ensuite, nous définissons un vecteur de fonction *delayed* pour Dask. Lorsque nous utilisons *delayed*, la fonction n'est pas exécutée directement. A la place, Dask en fait un *delayed object* qui permet de tracer les fonctions à exécuter et ses arguments (ici, chaque valeur du vecteur facets).  

In [16]:
lazy_arrays = [dask.delayed(my_f)(i) for i in facets]
lazy_arrays

[Delayed('my_f-67f738fc-525d-40f1-b2d6-f2f95daeccf8'),
 Delayed('my_f-23cd0e27-f766-4f35-964e-b227a61f95ed'),
 Delayed('my_f-656263ea-a81f-4bb3-95ee-9d84ac7355fb'),
 Delayed('my_f-49505daa-b20c-41ac-806b-2752b6b5c31a'),
 Delayed('my_f-1513656b-ae83-47e5-bbf6-13298d2f7405'),
 Delayed('my_f-9bfc5d62-d390-43c5-975a-a7a8f83a5d9d'),
 Delayed('my_f-cab24b9a-9cc0-45db-9a24-79d1776e5da7'),
 Delayed('my_f-4c1ca02b-d4aa-4ae6-9a57-4a6a20e968f7'),
 Delayed('my_f-fd2b9950-ae6d-498f-a9de-6c6d27d32027'),
 Delayed('my_f-6edb0068-2920-4bb2-970f-a7abbf45e14c')]

Nous pouvons voir les fonctions *my_f* dans les objets *Delayed*. Comme nous avions 10 éléments dans *facets*, nous avons donc dix fonctions.  
A partir de ces fonctions dites *lazy* (elles ne s'exécutent pas directement), nous indiquons à Daks d'en faire des vecteurs. Ensuite, nous utilisons *stack* afin d'obtenir un bloc.

In [17]:
arrays = [
    da.from_delayed(
        lazy_array, dtype=complex, shape=(2048 * 8, 2048 * 8)  # for every lazy value
    )
    for lazy_array in lazy_arrays
]

stack = da.stack(arrays, axis=0)
stack

Unnamed: 0,Array,Chunk
Bytes,40.00 GiB,4.00 GiB
Shape,"(10, 16384, 16384)","(1, 16384, 16384)"
Dask graph,10 chunks in 21 graph layers,10 chunks in 21 graph layers
Data type,complex128 numpy.ndarray,complex128 numpy.ndarray
"Array Chunk Bytes 40.00 GiB 4.00 GiB Shape (10, 16384, 16384) (1, 16384, 16384) Dask graph 10 chunks in 21 graph layers Data type complex128 numpy.ndarray",16384  16384  10,

Unnamed: 0,Array,Chunk
Bytes,40.00 GiB,4.00 GiB
Shape,"(10, 16384, 16384)","(1, 16384, 16384)"
Dask graph,10 chunks in 21 graph layers,10 chunks in 21 graph layers
Data type,complex128 numpy.ndarray,complex128 numpy.ndarray


Dask propose une visualisation dans Jupyter qui permet d'avoir une vision du bloc de données. Si nous décidons de redéfinir la taille d'un chunk dans le bloc, Dask se met à jour : 

In [18]:
stack = stack.rechunk((1, 4096, 4096))
stack

Unnamed: 0,Array,Chunk
Bytes,40.00 GiB,256.00 MiB
Shape,"(10, 16384, 16384)","(1, 4096, 4096)"
Dask graph,160 chunks in 22 graph layers,160 chunks in 22 graph layers
Data type,complex128 numpy.ndarray,complex128 numpy.ndarray
"Array Chunk Bytes 40.00 GiB 256.00 MiB Shape (10, 16384, 16384) (1, 4096, 4096) Dask graph 160 chunks in 22 graph layers Data type complex128 numpy.ndarray",16384  16384  10,

Unnamed: 0,Array,Chunk
Bytes,40.00 GiB,256.00 MiB
Shape,"(10, 16384, 16384)","(1, 4096, 4096)"
Dask graph,160 chunks in 22 graph layers,160 chunks in 22 graph layers
Data type,complex128 numpy.ndarray,complex128 numpy.ndarray


Maintenant, nous demandons à Dask d'effectuer une somme sur le premier axe. Il faut par ailleurs noter que pour le moment, aucune fonction n'est exécutée.

In [19]:
stack_sum = stack.sum(axis=0)
stack_sum

Unnamed: 0,Array,Chunk
Bytes,4.00 GiB,256.00 MiB
Shape,"(16384, 16384)","(4096, 4096)"
Dask graph,16 chunks in 25 graph layers,16 chunks in 25 graph layers
Data type,complex128 numpy.ndarray,complex128 numpy.ndarray
"Array Chunk Bytes 4.00 GiB 256.00 MiB Shape (16384, 16384) (4096, 4096) Dask graph 16 chunks in 25 graph layers Data type complex128 numpy.ndarray",16384  16384,

Unnamed: 0,Array,Chunk
Bytes,4.00 GiB,256.00 MiB
Shape,"(16384, 16384)","(4096, 4096)"
Dask graph,16 chunks in 25 graph layers,16 chunks in 25 graph layers
Data type,complex128 numpy.ndarray,complex128 numpy.ndarray


La commande *persist* met les données en mémoire du cluster. Cela provoque l'exécution de la somme de la matrice en tâche de fond. Si nous avons l'accès au dashboard, nous pouvons voir l'exécution de la fonction directement.

In [11]:
stack_sum = stack_sum.persist()
stack_sum

Unnamed: 0,Array,Chunk
Bytes,4.00 GiB,256.00 MiB
Shape,"(16384, 16384)","(4096, 4096)"
Dask graph,16 chunks in 1 graph layer,16 chunks in 1 graph layer
Data type,complex128 numpy.ndarray,complex128 numpy.ndarray
"Array Chunk Bytes 4.00 GiB 256.00 MiB Shape (16384, 16384) (4096, 4096) Dask graph 16 chunks in 1 graph layer Data type complex128 numpy.ndarray",16384  16384,

Unnamed: 0,Array,Chunk
Bytes,4.00 GiB,256.00 MiB
Shape,"(16384, 16384)","(4096, 4096)"
Dask graph,16 chunks in 1 graph layer,16 chunks in 1 graph layer
Data type,complex128 numpy.ndarray,complex128 numpy.ndarray


La commande *compute* provoque l'exécution de la fonction de somme. Mais étant donné que nous avons utilisé la commande *persist* à l'étape précédente, la fonction est déjà en cours ou terminée. Par conséquent, il est possible que cette dernière cellule s'exécute instantanément. 

In [20]:
stack_sum[1, :].compute()

array([45.+0.j, 45.+0.j, 45.+0.j, ..., 45.+0.j, 45.+0.j, 45.+0.j])

In [13]:
client.close()
cluster.close()