In [3]:
import ipyparallel as ipp # import the client
rc = ipp.Client()

In [4]:
rc.ids # list the ids of the engine the client can communicate with


[5, 6, 7]

The client has two primary way to farm out work to the engines. First is a direct view. This is used to apply the same work to all engines. To create a DirectView just slice the client.

With a direct view you can issue a function to execute within the context of that engine's Python process.


In [3]:
dv = rc[:]
dv

<DirectView [0, 1, 2, 3,...]>

In [None]:
%%px
import os
os.getpid()

In [None]:
%%px
foo = 'bar on pid {}'.format(os.getpid())

In [None]:
%%px
foo

In [None]:
dv['foo']

In [None]:
# now we can overwrite its value
dv['foo'] = 'bar'
dv['foo']

There are many cases where you don't want the same data on each machine, but rather you want to chuck an list and distribute each chunk to an engine. The DirectView provides the .scatter and the .gather methods for this.

In [4]:
# start with a list of ids to work on
user_ids = list(range(1000))
dv.scatter('user_id_chunk', user_ids)

got unknown result: 65972dc1-0bb7c427b2f0f4dcd7c1f0cf
got unknown result: 50181ba6-2caee84ffde47e0b600d5023


<AsyncResult: scatter:finished>

Notice that this method completed almost immediately and returned an AsyncResult. All the methods we have used up to now have be blocking and synchronous. The scatter method is aysnc. To turn this scatter into a blocking call we can chain a .get() to the call.

In [5]:
# Now we have a variable on each engine that holds an equal amount of the original list.
dv.scatter('user_id_chunk', user_ids).get() 

[None, None, None, None, None, None, None]

In [6]:
%%px
print("Len", len(user_id_chunk))
print("Max", max(user_id_chunk))

[stdout:0] 
Len 143
Max 142
[stdout:1] 
Len 143
Max 285
[stdout:2] 
Len 143
Max 428
[stdout:3] 
Len 143
Max 571
[stdout:4] 
Len 143
Max 714
[stdout:5] 
Len 143
Max 857
[stdout:6] 
Len 142
Max 999


Let's apply a simple function to each list. First, declare a function within each engine. The --local flag also executes the code block in your local client. This is very useful to help debug your code.

In [7]:
%%px --local
def the_most_interesting_transformation_ever(user_id):
    """
    This function is really interesting
    """
    return "ID:{}".format(user_id * 3)

In [8]:
the_most_interesting_transformation_ever(1)

'ID:3'

In [9]:
%%px
transformed_user_ids = list(map(the_most_interesting_transformation_ever, user_id_chunk))

In [10]:
# Now we have 4 separate list of transformed ids. 
# We want to stitch the disparate lists into one list on our local notebook. 
# gather is used for that.
all_transformed_user_ids = dv.gather('transformed_user_ids').get()
print(len(all_transformed_user_ids))
print(all_transformed_user_ids[0:10])

1000
['ID:0', 'ID:3', 'ID:6', 'ID:9', 'ID:12', 'ID:15', 'ID:18', 'ID:21', 'ID:24', 'ID:27']
