# Using ipyparallel on the cluster

# Introduction

The package `ipyparallel` is tightly integrated with Jupyter notebooks and IPython.

# SLURM setup

The SLURM `batch_job.slurm` script is relatively simple; we request 2 independent tasks, 4 CPU per task, and 10G of memory per CPU. The tasks may or may not land on the same node. 

## The SLURM script
```python
# %load "../batch_job.slurm"
```
---
```bash
#!/bin/bash

#SBATCH --ntasks=2
#SBATCH --cpus-per-task=4
#SBATCH --mem-per-cpu=10G
#SBATCH --time=0-03:00:00

# Sources the appropriate packages and conda environments
source job-env.sh

# Start the controller and engines 
srun -l --multi-prog cluster.conf &


# Runs the application
${APP}
```

## The job environment

```python
# %load "../job-env.sh"
```
---
```bash
#!/bin/bash

# This file either loads or creates an appropriate conda environment 

NARGS="$#"

case $NARGS in
  (0)
    # Sets a default value for the environment name if not present
    MY_NAME="ipyparallel_env"
    ;;
  (1)
    MY_NAME="$1"
    ;;
  (*)
    (>&2 echo "Error: did not expect more than one argument.")
    (>&2 echo "    (Got $@)")
    exit 1
    ;;
esac
```

```bash
# Loads necessary ACCRE packages
module purge 
module load Anaconda2

# Checks if environment name is valid and, if so, exports name
if [[ "$MY_NAME" =~ ^[0-9A-Za-z_]+$ ]]; then
  export MY_CONDA_ENV=$MY_NAME ;
else
  (>&2 echo "Invalid name $MY_NAME$")
  exit 1
fi

# If the conda environment exists, then activates the environment
# else creates the new conda environment with the Makefile
if $(conda env list | grep -q $MY_CONDA_ENV); then
  echo "Found existing conda environment $MY_CONDA_ENV" 
  source activate $MY_CONDA_ENV 
else
  echo "Creating conda environment $MY_CONDA_ENV";
  make env
  source activate $MY_CONDA_ENV 
  make install
  make test
fi
```

```bash
## Set additional environment variables

# PROFILE should point to a network drive, otherwise, the JSON created
# by ipcontroller needs to be copied to each host.
# The path provided here is stored in $HOME/.ipython/profile_*

export PROFILE=job_${SLURM_JOB_ID}_$(hostname)
echo "Creating profile ${PROFILE}"
ipython profile create ${PROFILE}

# Creates roles for each task to be run in parallel
LAST_PROC=$(( $SLURM_NTASKS - 1 )) # id of the last process
cluster_conf=$"# This file has been generated by $0\n"
cluster_conf+=$"0         ./cluster-roles.sh controller\n"
cluster_conf+=$"1-$LAST_PROC      ./cluster-roles.sh engine"

echo -e $cluster_conf > cluster.conf

# Creates output filename including timestamp
OUTFILE=pi_estimate$(date +%Y%m%d_%H%M%S).txt
echo Using output file $OUTFILE

export APP="python echo.py --profile ${PROFILE} -n 1e12 -o $OUTFILE" 
```

## The cluster configuration

From `batch_job.slurm`:

```bash
# Start the controller and engines 
srun -l --multi-prog cluster.conf &
```

`srun` is a SLURM command that supercedes the `mpiexec` commands from mpi, i.e., it executes code for tasks in parallel. The `--multi-prog` option allows hetereogeneous tasks to be defined in a file, namely `cluster.conf` which
was created in the `job-env.sh` file.

```python
# %load "../cluster.conf"
```
---
```bash
# This file has been generated by /bin/bash
0 ./cluster-roles.sh controller
1-13 ./cluster-roles.sh engine
```

## Launch controller and engines

```python
# %load "../cluster-roles.sh"
```
---

```bash
#!/bin/bash
# launcher.sh

if [[ "$#" -ne 1 ]]; then
  echo "USAGE: bash $0 ROLE" 
  exit 1
else
  ROLE=$1
fi

case $ROLE in 
  ("controller")
    echo "Launching controller"
    ipcontroller --ip="*" --profile=${PROFILE}
    ;;
  ("engine")
    sleep 5
    echo "Launching engines on $(hostname)"
    ipengine --profile=${PROFILE}
    ;;
esac
```


# Using the code

## Setup

In [5]:
from ipyparallel import Client
import os

# Establish connection
Create the client, whose configuration is stored on the network mounted drive in the file `$PROFILE`. The default path used is in 

```
$HOME/.ipython/profile_*
```

