Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Client][Dask] Dask Errors on Ray Client #16743

Closed
2 tasks
matthewdeng opened this issue Jun 29, 2021 · 31 comments · Fixed by ray-project/xgboost_ray#150
Closed
2 tasks

[Client][Dask] Dask Errors on Ray Client #16743

matthewdeng opened this issue Jun 29, 2021 · 31 comments · Fixed by ray-project/xgboost_ray#150
Assignees
Labels
bug Something that is supposed to be working; but isn't P1 Issue that should be fixed within a few weeks
Milestone

Comments

@matthewdeng
Copy link
Contributor

What is the problem?

When running a simple Dask/XGBoost script using Ray Client, length computation of the Dask Dataframe ultimately errors out with the following:

TypeError: 'ray._raylet.ObjectRef' object is not subscriptable

Ray version and other system information (Python version, TensorFlow version, OS):

(ray) ~/workspace/scratch/dask python --version
Python 3.8.10
(ray) ~/workspace/scratch/dask pip list | grep -E 'ray|dask|xgboost'
dask                                   2021.6.2
ray                                    2.0.0.dev0
ray-shuffling-data-loader              0.1.0
xgboost                                1.4.2
xgboost-ray                            0.1.0

Reproduction (REQUIRED)

  1. Run ray start --head on local machine.
  2. Create python script dask-xgboost.py:
import dask
import dask.dataframe as dd
import ray
from ray.util.dask import ray_dask_get
from xgboost_ray import RayDMatrix, RayParams, train

ray.client().connect()

S3_PATH = "s3://ray-ci-higgs/simpleHIGGS.csv"

dask.config.set(scheduler=ray_dask_get)
colnames = ["label"] + ["feature-%02d" % i for i in range(1, 29)]
data = dd.read_csv(S3_PATH, names=colnames)
df_train = data
df_train = df_train.persist()
dtrain = RayDMatrix(df_train, label="label", columns=colnames)

evals_result = {}
config = {"tree_method": "hist", "eval_metric": ["logloss", "error"]}
train(
    params=config,
    dtrain=dtrain,
    evals_result=evals_result,
    ray_params=RayParams(
        max_actor_restarts=1, num_actors=4, cpus_per_actor=2),
    num_boost_round=100)
  1. Execute the script:
