Skip to content

Commit

Permalink
apacheGH-36523: [C++] Fix TSan-detected lock ordering issues in S3
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Jul 7, 2023
1 parent 6a57a08 commit 69bf7b3
Showing 1 changed file with 59 additions and 46 deletions.
105 changes: 59 additions & 46 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -800,26 +800,47 @@ class S3ClientFinalizer : public std::enable_shared_from_this<S3ClientFinalizer>
};

Result<S3ClientLock> S3ClientHolder::Lock() {
std::lock_guard lock(mutex_);
auto finalizer = finalizer_.lock();
std::shared_ptr<S3ClientFinalizer> finalizer;
std::shared_ptr<S3Client> client;
{
std::unique_lock lock(mutex_);
finalizer = finalizer_.lock();
client = client_;
}
// Do not hold mutex while taking finalizer lock below.
//
// Note that acquiring a shared_mutex in shared mode may block even if
// it is not acquired in exclusive mode, because of pending writers:
// https://github.com/google/sanitizers/issues/1668#issuecomment-1624985664
// """It is implementation-defined whether the calling thread acquires
// the lock when a writer does not hold the lock and there are writers
// blocked on the lock""".
//
// Therefore, we want to avoid potentially lock ordering issues (GH-36523)
// even when a shared lock is involved.
if (!finalizer) {
return ErrorS3Finalized();
}

S3ClientLock client_lock;
// Lock the finalizer before examining it
client_lock.lock_ = finalizer->LockShared();
if (finalizer->finalized_) {
return ErrorS3Finalized();
}
// (the client can be cleared only if finalizer->finalized_ is true)
DCHECK(client_) << "inconsistent S3ClientHolder";
client_lock.client_ = client_;
DCHECK(client) << "inconsistent S3ClientHolder";
client_lock.client_ = std::move(client);
return client_lock;
}

void S3ClientHolder::Finalize() {
std::lock_guard lock(mutex_);
client_.reset();
std::shared_ptr<S3Client> client;
{
std::unique_lock lock(mutex_);
client = std::move(client_);
}
// Do not hold mutex while ~S3Client potentially runs
}

