Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/ray-project/ray
Browse files Browse the repository at this point in the history
* 'master' of https://github.com/ray-project/ray:
  [rllib] Fix broken link in docs (ray-project#1967)
  [DataFrame] Sample implement (ray-project#1954)
  [DataFrame] Implement Inter-DataFrame operations (ray-project#1937)
  remove UniqueIDHasher (ray-project#1957)
  [rllib] Add DDPG documentation, rename DDPG2 <=> DDPG (ray-project#1946)
  updates (ray-project#1958)
  Pin Cython in autoscaler development example. (ray-project#1951)
  Incorporate C++ Buffer management and Seal global threadpool fix from arrow (ray-project#1950)
  [XRay] Add consistency check for protocol between node_manager and local_scheduler_client (ray-project#1944)
  Remove smart_open install. (ray-project#1943)
  [DataFrame] Fully implement append, concat and join (ray-project#1932)
  [DataFrame] Fix for __getitem__ string indexing (ray-project#1939)
  [DataFrame] Implementing write methods (ray-project#1918)
  [rllib] arr[end] was excluded when end is not None (ray-project#1931)
  [DataFrame] Implementing API correct groupby with aggregation methods (ray-project#1914)
  • Loading branch information
royf committed May 1, 2018
2 parents 4e7129b + b55f4a7 commit b87c06d
Show file tree
Hide file tree
Showing 84 changed files with 3,577 additions and 2,210 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Expand Up @@ -13,6 +13,7 @@
/src/thirdparty/catapult/
/src/thirdparty/flatbuffers/
/src/thirdparty/parquet-cpp
/thirdparty/pkg/

# Files generated by flatc should be ignored
/src/common/format/*.py
Expand Down Expand Up @@ -143,3 +144,6 @@ build

# Pytest Cache
**/.pytest_cache

# Vscode
.vscode/
1 change: 0 additions & 1 deletion doc/source/conf.py
Expand Up @@ -32,7 +32,6 @@
"tensorflow.python",
"tensorflow.python.client",
"tensorflow.python.util",
"smart_open",
"ray.local_scheduler",
"ray.plasma",
"ray.core.generated.TaskInfo",
Expand Down
2 changes: 2 additions & 0 deletions doc/source/policy-optimizers.rst
Expand Up @@ -20,6 +20,8 @@ Example of constructing and using a policy optimizer `(link to full example) <ht
print("optimizer stats", optimizer.stats())
print("local evaluator stats", optimizer.local_evaluator.stats())
Read more about policy optimizers in this post: `Distributed Policy Optimizers for Scalable and Reproducible Deep RL <https://rise.cs.berkeley.edu/blog/distributed-policy-optimizers-for-scalable-and-reproducible-deep-rl/>`__.

Here are the steps for using a RLlib policy optimizer with an existing algorithm.

1. Implement the `Policy evaluator interface <rllib-dev.html#policy-evaluators-and-optimizers>`__.
Expand Down
28 changes: 16 additions & 12 deletions doc/source/rllib.rst
Expand Up @@ -7,19 +7,19 @@ You can find the code for RLlib `here on GitHub <https://github.com/ray-project/

RLlib's policy optimizers serve as the basis for RLlib's reference algorithms, which include:

- `Proximal Policy Optimization (PPO) <https://arxiv.org/abs/1707.06347>`__ which
is a proximal variant of `TRPO <https://arxiv.org/abs/1502.05477>`__.
- Proximal Policy Optimization (`PPO <https://github.com/ray-project/ray/tree/master/python/ray/rllib/ppo>`__) which is a proximal variant of `TRPO <https://arxiv.org/abs/1502.05477>`__.

- `The Asynchronous Advantage Actor-Critic (A3C) <https://arxiv.org/abs/1602.01783>`__.
- Policy Gradients (`PG <https://github.com/ray-project/ray/tree/master/python/ray/rllib/pg>`__).

- `Deep Q Networks (DQN) <https://arxiv.org/abs/1312.5602>`__.
- Asynchronous Advantage Actor-Critic (`A3C <https://github.com/ray-project/ray/tree/master/python/ray/rllib/a3c>`__).

- `Ape-X Distributed Prioritized Experience Replay <https://arxiv.org/abs/1803.00933>`__.
- Deep Q Networks (`DQN <https://github.com/ray-project/ray/tree/master/python/ray/rllib/dqn>`__).

- Evolution Strategies, as described in `this
paper <https://arxiv.org/abs/1703.03864>`__. Our implementation
is adapted from
`here <https://github.com/openai/evolution-strategies-starter>`__.
- Deep Deterministic Policy Gradients (`DDPG <https://github.com/ray-project/ray/tree/master/python/ray/rllib/ddpg>`__, `DDPG2 <https://github.com/ray-project/ray/tree/master/python/ray/rllib/ddpg2>`__).

- Ape-X Distributed Prioritized Experience Replay, including both `DQN <https://github.com/ray-project/ray/blob/master/python/ray/rllib/dqn/apex.py>`__ and `DDPG <https://github.com/ray-project/ray/blob/master/python/ray/rllib/ddpg/apex.py>`__ variants.

- Evolution Strategies (`ES <https://github.com/ray-project/ray/tree/master/python/ray/rllib/es>`__), as described in `this paper <https://arxiv.org/abs/1703.03864>`__.

These algorithms can be run on any `OpenAI Gym MDP <https://github.com/openai/gym>`__,
including custom ones written and registered by the user.
Expand Down Expand Up @@ -76,18 +76,22 @@ The ``train.py`` script has a number of options you can show by running
The most important options are for choosing the environment
with ``--env`` (any OpenAI gym environment including ones registered by the user
can be used) and for choosing the algorithm with ``--run``
(available options are ``PPO``, ``A3C``, ``ES``, ``DQN`` and ``APEX``).
(available options are ``PPO``, ``PG``, ``A3C``, ``ES``, ``DDPG``, ``DDPG2``, ``DQN``, ``APEX``, and ``APEX_DDPG2``).

Specifying Parameters
~~~~~~~~~~~~~~~~~~~~~

Each algorithm has specific hyperparameters that can be set with ``--config`` - see the
``DEFAULT_CONFIG`` variable in
`PPO <https://github.com/ray-project/ray/blob/master/python/ray/rllib/ppo/ppo.py>`__,
`PG <https://github.com/ray-project/ray/blob/master/python/ray/rllib/pg/pg.py>`__,
`A3C <https://github.com/ray-project/ray/blob/master/python/ray/rllib/a3c/a3c.py>`__,
`ES <https://github.com/ray-project/ray/blob/master/python/ray/rllib/es/es.py>`__,
`DQN <https://github.com/ray-project/ray/blob/master/python/ray/rllib/dqn/dqn.py>`__ and
`APEX <https://github.com/ray-project/ray/blob/master/python/ray/rllib/dqn/apex.py>`__.
`DQN <https://github.com/ray-project/ray/blob/master/python/ray/rllib/dqn/dqn.py>`__,
`DDPG <https://github.com/ray-project/ray/blob/master/python/ray/rllib/ddpg/ddpg.py>`__,
`DDPG2 <https://github.com/ray-project/ray/blob/master/python/ray/rllib/ddpg2/ddpg.py>`__,
`APEX <https://github.com/ray-project/ray/blob/master/python/ray/rllib/dqn/apex.py>`__, and
`APEX_DDPG2 <https://github.com/ray-project/ray/blob/master/python/ray/rllib/ddpg2/apex.py>`__.

In an example below, we train A3C by specifying 8 workers through the config flag.
function that creates the env to refer to it by name. The contents of the env_config agent config field will be passed to that function to allow the environment to be configured. The return type should be an OpenAI gym.Env. For example:
Expand Down
2 changes: 1 addition & 1 deletion docker/examples/Dockerfile
Expand Up @@ -3,6 +3,6 @@
FROM ray-project/deploy
RUN conda install -y -c conda-forge tensorflow
RUN apt-get install -y zlib1g-dev
RUN pip install gym[atari] opencv-python==3.2.0.8 smart_open
RUN pip install gym[atari] opencv-python==3.2.0.8
RUN pip install --upgrade git+git://github.com/hyperopt/hyperopt.git
# RUN conda install -y -q pytorch torchvision -c soumith
2 changes: 2 additions & 0 deletions python/build-wheel-macos.sh
Expand Up @@ -53,6 +53,8 @@ for ((i=0; i<${#PY_VERSIONS[@]}; ++i)); do
popd

pushd python
# Setuptools on CentOS is too old to install arrow 0.9.0, therefore we upgrade.
$PIP_CMD install --upgrade setuptools
# Install setuptools_scm because otherwise when building the wheel for
# Python 3.6, we see an error.
$PIP_CMD install -q setuptools_scm
Expand Down
2 changes: 1 addition & 1 deletion python/ray/autoscaler/autoscaler.py
Expand Up @@ -224,7 +224,7 @@ def __init__(self,
max_concurrent_launches=AUTOSCALER_MAX_CONCURRENT_LAUNCHES,
max_failures=AUTOSCALER_MAX_NUM_FAILURES,
process_runner=subprocess,
verbose_updates=False,
verbose_updates=True,
node_updater_cls=NodeUpdaterProcess,
update_interval_s=AUTOSCALER_UPDATE_INTERVAL_S):
self.config_path = config_path
Expand Down
2 changes: 1 addition & 1 deletion python/ray/autoscaler/aws/development-example.yaml
Expand Up @@ -94,7 +94,7 @@ setup_commands:
- echo 'export PATH="$HOME/anaconda3/bin:$PATH"' >> ~/.bashrc
# Build Ray.
- git clone https://github.com/ray-project/ray || true
- pip install boto3==1.4.8
- pip install boto3==1.4.8 cython==0.27.3
- cd ray/python; pip install -e . --verbose

# Custom commands that will be run on the head node after common setup.
Expand Down
12 changes: 8 additions & 4 deletions python/ray/dataframe/__init__.py
Expand Up @@ -3,7 +3,7 @@
from __future__ import print_function

import pandas as pd
from pandas import eval
from pandas import (eval, Panel, date_range, MultiIndex)
import threading

pd_version = pd.__version__
Expand All @@ -30,11 +30,15 @@ def get_npartitions():
# because they depend on npartitions.
from .dataframe import DataFrame # noqa: 402
from .series import Series # noqa: 402
from .io import (read_csv, read_parquet) # noqa: 402
from .concat import concat # noqa: 402
from .io import (read_csv, read_parquet, read_json, read_html, # noqa: 402
read_clipboard, read_excel, read_hdf, read_feather, # noqa: 402
read_msgpack, read_stata, read_sas, read_pickle, # noqa: 402
read_sql) # noqa: 402
from .concat import concat # noqa: 402

__all__ = [
"DataFrame", "Series", "read_csv", "read_parquet", "concat", "eval"
"DataFrame", "Series", "read_csv", "read_parquet", "concat", "eval",
"Panel", "date_range", "MultiIndex"
]

try:
Expand Down
191 changes: 117 additions & 74 deletions python/ray/dataframe/concat.py
@@ -1,90 +1,133 @@
import pandas as pd
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import pandas
import numpy as np
from .dataframe import DataFrame as rdf
from .utils import (
from_pandas,
_deploy_func)
from functools import reduce
from .dataframe import DataFrame
from .utils import _reindex_helper


def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False,
keys=None, levels=None, names=None, verify_integrity=False,
copy=True):

def _concat(frame1, frame2):
# Check type on objects
# Case 1: Both are Pandas DF
if isinstance(frame1, pd.DataFrame) and \
isinstance(frame2, pd.DataFrame):

return pd.concat((frame1, frame2), axis, join, join_axes,
if keys is not None:
objs = [objs[k] for k in keys]
else:
objs = list(objs)

if len(objs) == 0:
raise ValueError("No objects to concatenate")

objs = [obj for obj in objs if obj is not None]

if len(objs) == 0:
raise ValueError("All objects passed were None")

try:
type_check = next(obj for obj in objs
if not isinstance(obj, (pandas.Series,
pandas.DataFrame,
DataFrame)))
except StopIteration:
type_check = None
if type_check is not None:
raise ValueError("cannot concatenate object of type \"{0}\"; only "
"pandas.Series, pandas.DataFrame, "
"and ray.dataframe.DataFrame objs are "
"valid", type(type_check))

all_series = all([isinstance(obj, pandas.Series)
for obj in objs])
if all_series:
return pandas.concat(objs, axis, join, join_axes,
ignore_index, keys, levels, names,
verify_integrity, copy)

if not (isinstance(frame1, rdf) and
isinstance(frame2, rdf)) and join == 'inner':
raise NotImplementedError(
"Obj as dicts not implemented. To contribute to "
"Pandas on Ray, please visit github.com/ray-project/ray."
)

# Case 2: Both are different types
if isinstance(frame1, pd.DataFrame):
frame1 = from_pandas(frame1, len(frame1) / 2**16 + 1)
if isinstance(frame2, pd.DataFrame):
frame2 = from_pandas(frame2, len(frame2) / 2**16 + 1)

# Case 3: Both are Ray DF
if isinstance(frame1, rdf) and \
isinstance(frame2, rdf):

new_columns = frame1.columns.join(frame2.columns, how=join)

def _reindex_helper(pdf, old_columns, join):
pdf.columns = old_columns
if join == 'outer':
pdf = pdf.reindex(columns=new_columns)
else:
pdf = pdf[new_columns]
pdf.columns = pd.RangeIndex(len(new_columns))

return pdf

f1_columns, f2_columns = frame1.columns, frame2.columns
new_f1 = [_deploy_func.remote(lambda p: _reindex_helper(p,
f1_columns, join), part) for
part in frame1._row_partitions]
new_f2 = [_deploy_func.remote(lambda p: _reindex_helper(p,
f2_columns, join), part) for
part in frame2._row_partitions]

return rdf(row_partitions=new_f1 + new_f2, columns=new_columns,
index=frame1.index.append(frame2.index))

# (TODO) Group all the pandas dataframes

if isinstance(objs, dict):
raise NotImplementedError(
"Obj as dicts not implemented. To contribute to "
"Pandas on Ray, please visit github.com/ray-project/ray."
)
"Pandas on Ray, please visit github.com/ray-project/ray.")

axis = pd.DataFrame()._get_axis_number(axis)
if axis == 1:
raise NotImplementedError(
"Concat not implemented for axis=1. To contribute to "
"Pandas on Ray, please visit github.com/ray-project/ray."
)

all_pd = np.all([isinstance(obj, pd.DataFrame) for obj in objs])
if all_pd:
result = pd.concat(objs, axis, join, join_axes,
ignore_index, keys, levels, names,
verify_integrity, copy)
else:
result = reduce(_concat, objs)
axis = pandas.DataFrame()._get_axis_number(axis)

if isinstance(result, pd.DataFrame):
return from_pandas(result, len(result) / 2**16 + 1)
if join not in ['inner', 'outer']:
raise ValueError("Only can inner (intersect) or outer (union) join the"
" other axis")

return result
# We need this in a list because we use it later.
all_index, all_columns = list(zip(*[(obj.index, obj.columns)
for obj in objs]))

def series_to_df(series, columns):
df = pandas.DataFrame(series)
df.columns = columns
return DataFrame(df)

# Pandas puts all of the Series in a single column named 0. This is
# true regardless of the existence of another column named 0 in the
# concat.
if axis == 0:
objs = [series_to_df(obj, [0])
if isinstance(obj, pandas.Series) else obj for obj in objs]
else:
# Pandas starts the count at 0 so this will increment the names as
# long as there's a new nameless Series being added.
def name_incrementer(i):
val = i[0]
i[0] += 1
return val

i = [0]
objs = [series_to_df(obj, obj.name if obj.name is not None
else name_incrementer(i))
if isinstance(obj, pandas.Series) else obj for obj in objs]

# Using concat on the columns and index is fast because they're empty,
# and it forces the error checking. It also puts the columns in the
# correct order for us.
final_index = \
pandas.concat([pandas.DataFrame(index=idx) for idx in all_index],
axis=axis, join=join, join_axes=join_axes,
ignore_index=ignore_index, keys=keys, levels=levels,
names=names, verify_integrity=verify_integrity,
copy=False).index
final_columns = \
pandas.concat([pandas.DataFrame(columns=col)
for col in all_columns],
axis=axis, join=join, join_axes=join_axes,
ignore_index=ignore_index, keys=keys, levels=levels,
names=names, verify_integrity=verify_integrity,
copy=False).columns

# Put all of the DataFrames into Ray format
# TODO just partition the DataFrames instead of building a new Ray DF.
objs = [DataFrame(obj) if isinstance(obj, (pandas.DataFrame,
pandas.Series)) else obj
for obj in objs]

# Here we reuse all_columns/index so we don't have to materialize objects
# from remote memory built in the previous line. In the future, we won't be
# building new DataFrames, rather just partitioning the DataFrames.
if axis == 0:
new_blocks = np.array([_reindex_helper._submit(
args=tuple([all_columns[i], final_columns, axis,
len(objs[0]._block_partitions)] + part.tolist()),
num_return_vals=len(objs[0]._block_partitions))
for i in range(len(objs))
for part in objs[i]._block_partitions])
else:
# Transposing the columns is necessary because the remote task treats
# everything like rows and returns in row-major format. Luckily, this
# operation is cheap in numpy.
new_blocks = np.array([_reindex_helper._submit(
args=tuple([all_index[i], final_index, axis,
len(objs[0]._block_partitions.T)] + part.tolist()),
num_return_vals=len(objs[0]._block_partitions.T))
for i in range(len(objs))
for part in objs[i]._block_partitions.T]).T

return DataFrame(block_partitions=new_blocks,
columns=final_columns,
index=final_index)

0 comments on commit b87c06d

Please sign in to comment.