(ray) ~/workspace/scratch/dask RAY_ADDRESS="localhost:10001" python dask-xgboost.py
Traceback (most recent call last):
  File "dask-xgboost.py", line 20, in <module>
    train(
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/main.py", line 1110, in train
    bst, train_evals_result, train_additional_results = ray.get(
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 61, in wrapper
    return getattr(ray, func.__name__)(*args, **kwargs)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/ray/util/client/api.py", line 42, in get
    return self.worker.get(vals, timeout=timeout)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/ray/util/client/worker.py", line 225, in get
    res = self._get(obj_ref, op_timeout)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/ray/util/client/worker.py", line 248, in _get
    raise err
types.RayTaskError(TypeError): ray::_wrapped() (pid=51862, ip=192.168.0.57)
  File "python/ray/_raylet.pyx", line 535, in ray._raylet.execute_task
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/main.py", line 1102, in _wrapped
    bst = train(
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/main.py", line 1196, in train
    dtrain.load_data(ray_params.num_actors)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/matrix.py", line 741, in load_data
    refs, self.n = self.loader.load_data(
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/matrix.py", line 317, in load_data
    data_source = self.get_data_source()
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/matrix.py", line 301, in get_data_source
    self._cached_n = data_source.get_n(self.data)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/data_sources/dask.py", line 118, in get_n
    return len(data)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/dataframe/core.py", line 3880, in __len__
    return len(s)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/dataframe/core.py", line 564, in __len__
    return self.reduction(
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/base.py", line 285, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/base.py", line 567, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/threaded.py", line 79, in get
    results = get_async(
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/local.py", line 514, in get_async
    raise_exception(exc, tb)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/local.py", line 325, in reraise
    raise exc
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/local.py", line 223, in execute_task
    result = _execute_task(task, data)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/optimization.py", line 969, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/core.py", line 151, in get
    result = _execute_task(task, cache)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
TypeError: 'ray._raylet.ObjectRef' object is not subscriptable
  • I have verified my script runs in a clean environment and reproduces the issue.
  • I have verified the issue also occurs with the latest wheels.
@matthewdeng matthewdeng added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Jun 29, 2021
@ijrsvt
Copy link
Contributor

ijrsvt commented Jun 29, 2021

Is this a duplicate of
#16406

@clarkzinzow
Copy link
Contributor

@ijrsvt I think that it might be!

@ijrsvt ijrsvt self-assigned this Jun 29, 2021
@ijrsvt
Copy link
Contributor

ijrsvt commented Jun 29, 2021

@matthewdeng One workaround is to use the single-threaded scheduler:

def ray_dask_get_sync(dsk, keys, **kwargs):
"""
A synchronous Dask-Ray scheduler. This scheduler will send top-level
(non-inlined) Dask tasks to a Ray cluster for execution. The scheduler will
wait for the tasks to finish executing, fetch the results, and repackage
them into the appropriate Dask collections. This particular scheduler
submits Ray tasks synchronously, which can be useful for debugging.
This can be passed directly to `dask.compute()`, as the scheduler:
>>> dask.compute(obj, scheduler=ray_dask_get_sync)
You can override the currently active global Dask-Ray callbacks (e.g.
supplied via a context manager):
>>> dask.compute(
obj,
scheduler=ray_dask_get_sync,
ray_callbacks=some_ray_dask_callbacks,
)
Args:
dsk (Dict): Dask graph, represented as a task DAG dictionary.
keys (List[str]): List of Dask graph keys whose values we wish to
compute and return.
Returns:
Computed values corresponding to the provided keys.
"""
ray_callbacks = kwargs.pop("ray_callbacks", None)
persist = kwargs.pop("ray_persist", False)
with local_ray_callbacks(ray_callbacks) as ray_callbacks:
# Unpack the Ray-specific callbacks.
(
ray_presubmit_cbs,
ray_postsubmit_cbs,
ray_pretask_cbs,
ray_posttask_cbs,
ray_postsubmit_all_cbs,
ray_finish_cbs,
) = unpack_ray_callbacks(ray_callbacks)
# NOTE: We hijack Dask's `get_async` function, injecting a different
# task executor.
object_refs = get_async(
_apply_async_wrapper(
apply_sync,
_rayify_task_wrapper,
ray_presubmit_cbs,
ray_postsubmit_cbs,
ray_pretask_cbs,
ray_posttask_cbs,
),
1,
dsk,
keys,
**kwargs,
)
if ray_postsubmit_all_cbs is not None:
for cb in ray_postsubmit_all_cbs:
cb(object_refs, dsk)
# NOTE: We explicitly delete the Dask graph here so object references
# are garbage-collected before this function returns, i.e. before all
# Ray tasks are done. Otherwise, no intermediate objects will be
# cleaned up until all Ray tasks are done.
del dsk
if persist:
result = object_refs
else:
result = ray_get_unpack(object_refs)
if ray_finish_cbs is not None:
for cb in ray_finish_cbs:
cb(result)
return result

@matthewdeng
Copy link
Contributor Author

@ijrsvt I tried running the same script with the latest version of ray which includes #16731. It unfortunately runs into the same error.

(python37) ~/workspace/scratch/dask RAY_ADDRESS="localhost:10001" python dask-xgboost.py
Traceback (most recent call last):
  File "dask-xgboost.py", line 26, in <module>
    num_boost_round=100)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/main.py", line 1160, in train
    **kwargs,
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 81, in wrapper
    return getattr(ray, func.__name__)(*args, **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/client/api.py", line 42, in get
    return self.worker.get(vals, timeout=timeout)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/client/worker.py", line 225, in get
    res = self._get(obj_ref, op_timeout)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/client/worker.py", line 248, in _get
    raise err
types.RayTaskError(TypeError): ray::_wrapped() (pid=59038, ip=192.168.0.57)
  File "python/ray/_raylet.pyx", line 534, in ray._raylet.execute_task
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/main.py", line 1146, in _wrapped
    **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/main.py", line 1249, in train
    dtrain.load_data(ray_params.num_actors)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/matrix.py", line 746, in load_data
    self.num_actors, self.sharding, rank=rank)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/matrix.py", line 319, in load_data
    data_source = self.get_data_source()
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/matrix.py", line 303, in get_data_source
    self._cached_n = data_source.get_n(self.data)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/data_sources/dask.py", line 118, in get_n
    return len(data)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/dataframe/core.py", line 3880, in __len__
    return len(s)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/dataframe/core.py", line 565, in __len__
    len, np.sum, token="len", meta=int, split_every=False
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/base.py", line 285, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/base.py", line 567, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/threaded.py", line 87, in get
    **kwargs
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/local.py", line 514, in get_async
    raise_exception(exc, tb)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/local.py", line 325, in reraise
    raise exc
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/local.py", line 223, in execute_task
    result = _execute_task(task, data)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/optimization.py", line 969, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/core.py", line 151, in get
    result = _execute_task(task, cache)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
TypeError: 'ray._raylet.ObjectRef' object is not subscriptable
(python37) ~/workspace/scratch/dask python -c "import ray; print(ray.__commit__)"
113ed2a07c0c8b98c406bd7173daedbf160985b6

I also tried setting the scheduler to ray_dask_get_sync, but I'm running into an error. Do I need to somehow import it from dask?

(python37) ~/workspace/scratch/dask RAY_ADDRESS="localhost:10001" python dask-xgboost.py
Traceback (most recent call last):
  File "dask-xgboost.py", line 11, in <module>
    dask.config.set(scheduler=ray_dask_get_sync)
NameError: name 'ray_dask_get_sync' is not defined

@clarkzinzow
Copy link
Contributor

@matthewdeng ray_dask_get_sync should be importable from ray.util.dask:

from ray.util.dask import ray_dask_get_sync

@matthewdeng
Copy link
Contributor Author

@clarkzinzow haha woops totally forgot that I had originally imported ray_dask_get in the python script, thanks for catching that. I updated it to use ray_dask_get_sync and am getting the same original error, which makes me think this may be a different issue than #16406?

(python37) ~/workspace/scratch/dask RAY_ADDRESS="localhost:10001" python dask-xgboost.py  
Traceback (most recent call last):
  File "dask-xgboost.py", line 26, in <module>
    num_boost_round=100)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/main.py", line 1160, in train
    **kwargs,
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 81, in wrapper
    return getattr(ray, func.__name__)(*args, **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/client/api.py", line 42, in get
    return self.worker.get(vals, timeout=timeout)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/client/worker.py", line 225, in get
    res = self._get(obj_ref, op_timeout)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/client/worker.py", line 248, in _get
    raise err
types.RayTaskError(TypeError): ray::_wrapped() (pid=66381, ip=192.168.0.57)
  File "python/ray/_raylet.pyx", line 534, in ray._raylet.execute_task
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/main.py", line 1146, in _wrapped
    **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/main.py", line 1249, in train
    dtrain.load_data(ray_params.num_actors)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/matrix.py", line 746, in load_data
    self.num_actors, self.sharding, rank=rank)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/matrix.py", line 319, in load_data
    data_source = self.get_data_source()
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/matrix.py", line 303, in get_data_source
    self._cached_n = data_source.get_n(self.data)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/data_sources/dask.py", line 118, in get_n
    return len(data)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/dataframe/core.py", line 3880, in __len__
    return len(s)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/dataframe/core.py", line 565, in __len__
    len, np.sum, token="len", meta=int, split_every=False
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/base.py", line 285, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/base.py", line 567, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/threaded.py", line 87, in get
    **kwargs
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/local.py", line 514, in get_async
    raise_exception(exc, tb)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/local.py", line 325, in reraise
    raise exc
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/local.py", line 223, in execute_task
    result = _execute_task(task, data)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/optimization.py", line 969, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/core.py", line 151, in get
    result = _execute_task(task, cache)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
TypeError: 'ray._raylet.ObjectRef' object is not subscriptable

@richardliaw richardliaw added P1 Issue that should be fixed within a few weeks and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Jul 1, 2021
@richardliaw richardliaw added this to the Serverless Autoscaling milestone Jul 1, 2021
@clarkzinzow
Copy link
Contributor

It looks like the Dask-on-Ray data source for XGBoost-on-Ray hardcodes the multithreaded Dask-on-Ray scheduler here when persisting the collection, so it could still be the same bug. And the len(data) call will trigger Dask computation which will use a non-Dask-on-Ray scheduler, which could also be the issue. Maybe the Dask data source for XGBoost-on-Ray should eagerly set a global config for the scheduler, e.g. dask.config.set(scheduler=ray_dask_get_sync). cc @krfricke

@ijrsvt
Copy link
Contributor

ijrsvt commented Jul 1, 2021

@matthewdeng I'm able to reproduce this!

@ijrsvt
Copy link
Contributor

ijrsvt commented Jul 1, 2021

Actually this might be distinct from #16406 because neither ObjectRef nor a ClientObjectRef are subscriptable!

I'm not able to follow where this attempt to subscript is coming from :(

@ijrsvt
Copy link
Contributor

ijrsvt commented Jul 6, 2021

@clarkzinzow do you know have any insight into how to debug further?

@clarkzinzow
Copy link
Contributor

clarkzinzow commented Jul 6, 2021

My guess is that df_train.persist() in the user script is inlining Ray futures in the Dask collection, which XGBoost-on-Ray will try to use without the Dask-on-Ray scheduler since the dask.config.set(scheduler=ray_dask_get) in the user script isn't propagated to the server. As I noted above, we can guard against this by modifying the XGBoost-on-Ray data source to eagerly set the scheduler to Dask-on-Ray globally.

@matthewdeng As a workaround, can you try removing the persisting of the training data from the script?

df_train = df_train.persist()

@ijrsvt
Copy link
Contributor

ijrsvt commented Jul 6, 2021

@matthewdeng I just tried removing that, it appears to work fine.

@ijrsvt
Copy link
Contributor

ijrsvt commented Jul 6, 2021

@matthewdeng another workaround is to do the following and keep the 'persist' call:

def _set_dask_config():
    import dask
    from ray.util.dask import ray_dask_get
    dask.config.set(scheduler=ray_dask_get)
_set_dask_config() # Set it locally
ray.remote(_set_dask_config).remote() # Set it on the server

@matthewdeng
Copy link
Contributor Author

Nice, removing persist() I was able to get the original example! Though when trying on a more complex multi-node case (with Anyscale Connect) I started running into a new issue. To note, this issue does not occur every time and some runs have finished training successfully.

Traceback (most recent call last):
  File "workloads/dask_xgboost_test.py", line 56, in <module>
    test()
  File "workloads/dask_xgboost_test.py", line 42, in test
    evals=evallist)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/main.py", line 1160, in train
    **kwargs,
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 81, in wrapper
    return getattr(ray, func.__name__)(*args, **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/client/api.py", line 42, in get
    return self.worker.get(vals, timeout=timeout)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/client/worker.py", line 225, in get
    res = self._get(obj_ref, op_timeout)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/client/worker.py", line 244, in _get
    err = cloudpickle.loads(data.error)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/botocore/exceptions.py", line 28, in _exception_from_packed_args
    return exception_cls(*args, **kwargs)
TypeError: __init__() missing 1 required positional argument: 'cause'

With BETTER_EXCEPTIONS=1:

"/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/main.py", line 1160, in train
    **kwargs,
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 81, in wrapper
    return getattr(ray, func.__name__)(*args, **kwargs)
                   │    │               │       └ {}
                   │    │               └ (ClientObjectRef(9363106c127248dbffffffffffffffffffffffff0100000001000000),)
                   │    └ <function get at 0x7ff9e0558320>
                   └ <ray.util.client.RayAPIStub object at 0x7ff9d02c2bd0>
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/client/api.py", line 42, in get
    return self.worker.get(vals, timeout=timeout)
           │               │             └ None
           │               └ ClientObjectRef(9363106c127248dbffffffffffffffffffffffff0100000001000000)
           └ <ray.util.client.api.ClientAPI object at 0x7ff9802c1c90>
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/client/worker.py", line 225, in get
    res = self._get(obj_ref, op_timeout)
    │     │         │        └ 2
    │     │         └ ClientObjectRef(9363106c127248dbffffffffffffffffffffffff0100000001000000)
    │     └ <ray.util.client.worker.Worker object at 0x7ff9907f0250>
    └ None
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/client/worker.py", line 244, in _get
    err = cloudpickle.loads(data.error)
          │                 └ error: "\200\005\225\016\035\000\000\000\000\000\000\214\023botocore.exceptions\224\214\033_exception_from_packed_args\224\223\2...
          └ <module 'ray.cloudpickle' from '/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/cloudpickle/__init__.py'>
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/botocore/exceptions.py", line 28, in _exception_from_packed_args
    return exception_cls(*args, **kwargs)
           │              │       └ {}
           │              └ ()
           └ <class 'types.RayTaskError(NoCredentialsError)'>
TypeError: __init__() missing 1 required positional argument: 'cause'

@ijrsvt
Copy link
Contributor

ijrsvt commented Jul 7, 2021

@matthewdeng Sorry for the delay, but if you reproduce this, can you print the output of ray_client_server.err?

@matthewdeng
Copy link
Contributor Author

@irjsvt thanks for making the deserialization change, I now see that it's a credential error:

Failed to deserialize b'\x80\x05\x95\xd9\x1b\x00\x00\x00\x00\x00\x00\x8c\x13botocore.exceptions\x94\x8c\x1b_exception_from_packed_args\x94\x93\x94\x8c\x1bray.cloudpickle.cloudpickle\x94\x8c\x14_make_skeleton_class\x94\x93\x94(\x8c\x08builtins\x94\x8c\x04type\x94\x93\x94\x8c RayTaskError(NoCredentialsError)\x94\x8c\x0eray.exceptions\x94\x8c\x0cRayTaskError\x94\x93\x94h\x00\x8c\x12NoCredentialsError\x94\x93\x94\x86\x94}\x94\x8c 1fa82322773e4ffab54101bc8cec2a53\x94Nt\x94R\x94\x8c ray.cloudpickle.cloudpickle_fast\x94\x8c\x0f_class_setstate\x94\x93\x94h\x13}\x94(\x8c\x08__init__\x94h\x03\x8c\r_builtin_type\x94\x93\x94\x8c
LambdaType\x94\x85\x94R\x94(h\x1a\x8c\x08CodeType\x94\x85\x94R\x94(K\x02K\x00K\x02K\x02KSC\x12|\x01|\x00_\x00|\x01f\x01|\x00_\x01d\x00S\x00\x94N\x85\x94\x8c\x05cause\x94\x8c\x04args\x94\x86\x94\x8c\x04self\x94h#\x86\x94\x8cA/home/ray/anaconda3/lib/python3.7/site-packages/ray/exceptions.py\x94h\x18KqC\x04\x00\x01\x06\x04\x94))t\x94R\x94}\x94(\x8c\x0b__package__\x94\x8c\x03ray\x94\x8c\x08__name__\x94h
\x8c\x08__file__\x94\x8cA/home/ray/anaconda3/lib/python3.7/site-packages/ray/exceptions.py\x94uNNNt\x94R\x94h\x14\x8c\x12_function_setstate\x94\x93\x94h3}\x94}\x94(h/h\x18\x8c\x0c__qualname__\x94\x8c6RayTaskError.as_instanceof_cause.<locals>.cls.__init__\x94\x8c\x0f__annotations__\x94}\x94\x8c\x0e__kwdefaults__\x94N\x8c\x0c__defaults__\x94N\x8c
__module__\x94h
\x8c\x07__doc__\x94N\x8c\x0b__closure__\x94N\x8c\x17_cloudpickle_submodules\x94]\x94\x8c\x0b__globals__\x94}\x94u\x86\x94\x86R0\x8c\x0b__getattr__\x94h\x1d(h (K\x02K\x00K\x02K\x03KSC\x0ct\x00|\x00j\x01|\x01\x83\x02S\x00\x94N\x85\x94\x8c\x07getattr\x94h#\x86\x94h&\x8c\x04name\x94\x86\x94h(hFKxC\x02\x00\x01\x94))t\x94R\x94h,NNNt\x94R\x94h5hQ}\x94}\x94(h/hFh8\x8c9RayTaskError.as_instanceof_cause.<locals>.cls.__getattr__\x94h:}\x94h<Nh=Nh>h
h?Nh@NhA]\x94hC}\x94u\x86\x94\x86R0\x8c\x07__str__\x94h\x1d(h (K\x01K\x00K\x01K\x01K\x13C\x04\x88\x00S\x00\x94N\x85\x94)h&\x85\x94h(hYK{C\x02\x00\x01\x94\x8c\terror_msg\x94\x85\x94)t\x94R\x94h,NNh\x03\x8c\x10_make_empty_cell\x94\x93\x94)R\x94\x85\x94t\x94R\x94h5hg}\x94}\x94(h/hYh8\x8c5RayTaskError.as_instanceof_cause.<locals>.cls.__str__\x94h:}\x94h<Nh=Nh>h
h?Nh@h\x03\x8c
_make_cell\x94\x93\x94Xp\x16\x00\x00\x1b[36mray::_wrapped()\x1b[39m (pid=401, ip=172.31.15.219)
  File "python/ray/_raylet.pyx", line 534, in ray._raylet.execute_task
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/main.py", line 1146, in _wrapped
  File "/home/ray/anaconda3/lib/python3.7/site-packages/xgboost_ray/main.py", line 1249, in train
    dtrain.load_data(ray_params.num_actors)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/xgboost_ray/matrix.py", line 745, in load_data
    self.num_actors, self.sharding, rank=rank)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/xgboost_ray/matrix.py", line 318, in load_data
    data_source = self.get_data_source()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/xgboost_ray/matrix.py", line 302, in get_data_source
    self._cached_n = data_source.get_n(self.data)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/xgboost_ray/data_sources/dask.py", line 118, in get_n
    return len(data)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/dask/dataframe/core.py", line 3880, in __len__
    return len(s)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/dask/dataframe/core.py", line 565, in __len__
    len, np.sum, token="len", meta=int, split_every=False
  File "/home/ray/anaconda3/lib/python3.7/site-packages/dask/base.py", line 286, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/dask/base.py", line 568, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/dask/threaded.py", line 87, in get
    **kwargs
  File "/home/ray/anaconda3/lib/python3.7/site-packages/dask/local.py", line 514, in get_async
    raise_exception(exc, tb)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/dask/local.py", line 325, in reraise
    raise exc
  File "/home/ray/anaconda3/lib/python3.7/site-packages/dask/local.py", line 223, in execute_task
    result = _execute_task(task, data)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/ray/anaconda3/lib/python3.7/site-packages/dask/core.py", line 121, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/ray/anaconda3/lib/python3.7/site-packages/dask/core.py", line 115, in _execute_task
    return [_execute_task(a, cache) for a in arg]
  File "/home/ray/anaconda3/lib/python3.7/site-packages/dask/core.py", line 115, in <listcomp>
    return [_execute_task(a, cache) for a in arg]
  File "/home/ray/anaconda3/lib/python3.7/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/ray/anaconda3/lib/python3.7/site-packages/dask/bytes/core.py", line 169, in read_block_from_file
    with copy.copy(lazy_file) as f:
  File "/home/ray/anaconda3/lib/python3.7/site-packages/fsspec/core.py", line 102, in __enter__
    f = self.fs.open(self.path, mode=mode)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/fsspec/spec.py", line 982, in open
    **kwargs,
  File "/home/ray/anaconda3/lib/python3.7/site-packages/s3fs/core.py", line 545, in _open
    requester_pays=requester_pays,
  File "/home/ray/anaconda3/lib/python3.7/site-packages/s3fs/core.py", line 1824, in __init__
    s3, path, mode, block_size, autocommit=autocommit, cache_type=cache_type
  File "/home/ray/anaconda3/lib/python3.7/site-packages/fsspec/spec.py", line 1304, in __init__
    self.details = fs.info(path)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/fsspec/asyn.py", line 88, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/fsspec/asyn.py", line 69, in sync
    raise result[0]
  File "/home/ray/anaconda3/lib/python3.7/site-packages/fsspec/asyn.py", line 25, in _runner
    result[0] = await coro
  File "/home/ray/anaconda3/lib/python3.7/site-packages/s3fs/core.py", line 1064, in _info
    out = await self._simple_info(path)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/s3fs/core.py", line 984, in _simple_info
    **self.req_kw,
  File "/home/ray/anaconda3/lib/python3.7/site-packages/s3fs/core.py", line 265, in _call_s3
    raise translate_boto_error(err)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/s3fs/core.py", line 246, in _call_s3
    out = await method(**additional_kwargs)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/aiobotocore/client.py", line 141, in _make_api_call
    operation_model, request_dict, request_context)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/aiobotocore/client.py", line 160, in _make_request
    return await self._endpoint.make_request(operation_model, request_dict)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/aiobotocore/endpoint.py", line 85, in _send_request
    request = await self.create_request(request_dict, operation_model)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/aiobotocore/endpoint.py", line 79, in create_request
    operation_name=operation_model.name)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/aiobotocore/hooks.py", line 27, in _emit
    response = await handler(**kwargs)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/aiobotocore/signers.py", line 16, in handler
    return await self.sign(operation_name, request)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/aiobotocore/signers.py", line 63, in sign
    auth.add_auth(request)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/botocore/auth.py", line 357, in add_auth
    raise NoCredentialsError
