Skip to content

Commit

Permalink
Forcibly move S3ClientLock when using it
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Jul 8, 2023
1 parent fc23c02 commit 85a0144
Showing 1 changed file with 31 additions and 32 deletions.
63 changes: 31 additions & 32 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,8 @@ class S3ClientLock {
S3Client* get() { return client_.get(); }
S3Client* operator->() { return client_.get(); }

S3ClientLock Move() { return std::move(*this); }

protected:
friend class S3ClientHolder;

Expand Down Expand Up @@ -809,15 +811,15 @@ Result<S3ClientLock> S3ClientHolder::Lock() {
}
// Do not hold mutex while taking finalizer lock below.
//
// Note that acquiring a shared_mutex in shared mode may block even if
// it is not acquired in exclusive mode, because of pending writers:
// Acquiring a shared_mutex in shared mode may block even if not already
// acquired in exclusive mode, because of pending writers:
// https://github.com/google/sanitizers/issues/1668#issuecomment-1624985664
// """It is implementation-defined whether the calling thread acquires
// the lock when a writer does not hold the lock and there are writers
// blocked on the lock""".
//
// Therefore, we want to avoid potentially lock ordering issues (GH-36523)
// even when a shared lock is involved.
// Therefore, we want to avoid potential lock ordering issues
// even when a shared lock is involved (GH-36523).
if (!finalizer) {
return ErrorS3Finalized();
}
Expand Down Expand Up @@ -1179,7 +1181,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
req.SetKey(ToAwsString(path_.key));

ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
auto outcome = std::move(client_lock)->HeadObject(req);
auto outcome = client_lock.Move()->HeadObject(req);
if (!outcome.IsSuccess()) {
if (IsNotFound(outcome.GetError())) {
return PathNotFound(path_);
Expand Down Expand Up @@ -1364,7 +1366,7 @@ class ObjectOutputStream final : public io::OutputStream {
req.SetContentType("application/octet-stream");
}

auto outcome = std::move(client_lock)->CreateMultipartUpload(req);
auto outcome = client_lock.Move()->CreateMultipartUpload(req);
if (!outcome.IsSuccess()) {
return ErrorToStatus(
std::forward_as_tuple("When initiating multiple part upload for key '",
Expand All @@ -1389,7 +1391,7 @@ class ObjectOutputStream final : public io::OutputStream {
req.SetKey(ToAwsString(path_.key));
req.SetUploadId(upload_id_);

auto outcome = std::move(client_lock)->AbortMultipartUpload(req);
auto outcome = client_lock.Move()->AbortMultipartUpload(req);
if (!outcome.IsSuccess()) {
return ErrorToStatus(
std::forward_as_tuple("When aborting multiple part upload for key '", path_.key,
Expand Down Expand Up @@ -1440,7 +1442,7 @@ class ObjectOutputStream final : public io::OutputStream {
req.SetMultipartUpload(std::move(completed_upload));

auto outcome =
std::move(client_lock)->CompleteMultipartUploadWithErrorFixup(std::move(req));
client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req));
if (!outcome.IsSuccess()) {
return ErrorToStatus(
std::forward_as_tuple("When completing multiple part upload for key '",
Expand Down Expand Up @@ -1559,7 +1561,7 @@ class ObjectOutputStream final : public io::OutputStream {
if (!background_writes_) {
req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
auto outcome = std::move(client_lock)->UploadPart(req);
auto outcome = client_lock.Move()->UploadPart(req);
if (!outcome.IsSuccess()) {
return UploadPartError(req, outcome);
} else {
Expand Down Expand Up @@ -1589,7 +1591,7 @@ class ObjectOutputStream final : public io::OutputStream {
state = upload_state_,
part_number = part_number_]() mutable -> Status {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder->Lock());
auto outcome = std::move(client_lock)->UploadPart(req);
auto outcome = client_lock.Move()->UploadPart(req);
HandleUploadOutcome(state, part_number, req, outcome);
return Status::OK();
};
Expand Down Expand Up @@ -1767,8 +1769,8 @@ struct TreeWalker : public std::enable_shared_from_this<TreeWalker> {
walker->task_group_->Append([cb]() mutable {
ARROW_ASSIGN_OR_RAISE(auto client_lock, cb.walker->holder_->Lock());
Result<S3Model::ListObjectsV2Outcome> result =
std::move(client_lock)->ListObjectsV2(cb.req);
return cb(result);
client_lock.Move()->ListObjectsV2(cb.req);
return cb(std::move(result));
});
}

Expand Down Expand Up @@ -1868,7 +1870,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
S3Model::HeadBucketRequest req;
req.SetBucket(ToAwsString(bucket));

auto outcome = std::move(client_lock)->HeadBucket(req);
auto outcome = client_lock.Move()->HeadBucket(req);
if (!outcome.IsSuccess()) {
if (!IsNotFound(outcome.GetError())) {
return ErrorToStatus(std::forward_as_tuple(
Expand All @@ -1882,13 +1884,12 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

// Create a bucket. Successful if bucket already exists.
Status CreateBucket(const std::string& bucket) {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());

// Check bucket exists first.
{
S3Model::HeadBucketRequest req;
req.SetBucket(ToAwsString(bucket));
auto outcome = client_lock->HeadBucket(req);
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
auto outcome = client_lock.Move()->HeadBucket(req);

if (outcome.IsSuccess()) {
return Status::OK();
Expand Down Expand Up @@ -1918,7 +1919,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
req.SetBucket(ToAwsString(bucket));
req.SetCreateBucketConfiguration(config);

auto outcome = client_lock->CreateBucket(req);
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
auto outcome = client_lock.Move()->CreateBucket(req);
if (!outcome.IsSuccess() && !IsAlreadyExists(outcome.GetError())) {
return ErrorToStatus(std::forward_as_tuple("When creating bucket '", bucket, "': "),
"CreateBucket", outcome.GetError());
Expand All @@ -1936,7 +1938,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
req.SetBody(std::make_shared<std::stringstream>(""));
return OutcomeToStatus(
std::forward_as_tuple("When creating key '", key, "' in bucket '", bucket, "': "),
"PutObject", client_lock->PutObject(req));
"PutObject", client_lock.Move()->PutObject(req));
}

Status CreateEmptyDir(const std::string& bucket, const std::string& key) {
Expand All @@ -1952,7 +1954,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
req.SetKey(ToAwsString(key));
return OutcomeToStatus(
std::forward_as_tuple("When delete key '", key, "' in bucket '", bucket, "': "),
"DeleteObject", client_lock->DeleteObject(req));
"DeleteObject", client_lock.Move()->DeleteObject(req));
}

Status CopyObject(const S3Path& src_path, const S3Path& dest_path) {
Expand All @@ -1968,7 +1970,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
std::forward_as_tuple("When copying key '", src_path.key, "' in bucket '",
src_path.bucket, "' to key '", dest_path.key,
"' in bucket '", dest_path.bucket, "': "),
"CopyObject", client_lock->CopyObject(req));
"CopyObject", client_lock.Move()->CopyObject(req));
}

// On Minio, an empty "directory" doesn't satisfy the same API requests as
Expand Down Expand Up @@ -2007,7 +2009,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
req.SetKey(ToAwsString(key));
}

auto outcome = std::move(client_lock)->HeadObject(req);
auto outcome = client_lock.Move()->HeadObject(req);
if (outcome.IsSuccess()) {
return true;
}
Expand Down Expand Up @@ -2040,7 +2042,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
req.SetPrefix(ToAwsString(path.key) + kSep);
req.SetDelimiter(Aws::String() + kSep);
req.SetMaxKeys(1);
auto outcome = client_lock->ListObjectsV2(req);
auto outcome = client_lock.Move()->ListObjectsV2(req);
if (outcome.IsSuccess()) {
const S3Model::ListObjectsV2Result& r = outcome.GetResult();
// In some cases, there may be 0 keys but some prefixes
Expand Down Expand Up @@ -2294,7 +2296,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
SubmitIO(io_context_,
[holder = holder_, req = std::move(req), delete_cb]() -> Status {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder->Lock());
return delete_cb(std::move(client_lock)->DeleteObjects(req));
return delete_cb(client_lock.Move()->DeleteObjects(req));
}));
futures.push_back(std::move(fut));
}
Expand Down Expand Up @@ -2360,17 +2362,14 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

Result<std::vector<std::string>> ListBuckets() {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());

auto outcome = std::move(client_lock)->ListBuckets();
return ProcessListBuckets(outcome);
return ProcessListBuckets(client_lock.Move()->ListBuckets());
}

Future<std::vector<std::string>> ListBucketsAsync(io::IOContext ctx) {
auto deferred =
[self = shared_from_this()]() mutable -> Result<std::vector<std::string>> {
ARROW_ASSIGN_OR_RAISE(auto client_lock, self->holder_->Lock());
auto outcome = std::move(client_lock)->ListBuckets();
return self->ProcessListBuckets(outcome);
return self->ProcessListBuckets(client_lock.Move()->ListBuckets());
};
return DeferNotOk(SubmitIO(ctx, std::move(deferred)));
}
Expand Down Expand Up @@ -2462,7 +2461,7 @@ Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
S3Model::HeadBucketRequest req;
req.SetBucket(ToAwsString(path.bucket));

auto outcome = std::move(client_lock)->HeadBucket(req);
auto outcome = client_lock.Move()->HeadBucket(req);
if (!outcome.IsSuccess()) {
if (!IsNotFound(outcome.GetError())) {
const auto msg = "When getting information for bucket '" + path.bucket + "': ";
Expand All @@ -2482,7 +2481,7 @@ Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
req.SetBucket(ToAwsString(path.bucket));
req.SetKey(ToAwsString(path.key));

auto outcome = std::move(client_lock)->HeadObject(req);
auto outcome = client_lock.Move()->HeadObject(req);
if (outcome.IsSuccess()) {
// "File" object found
FileObjectToInfo(outcome.GetResult(), &info);
Expand Down Expand Up @@ -2637,7 +2636,7 @@ Status S3FileSystem::DeleteDir(const std::string& s) {
req.SetBucket(ToAwsString(path.bucket));
return OutcomeToStatus(
std::forward_as_tuple("When deleting bucket '", path.bucket, "': "),
"DeleteBucket", client_lock->DeleteBucket(req));
"DeleteBucket", client_lock.Move()->DeleteBucket(req));
} else if (path.key.empty()) {
return Status::IOError("Would delete bucket '", path.bucket, "'. ",
"To delete buckets, enable the allow_bucket_deletion option.");
Expand Down Expand Up @@ -2689,7 +2688,7 @@ Status S3FileSystem::DeleteFile(const std::string& s) {
req.SetBucket(ToAwsString(path.bucket));
req.SetKey(ToAwsString(path.key));

auto outcome = std::move(client_lock)->HeadObject(req);
auto outcome = client_lock.Move()->HeadObject(req);
if (!outcome.IsSuccess()) {
if (IsNotFound(outcome.GetError())) {
return PathNotFound(path);
Expand Down

0 comments on commit 85a0144

Please sign in to comment.