Skip to content

Commit

Permalink
apacheGH-39897: [C++][FS][S3] Ensure AwsInstance::EnsureInitialized
Browse files Browse the repository at this point in the history
… to do initialization exactly once under concurrency (apache#40110)

### Rationale for this change

`FileSystemFromUri` could be called concurrently, and its implicit call to `AwsInstance::EnsureInitialized` will cause segment fault due to data race.

Therefore, make init stage thread-safe for `AwsInstance::EnsureInitialized`.

### What changes are included in this PR?

Serialize calls to S3 initialization to ensure initialization is done exactly once.

### Are these changes tested?

Yes, a test is added for the PyArrow bindings.

### Are there any user-facing changes?

No.

* Closes: apache#39897
* GitHub Issue: apache#39897

Lead-authored-by: tsy <tangsiyang2001@foxmail.com>
Co-authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
2 people authored and thisisnic committed Mar 8, 2024
1 parent b29529a commit e61223b
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 5 deletions.
15 changes: 10 additions & 5 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2898,12 +2898,16 @@ struct AwsInstance {
if (is_finalized_.load()) {
return Status::Invalid("Attempt to initialize S3 after it has been finalized");
}
if (!is_initialized_.exchange(true)) {
// Not already initialized
bool newly_initialized = false;
// EnsureInitialized() can be called concurrently by FileSystemFromUri,
// therefore we need to serialize initialization (GH-39897).
std::call_once(initialize_flag_, [&]() {
bool was_initialized = is_initialized_.exchange(true);
DCHECK(!was_initialized);
DoInitialize(options);
return true;
}
return false;
newly_initialized = true;
});
return newly_initialized;
}

bool IsInitialized() { return !is_finalized_ && is_initialized_; }
Expand Down Expand Up @@ -2979,6 +2983,7 @@ struct AwsInstance {
Aws::SDKOptions aws_options_;
std::atomic<bool> is_initialized_;
std::atomic<bool> is_finalized_;
std::once_flag initialize_flag_;
};

AwsInstance* GetAwsInstance() {
Expand Down
23 changes: 23 additions & 0 deletions python/pyarrow/tests/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1904,3 +1904,26 @@ def test_s3_finalize_region_resolver():
resolve_s3_region('voltrondata-labs-datasets')
"""
subprocess.check_call([sys.executable, "-c", code])


@pytest.mark.s3
def test_concurrent_s3fs_init():
# GH-39897: lazy concurrent initialization of S3 subsystem should not crash
code = """if 1:
import threading
import pytest
from pyarrow.fs import (FileSystem, S3FileSystem,
ensure_s3_initialized, finalize_s3)
threads = []
fn = lambda: FileSystem.from_uri('s3://mf-nwp-models/README.txt')
for i in range(4):
thread = threading.Thread(target = fn)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
finalize_s3()
"""
subprocess.check_call([sys.executable, "-c", code])

0 comments on commit e61223b

Please sign in to comment.