In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import ipyparallel as ipp
ipp.version_info

(6, 2, 4)

In [3]:
from setup_ipyparallel import SetupIpp
from my_package import BuildData, Compute, Collect, runtime, timer, file_content

## Introduction
This notebook and companion packages illustrate a possible use of [ipyparallel](https://ipyparallel.readthedocs.io/en/latest/index.html) with a contrived but simple example.

## Config

In [4]:
shared_dir = '/Users/Olivier/Documents/temp/ipp/demo'

folder_in = 'sample-data'
folder_out = 'sample-output'
file_in_common = 'common.json'
file_in = 'data-{}.json'
file_out = 'output-{}.json'

N = 8*5 # nb cores
list_id = [e for e in range(N)]

## Data

Random. This is a demo.

In [5]:
b = BuildData(shared_dir, N)
b.run()

create 40 files of integers between 0 and 1000
delete existing folder /Users/Olivier/Documents/temp/ipp/demo/sample-data
create folder /Users/Olivier/Documents/temp/ipp/demo/sample-data
create file common.json (with offset=3) in folder /Users/Olivier/Documents/temp/ipp/demo/sample-data
	common.json
create 40 files in folder /Users/Olivier/Documents/temp/ipp/demo/sample-data
	data-0.json
	data-1.json
	data-2.json
	data-3.json
	data-4.json
	data-5.json
	data-6.json
	data-7.json
	data-8.json
	data-9.json
	data-10.json
	data-11.json
	data-12.json
	data-13.json
	data-14.json
	data-15.json
	data-16.json
	data-17.json
	data-18.json
	data-19.json
	data-20.json
	data-21.json
	data-22.json
	data-23.json
	data-24.json
	data-25.json
	data-26.json
	data-27.json
	data-28.json
	data-29.json
	data-30.json
	data-31.json
	data-32.json
	data-33.json
	data-34.json
	data-35.json
	data-36.json
	data-37.json
	data-38.json
	data-39.json


## Sequential
  
Compute the average of each data chunk sequentially, adding the same offset to each.  
Here we pretend computing the average of a list of numbers is expensive by adding a 1s delay.  
The offset is just there to illustrate how to pass parameters.  
Input files are those created above.  
Output is both returned and saved on disk.  
Then gather all results and compute their average: the final result.  
It should be near 500+offset since the input numbers are random integers between zero and 1000.

In [6]:
t0 = timer()
for k in list_id:
    print(1+k, end=' ')
    c = Compute(k, 
                shared_dir, folder_in, folder_out,
                file_in_common, file_in, file_out,
                verbose=False)
    c.setup()
    c.run()
    c.save()
t1 = timer()
runtime(t0, t1)  

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 
runtime = 42.09 s


In [8]:
c = Collect(list_id, shared_dir, folder_out, file_out, verbose=True)
res_seq = c.run()
res_seq

collecting 40 files


503.50322500000004

## Go parallel
Now if you have several cores available on your or other machines, you can parallel the computation.  
Hopefully the runtime will be divided by the number of cores.

## Create Cluster
+ Make sure main and remote machines have access to shared directory on disk - where controller writes cluster info.
+ Copy/paste following commands in terminal - or run script from shared folder
+ Start controller on main machine
+ Start engines on main machine
+ Start engines on remote machine(s)

In [9]:
cluster_id = 'demo'
s = SetupIpp(shared_dir, cluster_id, nb_core=[6,7,8], local_packages=['my_package'])
s.run()

create ipyparallel scripts in /Users/Olivier/Documents/temp/ipp/demo
	launch_controller.sh
	launch_engine-6.sh
	launch_engine-7.sh
	launch_engine-8.sh
copy 1 local packages to /Users/Olivier/Documents/temp/ipp/demo
	my_package


## Run ipyparallel scripts
+ **First** launch controller
+ **Then** launch engine scripts on as one or more machines

In [10]:
# from terminal

## Audit ipp files
+ client file
+ engine file

In [11]:
radical = shared_dir+'/profile_default/security/ipcontroller-'+cluster_id
client_file = f'{radical}-client.json'
engine_file = f'{radical}-engine.json'
    

file_content(client_file)
file_content(engine_file)

/Users/Olivier/Documents/temp/ipp/demo/profile_default/security/ipcontroller-demo-client.json
{
  "ssh": "",
  "interface": "tcp://*",
  "registration": 62873,
  "control": 62874,
  "mux": 62876,
  "task": 62880,
  "task_scheme": "leastload",
  "iopub": 62882,
  "notification": 62884,
  "key": "90a9f3d5-54b0cb9d6a0ca89d108caf2d",
  "location": "Central",
  "pack": "json",
  "unpack": "json",
  "signature_scheme": "hmac-sha256"
}
/Users/Olivier/Documents/temp/ipp/demo/profile_default/security/ipcontroller-demo-engine.json
{
  "ssh": "",
  "interface": "tcp://*",
  "registration": 62873,
  "control": 62875,
  "mux": 62877,
  "hb_ping": 62878,
  "hb_pong": 62879,
  "task": 62881,
  "iopub": 62883,
  "key": "90a9f3d5-54b0cb9d6a0ca89d108caf2d",
  "location": "Central",
  "pack": "json",
  "unpack": "json",
  "signature_scheme": "hmac-sha256"
}


## Init parallel

In [12]:
rc = ipp.Client(client_file)
dv = rc[:] # direct view
lbv = rc.load_balanced_view() # load balanced view

print(rc.ids)
print(lbv.block)

[0, 1, 2, 3, 4, 5, 6]
False


## Push data to engines

In [13]:
dv.push({
    'shared_dir': shared_dir,
    'folder_in': folder_in,
    'folder_out': folder_out,
    'file_in_common': file_in_common,
    'file_in': file_in,
    'file_out': file_out,
})

<AsyncResult: _push>

## Engine functions

In [15]:
%%px --local

import numpy as np

from my_package import Compute

def compute(k, verbose=False):
    c = Compute(k, 
                shared_dir, folder_in, folder_out,
                file_in_common, file_in, file_out,
                verbose=verbose
               )
    c.setup()
    res = c.run()
    c.save()
    return res

def wrap_compute(k):
    res = compute(k)
    return res
    

## Local test run

In [16]:
compute(0, verbose=True)

start setup id=0
start run id=0
end run id=0 - time=1.00
start save id=0


{'base': 3, 'avg': 499.728, 'res': 502.728}

## Parallel run

In [17]:
amr = lbv.map_async(wrap_compute, list_id, chuncksize=1)
amr.wait_interactive()
res = amr.get()

print('wall time = {:.2f}\tserial time = {:.2f}\tratio = {:.2f}'.format(
    amr.wall_time,
    amr.serial_time,
    amr.serial_time / amr.wall_time)
)

  40/40 tasks finished after    6 s
done
wall time = 6.62	serial time = 43.58	ratio = 6.58


In [18]:
np.mean([e['res'] for e in res])

503.50322500000004

In [19]:
c = Collect(list_id, shared_dir, folder_out, file_out, verbose=False)
res_par = c.run()
res_par == res_seq

True