Import packages and set up a cluster

In [1]:
import os

# Import the package
import ipyparallel as ipp

# Get number of cores (for one node)
cpu_count = int(os.getenv('SLURM_CPUS_ON_NODE'))

# Create a remote cluster (It only takes one line!)
rc = ipp.Cluster(n=cpu_count).start_and_connect_sync()

Using existing profile dir: '/global/home/users/cth/.ipython/profile_default'
Starting 4 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>


HBox(children=(IntProgress(value=0, max=4), HTML(value='')))




First create a direct view, which lets you run tasks symmetrically across engines

In [2]:
dview = rc[:]

There are two ways to import packages on the engines

In [3]:
# Import via execute
dview.execute('import numpy as np')

# Import via sync_imports
with dview.sync_imports():
    import numpy as np

importing numpy on engine(s)


The push command lets you send data to each engine

In [4]:
# Send data to each engine
dview.push(dict(a=1.03234, b=3453))
for i in range(cpu_count):
  rc[i].push({'id': rc.ids[i]})

Some commands will return an asynchronous object

In [5]:
# Apply and then get
ar = dview.apply(lambda x: id+x, 27)
print(ar)
# Get the result
ar.get()

<AsyncResult: <lambda>>


[27, 28, 29, 30]

There are other ways to make sure your code finishes running before moving on

In [6]:
# Can use apply sync
dview.apply_sync(lambda x: id+x+np.random.rand(2), 27)

# Or use blocking for all operations
dview.block=True
dview.apply(lambda x: id+x, 27)

[27, 28, 29, 30]

A load balance view assigns tasks to keep all of the processors busy

In [7]:
# Create a balanced load view
lview = rc.load_balanced_view()

# Cause execution on main process to wait while tasks sent to workers finish
lview.block = True

We will calculate pi by monte carlo, let's define a function that checks if two points are in the unit circle

In [8]:
def uc_check(input):
  if input[0] ** 2 + input[1] ** 2 < 1:
    return 1
  else:
    return 0

We now generate many random points in the unit square, we ask the load balanced view to split these random numbers across engines

In [9]:
# Generate randoms numbers
rn = np.random.rand(int(1e4)).reshape(-1,2)
# A list of pairs of numbers
rn

array([[0.91740808, 0.98834548],
       [0.32375984, 0.62232555],
       [0.94115504, 0.85845923],
       ...,
       [0.8258353 , 0.07942718],
       [0.59974559, 0.34505376],
       [0.60311859, 0.4529335 ]])

In [10]:
# Execute map
%time pi4 = lview.map(uc_check, rn)   # Run calculation in parallel
# Estimate pi
print(np.mean(pi4) * 4)

CPU times: user 6.01 s, sys: 427 ms, total: 6.44 s
Wall time: 7.57 s
3.1344


Parallel programming isn't always more efficient

In [11]:
%time np.array(rn.T[0,:] ** 2 +  rn.T[1,:] ** 2 < 1).mean()*4

CPU times: user 789 µs, sys: 0 ns, total: 789 µs
Wall time: 507 µs


3.1344

In [12]:
rn2 = np.random.rand(int(1e9)).reshape(-1,2)
%time np.array(rn2.T[0,:] ** 2 +  rn2.T[1,:] ** 2 < 1).mean()*4

CPU times: user 2.69 s, sys: 1.69 s, total: 4.38 s
Wall time: 4.38 s


3.141604848