Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tune] pyarrow.fs hangs indefinitely while writing checkpoint file #26802

Closed
gjoliver opened this issue Jul 20, 2022 · 7 comments
Closed

[Tune] pyarrow.fs hangs indefinitely while writing checkpoint file #26802

gjoliver opened this issue Jul 20, 2022 · 7 comments
Assignees
Labels
bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) tune Tune-related issues

Comments

@gjoliver
Copy link
Member

What happened + What you expected to happen

Can be reliably reproduced when writing checkpoints to GCS. Not sure about S3.
Problem usually happens after we continuously write about 200 RLlib checkpoints, in 12 to 24 hrs of time.
There was 1 case where the job unblocked itself after 48 hrs without any intervention.

I was able to attach to the Ray node and grab a stack trace:

Thread 371 (idle): "MainThread"
    pthread_cond_wait@@GLIBC_2.3.2 (libpthread-2.27.so)
    std::condition_variable::wait (condition_variable.cc:55)
    arrow::FutureImpl::Wait (pyarrow/libarrow.so.600)
    arrow::fs::CopyFiles (pyarrow/libarrow.so.600)
    arrow::fs::CopyFiles (pyarrow/libarrow.so.600)
    _copy_files_selector (pyarrow/_fs.cpython-37m-x86_64-linux-gnu.so)
    copy_files (pyarrow/fs.py:246)
    upload_to_uri (ray/air/_internal/remote_storage.py:173)
    to_uri (ray/air/checkpoint.py:557)
    <lambda> (ray/tune/trainable/trainable.py:507)
    retry_fn (ray/tune/utils/util.py:132)
    _maybe_save_to_cloud (ray/tune/trainable/trainable.py:510)
    _resume_span (ray/util/tracing/tracing_helper.py:466)
    save (ray/tune/trainable/trainable.py:490)
    _resume_span (ray/util/tracing/tracing_helper.py:466)
    actor_method_executor (ray/_private/function_manager.py:674)
    function_executor (ray/_raylet.so)
    _raylet_task_execution_handler (ray/_raylet.so)
    std::_Function_handler<ray::Status(ray::rpc::TaskType, std::string, ray::core::RayFunction const&, std::unordered_map<std::string, double, std::hash<std::string>, std::equal_to<std::string>, std::allocator<std::pair<std::string const, double> > > const&, std::vector<std::shared_ptr<ray::RayObject>, std::allocator<std::shared_ptr<ray::RayObject> > > const&, std::vector<ray::rpc::ObjectReference, std::allocator<ray::rpc::ObjectReference> > const&, std::vector<ray::ObjectID, std::allocator<ray::ObjectID> > const&, std::string const&, std::vector<std::shared_ptr<ray::RayObject>, std::allocator<std::shared_ptr<ray::RayObject> > >*, std::shared_ptr<ray::LocalMemoryBuffer>&, bool*, std::vector<ray::ConcurrencyGroup, std::allocator<ray::ConcurrencyGroup> > const&, std::string), ray::Status (*)(ray::rpc::TaskType, std::string, ray::core::RayFunction const&, std::unordered_map<std::string, double, std::hash<std::string>, std::equal_to<std::string>, std::allocator<std::pair<std::string const, double> > > const&, std::vector<std::shared_ptr<ray::RayObject>, std::allocator<std::shared_ptr<ray::RayObject> > > const&, std::vector<ray::rpc::ObjectReference, std::allocator<ray::rpc::ObjectReference> > const&, std::vector<ray::ObjectID, std::allocator<ray::ObjectID> > const&, std::string, std::vector<std::shared_ptr<ray::RayObject>, std::allocator<std::shared_ptr<ray::RayObject> > >*, std::shared_ptr<ray::LocalMemoryBuffer>&, bool*, std::vector<ray::ConcurrencyGroup, std::allocator<ray::ConcurrencyGroup> > const&, std::string)>::_M_invoke (ray/_raylet.so)
    ray::core::CoreWorker::ExecuteTask (ray/_raylet.so)
    std::_Function_handler<ray::Status(ray::TaskSpecification const&, std::shared_ptr<std::unordered_map<std::string, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > >, std::hash<std::string>, std::equal_to<std::string>, std::allocator<std::pair<std::string const, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > > > > > >, std::vector<std::shared_ptr<ray::RayObject>, std::allocator<std::shared_ptr<ray::RayObject> > >*, google::protobuf::RepeatedPtrField<ray::rpc::ObjectReferenceCount>*, bool*), std::_Bind<ray::Status (ray::core::CoreWorker(ray::core::CoreWorker*, std::_Placeholder<1>, std::_Placeholder<2>, std::_Placeholder<3>, std::_Placeholder<4>, std::_Placeholder<5>)::*)(ray::TaskSpecification const&, std::shared_ptr<std::unordered_map<std::string, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > >, std::hash<std::string>, std::equal_to<std::string>, std::allocator<std::pair<std::string const, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > > > > > > const&, std::vector<std::shared_ptr<ray::RayObject>, std::allocator<std::shared_ptr<ray::RayObject> > >*, google::protobuf::RepeatedPtrField<ray::rpc::ObjectReferenceCount>*, bool*)> >::_M_invoke (ray/_raylet.so)
    ray::core::CoreWorkerDirectTaskReceiver::HandleTask(ray::rpc::PushTaskRequest const&, ray::rpc::PushTaskReply*, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)::{lambda(std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)#1}::operator() const (ray/_raylet.so)
    std::_Function_handler<void (std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>), ray::core::CoreWorkerDirectTaskReceiver::HandleTask(ray::rpc::PushTaskRequest const&, ray::rpc::PushTaskReply*, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)::{lambda(std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)#1}>::_M_invoke (ray/_raylet.so)
    ray::core::InboundRequest::Accept (ray/_raylet.so)
    ray::core::ActorSchedulingQueue::ScheduleRequests (ray/_raylet.so)
    ray::core::ActorSchedulingQueue::Add (ray/_raylet.so)
    ray::core::CoreWorkerDirectTaskReceiver::HandleTask (ray/_raylet.so)
    std::_Function_handler<void (), ray::core::CoreWorker::HandlePushTask(ray::rpc::PushTaskRequest const&, ray::rpc::PushTaskReply*, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)::{lambda()#1}>::_M_invoke (ray/_raylet.so)
    EventTracker::RecordExecution (ray/_raylet.so)
    std::_Function_handler<void (), instrumented_io_context::post(std::function<void ()>, std::string)::{lambda()#1}>::_M_invoke (ray/_raylet.so)
    boost::asio::detail::completion_handler<std::function<void ()>, boost::asio::io_context::basic_executor_type<std::allocator<void>, (unsigned int)0> >::do_complete (ray/_raylet.so)
    boost::asio::detail::scheduler::do_run_one (ray/_raylet.so)
    boost::asio::detail::scheduler::run (ray/_raylet.so)
    boost::asio::io_context::run (ray/_raylet.so)
    ray::core::CoreWorker::RunTaskExecutionLoop (ray/_raylet.so)
    ray::core::CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop (ray/_raylet.so)
    ray::core::CoreWorkerProcess::RunTaskExecutionLoop (ray/_raylet.so)
    run_task_loop (ray/_raylet.so)
    main_loop (ray/_private/worker.py:754)
    <module> (ray/_private/workers/default_worker.py:237)

pretty sure this is the PR e3bd598
and per Tune team, there is a workaround to manually construct a SyncCfg to fallback to the old gsutil sync.

Versions / Dependencies

Since Ray 1.13.0

Reproduction script

Any RLlib jobs (>1.13.0) that syncs to GCS will do.

Issue Severity

High: It blocks me from completing my task.

@gjoliver gjoliver added bug Something that is supposed to be working; but isn't tune Tune-related issues triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Jul 20, 2022
@gjoliver gjoliver changed the title [Tune] pyarrow.fs hangs while writing checkpoint file [Tune] pyarrow.fs hangs indefinitely while writing checkpoint file Jul 20, 2022
@richardliaw
Copy link
Contributor

cc @ericl

@ericl
Copy link
Contributor

ericl commented Jul 20, 2022

Since AIR is using pyarrow.fs separately, this is not related to e3bd598.

If we are seeing writes hang, then we should either add timeouts around it, or try to figure out the root cause of the fs hang (could be a bug in pyarrow).

@krfricke
Copy link
Contributor

The relevant change was introduced here: 1465eaa#diff-e1d889098f6b27e0d88ba206b0689d77c1a320d58697d98933decde97fd3cac8R486

How large are the checkpoints? And how many files?

I agree we should probably add a timeout around it, but I'm wondering how we should configure this. Maybe with an env variable

@xwjiang2010
Copy link
Contributor

@gjoliver could we get a reproduce? I think we need to figure out why pyarrow is not working..

@krfricke
Copy link
Contributor

In July, the latest pyarrow release was 8.0.0, which didn't have native gcs support, yet. Thus this used GCS via gcsfs and a FSSpecHandler, which could contribute to the problem. Pyarrow 9.0.0 comes with native gcs support, so it might resolve the problem.

In any case, I think adding a timeout is a good way to go forward.

@gjoliver
Copy link
Member Author

the checkpoints are in the order of 100MB.
repro will be super hard, since it only happens after 12hrs of runtime.
a single hardcoded timeout may not be a good idea, since this is file size dependent.
any idea how to check if the copy has made any progress in a timeout period for example?

@krfricke
Copy link
Contributor

krfricke commented Sep 6, 2022

Closed by #28155

@krfricke krfricke closed this as completed Sep 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) tune Tune-related issues
Projects
None yet
Development

No branches or pull requests

5 participants