`$HOME` is, in fact, a GPFS mounted drive on ACCRE.

In [6]:
rc = Client(profile=os.environ['PROFILE'])

# Sync imports on each engine

This ensures that each engine process will import the same packages. Here, we'll import the subprocess module, which allows python to spawn processes running in bash, i.e. shell commands.

In [7]:
with rc[:].sync_imports():
    import subprocess

importing subprocess on engine(s)


# Magic

The `%px` line magic command is used to execute a single line of code on each process. Here, let's use `subprocess` to figure out which nodes our SLURM tasks have been assigned to.

In [8]:
%px print(subprocess.check_output("hostname"))

[stdout:0] b'vmp1250\n'
[stdout:1] b'vmp1250\n'
[stdout:2] b'vmp1251\n'
[stdout:3] b'vmp1251\n'


The `%%px` cell magic command is used to execute blocks of code on each engine. Here, let's see how SLURM is keeping up with the tasks we've asked for.

In [9]:
%%px 
import os

print(os.environ['SLURM_PROCID'])

[stdout:0] 0
[stdout:1] 1
[stdout:2] 2
[stdout:3] 3


*Note that `PROCID` 0 is not present, because we haven't started an engine on process 0 in the `cluster.conf` file.*

# A practical(?) example

From [Rosetta Code](http://rosettacode.org/wiki/Parallel_Brute_Force#Python)

> Task

> Find, through brute force, the five-letter passwords corresponding with the following SHA-256 hashes:

> 1. 1115dd800feaacefdf481f1f9070374a2a81e27880f187396db67958b207cbad
> 2. 3a7bd3e2360a3d29eea436fcfb7e44c735d117c42d1c1835420b6b9942dd4f1b
> 3. 74e1bb62f8dabb8125a58852b63bdf6eaef667cb56ac7f7cdba6d7305c50a22f

> Your program should naively iterate through all possible passwords consisting only of five lower-case ASCII English letters. It should use concurrent or parallel processing, if your language supports that feature. You may calculate SHA-256 hashes by calling a library or through a custom implementation. Print each matching password, along with its SHA-256 hash. 

## Setup

We need to process chunks of data in each process, and we need the load to be balanced among all the tasks. We use the `Client` method `load_balanced_view`.

In [10]:
lview = rc.load_balanced_view()

We also need the `hashlib` library in order to compute the hash for each potential password.

In [11]:
with rc[:].sync_imports():
    from hashlib import sha256


importing sha256 from hashlib on engine(s)


## The worker function

The total number of passwords we need to try is $26^5$, so we'll pass each integer from $0$ to $26^5$ and map it to it's corresponding word and hash. Essentially, what we need to do is convert each base-10 integer to base-26; to get the first letter of the word, we divide by $26^4$ to get a value in the range $0-25$. Then, we take the remainder and divide by $26^3$ to get the second letter and so on. To get the ASCII value for each letter, we have to add $97$. Finally, we compute the sha256 digest.

In [12]:
def hash_from_serial(serial):
    divisor = 456976
    letters = []
    for i in range(5):
        letter, serial = divmod(serial, divisor)
        letters.append( 97 + int(letter) )
        divisor /= 26
    digest = sha256(bytes(letters)).hexdigest()
    if digest in hashes:
        password = "".join(chr(x) for x in letters)
        return {digest: password}

In [13]:
%%px
hashes = {
    "1115dd800feaacefdf481f1f9070374a2a81e27880f187396db67958b207cbad",
    "3a7bd3e2360a3d29eea436fcfb7e44c735d117c42d1c1835420b6b9942dd4f1b",
    "74e1bb62f8dabb8125a58852b63bdf6eaef667cb56ac7f7cdba6d7305c50a22f"
}

Here are some example results of our mapping function, which we apply to the first five elements of the series. Note that we set the view to blocking, so that each process must finish execution before the code proceeds.

In [14]:
lview.block = True
result = lview.map(hash_from_serial, range(5))
result

[None, None, None, None, None]

If we set blocking to false, we get the asynchronous result, which is, in essence, a future.

In [15]:
lview.block = False
result = lview.map(hash_from_serial, range(5))
result

<AsyncMapResult: hash_from_serial>

We can gather the results from an `AsyncMapResult` with the `get` method.

In [16]:
lview.block = False
result = lview.map(hash_from_serial, range(5)).get()
result

[None, None, None, None, None]

# Putting it all together

Let's set up our problem and use our mapping function to compute all possible hash digests.

Since we don't have a one-to-one mapping between data and tasks, we can send chunks of data to each tasks. We'll simply take the number of passwords and divide by the number of tasks.

In [17]:
import os
from math import ceil

num_passwords = int(26 ** 5)
num_procs = len(rc[:])
chunksize = ceil(num_passwords / num_procs)

assert( chunksize * num_procs >= num_passwords)

One very nice feature of the `AsyncMapResult` is that we can iterate over it as if it were a normal sequence, and it will behave as expected. Behind the scenes, our operations are being applied to results as they come in.

Here , we're simply going to filter our results to find those with hash digests equivalent to our unknowns.

In [46]:
lview.block = False
for v in filter(lambda x: x is not None, 
                lview.map(hash_from_serial, range(num_passwords), chunksize=chunksize)):
    print(v)


{'3a7bd3e2360a3d29eea436fcfb7e44c735d117c42d1c1835420b6b9942dd4f1b': 'apple'}
{'74e1bb62f8dabb8125a58852b63bdf6eaef667cb56ac7f7cdba6d7305c50a22f': 'mmmmm'}
{'1115dd800feaacefdf481f1f9070374a2a81e27880f187396db67958b207cbad': 'zyzzx'}


How long does this take, asynchronously?

In [47]:
%%timeit -n 1 -r 1 lview.block = False
lview.block = False
for v in filter(lambda x: x is not None, 
                lview.map(hash_from_serial, range(num_passwords), chunksize=chunksize)):
    print(v)


{'3a7bd3e2360a3d29eea436fcfb7e44c735d117c42d1c1835420b6b9942dd4f1b': 'apple'}
{'74e1bb62f8dabb8125a58852b63bdf6eaef667cb56ac7f7cdba6d7305c50a22f': 'mmmmm'}
{'1115dd800feaacefdf481f1f9070374a2a81e27880f187396db67958b207cbad': 'zyzzx'}
1 loop, best of 1: 23.6 s per loop


How long does this take with blocking?

In [None]:
%%timeit -n 1 -r 1 lview.block = True
lview.block = False
for v in filter(lambda x: x is not None, 
                lview.map(hash_from_serial, range(num_passwords), chunksize=chunksize)):
    print(v)


$26^5 \approxeq 12,000,000$ which is not that big, so the overhead of computing iterating over the asynchronous result is actually more expensive than just returning all the results at once.

# Getting Greedy: Multithreading in different processes

In [18]:
%px print(os.environ['SLURM_CPUS_PER_TASK'])

[stdout:0] 4
[stdout:1] 4
[stdout:2] 4
[stdout:3] 4


For this, we need to use a DirectView

In [19]:
def echo(s, t):
    return "{0}: {1} -> {2}".format(os.environ['SLURM_PROCID'], s, s+t)  

In [20]:
import math

span = math.ceil(num_passwords / num_procs)
starts = range(0, num_passwords, span)
spans = [span] * num_procs

assert(starts[-1] + span >= num_passwords)
assert(len(starts) == num_procs)

In [32]:
lview.map(echo, starts, spans).get()

['0: 0 -> 2970344',
 '2: 2970344 -> 5940688',
 '3: 5940688 -> 8911032',
 '1: 8911032 -> 11881376']

In [28]:
%%px
from multiprocessing import Pool
import hashlib
import os

def hash_from_serial(serial):
    divisor = 456976
    letters = []
    for i in range(5):
        letter, serial = divmod(serial, divisor)
        letters.append( 97 + int(letter) )
        divisor /= 26
    digest = sha256(bytes(letters)).hexdigest()
    if digest in hashes:
        password = "".join(chr(x) for x in letters)
        return {digest: password}

In [60]:
def use_pool(start, span):
    num_cpus = int(os.environ['SLURM_CPUS_PER_TASK'])
    chunksize = span // num_cpus + 1
    p = Pool(num_cpus)
    pwords = dict()
    val_gen = p.map(hash_from_serial, range(start, span), 
                          chunksize=chunksize)
    for v in filter(lambda x: x is not None, val_gen):
        pwords.update(v)
    return pwords

In [62]:
lview.map(use_pool, starts, spans).get()

[{'3a7bd3e2360a3d29eea436fcfb7e44c735d117c42d1c1835420b6b9942dd4f1b': 'apple'},
 {},
 {},
 {}]

In [44]:
passwords = dict()
for entry in filter(lambda x: x is not None, lview.map(use_pool, starts, spans)):
    passwords.update(entry)
print(passwords)

{'3a7bd3e2360a3d29eea436fcfb7e44c735d117c42d1c1835420b6b9942dd4f1b': 'apple'}


In [66]:
import hasher

ModuleNotFoundError: No module named 'hasher'

# Conclusions