# Distributed computing on HPC

To scale up computation, it can be relevant to transfer the computational load
to a remote server such as a high performance computing (HPC) cluster from a 
jupyter notebook.

We need first to create an environment with the necessary packages on the cluster,
 or add those to an existing environment.

## Installation
1. Connect to the remote server with ssh from your local machine:
```
ssh <REMOTE_USER>@<REMOTE_HOST>
```
2. Install  miniconda
```
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
chmod +x Miniconda3-latest-Linux-x86_64.sh
./Miniconda3-latest-Linux-x86_64.sh
conda init tcsh
```
3. Create a conda environement and install Jupyter Lab 
```
conda create -n dasktest
conda activate dasktest
conda install jupyterlab nodejs ipywidgets -c conda-forge -y
# optional packages:
# conda install scikit-image matplotlib pandas -y
```
4. Register the jupyter kernel
```
python -m ipykernel install --user --name dasktest
```

Note if you have already a working environment (eg myenv), you only need to make 
sure that you have a jupyter lab package installed:
```
conda activate myenv
conda install jupyterlab nodejs ipywidgets -c conda-forge -y
python -m ipykernel install --user --name myenv

```

## Connecting to a Jupyter notebook running on a remote server
We want to run a jupyter on the server from a local computer, to do so we need
to configure an python environment on the remote server, then we can connect 
directly or via a ssh tunnel.

### Directly
1. Connect to the remote computer
```
ssh <REMOTE_USER>@<REMOTE_HOST>
```
2. Optinally request ressources on the cluster. In a SLURM managed cluster, type
```
srun --partition gpu --pty tcsh -i
```
2. Start jupyter on the remote computer
```
conda activate dasktest
jupyter lab --no-browser --ip 0.0.0.0
```
3. Copy the link indicating the remote computer, click in the 'Jupyter Server:Local' in visual code menu bar and paste the link in the menu appearing at the top of the window. You can also open the link in a a browser.

### With a ssh tunnel
1. Open a ssh tunnel using the same port than the notebook by runing on the local
machine:
```
ssh -L 8080:localhost:8080 <REMOTE_USER>@<REMOTE_HOST>
```
This command opend a interactive session on the cluster.
2. Start a jupyter lab server from the environment
```
conda activate dasktest
jupyter lab --no-browser --ip="*" --port 8080
```
3. Connect to the notebook by opening a browser and navigating to
http://localhost:8080 or use the link provided http://localhost:8888/lab?token=
Alternatively, you can use a remote server in visual code by clicking on
Jupyter Server:Local in the task bar and paste the link http://localhost:8888/lab?token=
when prompted. New kernel will then be visible.

At this point, we have an openned terminal connected to the cluster with
jupyter lab running. We also have either a web browser tab or a Visual Code displaying a notebook.

At  the end of the session, we need to stop Jupyter Lab by pressing CTRL-C in the
terminal running Jupyter Lab or using the File>Shutdown in the jupyter lab 
interface. Then logout from the terminal to stop the session.

