Skip to content

Commit

Permalink
Doc edits
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanie-wang committed Mar 28, 2017
1 parent cd41127 commit 46b5e56
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 103 deletions.
114 changes: 58 additions & 56 deletions doc/source/internals-overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ standalone fashion or it can be connect to an existing Ray cluster.
Running Ray standalone
~~~~~~~~~~~~~~~~~~~~~~

Ray can be used in a self-contained manner simply by calling ``ray.init()``
within a script. When the call to ``ray.init()`` happens, all of the relevant
processes are started up. For example, a local scheduler, a global scheduler, an
object store, and object manager, a Redis server, a number of worker processes,
and several more processes.
Ray can be used standalone by calling ``ray.init()`` within a script. When the
call to ``ray.init()`` happens, all of the relevant processes are started.
These include a local scheduler, a global scheduler, an object store and
manager, a Redis server, and a number of worker processes.

When the script exits, these processes will be killed.

Expand All @@ -30,15 +29,15 @@ To connect to an existing Ray cluster, simply pass the argument address of the
Redis server as the ``redis_address=`` keyword argument into ``ray.init``. In
this case, no new processes will be started when ``ray.init`` is called, and
similarly the processes will continue running when the script exits. In this
case, pretty much all of the processes (except for workers that correspond to
actors) are shared between different driver processes.
case, all processes except workers that correspond to actors are shared between
different driver processes.

Defining a remote function
--------------------------

A central component of this system is the **centralized control plane**. This is
implemented using one or more Redis servers. `Redis`_ can be thought of as an
in-memory key-value store.
implemented using one or more Redis servers. `Redis`_ is an in-memory key-value
store.

.. _`Redis`: https://github.com/antirez/redis

Expand All @@ -55,18 +54,17 @@ Now, consider a remote function definition as below.
return x + 1
When the remote function is defined as above, the function is immediately
pickled and stored in a Redis server. You can view the remote functions in the
centralized control plane as below.
pickled, assigned a unique ID, and stored in a Redis server. You can view the
remote functions in the centralized control plane as below.