std::shared_ptr<S3ClientFinalizer> GetClientFinalizer() {
Expand Down Expand Up @@ -1158,7 +1179,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
req.SetKey(ToAwsString(path_.key));

ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
auto outcome = client_lock->HeadObject(req);
auto outcome = std::move(client_lock)->HeadObject(req);
if (!outcome.IsSuccess()) {
if (IsNotFound(outcome.GetError())) {
return PathNotFound(path_);
Expand Down Expand Up @@ -1343,7 +1364,7 @@ class ObjectOutputStream final : public io::OutputStream {
req.SetContentType("application/octet-stream");
}

auto outcome = client_lock->CreateMultipartUpload(req);
auto outcome = std::move(client_lock)->CreateMultipartUpload(req);
if (!outcome.IsSuccess()) {
return ErrorToStatus(
std::forward_as_tuple("When initiating multiple part upload for key '",
Expand All @@ -1368,7 +1389,7 @@ class ObjectOutputStream final : public io::OutputStream {
req.SetKey(ToAwsString(path_.key));
req.SetUploadId(upload_id_);

auto outcome = client_lock->AbortMultipartUpload(req);
auto outcome = std::move(client_lock)->AbortMultipartUpload(req);
if (!outcome.IsSuccess()) {
return ErrorToStatus(
std::forward_as_tuple("When aborting multiple part upload for key '", path_.key,
Expand Down Expand Up @@ -1418,7 +1439,8 @@ class ObjectOutputStream final : public io::OutputStream {
req.SetUploadId(upload_id_);
req.SetMultipartUpload(std::move(completed_upload));

auto outcome = client_lock->CompleteMultipartUploadWithErrorFixup(std::move(req));
auto outcome =
std::move(client_lock)->CompleteMultipartUploadWithErrorFixup(std::move(req));
if (!outcome.IsSuccess()) {
return ErrorToStatus(
std::forward_as_tuple("When completing multiple part upload for key '",
Expand Down Expand Up @@ -1527,8 +1549,6 @@ class ObjectOutputStream final : public io::OutputStream {

Status UploadPart(const void* data, int64_t nbytes,
std::shared_ptr<Buffer> owned_buffer = nullptr) {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());

S3Model::UploadPartRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
Expand All @@ -1538,7 +1558,8 @@ class ObjectOutputStream final : public io::OutputStream {

if (!background_writes_) {
req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
auto outcome = client_lock->UploadPart(req);
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
auto outcome = std::move(client_lock)->UploadPart(req);
if (!outcome.IsSuccess()) {
return UploadPartError(req, outcome);
} else {
Expand All @@ -1562,21 +1583,17 @@ class ObjectOutputStream final : public io::OutputStream {
upload_state_->pending_parts_completed = Future<>::Make();
}
}
// XXX This callback returns Aws::Utils::Outcome, it cannot easily call
// `holder->Lock()` which returns arrow::Result.
ARROW_ASSIGN_OR_RAISE(
auto fut,
SubmitIO(io_context_, [client_lock = std::move(client_lock), req]() mutable {
return client_lock->UploadPart(req);
}));

// The closure keeps the buffer and the upload state alive
auto state = upload_state_;
auto part_number = part_number_;
auto handler = [owned_buffer, state, part_number,
req](const Result<S3Model::UploadPartOutcome>& result) -> void {
HandleUploadOutcome(state, part_number, req, result);
auto deferred = [owned_buffer, holder = holder_, req = std::move(req),
state = upload_state_,
part_number = part_number_]() mutable -> Status {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder->Lock());
auto outcome = std::move(client_lock)->UploadPart(req);
HandleUploadOutcome(state, part_number, req, outcome);
return Status::OK();
};
fut.AddCallback(std::move(handler));
RETURN_NOT_OK(SubmitIO(io_context_, std::move(deferred)));
}

++part_number_;
Expand Down Expand Up @@ -1749,7 +1766,8 @@ struct TreeWalker : public std::enable_shared_from_this<TreeWalker> {
auto cb = *this;
walker->task_group_->Append([cb]() mutable {
ARROW_ASSIGN_OR_RAISE(auto client_lock, cb.walker->holder_->Lock());
Result<S3Model::ListObjectsV2Outcome> result = client_lock->ListObjectsV2(cb.req);
Result<S3Model::ListObjectsV2Outcome> result =
std::move(client_lock)->ListObjectsV2(cb.req);
return cb(result);
});
}
Expand Down Expand Up @@ -1850,7 +1868,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
S3Model::HeadBucketRequest req;
req.SetBucket(ToAwsString(bucket));

auto outcome = client_lock->HeadBucket(req);
auto outcome = std::move(client_lock)->HeadBucket(req);
if (!outcome.IsSuccess()) {
if (!IsNotFound(outcome.GetError())) {
return ErrorToStatus(std::forward_as_tuple(
Expand Down Expand Up @@ -1989,7 +2007,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
req.SetKey(ToAwsString(key));
}

auto outcome = client_lock->HeadObject(req);
auto outcome = std::move(client_lock)->HeadObject(req);
if (outcome.IsSuccess()) {
return true;
}
Expand Down Expand Up @@ -2233,8 +2251,6 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
// Delete multiple objects at once
Future<> DeleteObjectsAsync(const std::string& bucket,
const std::vector<std::string>& keys) {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());

struct DeleteCallback {
std::string bucket;

Expand Down Expand Up @@ -2278,7 +2294,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
SubmitIO(io_context_,
[holder = holder_, req = std::move(req), delete_cb]() -> Status {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder->Lock());
return delete_cb(client_lock->DeleteObjects(req));
return delete_cb(std::move(client_lock)->DeleteObjects(req));
}));
futures.push_back(std::move(fut));
}
Expand Down Expand Up @@ -2345,21 +2361,18 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
Result<std::vector<std::string>> ListBuckets() {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());

auto outcome = client_lock->ListBuckets();
auto outcome = std::move(client_lock)->ListBuckets();
return ProcessListBuckets(outcome);
}

Future<std::vector<std::string>> ListBucketsAsync(io::IOContext ctx) {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());

return DeferNotOk(SubmitIO(ctx,
[client_lock = std::move(client_lock)]() mutable {
return client_lock->ListBuckets();
}))
// TODO(ARROW-12655) Change to Then(Impl::ProcessListBuckets)
.Then([](const Aws::S3::Model::ListBucketsOutcome& outcome) {
return Impl::ProcessListBuckets(outcome);
});
auto deferred =
[self = shared_from_this()]() mutable -> Result<std::vector<std::string>> {
ARROW_ASSIGN_OR_RAISE(auto client_lock, self->holder_->Lock());
auto outcome = std::move(client_lock)->ListBuckets();
return self->ProcessListBuckets(outcome);
};
return DeferNotOk(SubmitIO(ctx, std::move(deferred)));
}

Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const std::string& s,
Expand Down Expand Up @@ -2449,7 +2462,7 @@ Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
S3Model::HeadBucketRequest req;
req.SetBucket(ToAwsString(path.bucket));

auto outcome = client_lock->HeadBucket(req);
auto outcome = std::move(client_lock)->HeadBucket(req);
if (!outcome.IsSuccess()) {
if (!IsNotFound(outcome.GetError())) {
const auto msg = "When getting information for bucket '" + path.bucket + "': ";
Expand All @@ -2469,7 +2482,7 @@ Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
req.SetBucket(ToAwsString(path.bucket));
req.SetKey(ToAwsString(path.key));

auto outcome = client_lock->HeadObject(req);
auto outcome = std::move(client_lock)->HeadObject(req);
if (outcome.IsSuccess()) {
// "File" object found
FileObjectToInfo(outcome.GetResult(), &info);
Expand Down Expand Up @@ -2676,7 +2689,7 @@ Status S3FileSystem::DeleteFile(const std::string& s) {
req.SetBucket(ToAwsString(path.bucket));
req.SetKey(ToAwsString(path.key));

auto outcome = client_lock->HeadObject(req);
auto outcome = std::move(client_lock)->HeadObject(req);
if (!outcome.IsSuccess()) {
if (IsNotFound(outcome.GetError())) {
return PathNotFound(path);
Expand Down

0 comments on commit 69bf7b3

Please sign in to comment.