botocore.exceptions.NoCredentialsError: Unable to locate credentials\x94\x85\x94R\x94\x85\x94hA]\x94hC}\x94u\x86\x94\x86R0h?Nu}\x94\x86\x94\x86R0N}\x94\x87\x94R\x94.'
Traceback (most recent call last):
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/client/worker.py", line 244, in _get
    err = cloudpickle.loads(data.error)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/botocore/exceptions.py", line 28, in _exception_from_packed_args
    return exception_cls(*args, **kwargs)
TypeError: __init__() missing 1 required positional argument: 'cause'

This is from running the latest commit in https://github.com/ray-project/ray/pull/16518/files. A few quick observations:

  1. This is coming from the head node ((pid=401, ip=172.31.15.219))
  2. This only happens when loading a large dataset. Running the same script with --smoke-test succeeds.

@sasha-s
Copy link
Contributor

sasha-s commented Jul 21, 2021

I am unable to reproduce the original problem TypeError: 'ray._raylet.ObjectRef' object is not subscriptable, running local on Darwin,
using

dask                     2021.7.0
ray                      2.0.0.dev0          /Users/sashasobol/code/ray/python
xgboost                  1.4.2
xgboost-ray              0.1.2

nor with

dask                     2021.6.2
ray                      2.0.0.dev0          /Users/sashasobol/code/ray/python
xgboost                  1.4.2
xgboost-ray              0.1.0