.. code-block:: python
TODO: Fill this in.
Each worker process has a separate thread running in the background that listens
for remote functions being added to the centralized control state (again, using
Redis's publish-subscribe functionality). When a new remote function is added,
the thread fetches the pickled remote function, unpickles it, and is then
capable of executing that function.
Each worker process has a separate thread running in the background that
listens for the addition of remote functions to the centralized control state.
When a new remote function is added, the thread fetches the pickled remote
function, unpickles it, and can then execute that function.

Notes and limitations
~~~~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -105,41 +103,41 @@ When a driver or worker invokes a remote function, a number of things happen.
- The IDs or values of the arguments to the function. Python primitives like
integers or short strings will be pickled and included as part of the task
object. Larger or more complex objects will be put into the object store
with a call to ``ray.put`` (this happens under the hood), and the
resulting IDs are included in the task object. Object IDs that are passed
directly as arguments are also included in the task object.
- Object IDs for the return values of the task are generated and included in
the task object.
with an internal call to ``ray.put``, and the resulting IDs are included in
the task object. Object IDs that are passed directly as arguments are also
included in the task object.
- The ID of the task. This is generated uniquely from the above content.
- The IDs for the return values of the task. These are generated uniquely
from the above content.
- The task object is then sent to the local scheduler on the same node as the
driver or worker.
- The local scheduler makes a decision to either schedule the task locally or to
pass the task on to a global scheduler.

- If all of the task's object dependencies are present in the local object
store and their are enough CPU and GPU resources available to execute the
store and there are enough CPU and GPU resources available to execute the
task, then the local scheduler will assign the task to one of its
available workers.
- If those conditions are not met, then the task will likely be passed on to
a global scheduler. This is done by adding the task to the **task table**
which is part of the centralized control state and setting its state to
``WAITING``. The task table can be inspected as follows.
- If those conditions are not met, the task will be passed on to a global
scheduler. This is done by adding the task to the **task table**, which is
part of the centralized control state.
The task table can be inspected as follows.

.. code-block:: python
TODO: Fill this in.
If this happens, a global scheduler will be notified and will assign the
task to a local scheduler (again by putting it back in the task table but
with state ``ASSIGNED``.) The relevant local scheduler will be notified
and will pick up the task.
- Once a local scheduler has gotten the task, it either immediately assigns the
the task to an available worker, or it queues the task and assigns it at a
later time (for example, when a worker becomes available or when an object
dependency for the task becomes available).
- When the task has been assigned to the worker, the worker executes the task
and puts the task's return values into the object store. The object store will
then update the **object table** which is part of the centralized control
state to reflect the fact that it contains the newly created objects. The
A global scheduler will be notified of the update and will assign the task
to a local scheduler by updating the task's state in the task table. The
local scheduler will be notified and pull the task object.
- Once a task has been scheduled to a local scheduler, whether by itself or by
a global scheduler, the local scheduler queues the task for execution. A task
is assigned to a worker when enough resources become available and the object
dependencies are available locally, in first-in, first-out order.
- When the task has been assigned to a worker, the worker executes the task and
puts the task's return values into the object store. The object store will
then update the **object table**, which is part of the centralized control
state, to reflect the fact that it contains the newly created objects. The
object table can be viewed as follows.

.. code-block:: python
Expand All @@ -159,7 +157,10 @@ Notes and limitations
- When an object store on a particular node fills up, it will begin evicting
objects in a least-recently-used manner. If an object that is needed later is
evicted, then the call to ``ray.get`` for that object will initiate the
reconstruction of the object (by replaying the task that created the object).
reconstruction of the object. The local scheduler will attempt to reconstruct
the object by replaying its task lineage.

TODO: Limitations on reconstruction.

Getting an object ID
--------------------
Expand All @@ -171,20 +172,21 @@ Several things happen when a driver or worker calls ``ray.get`` on an object ID.
ray.get(x_id)
- The driver or worker goes to the object store on the same node and requests
the relevant object. Each object store consists of two components, an **object
store**, which is essentially a shared-memory key-value store of immutable
objects, and an **object manager**, which coordinates the transfer of objects
between nodes.

- If the relevant object is not present in the object store, the object
manager checks the object table to see which object stores (if any) have
the object. It then requests the object directly from one of the object
stores that has the object (via the corresponding object manager). If the
object doesn't exist anywhere yet, then the centralized control state will
notify the object manager when the object is created somewhere in the
system.
- Once the object has been transferred to the local object store, the driver or
worker will map the relevant region of memory into its own address space (to
avoid copying the object), and will deserialize the bytes into a Python
object. Note that any numpy arrays that are part of the object will not be
copied.
the relevant object. Each object store consists of two components, a
shared-memory key-value store of immutable objects, and an manager to
coordinate the transfer of objects between nodes.

- If the object is not present in the object store, the manager checks the
object table to see which other object stores, if any, have the object. It
then requests the object directly from one of those object stores, via its
manager. If the object doesn't exist anywhere, then the centralized control
state will notify the requesting manager when the object is created. If the
object doesn't exist anywhere because it has been evicted from all object
stores, the worker will also request reconstruction of the object from the
local scheduler. These checks repeat periodically until the object is
available in the local object store, whether through reconstruction or
through object transfer.
- Once the object is available in the local object store, the driver or worker
will map the relevant region of memory into its own address space (to avoid
copying the object), and will deserialize the bytes into a Python object.
Note that any numpy arrays that are part of the object will not be copied.
102 changes: 55 additions & 47 deletions doc/source/serialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ the object store. Once an object is placed in the object store, it is immutable.
There are a number of situations in which Ray will place objects in the object
store.

1. When a remote function returns, its return values are stored in the object
1. The return values of a remote function.
store.
2. A call to ``ray.put(x)`` places ``x`` in the object store.
3. When large objects or objects other than simple primitive types are passed as
arguments into remote functions, they will be placed in the object store.

A normal Python object may have pointers all over the place, so to place an
object in the object store or send it between processes, it must first be
converted to a contiguous string of bytes. This process is known as
serialization. The process of turning the string of bytes back into a Python
object is known as deserialization. Serialization and deserialization are often
bottlenecks in distributed computing.
2. ``x`` in a call to ``ray.put(x)``.
3. Large objects or objects other than simple primitive types that are passed
as arguments into remote functions.

A Python object may have an arbitrary number of pointers with arbitrarily deep
nesting. To place an object in the object store or send it between processes,
it must first be converted to a contiguous string of bytes. This process is
known as serialization. The process of converting the string of bytes back into a
Python object is known as deserialization. Serialization and deserialization
are often bottlenecks in distributed computing, if the time needed to compute
on the data is relatively low.

Pickle is one example of a library for serialization and deserialization in
Python.
Expand All @@ -30,24 +31,31 @@ Python.
pickle.dumps([1, 2, 3]) # prints b'\x80\x03]q\x00(K\x01K\x02K\x03e.'
pickle.loads(b'\x80\x03]q\x00(K\x01K\x02K\x03e.') # prints [1, 2, 3]
Pickle (and its variants) are pretty general. They can successfully serialize a
large variety of Python objects. However, for numerical workloads, pickling and
unpickling can be inefficient. For example, when unpickling a list of numpy
arrays, pickle will create completely new arrays in memory. In Ray, when we
deserialize a list of numpy arrays from the object store, we will create a list
of numpy array objects in Python, but each numpy array object is essentially
just a pointer to the relevant location in shared memory. There are some
advantages to this form of serialization.
Pickle (and the variant we use, cloudpickle) is general-purpose. They can
serialize a large variety of Python objects. However, for numerical workloads,
pickling and unpickling can be inefficient. For example, if multiple processes
want to access a Python list of numpy arrays, each process must unpickle the
list and create its own new copies of the arrays. This can lead to high memory
overheads, even when all processes are read-only and could easily share memory.

In Ray, we optimize for numpy arrays by using the `Apache Arrow`_ data format.
When we deserialize a list of numpy arrays from the object store, we still
create a Python list of numpy array objects. However, rather than copy each
numpy array over again, each numpy array object is essentially a pointer to its
address in shared memory. There are some advantages to this form of
serialization.

- Deserialization can be very fast.
- Memory is shared between processes so worker processes can all read the same
data without having to copy it.

.. _`Apache Arrow`: https://arrow.apache.org/

What Objects Does Ray Handle
----------------------------

However, Ray is not currently capable of serializing arbitrary Python objects.
The set of Python objects that Ray can serialize includes the following.
Ray does not currently support serialization of arbitrary Python objects. The
set of Python objects that Ray can serialize includes the following.

1. Primitive types: ints, floats, longs, bools, strings, unicode, and numpy
arrays.
Expand Down Expand Up @@ -97,15 +105,15 @@ This can be addressed by calling ``ray.register_class(Foo)``.
f_id = ray.put(f)
ray.get(f_id) # prints <__main__.Foo at 0x1078128d0>
Under the hood, ``ray.put`` essentially replaces ``f`` with ``f.__dict__``,
which is just the dictionary ``{"a": 1, "b": 2}``. Then during deserialization,
``ray.get`` constructs a new ``Foo`` object from the dictionary of fields.
Under the hood, ``ray.put`` places ``f.__dict__``, the dictionary of attributes
of ``f``, into the object store instead of ``f`` itself. In this case, this is
the dictionary, ``{"a": 1, "b": 2}``. Then during deserialization, ``ray.get``
constructs a new ``Foo`` object from the dictionary of fields.

This naive substitution won't work in all cases. For example if we want to
serialize Python objects of type ``function`` (for example ``f = lambda x: x +
1``), this simple scheme doesn't quite work, and ``ray.register_class(type(f))``
will give an error message. In these cases, we can fall back to pickle (actually
we use cloudpickle).
This naive substitution won't work in all cases. For example, this scheme does
not support Python objects of type ``function`` (e.g., ``f = lambda x: x +
1``). In these cases, the call to ``ray.register_class`` will give an error
message, and you should fall back to pickle.

.. code-block:: python
Expand All @@ -117,34 +125,34 @@ we use cloudpickle).
f_new = ray.get(ray.put(f))
f_new(0) # prints 1
However, it's best to avoid using pickle for efficiency reasons. If you find
yourself needing to pickle certain objects, consider trying to use more
efficient data structures like arrays.
However, it's best to avoid using pickle for the efficiency reasons described
above. If you find yourself needing to pickle certain objects, consider trying
to use more efficient data structures like arrays.

**Note:** Another setting where the naive replacement of an object with its
``__dict__`` attribute fails is where an object recursively contains itself (or
multiple objects recursively contain each other). To see more examples of this,
see the section `Notes and Limitations`_.
``__dict__`` attribute fails is recursion, e.g., an object contains itself or
multiple objects contain each other. To see more examples of this, see the
section `Notes and Limitations`_.

Notes and limitations
---------------------

- We currently handle certain patterns incorrectly. For example, a list that
contains two copies of the same list, will be serialized as if the two lists
were distinct.
- We currently handle certain patterns incorrectly, according to Python
semantics. For example, a list that contains two copies of the same list will
be serialized as if the two lists were distinct.

.. code-block:: python
l1 = [0]
l2 = [l1, l1]
l3 = ray.get(ray.put(l2))
l2[0] is l2[1] # This is true.
l3[0] is l3[1] # This is false.
l2[0] is l2[1] # True.
l3[0] is l3[1] # False.
- For reasons similar to the above example, we also do not currently handle
objects that recursively contain themselves (this may be common with certain
graph-like data structures).
objects that recursively contain themselves (this may be common in graph-like
data structures).

.. code-block:: python
Expand All @@ -163,15 +171,15 @@ Notes and limitations
- If you need to pass a custom class into a remote function, you should call
``ray.register_class`` on the class **before defining the remote function**.

- Whenever possible, use numpy arrays. This is generally good advice.
- Whenever possible, use numpy arrays for maximum performance.

Last Resort Workaround
----------------------

If you find cases where Ray doesn't work or does the wrong thing, please `let us
know`_ so we can fix it. In the meantime, you can do your own custom
serialization and deserialization (for example by calling pickle by hand). Or by
writing your own custom serializer and deserializer.
If you find cases where Ray serialization doesn't work or does something
unexpected, please `let us know`_ so we can fix it. In the meantime, you may
have to resort to writing custom serialization and deserialization code (e.g.,
calling pickle by hand).

.. _`let us know`: https://github.com/ray-project/ray/issues

Expand Down

0 comments on commit 46b5e56

Please sign in to comment.