From a9f14793193bf6655183ddb5e2244c907f059e3b Mon Sep 17 00:00:00 2001 From: Ivo Dilov Date: Tue, 12 Nov 2024 13:15:38 +0200 Subject: [PATCH 1/6] Reliable storage lock Introduces a new `ReliableStorageLock` and `ReliableStorageLockGuard` to be used as a slower but more reliable alternative to the existing `StorageLock`. It uses the new If-None-Match atomic put operations in S3. This commit: - Upgrades the aws-sdk-cpp in vcpkg (which needed a few additions because of some problematic dependencies) - Adds `write_if_none_match` capability to `AsyncStore`'s S3 and to `InMemoryStore` - Logic for `ReliableStorageLock` - C++ tests using the `InMemoryStore` Follow up commit will introduce a python integration test with real aws s3. --- cpp/arcticdb/CMakeLists.txt | 5 +- cpp/arcticdb/async/async_store.hpp | 9 + cpp/arcticdb/async/tasks.hpp | 17 ++ cpp/arcticdb/entity/key.cpp | 1 + cpp/arcticdb/entity/key.hpp | 6 +- cpp/arcticdb/storage/azure/azure_storage.hpp | 4 + .../storage/file/mapped_file_storage.hpp | 4 + cpp/arcticdb/storage/library.hpp | 8 + cpp/arcticdb/storage/lmdb/lmdb_storage.hpp | 4 + .../storage/memory/memory_storage.hpp | 4 + cpp/arcticdb/storage/mongo/mongo_storage.hpp | 4 + cpp/arcticdb/storage/s3/detail-inl.hpp | 21 ++ .../storage/s3/nfs_backed_storage.hpp | 4 + cpp/arcticdb/storage/s3/s3_client_wrapper.hpp | 3 +- cpp/arcticdb/storage/s3/s3_mock_client.cpp | 3 +- cpp/arcticdb/storage/s3/s3_mock_client.hpp | 3 +- cpp/arcticdb/storage/s3/s3_real_client.cpp | 6 +- cpp/arcticdb/storage/s3/s3_real_client.hpp | 3 +- cpp/arcticdb/storage/s3/s3_storage.cpp | 4 + cpp/arcticdb/storage/s3/s3_storage.hpp | 2 + cpp/arcticdb/storage/storage.hpp | 6 + cpp/arcticdb/storage/storages.hpp | 4 + cpp/arcticdb/storage/test/in_memory_store.hpp | 16 +- cpp/arcticdb/stream/piloted_clock.cpp | 1 + cpp/arcticdb/stream/piloted_clock.hpp | 9 + cpp/arcticdb/stream/stream_sink.hpp | 5 + .../stream/test/stream_test_common.cpp | 2 - cpp/arcticdb/util/error_code.hpp | 1 + cpp/arcticdb/util/reliable_storage_lock.hpp | 69 ++++++ cpp/arcticdb/util/reliable_storage_lock.tpp | 233 ++++++++++++++++++ cpp/arcticdb/util/storage_lock.hpp | 2 + .../util/test/test_reliable_storage_lock.cpp | 127 ++++++++++ cpp/vcpkg | 2 +- cpp/vcpkg.json | 49 +++- 34 files changed, 621 insertions(+), 20 deletions(-) create mode 100644 cpp/arcticdb/util/reliable_storage_lock.hpp create mode 100644 cpp/arcticdb/util/reliable_storage_lock.tpp create mode 100644 cpp/arcticdb/util/test/test_reliable_storage_lock.cpp diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index f7ca660960..2fca7962bb 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -367,6 +367,7 @@ set(arcticdb_srcs util/slab_allocator.hpp util/sparse_utils.hpp util/storage_lock.hpp + util/reliable_storage_lock.hpp util/string_utils.hpp util/thread_cached_int.hpp util/timeouts.hpp @@ -515,7 +516,7 @@ set(arcticdb_srcs version/version_store_api.cpp version/version_utils.cpp version/version_map_batch_methods.cpp - ) + util/reliable_storage_lock.tpp) add_library(arcticdb_core_object OBJECT ${arcticdb_srcs}) @@ -964,7 +965,7 @@ if(${TEST}) version/test/version_map_model.hpp python/python_handlers.cpp storage/test/common.hpp - ) + util/test/test_reliable_storage_lock.cpp) set(EXECUTABLE_PERMS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE) # 755 diff --git a/cpp/arcticdb/async/async_store.hpp b/cpp/arcticdb/async/async_store.hpp index bd712b8f06..a292f0aad8 100644 --- a/cpp/arcticdb/async/async_store.hpp +++ b/cpp/arcticdb/async/async_store.hpp @@ -151,6 +151,15 @@ entity::VariantKey write_sync( return WriteSegmentTask{library_}(std::move(encoded)); } +entity::VariantKey write_if_none_sync( + KeyType key_type, + const StreamId &stream_id, + SegmentInMemory &&segment) override { + util::check(is_ref_key_class(key_type), "Expected ref key type got {}", key_type); + auto encoded = EncodeRefTask{key_type, stream_id, std::move(segment), codec_, encoding_version_}(); + return WriteIfNoneTask{library_}(std::move(encoded)); +} + bool is_path_valid(const std::string_view path) const override { return library_->is_path_valid(path); } diff --git a/cpp/arcticdb/async/tasks.hpp b/cpp/arcticdb/async/tasks.hpp index 0663e58baf..f28c5ce2b3 100644 --- a/cpp/arcticdb/async/tasks.hpp +++ b/cpp/arcticdb/async/tasks.hpp @@ -187,6 +187,23 @@ struct WriteSegmentTask : BaseTask { } }; +struct WriteIfNoneTask : BaseTask { + std::shared_ptr lib_; + + explicit WriteIfNoneTask(std::shared_ptr lib) : + lib_(std::move(lib)) { + } + + ARCTICDB_MOVE_ONLY_DEFAULT(WriteIfNoneTask) + + VariantKey operator()(storage::KeySegmentPair &&key_seg) const { + ARCTICDB_SAMPLE(WriteSegmentTask, 0) + auto k = key_seg.variant_key(); + lib_->write_if_none(std::move(key_seg)); + return k; + } +}; + struct UpdateSegmentTask : BaseTask { std::shared_ptr lib_; storage::UpdateOpts opts_; diff --git a/cpp/arcticdb/entity/key.cpp b/cpp/arcticdb/entity/key.cpp index 5507b40e3d..599a06d53a 100644 --- a/cpp/arcticdb/entity/key.cpp +++ b/cpp/arcticdb/entity/key.cpp @@ -60,6 +60,7 @@ KeyData get_key_data(KeyType key_type) { STRING_REF(KeyType::APPEND_REF, aref, 'a') STRING_KEY(KeyType::MULTI_KEY, mref, 'm') STRING_REF(KeyType::LOCK, lref, 'x') + STRING_REF(KeyType::SLOW_LOCK, lref, 'x') STRING_REF(KeyType::SNAPSHOT_TOMBSTONE, ttomb, 'X') STRING_KEY(KeyType::APPEND_DATA, app, 'b') STRING_REF(KeyType::BLOCK_VERSION_REF, bvref, 'R') diff --git a/cpp/arcticdb/entity/key.hpp b/cpp/arcticdb/entity/key.hpp index 90a0db0f6d..e6ac27f974 100644 --- a/cpp/arcticdb/entity/key.hpp +++ b/cpp/arcticdb/entity/key.hpp @@ -182,12 +182,14 @@ enum class KeyType : int { * Used for storing the ids of storages that failed to sync */ REPLICATION_FAIL_INFO = 26, - /* * A reference key storing many versions, used to track state within some of our background jobs. */ BLOCK_VERSION_REF = 27, - + /* + * Used for a list based reliable storage lock + */ + SLOW_LOCK = 27, UNDEFINED }; diff --git a/cpp/arcticdb/storage/azure/azure_storage.hpp b/cpp/arcticdb/storage/azure/azure_storage.hpp index 2c8168bfc0..2b926e5cbe 100644 --- a/cpp/arcticdb/storage/azure/azure_storage.hpp +++ b/cpp/arcticdb/storage/azure/azure_storage.hpp @@ -37,6 +37,10 @@ class AzureStorage final : public Storage { protected: void do_write(Composite&& kvs) final; + void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final { + storage::raise("Atomic operations are only supported for s3 backend"); + }; + void do_update(Composite&& kvs, UpdateOpts opts) final; void do_read(Composite&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final; diff --git a/cpp/arcticdb/storage/file/mapped_file_storage.hpp b/cpp/arcticdb/storage/file/mapped_file_storage.hpp index 6371becd26..44d9fd365a 100644 --- a/cpp/arcticdb/storage/file/mapped_file_storage.hpp +++ b/cpp/arcticdb/storage/file/mapped_file_storage.hpp @@ -36,6 +36,10 @@ class MappedFileStorage final : public SingleFileStorage { void do_write(Composite&& kvs) override; + void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final { + storage::raise("Atomic operations are only supported for s3 backend"); + }; + void do_update(Composite&& kvs, UpdateOpts opts) override; void do_read(Composite&& ks, const ReadVisitor& visitor, storage::ReadKeyOpts opts) override; diff --git a/cpp/arcticdb/storage/library.hpp b/cpp/arcticdb/storage/library.hpp index 33aef916c7..e1a644d1cc 100644 --- a/cpp/arcticdb/storage/library.hpp +++ b/cpp/arcticdb/storage/library.hpp @@ -90,6 +90,14 @@ class Library { storages_->write(std::move(kvs)); } + void write_if_none(KeySegmentPair&& kv) { + if (open_mode() < OpenMode::WRITE) { + throw LibraryPermissionException(library_path_, open_mode(), "write"); + } + + storages_->write_if_none(std::move(kv)); + } + void update(Composite&& kvs, storage::UpdateOpts opts) { ARCTICDB_SAMPLE(LibraryUpdate, 0) if (open_mode() < OpenMode::WRITE) diff --git a/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp b/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp index b04af485b6..9c6fa838cc 100644 --- a/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp +++ b/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp @@ -38,6 +38,10 @@ class LmdbStorage final : public Storage { private: void do_write(Composite&& kvs) final; + void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final { + storage::raise("Atomic operations are only supported for s3 backend"); + }; + void do_update(Composite&& kvs, UpdateOpts opts) final; void do_read(Composite&& ks, const ReadVisitor& visitor, storage::ReadKeyOpts opts) final; diff --git a/cpp/arcticdb/storage/memory/memory_storage.hpp b/cpp/arcticdb/storage/memory/memory_storage.hpp index a02dd08269..34eda07a67 100644 --- a/cpp/arcticdb/storage/memory/memory_storage.hpp +++ b/cpp/arcticdb/storage/memory/memory_storage.hpp @@ -29,6 +29,10 @@ namespace arcticdb::storage::memory { private: void do_write(Composite&& kvs) final; + void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final { + storage::raise("Atomic operations are only supported for s3 backend"); + }; + void do_update(Composite&& kvs, UpdateOpts opts) final; void do_read(Composite&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final; diff --git a/cpp/arcticdb/storage/mongo/mongo_storage.hpp b/cpp/arcticdb/storage/mongo/mongo_storage.hpp index 7dbe22f999..bf37f5d5da 100644 --- a/cpp/arcticdb/storage/mongo/mongo_storage.hpp +++ b/cpp/arcticdb/storage/mongo/mongo_storage.hpp @@ -31,6 +31,10 @@ class MongoStorage final : public Storage { private: void do_write(Composite&& kvs) final; + void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final { + storage::raise("Atomic operations are only supported for s3 backend"); + }; + void do_update(Composite&& kvs, UpdateOpts opts) final; void do_read(Composite&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final; diff --git a/cpp/arcticdb/storage/s3/detail-inl.hpp b/cpp/arcticdb/storage/s3/detail-inl.hpp index d3503b0a0b..16ee41faa4 100644 --- a/cpp/arcticdb/storage/s3/detail-inl.hpp +++ b/cpp/arcticdb/storage/s3/detail-inl.hpp @@ -129,6 +129,27 @@ namespace s3 { }); } + template + void do_write_if_none_impl( + KeySegmentPair &&kv, + const std::string &root_folder, + const std::string &bucket_name, + S3ClientWrapper &s3_client, + KeyBucketizer &&bucketizer) { + ARCTICDB_SAMPLE(S3StorageWriteIfNone, 0) + auto key_type_dir = key_type_folder(root_folder, kv.key_type()); + auto &k = kv.variant_key(); + auto s3_object_name = object_path(bucketizer.bucketize(key_type_dir, k), k); + auto &seg = kv.segment(); + + auto put_object_result = s3_client.put_object(s3_object_name, std::move(seg), bucket_name, true); + + if (!put_object_result.is_success()) { + auto& error = put_object_result.get_error(); + raise_s3_exception(error, s3_object_name); + } + } + template void do_update_impl( Composite &&kvs, diff --git a/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp b/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp index d7a3dfac5d..de6267cffb 100644 --- a/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp +++ b/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp @@ -35,6 +35,10 @@ class NfsBackedStorage final : public Storage { private: void do_write(Composite&& kvs) final; + void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final { + storage::raise("Atomic operations are only supported for s3 backend"); + }; + void do_update(Composite&& kvs, UpdateOpts opts) final; void do_read(Composite&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final; diff --git a/cpp/arcticdb/storage/s3/s3_client_wrapper.hpp b/cpp/arcticdb/storage/s3/s3_client_wrapper.hpp index 96c98e13ce..613722934e 100644 --- a/cpp/arcticdb/storage/s3/s3_client_wrapper.hpp +++ b/cpp/arcticdb/storage/s3/s3_client_wrapper.hpp @@ -57,7 +57,8 @@ class S3ClientWrapper { virtual S3Result put_object( const std::string& s3_object_name, Segment&& segment, - const std::string& bucket_name) = 0; + const std::string& bucket_name, + bool if_none_match = false) = 0; virtual S3Result delete_objects( const std::vector& s3_object_names, diff --git a/cpp/arcticdb/storage/s3/s3_mock_client.cpp b/cpp/arcticdb/storage/s3/s3_mock_client.cpp index a6b33570f5..aec87b8b48 100644 --- a/cpp/arcticdb/storage/s3/s3_mock_client.cpp +++ b/cpp/arcticdb/storage/s3/s3_mock_client.cpp @@ -83,7 +83,8 @@ S3Result MockS3Client::get_object( S3Result MockS3Client::put_object( const std::string &s3_object_name, Segment &&segment, - const std::string &bucket_name) { + const std::string &bucket_name, + bool if_none_match[[maybe_unused]]) { auto maybe_error = has_failure_trigger(s3_object_name, StorageOperation::WRITE); if (maybe_error.has_value()) { return {*maybe_error}; diff --git a/cpp/arcticdb/storage/s3/s3_mock_client.hpp b/cpp/arcticdb/storage/s3/s3_mock_client.hpp index 04c6812218..c7c094da23 100644 --- a/cpp/arcticdb/storage/s3/s3_mock_client.hpp +++ b/cpp/arcticdb/storage/s3/s3_mock_client.hpp @@ -61,7 +61,8 @@ class MockS3Client : public S3ClientWrapper { S3Result put_object( const std::string& s3_object_name, Segment&& segment, - const std::string& bucket_name) override; + const std::string& bucket_name, + bool if_none_match = false) override; S3Result delete_objects( const std::vector& s3_object_names, diff --git a/cpp/arcticdb/storage/s3/s3_real_client.cpp b/cpp/arcticdb/storage/s3/s3_real_client.cpp index b2f74dadc8..9fd7b80ec7 100644 --- a/cpp/arcticdb/storage/s3/s3_real_client.cpp +++ b/cpp/arcticdb/storage/s3/s3_real_client.cpp @@ -133,12 +133,16 @@ S3Result RealS3Client::get_object( S3Result RealS3Client::put_object( const std::string &s3_object_name, Segment &&segment, - const std::string &bucket_name) { + const std::string &bucket_name, + bool if_none_match) { ARCTICDB_SUBSAMPLE(S3StorageWritePreamble, 0) Aws::S3::Model::PutObjectRequest request; request.SetBucket(bucket_name.c_str()); request.SetKey(s3_object_name.c_str()); + if (if_none_match) { + request.SetIfNoneMatch("*"); + } ARCTICDB_RUNTIME_DEBUG(log::storage(), "Set s3 key {}", request.GetKey().c_str()); auto [dst, write_size, buffer] = segment.serialize_header(); diff --git a/cpp/arcticdb/storage/s3/s3_real_client.hpp b/cpp/arcticdb/storage/s3/s3_real_client.hpp index 57bfe87b65..b0e7e0caff 100644 --- a/cpp/arcticdb/storage/s3/s3_real_client.hpp +++ b/cpp/arcticdb/storage/s3/s3_real_client.hpp @@ -38,7 +38,8 @@ class RealS3Client : public S3ClientWrapper { S3Result put_object( const std::string& s3_object_name, Segment&& segment, - const std::string& bucket_name) override; + const std::string& bucket_name, + bool if_none_match = false) override; S3Result delete_objects( const std::vector& s3_object_names, diff --git a/cpp/arcticdb/storage/s3/s3_storage.cpp b/cpp/arcticdb/storage/s3/s3_storage.cpp index 180b52c332..4f6facb6ce 100644 --- a/cpp/arcticdb/storage/s3/s3_storage.cpp +++ b/cpp/arcticdb/storage/s3/s3_storage.cpp @@ -52,6 +52,10 @@ void S3Storage::do_write(Composite&& kvs) { detail::do_write_impl(std::move(kvs), root_folder_, bucket_name_, *s3_client_, FlatBucketizer{}); } +void S3Storage::do_write_if_none(KeySegmentPair&& kv) { + detail::do_write_if_none_impl(std::move(kv), root_folder_, bucket_name_, *s3_client_, FlatBucketizer{}); +} + void S3Storage::do_update(Composite&& kvs, UpdateOpts) { detail::do_update_impl(std::move(kvs), root_folder_, bucket_name_, *s3_client_, FlatBucketizer{}); } diff --git a/cpp/arcticdb/storage/s3/s3_storage.hpp b/cpp/arcticdb/storage/s3/s3_storage.hpp index 196b9c0b8c..9826b5bde5 100644 --- a/cpp/arcticdb/storage/s3/s3_storage.hpp +++ b/cpp/arcticdb/storage/s3/s3_storage.hpp @@ -44,6 +44,8 @@ class S3Storage final : public Storage { private: void do_write(Composite&& kvs) final; + void do_write_if_none(KeySegmentPair&& kv) final; + void do_update(Composite&& kvs, UpdateOpts opts) final; void do_read(Composite&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final; diff --git a/cpp/arcticdb/storage/storage.hpp b/cpp/arcticdb/storage/storage.hpp index 0291f74ec8..cca4f3902b 100644 --- a/cpp/arcticdb/storage/storage.hpp +++ b/cpp/arcticdb/storage/storage.hpp @@ -107,6 +107,10 @@ class Storage { return write(Composite{std::move(kv)}); } + void write_if_none(KeySegmentPair&& kv) { + return do_write_if_none(std::move(kv)); + } + void update(Composite &&kvs, UpdateOpts opts) { ARCTICDB_SAMPLE(StorageUpdate, 0) return do_update(std::move(kvs), opts); @@ -186,6 +190,8 @@ class Storage { private: virtual void do_write(Composite&& kvs) = 0; + virtual void do_write_if_none(KeySegmentPair&& kv) = 0; + virtual void do_update(Composite&& kvs, UpdateOpts opts) = 0; virtual void do_read(Composite&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) = 0; diff --git a/cpp/arcticdb/storage/storages.hpp b/cpp/arcticdb/storage/storages.hpp index bd716ef240..0ba38efada 100644 --- a/cpp/arcticdb/storage/storages.hpp +++ b/cpp/arcticdb/storage/storages.hpp @@ -46,6 +46,10 @@ class Storages { primary().write(std::move(kvs)); } + void write_if_none(KeySegmentPair&& kv) { + primary().write_if_none(std::move(kv)); + } + void update(Composite&& kvs, storage::UpdateOpts opts) { ARCTICDB_SAMPLE(StoragesUpdate, 0) primary().update(std::move(kvs), opts); diff --git a/cpp/arcticdb/storage/test/in_memory_store.hpp b/cpp/arcticdb/storage/test/in_memory_store.hpp index d351e19905..e2927fa020 100644 --- a/cpp/arcticdb/storage/test/in_memory_store.hpp +++ b/cpp/arcticdb/storage/test/in_memory_store.hpp @@ -144,6 +144,15 @@ namespace arcticdb { return write(key_type, stream_id, std::move(segment)).get(); } + entity::VariantKey write_if_none_sync( + KeyType key_type, + const StreamId& stream_id, + SegmentInMemory &&segment) override { + auto key = entity::RefKey{stream_id, key_type}; + add_segment(key, std::move(segment), true); + return key; + } + bool is_path_valid(const std::string_view) const override { return true; } @@ -465,10 +474,15 @@ namespace arcticdb { seg_by_atom_key_[key] = std::make_unique(std::move(seg)); } - void add_segment(const RefKey &key, SegmentInMemory &&seg) { + void add_segment(const RefKey &key, SegmentInMemory &&seg, bool if_none_match = false) { StorageFailureSimulator::instance()->go(FailureType::WRITE); std::lock_guard lock{mutex_}; ARCTICDB_DEBUG(log::storage(), "Adding segment with key {}", key); + if (if_none_match) { + if (seg_by_ref_key_.find(key) != seg_by_ref_key_.end()) { + storage::raise("Precondition failed. Object is already present."); + } + } seg_by_ref_key_[key] = std::make_unique(std::move(seg)); } diff --git a/cpp/arcticdb/stream/piloted_clock.cpp b/cpp/arcticdb/stream/piloted_clock.cpp index d396cfc6ca..c17530c724 100644 --- a/cpp/arcticdb/stream/piloted_clock.cpp +++ b/cpp/arcticdb/stream/piloted_clock.cpp @@ -2,4 +2,5 @@ namespace arcticdb { std::atomic PilotedClock::time_; +std::atomic PilotedClockNoAutoIncrement::time_; } \ No newline at end of file diff --git a/cpp/arcticdb/stream/piloted_clock.hpp b/cpp/arcticdb/stream/piloted_clock.hpp index f2b79189ac..41da1b5e0f 100644 --- a/cpp/arcticdb/stream/piloted_clock.hpp +++ b/cpp/arcticdb/stream/piloted_clock.hpp @@ -1,3 +1,5 @@ +#pragma once + #include #include @@ -14,4 +16,11 @@ struct PilotedClock { } }; +struct PilotedClockNoAutoIncrement { + static std::atomic time_; + static entity::timestamp nanos_since_epoch() { + return PilotedClockNoAutoIncrement::time_; + } +}; + } //namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/stream/stream_sink.hpp b/cpp/arcticdb/stream/stream_sink.hpp index ed28a611ab..14d1e0e64a 100644 --- a/cpp/arcticdb/stream/stream_sink.hpp +++ b/cpp/arcticdb/stream/stream_sink.hpp @@ -99,6 +99,11 @@ struct StreamSink { const StreamId &stream_id, SegmentInMemory &&segment) = 0; + virtual entity::VariantKey write_if_none_sync( + KeyType key_type, + const StreamId &stream_id, + SegmentInMemory &&segment) = 0; + [[nodiscard]] virtual folly::Future write_compressed(storage::KeySegmentPair ks) = 0; virtual void write_compressed_sync(storage::KeySegmentPair ks) = 0; diff --git a/cpp/arcticdb/stream/test/stream_test_common.cpp b/cpp/arcticdb/stream/test/stream_test_common.cpp index 3a02c8a639..d5ee8a6ae1 100644 --- a/cpp/arcticdb/stream/test/stream_test_common.cpp +++ b/cpp/arcticdb/stream/test/stream_test_common.cpp @@ -9,6 +9,4 @@ namespace arcticdb { -std::atomic PilotedClock::time_{0}; - } //namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/util/error_code.hpp b/cpp/arcticdb/util/error_code.hpp index da53b08ab7..ff56ed1d20 100644 --- a/cpp/arcticdb/util/error_code.hpp +++ b/cpp/arcticdb/util/error_code.hpp @@ -80,6 +80,7 @@ inline std::unordered_map get_error_category_names() ERROR_CODE(5002, E_SYMBOL_NOT_FOUND) \ ERROR_CODE(5003, E_PERMISSION) \ ERROR_CODE(5004, E_RESOURCE_NOT_FOUND) \ + ERROR_CODE(5005, E_UNSUPPORTED_ATOMIC_OPERATION) \ ERROR_CODE(5010, E_LMDB_MAP_FULL) \ ERROR_CODE(5011, E_UNEXPECTED_LMDB_ERROR) \ ERROR_CODE(5020, E_UNEXPECTED_S3_ERROR) \ diff --git a/cpp/arcticdb/util/reliable_storage_lock.hpp b/cpp/arcticdb/util/reliable_storage_lock.hpp new file mode 100644 index 0000000000..e93b7968b2 --- /dev/null +++ b/cpp/arcticdb/util/reliable_storage_lock.hpp @@ -0,0 +1,69 @@ +/* Copyright 2023 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ + +#pragma once + +#include +#include +#include +#include + +#include +#include + +namespace arcticdb { + +namespace lock { + +using Epoch = uint64_t; + +// The ReliableStorageLock is a storage lock which relies on atomic If-None-Match Put and ListObject operations to +// provide a slower but more reliable lock than the StorageLock. It should be completely consistent unless a process +// holding a lock get's paused for times comparable to the lock timeout. +// It lock follows the algorithm described here: +// https://www.morling.dev/blog/leader-election-with-s3-conditional-writes/ +template +class ReliableStorageLock { +public: + ReliableStorageLock(const std::string& base_name, const std::shared_ptr store, timestamp timeout); + + Epoch retry_until_take_lock() const; + std::optional try_take_lock() const; + std::optional try_extend_lock(Epoch held_lock_epoch) const; + void free_lock(Epoch held_lock_epoch) const; + timestamp timeout() const; +private: + std::optional try_take_next_epoch(const std::vector& existing_locks, std::optional latest) const; + std::pair, std::optional> get_all_locks() const; + timestamp get_expiration(RefKey lock_key) const; + void clear_old_locks(const std::vector& epochs) const; + StreamId get_stream_id(Epoch e) const; + std::string base_name_; + std::shared_ptr store_; + timestamp timeout_; +}; + +// The ReliableStorageLockGuard aquires a ReliableStorageLock on construction and frees it on destruction. While the lock +// is held it periodically extends its timeout in a heartbeating thread. +class ReliableStorageLockGuard { +public: + ReliableStorageLockGuard(const ReliableStorageLock<> &lock, folly::Func&& on_lost_lock); + + ~ReliableStorageLockGuard(); +private: + void cleanup_on_lost_lock(); + const ReliableStorageLock<> &lock_; + std::optional aquired_epoch_; + folly::Func on_lost_lock_; + folly::FunctionScheduler extend_lock_heartbeat_; +}; + +} + +} + +#include "arcticdb/util/reliable_storage_lock.tpp" \ No newline at end of file diff --git a/cpp/arcticdb/util/reliable_storage_lock.tpp b/cpp/arcticdb/util/reliable_storage_lock.tpp new file mode 100644 index 0000000000..7bcb163ca7 --- /dev/null +++ b/cpp/arcticdb/util/reliable_storage_lock.tpp @@ -0,0 +1,233 @@ +/* Copyright 2023 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ + +#include + +#include +#include +#include +#include + +#include +#include + +namespace arcticdb { + +namespace lock { + +const auto SEPARATOR = '*'; +const auto EXTENDS_PER_TIMEOUT = 5u; +const auto KEEP_LAST_N_LOCKS = 5u; + +StreamDescriptor lock_stream_descriptor(const StreamId &stream_id) { + return StreamDescriptor{stream_descriptor( + stream_id, + stream::RowCountIndex(), + {scalar_field(DataType::INT64, "expiration")})}; +} + +SegmentInMemory lock_segment(const StreamId &name, timestamp expiration) { + SegmentInMemory output{lock_stream_descriptor(name)}; + output.set_scalar(0, expiration); + output.end_row(); + return output; +} + +template +ReliableStorageLock::ReliableStorageLock(const std::string &base_name, const std::shared_ptr store, timestamp timeout) : + base_name_(base_name), store_(store), timeout_(timeout) { + auto s3_timeout = ConfigsMap::instance()->get_int("S3Storage.RequestTimeoutMs", 200000) * ONE_MILLISECOND; + if (2 * s3_timeout > timeout) { + log::lock().warn( + "Reliable Lock is created with a timeout < twice the s3 timeout. This is not recommended, as it increases the risk for a faulty lock timeout in s3." + "Lock timeout: {}, S3 timeout: {}", timeout, s3_timeout); + } +} + +template +timestamp ReliableStorageLock::timeout() const { + return timeout_; +} + +Epoch get_next_epoch(std::optional maybe_prev) { + if (maybe_prev.has_value()) { + return maybe_prev.value() + 1; + } + return 0; +} + +template +StreamId ReliableStorageLock::get_stream_id(Epoch e) const { + return fmt::format("{}{}{}", base_name_, SEPARATOR, e); +} + +Epoch extract_epoch_from_stream_id(const StreamId& stream_id) { + auto string_id = std::get(stream_id); + auto epoch_string = string_id.substr(string_id.find(SEPARATOR)+1, string_id.size()); + return std::stoull(epoch_string); +} + + +template +std::pair, std::optional> ReliableStorageLock::get_all_locks() const { + std::vector epochs; + store_->iterate_type( + KeyType::SLOW_LOCK, + [&epochs](VariantKey &&key){ + auto current_epoch = extract_epoch_from_stream_id(variant_key_id(key)); + epochs.push_back(current_epoch); + }, base_name_ + SEPARATOR); + std::optional latest = epochs.size()==0 ? std::nullopt : std::make_optional<>(*std::max_element(epochs.begin(), epochs.end())); + return {epochs, latest}; +} + +template +timestamp ReliableStorageLock::get_expiration(RefKey lock_key) const { + auto kv = store_->read_sync(lock_key, storage::ReadKeyOpts{}); + return kv.second.template scalar_at(0, 0).value(); +} + +template +void ReliableStorageLock::clear_old_locks(const std::vector& epochs) const { + auto now = ClockType::nanos_since_epoch(); + auto to_delete = std::vector(); + // We only clear locks that have expired more than a timeout (we assume a process can't be paused for more than the timeout) ago. + // We do this to avoid a process mistakenly taking a lock if: + // 1. Process A lists locks and gets [4, 5, 6] + // 2. Process A decides to attempt taking lock 7 + // 3. Process A gets paused + // 4. Process B takes locks 7 and 8 + // 5. Process B decides to clear lock 7 since it's not the latest + // 6. Process A succeeds in taking lock 7 + for (auto epoch : epochs) { + auto lock_key = RefKey{get_stream_id(epoch), KeyType::SLOW_LOCK}; + if (get_expiration(lock_key) + timeout_ < now) { + to_delete.emplace_back(lock_key); + } + } + store_->remove_keys_sync(to_delete); +} + +template +std::optional ReliableStorageLock::try_take_lock() const { + auto [existing_locks, latest] = get_all_locks(); + if (latest.has_value()) { + auto expires = get_expiration(RefKey{get_stream_id(latest.value()), KeyType::SLOW_LOCK}); + if (expires > ClockType::nanos_since_epoch()) { + // An unexpired lock exists + return std::nullopt; + } + } + return try_take_next_epoch(existing_locks, latest); +} + +template +Epoch ReliableStorageLock::retry_until_take_lock() const { + // We don't use the ExponentialBackoff because we want to be able to wait indefinitely + auto max_wait = std::chrono::duration_cast(std::chrono::nanoseconds(timeout())); + auto min_wait = max_wait / 16; + auto current_wait = min_wait; + std::minstd_rand gen(std::random_device{}()); + std::uniform_real_distribution<> dist(1.0, 1.2); + auto jittered_wait = [&]() { + auto factor = dist(gen); + return current_wait * factor; + }; + + auto aquired_epoch = try_take_lock(); + while (!aquired_epoch.has_value()) { + std::this_thread::sleep_for(jittered_wait()); + current_wait = std::min(current_wait * 2, max_wait); + aquired_epoch = try_take_lock(); + } + return aquired_epoch.value(); +} + +template +std::optional ReliableStorageLock::try_extend_lock(Epoch held_lock_epoch) const { + auto [existing_locks, latest] = get_all_locks(); + util::check(latest.has_value() && latest.value() >= held_lock_epoch, + "We are trying to extend a newer epoch than the existing one in storage. Extend epoch: {}", + held_lock_epoch); + if (latest.value() != held_lock_epoch) { + // We have lost the lock while holding it (most likely due to timeout). + return std::nullopt; + } + return try_take_next_epoch(existing_locks, latest); +} + +template +void ReliableStorageLock::free_lock(Epoch held_lock_epoch) const { + auto [existing_locks, latest_epoch] = get_all_locks(); + util::check(latest_epoch.has_value() && latest_epoch.value() >= held_lock_epoch, + "We are trying to free a newer epoch than the existing one in storage. Free epoch: {}, Existing epoch: {}", + held_lock_epoch, latest_epoch); + if (latest_epoch.value() != held_lock_epoch) { + // Lock is already lost + return; + } + auto lock_stream_id = get_stream_id(held_lock_epoch); + auto expiration = ClockType::nanos_since_epoch(); // Write current time to mark lock as expired as of now + store_->write_sync(KeyType::SLOW_LOCK, lock_stream_id, lock_segment(lock_stream_id, expiration)); +} + +template +std::optional ReliableStorageLock::try_take_next_epoch(const std::vector& existing_locks, std::optional latest) const { + Epoch epoch = get_next_epoch(latest); + auto lock_stream_id = get_stream_id(epoch); + auto expiration = ClockType::nanos_since_epoch() + timeout_; + try { + store_->write_if_none_sync(KeyType::SLOW_LOCK, lock_stream_id, lock_segment(lock_stream_id, expiration)); + } catch (const StorageException& e) { + // There is no specific Aws::S3::S3Errors for the failed atomic operation, so we catch any StorageException. + // Either way it's safe to assume we have failed to aquire the lock in case of transient S3 error. + // If error persists we'll approprieately raise in the next attempt to LIST/GET the existing lock and propagate + // the transient error. + log::lock().warn("Failed to aquire lock (likely someone aquired it before us): {}", e.what()); + return std::nullopt; + } + // We clear old locks only after aquiring the lock to avoid duplicating the deletion work + clear_old_locks(existing_locks); + return epoch; +} + +ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageLock<> &lock, folly::Func&& on_lost_lock) : + lock_(lock), aquired_epoch_(std::nullopt), on_lost_lock_(std::move(on_lost_lock)) { + aquired_epoch_ = lock_.retry_until_take_lock(); + util::check(aquired_epoch_.has_value(), "We should have waited until we surely aquire a lock"); + // We heartbeat 5 times per lock timeout to extend the lock. + auto hearbeat_frequency = std::chrono::duration_cast( + std::chrono::nanoseconds(lock_.timeout() / EXTENDS_PER_TIMEOUT)); + extend_lock_heartbeat_.addFunction( + [that=this](){ + if (that->aquired_epoch_.has_value()) { + that->aquired_epoch_ = that->lock_.try_extend_lock(that->aquired_epoch_.value()); + if (!that->aquired_epoch_.has_value()) { + // Clean up if we have lost the lock. + that->cleanup_on_lost_lock(); + } + } + }, hearbeat_frequency, "Extend lock", hearbeat_frequency); + extend_lock_heartbeat_.start(); +} + +void ReliableStorageLockGuard::cleanup_on_lost_lock() { + // We do not use shutdown because we don't want to run it from within a FunctionScheduler thread to avoid a deadlock + extend_lock_heartbeat_.cancelAllFunctions(); + on_lost_lock_(); +} + +ReliableStorageLockGuard::~ReliableStorageLockGuard() { + extend_lock_heartbeat_.shutdown(); + if (aquired_epoch_.has_value()) { + lock_.free_lock(aquired_epoch_.value()); + } +} + +} + +} diff --git a/cpp/arcticdb/util/storage_lock.hpp b/cpp/arcticdb/util/storage_lock.hpp index 7fdd45a443..61963e2e5e 100644 --- a/cpp/arcticdb/util/storage_lock.hpp +++ b/cpp/arcticdb/util/storage_lock.hpp @@ -70,6 +70,8 @@ inline std::thread::id get_thread_id() noexcept { return std::this_thread::get_id(); } +// This StorageLock is inherently unreliable. It does not use atomic operations and it is possible for two processes to aquire if the timing is right. +// If you want a reliable alternative which is slower but uses atomic primitives you can look at the `ReliableStorageLock`. template class StorageLock { // 1 Day diff --git a/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp b/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp new file mode 100644 index 0000000000..033081c539 --- /dev/null +++ b/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp @@ -0,0 +1,127 @@ +/* Copyright 2023 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ + +#include +#include +#include +#include +#include +#include +#include + +using namespace arcticdb; +using namespace lock; + +// These tests test the actual implementation + +TEST(ReliableStorageLock, SingleThreaded) { + auto store = std::make_shared(); + using Clock = PilotedClockNoAutoIncrement; + // We have 2 locks, one with timeout of 20 and another with a timeout of 10 + ReliableStorageLock lock1{StringId{"test_lock"}, store, 20}; + ReliableStorageLock lock2{StringId{"test_lock"}, store, 10}; + + auto count_locks = [&]() { + auto number_of_lock_keys = 0; + store->iterate_type(KeyType::SLOW_LOCK, [&number_of_lock_keys](VariantKey&& _ [[maybe_unused]]){++number_of_lock_keys;}); + return number_of_lock_keys; + }; + + // We take the first lock at 0 and it should not expire until 20 + Clock::time_ = 0; + ASSERT_EQ(lock1.try_take_lock(), std::optional{0}); + Clock::time_ = 5; + ASSERT_EQ(lock2.try_take_lock(), std::nullopt); + Clock::time_ = 10; + ASSERT_EQ(lock2.try_take_lock(), std::nullopt); + Clock::time_ = 19; + ASSERT_EQ(lock1.try_take_lock(), std::nullopt); + + // Once the first lock has expired we can take a new lock with epoch=1 + Clock::time_ = 20; + ASSERT_EQ(lock2.try_take_lock(), std::optional{1}); + ASSERT_EQ(count_locks(), 2); + + // We can extend the lock timeout at 25 to 35 and get an epoch=2 + Clock::time_ = 25; + ASSERT_EQ(lock1.try_take_lock(), std::nullopt); + ASSERT_EQ(lock2.try_extend_lock(1), std::optional{2}); + ASSERT_EQ(count_locks(), 3); + Clock::time_ = 34; + ASSERT_EQ(lock1.try_take_lock(), std::nullopt); + + // At time 35 the lock with epoch=2 has expired and we can re-aquire the lock + Clock::time_ = 35; + ASSERT_EQ(lock1.try_take_lock(), std::optional{3}); + ASSERT_EQ(count_locks(), 4); + ASSERT_EQ(lock2.try_take_lock(), std::nullopt); + + // And we can free the lock immediately to allow re-aquiring without waiting for timeout + lock1.free_lock(3); + ASSERT_EQ(lock2.try_take_lock(), std::optional{4}); + // Taking lock2 with timeout=10 means we should clear all locks which have expired before 25. In this case just epoch=0 + ASSERT_EQ(count_locks(), 4); + + // But if we take a lock at 100 all locks would have expired a timeout=10 ago, and we should clear all apart from latest epoch=5 + Clock::time_ = 100; + ASSERT_EQ(lock2.try_take_lock(), std::optional{5}); + ASSERT_EQ(count_locks(), 1); +} + +struct SlowIncrementTask : async::BaseTask { + int& cnt_; + const ReliableStorageLock<>& lock_; + std::chrono::milliseconds sleep_time_; + bool lock_lost_ = false; + + SlowIncrementTask(int& cnt, ReliableStorageLock<>& lock, std::chrono::milliseconds sleep_time) : + cnt_(cnt), lock_(lock), sleep_time_(sleep_time) {} + + void operator()() { + auto guard = ReliableStorageLockGuard(lock_, [that = this](){ + that->lock_lost_ = true; + }); + auto value_before_sleep = cnt_; + // std::cout<<"Taken a lock with "< exec{threads}; + auto store = std::make_shared(); + // Running the test with tighter timeout than the 1000ms timeout causes it to fail occasionally. + // Seemingly because the heartbeating thread might occasionally not run for long periods of time. This problem disappears with larger timouts like 1000ms. + ReliableStorageLock<> lock{StringId{"test_lock"}, store, ONE_SECOND}; + + int counter = 0; + + std::vector> futures; + for(auto i = 0u; i < threads; ++i) { + // We use both fast and slow tasks to test both fast lock frees and lock extensions + auto sleep_time = std::chrono::milliseconds(i%2 * 2000); + futures.emplace_back(exec.addFuture(SlowIncrementTask{counter, lock, sleep_time})); + } + folly::collectAll(futures).get(); + + // If no locks were lost and no races happened each thread will have incremented the counter exactly once + ASSERT_EQ(counter, threads); + + // Also the lock should be free by the end (i.e. we can take a new lock) + ASSERT_EQ(lock.try_take_lock().has_value(), true); +} diff --git a/cpp/vcpkg b/cpp/vcpkg index c82f746672..6e1219d080 160000 --- a/cpp/vcpkg +++ b/cpp/vcpkg @@ -1 +1 @@ -Subproject commit c82f74667287d3dc386bce81e44964370c91a289 +Subproject commit 6e1219d080fc0525f55bf8c8ba57d8ab2ce35422 diff --git a/cpp/vcpkg.json b/cpp/vcpkg.json index b364273b91..5b34a82ad3 100644 --- a/cpp/vcpkg.json +++ b/cpp/vcpkg.json @@ -31,16 +31,53 @@ "features": [ "push", "pull" ] }, "mongo-cxx-driver", - { - "name": "aws-c-io", - "$version reason": "To pick up https://github.com/awslabs/aws-c-io/pull/515" - }, { "name": "aws-sdk-cpp", - "$version reason": "Minimum version in the baseline that works with aws-c-io above.", + "version>=": "1.11.405", + "$version reason": "Version which contains atomic put operations", "default-features": false, "features": [ "s3", "identity-management" ] }, + { + "name": "aws-crt-cpp", + "version>=": "0.28.3" + }, + { + "name": "aws-c-s3", + "version>=": "0.6.6" + }, + { + "name": "aws-c-io", + "version>=": "0.14.18" + }, + { + "name": "aws-c-common", + "version>=": "0.9.28" + }, + { + "name": "aws-c-auth", + "version>=": "0.7.31" + }, + { + "name": "aws-c-cal", + "version>=": "0.7.4" + }, + { + "name": "aws-c-http", + "version>=": "0.8.10" + }, + { + "name": "aws-c-sdkutils", + "version>=": "0.1.19" + }, + { + "name": "aws-c-event-stream", + "version>=": "0.4.3" + }, + { + "name": "aws-checksums", + "version>=": "0.1.20" + }, "boost-dynamic-bitset", "boost-interprocess", "boost-callable-traits", @@ -68,9 +105,7 @@ ], "overrides": [ { "name": "openssl", "version-string": "3.3.0" }, - { "name": "aws-sdk-cpp", "version": "1.11.201" }, { "name": "azure-core-cpp", "version": "1.12.0" }, - { "name": "aws-c-s3", "version": "0.3.24" }, { "name": "benchmark", "version": "1.9.0" }, { "name": "bitmagic", "version": "7.12.3" }, { "name": "boost-algorithm", "version": "1.84.0" }, From b8ce22b7727627e84f7c052c5cc7792cd7799027 Mon Sep 17 00:00:00 2001 From: Ivo Dilov Date: Fri, 22 Nov 2024 17:27:08 +0200 Subject: [PATCH 2/6] Real S3 storage python tests for ReliableStorageLock Adds a real s3 storage test (currently to be run with persistent storage tests mark) for the lock. --- cpp/arcticdb/python/python_module.cpp | 2 +- cpp/arcticdb/toolbox/python_bindings.cpp | 21 +++++++- cpp/arcticdb/toolbox/python_bindings.hpp | 2 +- cpp/arcticdb/util/reliable_storage_lock.hpp | 11 +++++ cpp/arcticdb/util/reliable_storage_lock.tpp | 10 ++++ .../integration/arcticdb/test_storage_lock.py | 49 +++++++++++++++++++ 6 files changed, 92 insertions(+), 3 deletions(-) create mode 100644 python/tests/integration/arcticdb/test_storage_lock.py diff --git a/cpp/arcticdb/python/python_module.cpp b/cpp/arcticdb/python/python_module.cpp index 50321d6f00..474e74bdeb 100644 --- a/cpp/arcticdb/python/python_module.cpp +++ b/cpp/arcticdb/python/python_module.cpp @@ -335,7 +335,7 @@ PYBIND11_MODULE(arcticdb_ext, m) { arcticdb::storage::apy::register_bindings(storage_submodule, base_exception); arcticdb::stream::register_bindings(m); - arcticdb::toolbox::apy::register_bindings(m); + arcticdb::toolbox::apy::register_bindings(m, base_exception); m.def("get_version_string", &arcticdb::get_arcticdb_version_string); diff --git a/cpp/arcticdb/toolbox/python_bindings.cpp b/cpp/arcticdb/toolbox/python_bindings.cpp index 1ec9f124aa..f53b89e07a 100644 --- a/cpp/arcticdb/toolbox/python_bindings.cpp +++ b/cpp/arcticdb/toolbox/python_bindings.cpp @@ -15,10 +15,11 @@ #include #include #include +#include namespace arcticdb::toolbox::apy { -void register_bindings(py::module &m) { +void register_bindings(py::module &m, py::exception& base_exception) { auto tools = m.def_submodule("tools", "Library management tool hooks"); using namespace arcticdb::toolbox::apy; using namespace arcticdb::storage; @@ -67,6 +68,24 @@ void register_bindings(py::module &m) { .def("inspect_env_variable", &LibraryTool::inspect_env_variable) .def_static("read_unaltered_lib_cfg", &LibraryTool::read_unaltered_lib_cfg); + // Reliable storage lock exposed for integration testing. It is intended for use in C++ + using namespace arcticdb::lock; + + py::register_exception(tools, "LostReliableLock", base_exception.ptr()); + + py::class_>(tools, "ReliableStorageLock") + .def(py::init<>([](std::string base_name, std::shared_ptr lib, timestamp timeout){ + auto store = version_store::LocalVersionedEngine(lib)._test_get_store(); + return ReliableStorageLock<>(base_name, store, timeout); + })); + + py::class_(tools, "ReliableStorageLockManager") + .def(py::init<>([](){ + return ReliableStorageLockManager(); + })) + .def("take_lock_guard", &ReliableStorageLockManager::take_lock_guard) + .def("free_lock_guard", &ReliableStorageLockManager::free_lock_guard); + // S3 Storage tool using namespace arcticdb::storage::s3; py::class_>(tools, "S3Tool") diff --git a/cpp/arcticdb/toolbox/python_bindings.hpp b/cpp/arcticdb/toolbox/python_bindings.hpp index 63db2eb0da..2975663ce5 100644 --- a/cpp/arcticdb/toolbox/python_bindings.hpp +++ b/cpp/arcticdb/toolbox/python_bindings.hpp @@ -20,7 +20,7 @@ namespace arcticdb::toolbox::apy { namespace py = pybind11; -void register_bindings(py::module &m); +void register_bindings(py::module &m, py::exception& base_exception); } // namespace arcticdb::toolbox::apy diff --git a/cpp/arcticdb/util/reliable_storage_lock.hpp b/cpp/arcticdb/util/reliable_storage_lock.hpp index e93b7968b2..a027ca8b29 100644 --- a/cpp/arcticdb/util/reliable_storage_lock.hpp +++ b/cpp/arcticdb/util/reliable_storage_lock.hpp @@ -62,6 +62,17 @@ class ReliableStorageLockGuard { folly::FunctionScheduler extend_lock_heartbeat_; }; + +// Only used for python tests +struct LostReliableLock : std::exception {}; +class ReliableStorageLockManager { +public: + void take_lock_guard(const ReliableStorageLock<>& lock); + void free_lock_guard(); +private: + std::optional> guard = std::nullopt; +}; + } } diff --git a/cpp/arcticdb/util/reliable_storage_lock.tpp b/cpp/arcticdb/util/reliable_storage_lock.tpp index 7bcb163ca7..27bb7400e0 100644 --- a/cpp/arcticdb/util/reliable_storage_lock.tpp +++ b/cpp/arcticdb/util/reliable_storage_lock.tpp @@ -228,6 +228,16 @@ ReliableStorageLockGuard::~ReliableStorageLockGuard() { } } +void ReliableStorageLockManager::take_lock_guard(const ReliableStorageLock<> &lock) { + guard = std::make_shared(lock, [](){ + throw LostReliableLock(); + }); +} + +void ReliableStorageLockManager::free_lock_guard() { + guard = std::nullopt; +} + } } diff --git a/python/tests/integration/arcticdb/test_storage_lock.py b/python/tests/integration/arcticdb/test_storage_lock.py new file mode 100644 index 0000000000..182b87965d --- /dev/null +++ b/python/tests/integration/arcticdb/test_storage_lock.py @@ -0,0 +1,49 @@ +import pandas as pd +import numpy as np +import pytest + +from arcticdb_ext.tools import ReliableStorageLock, ReliableStorageLockManager +from tests.util.mark import PERSISTENT_STORAGE_TESTS_ENABLED, REAL_S3_TESTS_MARK + +from multiprocessing import Process, set_start_method +set_start_method("fork") # Okay to fork an S3 lib +import time + +from arcticdb.util.test import assert_frame_equal + + +one_sec = 1_000_000_000 + + +def slow_increment_task(lib, symbol, sleep_time, lock_manager, lock): + lock_manager.take_lock_guard(lock) + df = lib.read(symbol).data + df["col"][0] = df["col"][0] + 1 + time.sleep(sleep_time) + lib.write(symbol, df) + lock_manager.free_lock_guard() + + +@pytest.mark.parametrize("num_processes,max_sleep", [(100, 1), (5, 20)]) +@REAL_S3_TESTS_MARK +def test_many_increments(real_s3_version_store, num_processes, max_sleep): + lib = real_s3_version_store + init_df = pd.DataFrame({"col": [0]}) + symbol = "counter" + lib.write(symbol, init_df) + lock = ReliableStorageLock("test_lock", lib._library, 10*one_sec) + lock_manager = ReliableStorageLockManager() + + processes = [ + Process(target=slow_increment_task, args=(lib, symbol, 0 if i%2==0 else max_sleep, lock_manager, lock)) + for i in range(num_processes) + ] + for p in processes: + p.start() + + for p in processes: + p.join() + + read_df = lib.read(symbol).data + expected_df = pd.DataFrame({"col": [num_processes]}) + assert_frame_equal(read_df, expected_df) From 75cc2183eb55d8002165fa521d3d9891e522105b Mon Sep 17 00:00:00 2001 From: Ivo Dilov Date: Tue, 26 Nov 2024 14:17:54 +0000 Subject: [PATCH 3/6] Add a `supports_atomic_writes` predicate to storages. Currently all backends are unsupported apart from S3 (for which only some providers like AWS support it). Unfotrunately it's impossible to differentiate between aws and e.g. vast backends apart from looking at the endpoint which can be subject to rerouting etc. This commit also reworks the Guard to work only with aquired locks. --- cpp/arcticdb/async/async_store.hpp | 4 +++ cpp/arcticdb/storage/azure/azure_storage.hpp | 4 +++ .../storage/file/mapped_file_storage.hpp | 4 +++ cpp/arcticdb/storage/library.hpp | 2 ++ cpp/arcticdb/storage/lmdb/lmdb_storage.hpp | 4 +++ .../storage/memory/memory_storage.hpp | 4 +++ cpp/arcticdb/storage/mongo/mongo_storage.hpp | 4 +++ cpp/arcticdb/storage/python_bindings.cpp | 1 + .../storage/s3/nfs_backed_storage.hpp | 4 +++ cpp/arcticdb/storage/s3/s3_storage.hpp | 7 +++++ cpp/arcticdb/storage/storage.hpp | 6 +++++ cpp/arcticdb/storage/storages.hpp | 4 +++ cpp/arcticdb/storage/test/in_memory_store.hpp | 4 +++ cpp/arcticdb/stream/stream_sink.hpp | 2 ++ cpp/arcticdb/util/reliable_storage_lock.hpp | 7 ++--- cpp/arcticdb/util/reliable_storage_lock.tpp | 27 +++++++++---------- .../util/test/test_reliable_storage_lock.cpp | 3 ++- 17 files changed, 73 insertions(+), 18 deletions(-) diff --git a/cpp/arcticdb/async/async_store.hpp b/cpp/arcticdb/async/async_store.hpp index a292f0aad8..fed575a817 100644 --- a/cpp/arcticdb/async/async_store.hpp +++ b/cpp/arcticdb/async/async_store.hpp @@ -280,6 +280,10 @@ bool supports_prefix_matching() const override { return library_->supports_prefix_matching(); } +bool supports_atomic_writes() const override { + return library_->supports_atomic_writes(); +} + std::string key_path(const VariantKey& key) const { return library_->key_path(key); } diff --git a/cpp/arcticdb/storage/azure/azure_storage.hpp b/cpp/arcticdb/storage/azure/azure_storage.hpp index 2b926e5cbe..1c679cfab6 100644 --- a/cpp/arcticdb/storage/azure/azure_storage.hpp +++ b/cpp/arcticdb/storage/azure/azure_storage.hpp @@ -55,6 +55,10 @@ class AzureStorage final : public Storage { return true; } + bool do_supports_atomic_writes() const final { + return false; + } + bool do_fast_delete() final { return false; } diff --git a/cpp/arcticdb/storage/file/mapped_file_storage.hpp b/cpp/arcticdb/storage/file/mapped_file_storage.hpp index 44d9fd365a..ee4b395462 100644 --- a/cpp/arcticdb/storage/file/mapped_file_storage.hpp +++ b/cpp/arcticdb/storage/file/mapped_file_storage.hpp @@ -50,6 +50,10 @@ class MappedFileStorage final : public SingleFileStorage { return false; }; + bool do_supports_atomic_writes() const final { + return false; + } + std::string do_key_path(const VariantKey&) const override { return {}; } bool do_fast_delete() override; diff --git a/cpp/arcticdb/storage/library.hpp b/cpp/arcticdb/storage/library.hpp index e1a644d1cc..6b3d58837c 100644 --- a/cpp/arcticdb/storage/library.hpp +++ b/cpp/arcticdb/storage/library.hpp @@ -163,6 +163,8 @@ class Library { bool supports_prefix_matching() const { return storages_->supports_prefix_matching(); } + bool supports_atomic_writes() const { return storages_->supports_atomic_writes(); } + const LibraryPath &library_path() const { return library_path_; } OpenMode open_mode() const { return storages_->open_mode(); } diff --git a/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp b/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp index 9c6fa838cc..9c8bf9cbdc 100644 --- a/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp +++ b/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp @@ -52,6 +52,10 @@ class LmdbStorage final : public Storage { return false; }; + bool do_supports_atomic_writes() const final { + return false; + } + inline bool do_fast_delete() final; void cleanup() override; diff --git a/cpp/arcticdb/storage/memory/memory_storage.hpp b/cpp/arcticdb/storage/memory/memory_storage.hpp index 34eda07a67..6cec399494 100644 --- a/cpp/arcticdb/storage/memory/memory_storage.hpp +++ b/cpp/arcticdb/storage/memory/memory_storage.hpp @@ -45,6 +45,10 @@ namespace arcticdb::storage::memory { return false; } + bool do_supports_atomic_writes() const final { + return false; + } + inline bool do_fast_delete() final; bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string & prefix) final; diff --git a/cpp/arcticdb/storage/mongo/mongo_storage.hpp b/cpp/arcticdb/storage/mongo/mongo_storage.hpp index bf37f5d5da..fe505b469c 100644 --- a/cpp/arcticdb/storage/mongo/mongo_storage.hpp +++ b/cpp/arcticdb/storage/mongo/mongo_storage.hpp @@ -47,6 +47,10 @@ class MongoStorage final : public Storage { return false; } + bool do_supports_atomic_writes() const final { + return false; + } + inline bool do_fast_delete() final; bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string &prefix) final; diff --git a/cpp/arcticdb/storage/python_bindings.cpp b/cpp/arcticdb/storage/python_bindings.cpp index a7a5acfa9d..4a69ad9069 100644 --- a/cpp/arcticdb/storage/python_bindings.cpp +++ b/cpp/arcticdb/storage/python_bindings.cpp @@ -51,6 +51,7 @@ void register_bindings(py::module& storage, py::exception &lock, folly::Func&& on_lost_lock); + ReliableStorageLockGuard(const ReliableStorageLock<> &lock, Epoch aquired_epoch, folly::Func&& on_lost_lock); ~ReliableStorageLockGuard(); private: diff --git a/cpp/arcticdb/util/reliable_storage_lock.tpp b/cpp/arcticdb/util/reliable_storage_lock.tpp index 27bb7400e0..96d273b68d 100644 --- a/cpp/arcticdb/util/reliable_storage_lock.tpp +++ b/cpp/arcticdb/util/reliable_storage_lock.tpp @@ -21,16 +21,15 @@ namespace lock { const auto SEPARATOR = '*'; const auto EXTENDS_PER_TIMEOUT = 5u; -const auto KEEP_LAST_N_LOCKS = 5u; -StreamDescriptor lock_stream_descriptor(const StreamId &stream_id) { +inline StreamDescriptor lock_stream_descriptor(const StreamId &stream_id) { return StreamDescriptor{stream_descriptor( stream_id, stream::RowCountIndex(), {scalar_field(DataType::INT64, "expiration")})}; } -SegmentInMemory lock_segment(const StreamId &name, timestamp expiration) { +inline SegmentInMemory lock_segment(const StreamId &name, timestamp expiration) { SegmentInMemory output{lock_stream_descriptor(name)}; output.set_scalar(0, expiration); output.end_row(); @@ -40,6 +39,7 @@ SegmentInMemory lock_segment(const StreamId &name, timestamp expiration) { template ReliableStorageLock::ReliableStorageLock(const std::string &base_name, const std::shared_ptr store, timestamp timeout) : base_name_(base_name), store_(store), timeout_(timeout) { + storage::check(store_->supports_atomic_writes(), "Storage does not support atomic writes, so we can't create a lock"); auto s3_timeout = ConfigsMap::instance()->get_int("S3Storage.RequestTimeoutMs", 200000) * ONE_MILLISECOND; if (2 * s3_timeout > timeout) { log::lock().warn( @@ -53,7 +53,7 @@ timestamp ReliableStorageLock::timeout() const { return timeout_; } -Epoch get_next_epoch(std::optional maybe_prev) { +inline Epoch get_next_epoch(std::optional maybe_prev) { if (maybe_prev.has_value()) { return maybe_prev.value() + 1; } @@ -65,13 +65,12 @@ StreamId ReliableStorageLock::get_stream_id(Epoch e) const { return fmt::format("{}{}{}", base_name_, SEPARATOR, e); } -Epoch extract_epoch_from_stream_id(const StreamId& stream_id) { +inline Epoch extract_epoch_from_stream_id(const StreamId& stream_id) { auto string_id = std::get(stream_id); auto epoch_string = string_id.substr(string_id.find(SEPARATOR)+1, string_id.size()); return std::stoull(epoch_string); } - template std::pair, std::optional> ReliableStorageLock::get_all_locks() const { std::vector epochs; @@ -195,10 +194,9 @@ std::optional ReliableStorageLock::try_take_next_epoch(const s return epoch; } -ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageLock<> &lock, folly::Func&& on_lost_lock) : +inline ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageLock<> &lock, Epoch aquired_epoch, folly::Func&& on_lost_lock) : lock_(lock), aquired_epoch_(std::nullopt), on_lost_lock_(std::move(on_lost_lock)) { - aquired_epoch_ = lock_.retry_until_take_lock(); - util::check(aquired_epoch_.has_value(), "We should have waited until we surely aquire a lock"); + aquired_epoch_ = aquired_epoch; // We heartbeat 5 times per lock timeout to extend the lock. auto hearbeat_frequency = std::chrono::duration_cast( std::chrono::nanoseconds(lock_.timeout() / EXTENDS_PER_TIMEOUT)); @@ -215,26 +213,27 @@ ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageLock<> & extend_lock_heartbeat_.start(); } -void ReliableStorageLockGuard::cleanup_on_lost_lock() { +inline void ReliableStorageLockGuard::cleanup_on_lost_lock() { // We do not use shutdown because we don't want to run it from within a FunctionScheduler thread to avoid a deadlock extend_lock_heartbeat_.cancelAllFunctions(); on_lost_lock_(); } -ReliableStorageLockGuard::~ReliableStorageLockGuard() { +inline ReliableStorageLockGuard::~ReliableStorageLockGuard() { extend_lock_heartbeat_.shutdown(); if (aquired_epoch_.has_value()) { lock_.free_lock(aquired_epoch_.value()); } } -void ReliableStorageLockManager::take_lock_guard(const ReliableStorageLock<> &lock) { - guard = std::make_shared(lock, [](){ +inline void ReliableStorageLockManager::take_lock_guard(const ReliableStorageLock<> &lock) { + auto aquired = lock.retry_until_take_lock(); + guard = std::make_shared(lock, aquired, [](){ throw LostReliableLock(); }); } -void ReliableStorageLockManager::free_lock_guard() { +inline void ReliableStorageLockManager::free_lock_guard() { guard = std::nullopt; } diff --git a/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp b/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp index 033081c539..d500c010e0 100644 --- a/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp +++ b/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp @@ -82,7 +82,8 @@ struct SlowIncrementTask : async::BaseTask { cnt_(cnt), lock_(lock), sleep_time_(sleep_time) {} void operator()() { - auto guard = ReliableStorageLockGuard(lock_, [that = this](){ + auto aquired = lock_.retry_until_take_lock(); + auto guard = ReliableStorageLockGuard(lock_, aquired, [that = this](){ that->lock_lost_ = true; }); auto value_before_sleep = cnt_; From 87a5e06cede762d110040222d14177173a1a7896 Mon Sep 17 00:00:00 2001 From: Ivo Dilov Date: Mon, 2 Dec 2024 11:51:20 +0200 Subject: [PATCH 4/6] Better naming for ReliableStorageLock - Addressing review comments on ReliableStorageLock pr - Mostly renames - Longer timeout before lock cleanup - Fixes mamba dependencies --- cpp/arcticdb/CMakeLists.txt | 4 +- cpp/arcticdb/entity/key.cpp | 2 +- cpp/arcticdb/entity/key.hpp | 2 +- cpp/arcticdb/storage/python_bindings.cpp | 2 +- cpp/arcticdb/storage/s3/detail-inl.hpp | 2 +- cpp/arcticdb/storage/s3/s3_client_wrapper.hpp | 7 +- cpp/arcticdb/storage/s3/s3_mock_client.cpp | 7 +- cpp/arcticdb/storage/s3/s3_mock_client.hpp | 2 +- cpp/arcticdb/storage/s3/s3_real_client.cpp | 4 +- cpp/arcticdb/storage/s3/s3_real_client.hpp | 2 +- cpp/arcticdb/stream/piloted_clock.cpp | 1 - cpp/arcticdb/stream/piloted_clock.hpp | 9 -- ...lock.tpp => reliable_storage_lock-inl.hpp} | 127 +++++++++--------- cpp/arcticdb/util/reliable_storage_lock.hpp | 35 ++--- cpp/arcticdb/util/storage_lock.hpp | 2 +- .../util/test/test_reliable_storage_lock.cpp | 30 ++--- cpp/vcpkg.json | 2 +- environment_unix.yml | 2 +- .../integration/arcticdb/test_storage_lock.py | 8 +- 19 files changed, 125 insertions(+), 125 deletions(-) rename cpp/arcticdb/util/{reliable_storage_lock.tpp => reliable_storage_lock-inl.hpp} (57%) diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index 2fca7962bb..8048fb0cf7 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -368,6 +368,7 @@ set(arcticdb_srcs util/sparse_utils.hpp util/storage_lock.hpp util/reliable_storage_lock.hpp + util/reliable_storage_lock-inl.hpp util/string_utils.hpp util/thread_cached_int.hpp util/timeouts.hpp @@ -515,8 +516,7 @@ set(arcticdb_srcs version/version_core.cpp version/version_store_api.cpp version/version_utils.cpp - version/version_map_batch_methods.cpp - util/reliable_storage_lock.tpp) + version/version_map_batch_methods.cpp) add_library(arcticdb_core_object OBJECT ${arcticdb_srcs}) diff --git a/cpp/arcticdb/entity/key.cpp b/cpp/arcticdb/entity/key.cpp index 599a06d53a..8c31aee12c 100644 --- a/cpp/arcticdb/entity/key.cpp +++ b/cpp/arcticdb/entity/key.cpp @@ -60,7 +60,7 @@ KeyData get_key_data(KeyType key_type) { STRING_REF(KeyType::APPEND_REF, aref, 'a') STRING_KEY(KeyType::MULTI_KEY, mref, 'm') STRING_REF(KeyType::LOCK, lref, 'x') - STRING_REF(KeyType::SLOW_LOCK, lref, 'x') + STRING_REF(KeyType::ATOMIC_LOCK, alref, 'A') STRING_REF(KeyType::SNAPSHOT_TOMBSTONE, ttomb, 'X') STRING_KEY(KeyType::APPEND_DATA, app, 'b') STRING_REF(KeyType::BLOCK_VERSION_REF, bvref, 'R') diff --git a/cpp/arcticdb/entity/key.hpp b/cpp/arcticdb/entity/key.hpp index e6ac27f974..1d37115586 100644 --- a/cpp/arcticdb/entity/key.hpp +++ b/cpp/arcticdb/entity/key.hpp @@ -189,7 +189,7 @@ enum class KeyType : int { /* * Used for a list based reliable storage lock */ - SLOW_LOCK = 27, + ATOMIC_LOCK = 27, UNDEFINED }; diff --git a/cpp/arcticdb/storage/python_bindings.cpp b/cpp/arcticdb/storage/python_bindings.cpp index 4a69ad9069..1b1e45b059 100644 --- a/cpp/arcticdb/storage/python_bindings.cpp +++ b/cpp/arcticdb/storage/python_bindings.cpp @@ -51,7 +51,7 @@ void register_bindings(py::module& storage, py::exception failed_deletes; }; +enum class PutHeader{ + NONE, + IF_NONE_MATCH +}; + // An abstract class, which is responsible for sending the requests and parsing the responses from S3. // It can be derived as either a real connection to S3 or a mock used for unit tests. class S3ClientWrapper { @@ -58,7 +63,7 @@ class S3ClientWrapper { const std::string& s3_object_name, Segment&& segment, const std::string& bucket_name, - bool if_none_match = false) = 0; + PutHeader header = PutHeader::NONE) = 0; virtual S3Result delete_objects( const std::vector& s3_object_names, diff --git a/cpp/arcticdb/storage/s3/s3_mock_client.cpp b/cpp/arcticdb/storage/s3/s3_mock_client.cpp index aec87b8b48..3d614d5a05 100644 --- a/cpp/arcticdb/storage/s3/s3_mock_client.cpp +++ b/cpp/arcticdb/storage/s3/s3_mock_client.cpp @@ -49,6 +49,7 @@ std::optional has_failure_trigger(const std::string& s3_object } const auto not_found_error = Aws::S3::S3Error(Aws::Client::AWSError(Aws::S3::S3Errors::RESOURCE_NOT_FOUND, false)); +const auto precondition_failed_error = Aws::S3::S3Error(Aws::Client::AWSError(Aws::S3::S3Errors::UNKNOWN, "Precondition failed", "Precondition failed", false)); S3Result MockS3Client::head_object( const std::string& s3_object_name, @@ -84,12 +85,16 @@ S3Result MockS3Client::put_object( const std::string &s3_object_name, Segment &&segment, const std::string &bucket_name, - bool if_none_match[[maybe_unused]]) { + PutHeader header) { auto maybe_error = has_failure_trigger(s3_object_name, StorageOperation::WRITE); if (maybe_error.has_value()) { return {*maybe_error}; } + if (header == PutHeader::IF_NONE_MATCH && s3_contents.contains({bucket_name, s3_object_name})) { + return {precondition_failed_error}; + } + s3_contents.insert_or_assign({bucket_name, s3_object_name}, std::move(segment)); return {std::monostate()}; diff --git a/cpp/arcticdb/storage/s3/s3_mock_client.hpp b/cpp/arcticdb/storage/s3/s3_mock_client.hpp index c7c094da23..75561adb91 100644 --- a/cpp/arcticdb/storage/s3/s3_mock_client.hpp +++ b/cpp/arcticdb/storage/s3/s3_mock_client.hpp @@ -62,7 +62,7 @@ class MockS3Client : public S3ClientWrapper { const std::string& s3_object_name, Segment&& segment, const std::string& bucket_name, - bool if_none_match = false) override; + PutHeader header = PutHeader::NONE) override; S3Result delete_objects( const std::vector& s3_object_names, diff --git a/cpp/arcticdb/storage/s3/s3_real_client.cpp b/cpp/arcticdb/storage/s3/s3_real_client.cpp index 9fd7b80ec7..2f5d6e79de 100644 --- a/cpp/arcticdb/storage/s3/s3_real_client.cpp +++ b/cpp/arcticdb/storage/s3/s3_real_client.cpp @@ -134,13 +134,13 @@ S3Result RealS3Client::put_object( const std::string &s3_object_name, Segment &&segment, const std::string &bucket_name, - bool if_none_match) { + PutHeader header) { ARCTICDB_SUBSAMPLE(S3StorageWritePreamble, 0) Aws::S3::Model::PutObjectRequest request; request.SetBucket(bucket_name.c_str()); request.SetKey(s3_object_name.c_str()); - if (if_none_match) { + if (header == PutHeader::IF_NONE_MATCH) { request.SetIfNoneMatch("*"); } ARCTICDB_RUNTIME_DEBUG(log::storage(), "Set s3 key {}", request.GetKey().c_str()); diff --git a/cpp/arcticdb/storage/s3/s3_real_client.hpp b/cpp/arcticdb/storage/s3/s3_real_client.hpp index b0e7e0caff..414178e507 100644 --- a/cpp/arcticdb/storage/s3/s3_real_client.hpp +++ b/cpp/arcticdb/storage/s3/s3_real_client.hpp @@ -39,7 +39,7 @@ class RealS3Client : public S3ClientWrapper { const std::string& s3_object_name, Segment&& segment, const std::string& bucket_name, - bool if_none_match = false) override; + PutHeader header = PutHeader::NONE) override; S3Result delete_objects( const std::vector& s3_object_names, diff --git a/cpp/arcticdb/stream/piloted_clock.cpp b/cpp/arcticdb/stream/piloted_clock.cpp index c17530c724..d396cfc6ca 100644 --- a/cpp/arcticdb/stream/piloted_clock.cpp +++ b/cpp/arcticdb/stream/piloted_clock.cpp @@ -2,5 +2,4 @@ namespace arcticdb { std::atomic PilotedClock::time_; -std::atomic PilotedClockNoAutoIncrement::time_; } \ No newline at end of file diff --git a/cpp/arcticdb/stream/piloted_clock.hpp b/cpp/arcticdb/stream/piloted_clock.hpp index 41da1b5e0f..f2b79189ac 100644 --- a/cpp/arcticdb/stream/piloted_clock.hpp +++ b/cpp/arcticdb/stream/piloted_clock.hpp @@ -1,5 +1,3 @@ -#pragma once - #include #include @@ -16,11 +14,4 @@ struct PilotedClock { } }; -struct PilotedClockNoAutoIncrement { - static std::atomic time_; - static entity::timestamp nanos_since_epoch() { - return PilotedClockNoAutoIncrement::time_; - } -}; - } //namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/util/reliable_storage_lock.tpp b/cpp/arcticdb/util/reliable_storage_lock-inl.hpp similarity index 57% rename from cpp/arcticdb/util/reliable_storage_lock.tpp rename to cpp/arcticdb/util/reliable_storage_lock-inl.hpp index 96d273b68d..3ae517051d 100644 --- a/cpp/arcticdb/util/reliable_storage_lock.tpp +++ b/cpp/arcticdb/util/reliable_storage_lock-inl.hpp @@ -21,12 +21,13 @@ namespace lock { const auto SEPARATOR = '*'; const auto EXTENDS_PER_TIMEOUT = 5u; +const auto REMOVE_AFTER_TIMEOUTS = 10u; inline StreamDescriptor lock_stream_descriptor(const StreamId &stream_id) { - return StreamDescriptor{stream_descriptor( + return stream_descriptor( stream_id, stream::RowCountIndex(), - {scalar_field(DataType::INT64, "expiration")})}; + {scalar_field(DataType::INT64, "expiration")}); } inline SegmentInMemory lock_segment(const StreamId &name, timestamp expiration) { @@ -40,12 +41,6 @@ template ReliableStorageLock::ReliableStorageLock(const std::string &base_name, const std::shared_ptr store, timestamp timeout) : base_name_(base_name), store_(store), timeout_(timeout) { storage::check(store_->supports_atomic_writes(), "Storage does not support atomic writes, so we can't create a lock"); - auto s3_timeout = ConfigsMap::instance()->get_int("S3Storage.RequestTimeoutMs", 200000) * ONE_MILLISECOND; - if (2 * s3_timeout > timeout) { - log::lock().warn( - "Reliable Lock is created with a timeout < twice the s3 timeout. This is not recommended, as it increases the risk for a faulty lock timeout in s3." - "Lock timeout: {}, S3 timeout: {}", timeout, s3_timeout); - } } template @@ -53,7 +48,7 @@ timestamp ReliableStorageLock::timeout() const { return timeout_; } -inline Epoch get_next_epoch(std::optional maybe_prev) { +inline AcquiredLockId get_next_id(std::optional maybe_prev) { if (maybe_prev.has_value()) { return maybe_prev.value() + 1; } @@ -61,27 +56,27 @@ inline Epoch get_next_epoch(std::optional maybe_prev) { } template -StreamId ReliableStorageLock::get_stream_id(Epoch e) const { - return fmt::format("{}{}{}", base_name_, SEPARATOR, e); +StreamId ReliableStorageLock::get_stream_id(AcquiredLockId lock_id) const { + return fmt::format("{}{}{}", base_name_, SEPARATOR, lock_id); } -inline Epoch extract_epoch_from_stream_id(const StreamId& stream_id) { +inline AcquiredLockId extract_lock_id_from_stream_id(const StreamId& stream_id) { auto string_id = std::get(stream_id); - auto epoch_string = string_id.substr(string_id.find(SEPARATOR)+1, string_id.size()); - return std::stoull(epoch_string); + auto lock_id_string = string_id.substr(string_id.find(SEPARATOR)+1, string_id.size()); + return std::stoull(lock_id_string); } template -std::pair, std::optional> ReliableStorageLock::get_all_locks() const { - std::vector epochs; +std::pair, std::optional> ReliableStorageLock::get_all_locks() const { + std::vector lock_ids; store_->iterate_type( - KeyType::SLOW_LOCK, - [&epochs](VariantKey &&key){ - auto current_epoch = extract_epoch_from_stream_id(variant_key_id(key)); - epochs.push_back(current_epoch); + KeyType::ATOMIC_LOCK, + [&lock_ids](VariantKey &&key) { + auto current_lock_id = extract_lock_id_from_stream_id(variant_key_id(key)); + lock_ids.push_back(current_lock_id); }, base_name_ + SEPARATOR); - std::optional latest = epochs.size()==0 ? std::nullopt : std::make_optional<>(*std::max_element(epochs.begin(), epochs.end())); - return {epochs, latest}; + std::optional latest = lock_ids.size() == 0 ? std::nullopt : std::make_optional<>(*std::max_element(lock_ids.begin(), lock_ids.end())); + return {lock_ids, latest}; } template @@ -91,10 +86,10 @@ timestamp ReliableStorageLock::get_expiration(RefKey lock_key) const } template -void ReliableStorageLock::clear_old_locks(const std::vector& epochs) const { +void ReliableStorageLock::clear_old_locks(const std::vector& lock_ids) const { auto now = ClockType::nanos_since_epoch(); auto to_delete = std::vector(); - // We only clear locks that have expired more than a timeout (we assume a process can't be paused for more than the timeout) ago. + // We only clear locks that have expired more than 10 timeouts (we assume a process can't be paused for more than the timeout) ago. // We do this to avoid a process mistakenly taking a lock if: // 1. Process A lists locks and gets [4, 5, 6] // 2. Process A decides to attempt taking lock 7 @@ -102,9 +97,9 @@ void ReliableStorageLock::clear_old_locks(const std::vector& e // 4. Process B takes locks 7 and 8 // 5. Process B decides to clear lock 7 since it's not the latest // 6. Process A succeeds in taking lock 7 - for (auto epoch : epochs) { - auto lock_key = RefKey{get_stream_id(epoch), KeyType::SLOW_LOCK}; - if (get_expiration(lock_key) + timeout_ < now) { + for (auto lock_id : lock_ids) { + auto lock_key = RefKey{get_stream_id(lock_id), KeyType::ATOMIC_LOCK}; + if (get_expiration(lock_key) + REMOVE_AFTER_TIMEOUTS * timeout_ < now) { to_delete.emplace_back(lock_key); } } @@ -112,20 +107,20 @@ void ReliableStorageLock::clear_old_locks(const std::vector& e } template -std::optional ReliableStorageLock::try_take_lock() const { +std::optional ReliableStorageLock::try_take_lock() const { auto [existing_locks, latest] = get_all_locks(); if (latest.has_value()) { - auto expires = get_expiration(RefKey{get_stream_id(latest.value()), KeyType::SLOW_LOCK}); + auto expires = get_expiration(RefKey{get_stream_id(latest.value()), KeyType::ATOMIC_LOCK}); if (expires > ClockType::nanos_since_epoch()) { // An unexpired lock exists return std::nullopt; } } - return try_take_next_epoch(existing_locks, latest); + return try_take_next_id(existing_locks, latest); } template -Epoch ReliableStorageLock::retry_until_take_lock() const { +AcquiredLockId ReliableStorageLock::retry_until_take_lock() const { // We don't use the ExponentialBackoff because we want to be able to wait indefinitely auto max_wait = std::chrono::duration_cast(std::chrono::nanoseconds(timeout())); auto min_wait = max_wait / 16; @@ -137,74 +132,74 @@ Epoch ReliableStorageLock::retry_until_take_lock() const { return current_wait * factor; }; - auto aquired_epoch = try_take_lock(); - while (!aquired_epoch.has_value()) { + auto acquired_lock = try_take_lock(); + while (!acquired_lock.has_value()) { std::this_thread::sleep_for(jittered_wait()); current_wait = std::min(current_wait * 2, max_wait); - aquired_epoch = try_take_lock(); + acquired_lock = try_take_lock(); } - return aquired_epoch.value(); + return acquired_lock.value(); } template -std::optional ReliableStorageLock::try_extend_lock(Epoch held_lock_epoch) const { +std::optional ReliableStorageLock::try_extend_lock(AcquiredLockId acquired_lock) const { auto [existing_locks, latest] = get_all_locks(); - util::check(latest.has_value() && latest.value() >= held_lock_epoch, - "We are trying to extend a newer epoch than the existing one in storage. Extend epoch: {}", - held_lock_epoch); - if (latest.value() != held_lock_epoch) { + util::check(latest.has_value() && latest.value() >= acquired_lock, + "We are trying to extend a newer lock_id than the existing one in storage. Extend lock_id: {}", + acquired_lock); + if (latest.value() != acquired_lock) { // We have lost the lock while holding it (most likely due to timeout). return std::nullopt; } - return try_take_next_epoch(existing_locks, latest); + return try_take_next_id(existing_locks, latest); } template -void ReliableStorageLock::free_lock(Epoch held_lock_epoch) const { - auto [existing_locks, latest_epoch] = get_all_locks(); - util::check(latest_epoch.has_value() && latest_epoch.value() >= held_lock_epoch, - "We are trying to free a newer epoch than the existing one in storage. Free epoch: {}, Existing epoch: {}", - held_lock_epoch, latest_epoch); - if (latest_epoch.value() != held_lock_epoch) { +void ReliableStorageLock::free_lock(AcquiredLockId acquired_lock) const { + auto [existing_locks, latest_lock_id] = get_all_locks(); + util::check(latest_lock_id.has_value() && latest_lock_id.value() >= acquired_lock, + "We are trying to free a newer lock_id than the existing one in storage. Free lock_id: {}, Existing lock_id: {}", + acquired_lock, latest_lock_id); + if (latest_lock_id.value() != acquired_lock) { // Lock is already lost return; } - auto lock_stream_id = get_stream_id(held_lock_epoch); + auto lock_stream_id = get_stream_id(acquired_lock); auto expiration = ClockType::nanos_since_epoch(); // Write current time to mark lock as expired as of now - store_->write_sync(KeyType::SLOW_LOCK, lock_stream_id, lock_segment(lock_stream_id, expiration)); + store_->write_sync(KeyType::ATOMIC_LOCK, lock_stream_id, lock_segment(lock_stream_id, expiration)); } template -std::optional ReliableStorageLock::try_take_next_epoch(const std::vector& existing_locks, std::optional latest) const { - Epoch epoch = get_next_epoch(latest); - auto lock_stream_id = get_stream_id(epoch); +std::optional ReliableStorageLock::try_take_next_id(const std::vector& existing_locks, std::optional latest) const { + AcquiredLockId lock_id = get_next_id(latest); + auto lock_stream_id = get_stream_id(lock_id); auto expiration = ClockType::nanos_since_epoch() + timeout_; try { - store_->write_if_none_sync(KeyType::SLOW_LOCK, lock_stream_id, lock_segment(lock_stream_id, expiration)); + store_->write_if_none_sync(KeyType::ATOMIC_LOCK, lock_stream_id, lock_segment(lock_stream_id, expiration)); } catch (const StorageException& e) { // There is no specific Aws::S3::S3Errors for the failed atomic operation, so we catch any StorageException. - // Either way it's safe to assume we have failed to aquire the lock in case of transient S3 error. + // Either way it's safe to assume we have failed to acquire the lock in case of transient S3 error. // If error persists we'll approprieately raise in the next attempt to LIST/GET the existing lock and propagate // the transient error. - log::lock().warn("Failed to aquire lock (likely someone aquired it before us): {}", e.what()); + log::lock().warn("Failed to acquire lock (likely someone acquired it before us): {}", e.what()); return std::nullopt; } // We clear old locks only after aquiring the lock to avoid duplicating the deletion work clear_old_locks(existing_locks); - return epoch; + return lock_id; } -inline ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageLock<> &lock, Epoch aquired_epoch, folly::Func&& on_lost_lock) : - lock_(lock), aquired_epoch_(std::nullopt), on_lost_lock_(std::move(on_lost_lock)) { - aquired_epoch_ = aquired_epoch; +inline ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageLock<> &lock, AcquiredLockId acquired_lock, folly::Func&& on_lost_lock) : + lock_(lock), acquired_lock_(std::nullopt), on_lost_lock_(std::move(on_lost_lock)) { + acquired_lock_ = acquired_lock; // We heartbeat 5 times per lock timeout to extend the lock. auto hearbeat_frequency = std::chrono::duration_cast( std::chrono::nanoseconds(lock_.timeout() / EXTENDS_PER_TIMEOUT)); extend_lock_heartbeat_.addFunction( [that=this](){ - if (that->aquired_epoch_.has_value()) { - that->aquired_epoch_ = that->lock_.try_extend_lock(that->aquired_epoch_.value()); - if (!that->aquired_epoch_.has_value()) { + if (that->acquired_lock_.has_value()) { + that->acquired_lock_ = that->lock_.try_extend_lock(that->acquired_lock_.value()); + if (!that->acquired_lock_.has_value()) { // Clean up if we have lost the lock. that->cleanup_on_lost_lock(); } @@ -221,14 +216,14 @@ inline void ReliableStorageLockGuard::cleanup_on_lost_lock() { inline ReliableStorageLockGuard::~ReliableStorageLockGuard() { extend_lock_heartbeat_.shutdown(); - if (aquired_epoch_.has_value()) { - lock_.free_lock(aquired_epoch_.value()); + if (acquired_lock_.has_value()) { + lock_.free_lock(acquired_lock_.value()); } } inline void ReliableStorageLockManager::take_lock_guard(const ReliableStorageLock<> &lock) { - auto aquired = lock.retry_until_take_lock(); - guard = std::make_shared(lock, aquired, [](){ + auto acquired = lock.retry_until_take_lock(); + guard = std::make_shared(lock, acquired, [](){ throw LostReliableLock(); }); } diff --git a/cpp/arcticdb/util/reliable_storage_lock.hpp b/cpp/arcticdb/util/reliable_storage_lock.hpp index 56b9f5851b..8008a97f79 100644 --- a/cpp/arcticdb/util/reliable_storage_lock.hpp +++ b/cpp/arcticdb/util/reliable_storage_lock.hpp @@ -19,46 +19,49 @@ namespace arcticdb { namespace lock { -using Epoch = uint64_t; +using AcquiredLockId = uint64_t; // The ReliableStorageLock is a storage lock which relies on atomic If-None-Match Put and ListObject operations to -// provide a slower but more reliable lock than the StorageLock. It should be completely consistent unless a process -// holding a lock get's paused for times comparable to the lock timeout. +// provide a more reliable lock than the StorageLock but it requires the backend to support atomic operations. It should +// be completely consistent unless a process holding a lock gets paused for times comparable to the lock timeout. // It lock follows the algorithm described here: // https://www.morling.dev/blog/leader-election-with-s3-conditional-writes/ +// Note that the ReliableStorageLock just provides methods for requesting or extending acquired locks. It doesn't hold any +// information about the acquired locks so far and none of its APIs are re-entrant. Thus the user is responsible for +// protecting and extending the acquired locks (which can be done through the ReliableStorageLockGuard). template class ReliableStorageLock { public: ReliableStorageLock(const std::string& base_name, const std::shared_ptr store, timestamp timeout); - Epoch retry_until_take_lock() const; - std::optional try_take_lock() const; - std::optional try_extend_lock(Epoch held_lock_epoch) const; - void free_lock(Epoch held_lock_epoch) const; + AcquiredLockId retry_until_take_lock() const; + std::optional try_take_lock() const; + std::optional try_extend_lock(AcquiredLockId acquired_lock) const; + void free_lock(AcquiredLockId acquired_lock) const; timestamp timeout() const; private: - std::optional try_take_next_epoch(const std::vector& existing_locks, std::optional latest) const; - std::pair, std::optional> get_all_locks() const; + std::optional try_take_next_id(const std::vector& existing_locks, std::optional latest) const; + std::pair, std::optional> get_all_locks() const; timestamp get_expiration(RefKey lock_key) const; - void clear_old_locks(const std::vector& epochs) const; - StreamId get_stream_id(Epoch e) const; + void clear_old_locks(const std::vector& acquired_locks) const; + StreamId get_stream_id(AcquiredLockId acquired_lock) const; std::string base_name_; std::shared_ptr store_; timestamp timeout_; }; -// The ReliableStorageLockGuard protects an aquired ReliableStorageLock::Epoch and frees it on destruction. While the lock -// is held it periodically extends its timeout in a heartbeating thread. If for some reason the lock is lost we get notified +// The ReliableStorageLockGuard protects an AcquiredLockId and frees it on destruction. While the lock is held it +// periodically extends its timeout in a heartbeating thread. If for some reason the lock is lost we get notified // via the on_lock_lost. class ReliableStorageLockGuard { public: - ReliableStorageLockGuard(const ReliableStorageLock<> &lock, Epoch aquired_epoch, folly::Func&& on_lost_lock); + ReliableStorageLockGuard(const ReliableStorageLock<> &lock, AcquiredLockId acquired_lock, folly::Func&& on_lost_lock); ~ReliableStorageLockGuard(); private: void cleanup_on_lost_lock(); const ReliableStorageLock<> &lock_; - std::optional aquired_epoch_; + std::optional acquired_lock_; folly::Func on_lost_lock_; folly::FunctionScheduler extend_lock_heartbeat_; }; @@ -78,4 +81,4 @@ class ReliableStorageLockManager { } -#include "arcticdb/util/reliable_storage_lock.tpp" \ No newline at end of file +#include "arcticdb/util/reliable_storage_lock-inl.hpp" \ No newline at end of file diff --git a/cpp/arcticdb/util/storage_lock.hpp b/cpp/arcticdb/util/storage_lock.hpp index 61963e2e5e..f1b1229f85 100644 --- a/cpp/arcticdb/util/storage_lock.hpp +++ b/cpp/arcticdb/util/storage_lock.hpp @@ -70,7 +70,7 @@ inline std::thread::id get_thread_id() noexcept { return std::this_thread::get_id(); } -// This StorageLock is inherently unreliable. It does not use atomic operations and it is possible for two processes to aquire if the timing is right. +// This StorageLock is inherently unreliable. It does not use atomic operations and it is possible for two processes to acquire if the timing is right. // If you want a reliable alternative which is slower but uses atomic primitives you can look at the `ReliableStorageLock`. template class StorageLock { diff --git a/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp b/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp index d500c010e0..4fa555a0b6 100644 --- a/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp +++ b/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp @@ -11,7 +11,7 @@ #include #include #include -#include +#include using namespace arcticdb; using namespace lock; @@ -20,33 +20,37 @@ using namespace lock; TEST(ReliableStorageLock, SingleThreaded) { auto store = std::make_shared(); - using Clock = PilotedClockNoAutoIncrement; + using Clock = util::ManualClock; // We have 2 locks, one with timeout of 20 and another with a timeout of 10 ReliableStorageLock lock1{StringId{"test_lock"}, store, 20}; ReliableStorageLock lock2{StringId{"test_lock"}, store, 10}; auto count_locks = [&]() { auto number_of_lock_keys = 0; - store->iterate_type(KeyType::SLOW_LOCK, [&number_of_lock_keys](VariantKey&& _ [[maybe_unused]]){++number_of_lock_keys;}); + store->iterate_type(KeyType::ATOMIC_LOCK, [&number_of_lock_keys](VariantKey&& _ [[maybe_unused]]){++number_of_lock_keys;}); return number_of_lock_keys; }; // We take the first lock at 0 and it should not expire until 20 Clock::time_ = 0; ASSERT_EQ(lock1.try_take_lock(), std::optional{0}); + ASSERT_EQ(lock2.try_take_lock(), std::nullopt); Clock::time_ = 5; + ASSERT_EQ(lock1.try_take_lock(), std::nullopt); ASSERT_EQ(lock2.try_take_lock(), std::nullopt); Clock::time_ = 10; + ASSERT_EQ(lock1.try_take_lock(), std::nullopt); ASSERT_EQ(lock2.try_take_lock(), std::nullopt); Clock::time_ = 19; ASSERT_EQ(lock1.try_take_lock(), std::nullopt); + ASSERT_EQ(lock2.try_take_lock(), std::nullopt); - // Once the first lock has expired we can take a new lock with epoch=1 + // Once the first lock has expired we can take a new lock with lock_id=1 Clock::time_ = 20; ASSERT_EQ(lock2.try_take_lock(), std::optional{1}); ASSERT_EQ(count_locks(), 2); - // We can extend the lock timeout at 25 to 35 and get an epoch=2 + // We can extend the lock timeout at 25 to 35 and get an lock_id=2 Clock::time_ = 25; ASSERT_EQ(lock1.try_take_lock(), std::nullopt); ASSERT_EQ(lock2.try_extend_lock(1), std::optional{2}); @@ -54,7 +58,7 @@ TEST(ReliableStorageLock, SingleThreaded) { Clock::time_ = 34; ASSERT_EQ(lock1.try_take_lock(), std::nullopt); - // At time 35 the lock with epoch=2 has expired and we can re-aquire the lock + // At time 35 the lock with lock_id=2 has expired and we can re-acquire the lock Clock::time_ = 35; ASSERT_EQ(lock1.try_take_lock(), std::optional{3}); ASSERT_EQ(count_locks(), 4); @@ -63,11 +67,10 @@ TEST(ReliableStorageLock, SingleThreaded) { // And we can free the lock immediately to allow re-aquiring without waiting for timeout lock1.free_lock(3); ASSERT_EQ(lock2.try_take_lock(), std::optional{4}); - // Taking lock2 with timeout=10 means we should clear all locks which have expired before 25. In this case just epoch=0 - ASSERT_EQ(count_locks(), 4); + ASSERT_EQ(count_locks(), 5); - // But if we take a lock at 100 all locks would have expired a timeout=10 ago, and we should clear all apart from latest epoch=5 - Clock::time_ = 100; + // But if we take a lock at 1000 all locks would have expired a 10xtimeout=100 ago, and we should clear all apart from latest lock_id=5 + Clock::time_ = 1000; ASSERT_EQ(lock2.try_take_lock(), std::optional{5}); ASSERT_EQ(count_locks(), 1); } @@ -82,19 +85,16 @@ struct SlowIncrementTask : async::BaseTask { cnt_(cnt), lock_(lock), sleep_time_(sleep_time) {} void operator()() { - auto aquired = lock_.retry_until_take_lock(); - auto guard = ReliableStorageLockGuard(lock_, aquired, [that = this](){ + auto acquired = lock_.retry_until_take_lock(); + auto guard = ReliableStorageLockGuard(lock_, acquired, [that = this](){ that->lock_lost_ = true; }); auto value_before_sleep = cnt_; - // std::cout<<"Taken a lock with "<=": "1.11.405", "$version reason": "Version which contains atomic put operations", "default-features": false, "features": [ "s3", "identity-management" ] @@ -105,6 +104,7 @@ ], "overrides": [ { "name": "openssl", "version-string": "3.3.0" }, + { "name": "aws-sdk-cpp", "version": "1.11.405" }, { "name": "azure-core-cpp", "version": "1.12.0" }, { "name": "benchmark", "version": "1.9.0" }, { "name": "bitmagic", "version": "7.12.3" }, diff --git a/environment_unix.yml b/environment_unix.yml index 54277bf6e0..5d76d33d7c 100644 --- a/environment_unix.yml +++ b/environment_unix.yml @@ -27,7 +27,7 @@ dependencies: - pybind11 <2.11 - pcre - cyrus-sasl - - aws-sdk-cpp + - aws-sdk-cpp >=1.11.405 - prometheus-cpp - libprotobuf - openssl diff --git a/python/tests/integration/arcticdb/test_storage_lock.py b/python/tests/integration/arcticdb/test_storage_lock.py index 182b87965d..b3294943c0 100644 --- a/python/tests/integration/arcticdb/test_storage_lock.py +++ b/python/tests/integration/arcticdb/test_storage_lock.py @@ -5,11 +5,10 @@ from arcticdb_ext.tools import ReliableStorageLock, ReliableStorageLockManager from tests.util.mark import PERSISTENT_STORAGE_TESTS_ENABLED, REAL_S3_TESTS_MARK -from multiprocessing import Process, set_start_method -set_start_method("fork") # Okay to fork an S3 lib import time from arcticdb.util.test import assert_frame_equal +from multiprocessing import Process one_sec = 1_000_000_000 @@ -30,6 +29,7 @@ def test_many_increments(real_s3_version_store, num_processes, max_sleep): lib = real_s3_version_store init_df = pd.DataFrame({"col": [0]}) symbol = "counter" + lib.version_store.force_delete_symbol(symbol) lib.write(symbol, init_df) lock = ReliableStorageLock("test_lock", lib._library, 10*one_sec) lock_manager = ReliableStorageLockManager() @@ -44,6 +44,8 @@ def test_many_increments(real_s3_version_store, num_processes, max_sleep): for p in processes: p.join() - read_df = lib.read(symbol).data + vit = lib.read(symbol) + read_df = vit.data expected_df = pd.DataFrame({"col": [num_processes]}) assert_frame_equal(read_df, expected_df) + assert vit.version == num_processes From 63c9bb8a190f82bd7f75f441ff67cac31727dc4d Mon Sep 17 00:00:00 2001 From: Ivo Dilov Date: Wed, 4 Dec 2024 15:15:43 +0200 Subject: [PATCH 5/6] Allow setting the on_lock_lost after construction This will be useful if we want to construct a ReliableStorageLockGuard but decide later what to do in case a lock is lost. Also changes ReliableStorageLockGuard to hold a copy of the lock instead of just a reference. The lock is a cheap object to copy, so it makes easier to avoid errors where lock is destructed before the guard for it. --- cpp/arcticdb/util/reliable_storage_lock-inl.hpp | 14 ++++++++++++-- cpp/arcticdb/util/reliable_storage_lock.hpp | 10 +++++++--- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/cpp/arcticdb/util/reliable_storage_lock-inl.hpp b/cpp/arcticdb/util/reliable_storage_lock-inl.hpp index 3ae517051d..83e7bf5985 100644 --- a/cpp/arcticdb/util/reliable_storage_lock-inl.hpp +++ b/cpp/arcticdb/util/reliable_storage_lock-inl.hpp @@ -189,7 +189,7 @@ std::optional ReliableStorageLock::try_take_next_id(c return lock_id; } -inline ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageLock<> &lock, AcquiredLockId acquired_lock, folly::Func&& on_lost_lock) : +inline ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageLock<> &lock, AcquiredLockId acquired_lock, std::optional&& on_lost_lock) : lock_(lock), acquired_lock_(std::nullopt), on_lost_lock_(std::move(on_lost_lock)) { acquired_lock_ = acquired_lock; // We heartbeat 5 times per lock timeout to extend the lock. @@ -211,7 +211,9 @@ inline ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageL inline void ReliableStorageLockGuard::cleanup_on_lost_lock() { // We do not use shutdown because we don't want to run it from within a FunctionScheduler thread to avoid a deadlock extend_lock_heartbeat_.cancelAllFunctions(); - on_lost_lock_(); + if (on_lost_lock_.has_value()){ + on_lost_lock_.value()(); + } } inline ReliableStorageLockGuard::~ReliableStorageLockGuard() { @@ -221,6 +223,14 @@ inline ReliableStorageLockGuard::~ReliableStorageLockGuard() { } } +inline void ReliableStorageLockGuard::set_on_lost_lock(folly::Func &&on_lost_lock) { + on_lost_lock_ = std::make_optional(std::move(on_lost_lock)); + if (!acquired_lock_.has_value()) { + // Lock was lost before we set on_lost_lock. Running callback immediately. + on_lost_lock_.value()(); + } +} + inline void ReliableStorageLockManager::take_lock_guard(const ReliableStorageLock<> &lock) { auto acquired = lock.retry_until_take_lock(); guard = std::make_shared(lock, acquired, [](){ diff --git a/cpp/arcticdb/util/reliable_storage_lock.hpp b/cpp/arcticdb/util/reliable_storage_lock.hpp index 8008a97f79..4628a67b2b 100644 --- a/cpp/arcticdb/util/reliable_storage_lock.hpp +++ b/cpp/arcticdb/util/reliable_storage_lock.hpp @@ -33,6 +33,7 @@ template class ReliableStorageLock { public: ReliableStorageLock(const std::string& base_name, const std::shared_ptr store, timestamp timeout); + ReliableStorageLock(const ReliableStorageLock& other) = default; AcquiredLockId retry_until_take_lock() const; std::optional try_take_lock() const; @@ -55,14 +56,17 @@ class ReliableStorageLock { // via the on_lock_lost. class ReliableStorageLockGuard { public: - ReliableStorageLockGuard(const ReliableStorageLock<> &lock, AcquiredLockId acquired_lock, folly::Func&& on_lost_lock); + ReliableStorageLockGuard(const ReliableStorageLock<>& lock, AcquiredLockId acquired_lock, std::optional&& on_lost_lock); ~ReliableStorageLockGuard(); + + // Will immediately trigger [on_lost_lock] if lock is already lost. + void set_on_lost_lock(folly::Func&& on_lost_lock); private: void cleanup_on_lost_lock(); - const ReliableStorageLock<> &lock_; + const ReliableStorageLock<> lock_; std::optional acquired_lock_; - folly::Func on_lost_lock_; + std::optional on_lost_lock_; folly::FunctionScheduler extend_lock_heartbeat_; }; From a025892157bbc7a0967510b0ecba57fe97b67bf0 Mon Sep 17 00:00:00 2001 From: Ivo Dilov Date: Thu, 5 Dec 2024 17:46:35 +0200 Subject: [PATCH 6/6] Denoise Precondition failed logs Because aws-sdk-cpp transforms PreconditionFailed errors into S3Error::Unkown we do string munging on the exception name to identify it. Also fixes some block version ref key conflicts --- cpp/arcticdb/CMakeLists.txt | 4 ++-- cpp/arcticdb/entity/key.hpp | 2 +- cpp/arcticdb/storage/s3/detail-inl.hpp | 4 ++++ cpp/arcticdb/storage/test/in_memory_store.hpp | 2 +- cpp/arcticdb/util/error_code.hpp | 1 + cpp/arcticdb/util/reliable_storage_lock-inl.hpp | 2 +- 6 files changed, 10 insertions(+), 5 deletions(-) diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index 8048fb0cf7..429b8acad4 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -947,6 +947,7 @@ if(${TEST}) util/test/test_id_transformation.cpp util/test/test_key_utils.cpp util/test/test_ranges_from_future.cpp + util/test/test_reliable_storage_lock.cpp util/test/test_slab_allocator.cpp util/test/test_storage_lock.cpp util/test/test_string_pool.cpp @@ -964,8 +965,7 @@ if(${TEST}) version/test/test_version_store.cpp version/test/version_map_model.hpp python/python_handlers.cpp - storage/test/common.hpp - util/test/test_reliable_storage_lock.cpp) + storage/test/common.hpp) set(EXECUTABLE_PERMS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE) # 755 diff --git a/cpp/arcticdb/entity/key.hpp b/cpp/arcticdb/entity/key.hpp index 1d37115586..54b57c8fb9 100644 --- a/cpp/arcticdb/entity/key.hpp +++ b/cpp/arcticdb/entity/key.hpp @@ -189,7 +189,7 @@ enum class KeyType : int { /* * Used for a list based reliable storage lock */ - ATOMIC_LOCK = 27, + ATOMIC_LOCK = 28, UNDEFINED }; diff --git a/cpp/arcticdb/storage/s3/detail-inl.hpp b/cpp/arcticdb/storage/s3/detail-inl.hpp index 3ceab28259..5e9128e79f 100644 --- a/cpp/arcticdb/storage/s3/detail-inl.hpp +++ b/cpp/arcticdb/storage/s3/detail-inl.hpp @@ -70,6 +70,10 @@ namespace s3 { error_message_suffix)); } + if (type == Aws::S3::S3Errors::UNKNOWN && err.GetExceptionName().find("Precondition") != std::string::npos) { + raise(fmt::format("Atomic operation failed: {}", error_message_suffix)); + } + // We create a more detailed error explanation in case of NETWORK_CONNECTION errors to remedy #880. if (type == Aws::S3::S3Errors::NETWORK_CONNECTION) { error_message = fmt::format("Unexpected network error: {} " diff --git a/cpp/arcticdb/storage/test/in_memory_store.hpp b/cpp/arcticdb/storage/test/in_memory_store.hpp index 255fb4b5eb..da78be3c28 100644 --- a/cpp/arcticdb/storage/test/in_memory_store.hpp +++ b/cpp/arcticdb/storage/test/in_memory_store.hpp @@ -484,7 +484,7 @@ namespace arcticdb { ARCTICDB_DEBUG(log::storage(), "Adding segment with key {}", key); if (if_none_match) { if (seg_by_ref_key_.find(key) != seg_by_ref_key_.end()) { - storage::raise("Precondition failed. Object is already present."); + storage::raise("Precondition failed. Object is already present."); } } seg_by_ref_key_[key] = std::make_unique(std::move(seg)); diff --git a/cpp/arcticdb/util/error_code.hpp b/cpp/arcticdb/util/error_code.hpp index ff56ed1d20..c38484d3e0 100644 --- a/cpp/arcticdb/util/error_code.hpp +++ b/cpp/arcticdb/util/error_code.hpp @@ -85,6 +85,7 @@ inline std::unordered_map get_error_category_names() ERROR_CODE(5011, E_UNEXPECTED_LMDB_ERROR) \ ERROR_CODE(5020, E_UNEXPECTED_S3_ERROR) \ ERROR_CODE(5021, E_S3_RETRYABLE) \ + ERROR_CODE(5022, E_ATOMIC_OPERATION_FAILED) \ ERROR_CODE(5030, E_UNEXPECTED_AZURE_ERROR) \ ERROR_CODE(5050, E_MONGO_BULK_OP_NO_REPLY) \ ERROR_CODE(5051, E_UNEXPECTED_MONGO_ERROR) \ diff --git a/cpp/arcticdb/util/reliable_storage_lock-inl.hpp b/cpp/arcticdb/util/reliable_storage_lock-inl.hpp index 83e7bf5985..cd564b4bfc 100644 --- a/cpp/arcticdb/util/reliable_storage_lock-inl.hpp +++ b/cpp/arcticdb/util/reliable_storage_lock-inl.hpp @@ -181,7 +181,7 @@ std::optional ReliableStorageLock::try_take_next_id(c // Either way it's safe to assume we have failed to acquire the lock in case of transient S3 error. // If error persists we'll approprieately raise in the next attempt to LIST/GET the existing lock and propagate // the transient error. - log::lock().warn("Failed to acquire lock (likely someone acquired it before us): {}", e.what()); + log::lock().debug("Failed to acquire lock (likely someone acquired it before us): {}", e.what()); return std::nullopt; } // We clear old locks only after aquiring the lock to avoid duplicating the deletion work