From be8892e772738a40eaf324df5081b4fe851f34ef Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 7 Apr 2021 15:57:14 +0200 Subject: [PATCH] ARROW-12040: [C++] Fix potential deadlock in recursive S3 walks From a deadlocked run... ``` #0 0x00007f8a5d48dccd in __lll_lock_wait () from /lib64/libpthread.so.0 #1 0x00007f8a5d486f05 in pthread_mutex_lock () from /lib64/libpthread.so.0 #2 0x00007f8a566e7e89 in arrow::internal::FnOnce::FnImpl >::Callback >::invoke() () from /arrow/r/check/arrow.Rcheck/arrow/libs/arrow.so #3 0x00007f8a5650efa0 in arrow::FutureImpl::AddCallback(arrow::internal::FnOnce) () from /arrow/r/check/arrow.Rcheck/arrow/libs/arrow.so #4 0x00007f8a566e67a9 in arrow::fs::(anonymous namespace)::TreeWalker::ListObjectsV2Handler::SpawnListObjectsV2() () from /arrow/r/check/arrow.Rcheck/arrow/libs/arrow.so #5 0x00007f8a566e723f in arrow::fs::(anonymous namespace)::TreeWalker::WalkChild(std::__cxx11::basic_string, std::allocator >, int) () from /arrow/r/check/arrow.Rcheck/arrow/libs/arrow.so #6 0x00007f8a566e827d in arrow::internal::FnOnce::FnImpl >::Callback >::invoke() () from /arrow/r/check/arrow.Rcheck/arrow/libs/arrow.so #7 0x00007f8a5650efa0 in arrow::FutureImpl::AddCallback(arrow::internal::FnOnce) () from /arrow/r/check/arrow.Rcheck/arrow/libs/arrow.so #8 0x00007f8a566e67a9 in arrow::fs::(anonymous namespace)::TreeWalker::ListObjectsV2Handler::SpawnListObjectsV2() () from /arrow/r/check/arrow.Rcheck/arrow/libs/arrow.so #9 0x00007f8a566e723f in arrow::fs::(anonymous namespace)::TreeWalker::WalkChild(std::__cxx11::basic_string, std::allocator >, int) () from /arrow/r/check/arrow.Rcheck/arrow/libs/arrow.so #10 0x00007f8a566e74b1 in arrow::fs::(anonymous namespace)::TreeWalker::DoWalk() () from /arrow/r/check/arrow.Rcheck/arrow/libs/arrow.so ``` The callback `ListObjectsV2Handler` is being called recursively and the mutex is non-reentrant thus deadlock. To fix it I got rid of the mutex on `TreeWalker` by using `arrow::util::internal::TaskGroup` instead of manually tracking the #/status of in-flight requests. Closes #9842 from westonpace/bugfix/arrow-12040 Lead-authored-by: Weston Pace Co-authored-by: Antoine Pitrou Signed-off-by: Antoine Pitrou --- cpp/src/arrow/filesystem/s3fs.cc | 85 +++++++++++++------------------- 1 file changed, 34 insertions(+), 51 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 96d74a9ec0ac7..400442d215606 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -79,11 +79,13 @@ #include "arrow/util/future.h" #include "arrow/util/logging.h" #include "arrow/util/optional.h" +#include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" #include "arrow/util/windows_fixup.h" namespace arrow { +using internal::TaskGroup; using internal::Uri; namespace fs { @@ -1142,26 +1144,18 @@ struct TreeWalker : public std::enable_shared_from_this { recursion_handler_(std::move(recursion_handler)) {} private: + std::shared_ptr task_group_; std::mutex mutex_; - Future<> future_; - std::atomic num_in_flight_; Status DoWalk() { - future_ = decltype(future_)::Make(); - num_in_flight_ = 0; + task_group_ = + TaskGroup::MakeThreaded(io_context_.executor(), io_context_.stop_token()); WalkChild(base_dir_, /*nesting_depth=*/0); // When this returns, ListObjectsV2 tasks either have finished or will exit early - return future_.status(); + return task_group_->Finish(); } - bool is_finished() const { return future_.is_finished(); } - - void ListObjectsFinished(Status st) { - const auto in_flight = --num_in_flight_; - if (!st.ok() || !in_flight) { - future_.MarkFinished(std::move(st)); - } - } + bool ok() const { return task_group_->ok(); } struct ListObjectsV2Handler { std::shared_ptr walker; @@ -1169,55 +1163,46 @@ struct TreeWalker : public std::enable_shared_from_this { int32_t nesting_depth; S3Model::ListObjectsV2Request req; - void operator()(const Result& result) { + Status operator()(const Result& result) { // Serialize calls to operation-specific handlers - std::unique_lock guard(walker->mutex_); - if (walker->is_finished()) { + if (!walker->ok()) { // Early exit: avoid executing handlers if DoWalk() returned - return; + return Status::OK(); } if (!result.ok()) { - HandleError(result.status()); - return; + return result.status(); } const auto& outcome = *result; if (!outcome.IsSuccess()) { - Status st = walker->error_handler_(outcome.GetError()); - HandleError(std::move(st)); - return; + { + std::lock_guard guard(walker->mutex_); + return walker->error_handler_(outcome.GetError()); + } } - HandleResult(outcome.GetResult()); + return HandleResult(outcome.GetResult()); } void SpawnListObjectsV2() { - auto walker = this->walker; - auto req = this->req; - auto maybe_fut = walker->io_context_.executor()->Submit( - walker->io_context_.stop_token(), - [walker, req]() { return walker->client_->ListObjectsV2(req); }); - if (!maybe_fut.ok()) { - HandleError(maybe_fut.status()); - return; - } - maybe_fut->AddCallback(*this); + auto cb = *this; + walker->task_group_->Append([cb]() mutable { + Result result = + cb.walker->client_->ListObjectsV2(cb.req); + return cb(result); + }); } - void HandleError(Status status) { walker->ListObjectsFinished(std::move(status)); } - - void HandleResult(const S3Model::ListObjectsV2Result& result) { - bool recurse = result.GetCommonPrefixes().size() > 0; - if (recurse) { - auto maybe_recurse = walker->recursion_handler_(nesting_depth + 1); - if (!maybe_recurse.ok()) { - walker->ListObjectsFinished(maybe_recurse.status()); - return; + Status HandleResult(const S3Model::ListObjectsV2Result& result) { + bool recurse; + { + // Only one thread should be running result_handler_/recursion_handler_ at a time + std::lock_guard guard(walker->mutex_); + recurse = result.GetCommonPrefixes().size() > 0; + if (recurse) { + ARROW_ASSIGN_OR_RAISE(auto maybe_recurse, + walker->recursion_handler_(nesting_depth + 1)); + recurse &= maybe_recurse; } - recurse &= *maybe_recurse; - } - Status st = walker->result_handler_(prefix, result); - if (!st.ok()) { - walker->ListObjectsFinished(std::move(st)); - return; + RETURN_NOT_OK(walker->result_handler_(prefix, result)); } if (recurse) { walker->WalkChildren(result, nesting_depth + 1); @@ -1228,9 +1213,8 @@ struct TreeWalker : public std::enable_shared_from_this { DCHECK(!result.GetNextContinuationToken().empty()); req.SetContinuationToken(result.GetNextContinuationToken()); SpawnListObjectsV2(); - } else { - walker->ListObjectsFinished(Status::OK()); } + return Status::OK(); } void Start() { @@ -1246,7 +1230,6 @@ struct TreeWalker : public std::enable_shared_from_this { void WalkChild(std::string key, int32_t nesting_depth) { ListObjectsV2Handler handler{shared_from_this(), std::move(key), nesting_depth, {}}; - ++num_in_flight_; handler.Start(); }