## Using Dask distributed
Dask allows to perform parallel and distributed in python using well know data
structures such as numpy's ndarray and pandas's dataframes. Additionally we can 
use [dask-jobqueue](https://jobqueue.dask.org/) to manage the connection to a job 
scheduler such as SLURM.

We need to install dask on the remote computer and the extensions for jupyter lab:
```
condata activate dasktest
conda install dask distributed -c conda-forge
pip install dask_labextension
jupyter labextension install dask-labextension
jupyter labextension install @jupyter-widgets/jupyterlab-manager
```

Open a notebook on the remote computer and create a cluster scheduler:

In [None]:
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, progress
cluster = SLURMCluster(
     cores=1,       # size of a single job
     memory='64GB', # for a single job
     shebang='#!/usr/bin/env tcsh',
     processes=1,   # number of python process per job
     queue='',      # cpu,gpu or ml
     local_directory='/ssd',               
     walltime='02:00:00', # 2 hours wall time
)
cluster.adapt(maximum_jobs=20)

Create a client to connect to the scheduler and display the client. This will
print a link that you can copy paste in the Juypter lab dask extension tab in 
order to monitor the active processes.

In [None]:
client = Client(cluster)
client

One typical example is to load a list of files to process in a data frame. 
See also the [bath-processing](batch-processing.ipynb) notebook.

In [None]:
import pandas as pd
from pathlib import Path
folder = Path('../data')
# load a Dask Data Frame listing the files and additional informations
exp = pd.read_csv(folder/'experiment.csv')
exp

Using Dask, we can then map each entry to be processed in parallel.

Note that calling dask.delayed on function loading array using dask will load 
the all file each time. Here we lazily load the images before hand and process
them one by one.

In [None]:
import nd2
import dask

# load all images lazily
imgs = [nd2.imread(folder/f, dask=True) for f in exp['filename']]

# process each image
def process_image(img):
    return img.mean(), img.std()    

# create tasks for each file
tsk = [dask.delayed(process_image)(img) for img in imgs]


Scale the cluster (ask for workers)

In [None]:
cluster.scale(100)

You can check now in the command line that opened the tunnel for example, the 
status of the worker using the command:
```
squeue -u $USER
```
Note, send the jupyter notebook server in the background using `Ctrl-z` in the 
terminal and then type the previous command. Bring back jupyter in the forground
typing `fg`.

Once the jobs are running (the column ST should display R when running squeue),
you can start launch the tasks.

In [None]:
# run the tasks
result = dask.compute(tsk)

If we want to store the result in a pandas' data frame, it can be convenient to
 map a function to the input list of files.

In [None]:
import dask.dataframe as dd

# define the func to process blocks of the dataframe
def process_rows(df):
      """Process rows of the data frame"""
      result = []
      for x in df.itertuples():            
            # retreive the line of the input data frame
            # for example we could open a file and process it
            fname = x.filename
            m = 1
            # create a data frame, note that values must be lists or you need 
            # to pass an index            
            result.append(pd.DataFrame({'filename':[fname], 'mean':[m]}))
      return pd.concat(result,ignore_index=True)


# schedule the computations
ddf = dd.from_pandas(exp, chunksize=1).map_partitions(process_rows,
                            meta={'filename':'object', 'mean':'f'})

# compute the values
res = ddf.compute()
# merge the new columns to the original table
exp.merge(res, on='filename')

## Batch process files in a folder

In [None]:
import tifffile
import dask
from pathlib import Path
from dask.distributed import Client
import pandas as pd

#Define the function to load and process files
def process_file(filename):
    img = tifffile.imread(filename)
    return img.mean()

client = Client()

# define the folder where the datafiles are
folder = Path('../data/')

# list all tif files
filelist = [f for f in folder.glob('[!.]*.tif')]

# create a task for each files, and start them immediately
tsk = [client.submit(process_file,f) for f in filelist]

# gather the results and store it in a data frame
pd.DataFrame({
    'File name':filelist,
    'Mean intenisty':[r.result() for r in tsk]
    })


We can also define two functions that we combine to define a graph of task that
will be executed lazily. For this we can use the decorator `dask.delayed`.

In [None]:
import tifffile
import dask
from pathlib import Path
from dask.distributed import Client
import pandas as pd

#Define the function to load files
@dask.delayed
def load_file(filename):    
    img = tifffile.imread(filename)
    return img

#Define the function to process the image
@dask.delayed
def process_image(img):            
    return img.mean()

# define the folder where the datafiles are
folder = Path('../data/')

# list all tif files
filelist = [f for f in folder.glob('[!.]*.tif')]

# create a list of task 
tsk = [process_image(load_file(f)) for f in filelist]

# gather the results and store it in a data frame
pd.DataFrame({
    'File name':filelist,
    'Mean intenisty':[r.compute() for r in tsk]
    })

## Lazy loading

How to read a tiff as a delayed dask array:

In [None]:
import zarr
import tifffile
import dask.array

def tiffileimreaddask(filename):
    store = tifffile.imread(filename, aszarr=True)
    array = dask.array.from_zarr(store)
    return array

img = tiffileimreaddask('../scratch/tmp.tif')
img