Import packages and set up a cluster

In [None]:
import os

# Import the package
import ipyparallel as ipp

# Get number of cores (for one node)
n_workers = int(os.getenv('SLURM_CPUS_ON_NODE'))

# Create a remote cluster (It only takes one line!)
rc = ipp.Cluster(n=n_workers).start_and_connect_sync()

Then create a *direct view*, which lets you run tasks across all the workers in a simple fashion:

In [None]:
dview = rc[:]

There are two ways to import packages on the engines

In [None]:
# Import via execute
dview.execute('import numpy as np')

# Import via sync_imports
# with dview.sync_imports():
#    import numpy as np

The push command lets you send data to each engine

In [None]:
# Send data to all workers
dview.push(dict(a=1.03234, b=3453))

# Manual approach to send to each worker
for i in range(cpu_count):
  rc[i].push({'num': rc.ids[i]})

Some commands will return an asynchronous object

In [None]:
# Apply and then get
async_object = dview.apply(lambda x: id+x, 27)
print(async_object)
# Get the result
async_object.get()

There are other ways to make sure your code finishes running before moving on

In [None]:
# Can use apply sync
dview.apply_sync(lambda x: num+x, 27)

# Or use blocking for all operations
dview.block=True
dview.apply(lambda x: num+x, 27)

A *load balanced* view assigns tasks to keep all of the workers busy:

In [None]:
# Create a balanced load view
lview = rc.load_balanced_view()

# Cause execution on main process to wait while tasks sent to workers finish
lview.block = True

To calculate $pi$ by Monte Carlo simulation, let's define a function that checks if two points are in the unit circle. Each worker will process a large number of points in a vectorized fashion.


In [None]:
def local_mean(seed):
  rng = np.random.default_rng(seed=seed)
  x = rng.uniform(size = 100000).reshape(-1,2)
  return np.mean(x[:,0]**2 + x[:,1]**2 < 1)


In [None]:
# Execute map
m = 100
pi4 = lview.map(local_mean, range(m))   # Run calculation in parallel
# Estimate pi
print(np.mean(pi4) * 4)