df_train = df_train.persist() is happy and everything finishes just fine.

The other problem (with creds) is still there.
As a workaround I downloaded the simpleHIGGS.csv from S3 and pointed the script to the local copy.

@ijrsvt
Copy link
Contributor

ijrsvt commented Jul 21, 2021

@matthewdeng Just to confirm: The main initial issue is resolved with removing persist() (or specifying the dask scheduler locally solve this issue.

Would it be okay to close & open a follow-up about the credentials issue?

@matthewdeng
Copy link
Contributor Author

@ijrsvt I'm okay with opening a separate issue to track the credentials issue, but would it be reasonable to keep this one open since I don't think we've actually fixed the root cause of the original scheduler issue?

@sasha-s
Copy link
Contributor

sasha-s commented Jul 22, 2021

I can not reproduce the original issue.

The problem with credentials is separate: AWS environment variables are not propagated properly.

@ijrsvt
Copy link
Contributor

ijrsvt commented Jul 22, 2021

@sasha-s I think it only arose when the Client Server was on a remote machine

@matthewdeng The solution is to do dask.config.set(scheduler=ray_dask_get) on the server.

I guess is there some follow-up for this:

Maybe the Dask data source for XGBoost-on-Ray should eagerly set a global config for the scheduler, e.g. dask.config.set(scheduler=ray_dask_get_sync). cc @krfricke

@richardliaw
Copy link
Contributor

@ijrsvt what's the current status on this?
@sasha-s did you try reproducing with a ray cluster using ray.init() from your laptop?

@sasha-s
Copy link
Contributor

sasha-s commented Aug 5, 2021

@richardliaw was not able to reproduce.
There is a separate issue https://github.com/anyscale/product/issues/5879 for credentials problem.

@ijrsvt
Copy link
Contributor

ijrsvt commented Aug 9, 2021

@richardliaw @matthewdeng Can we close this issue as the initial problem is solved by dask.config.set(scheduler=ray_dask_get) on the server.

@AmeerHajAli AmeerHajAli assigned sasha-s and ijrsvt and unassigned ijrsvt Aug 9, 2021
@matthewdeng
Copy link
Contributor Author

Actually @Yard1 is this suggestion something you can help with on the xgboost_ray side?

@Yard1
Copy link
Member

Yard1 commented Aug 10, 2021

I'll take a look!

@ijrsvt
Copy link
Contributor

ijrsvt commented Aug 23, 2021

@Yard1 Any updates on trying to implement the suggestion?

@Yard1
Copy link
Member

Yard1 commented Aug 23, 2021

We are still trying to figure out the best way to take care of this. Ideally this would be taken care of on the Client's side

@ijrsvt
Copy link
Contributor

ijrsvt commented Aug 23, 2021

@Yard1, Would ensuring all calls to the Dask class methods first call

from ray.util.dask import ray_dask_get
ask.config.set(scheduler=ray_dask_get)

be okay?

@Yard1
Copy link
Member

Yard1 commented Aug 23, 2021

Yeah that should work actually. I'd be happy to test this!

@AmeerHajAli
Copy link
Contributor

@ijrsvt , thanks for closing the loop on this one!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants