diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index f7ca660960..429b8acad4 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -367,6 +367,8 @@ set(arcticdb_srcs util/slab_allocator.hpp util/sparse_utils.hpp util/storage_lock.hpp + util/reliable_storage_lock.hpp + util/reliable_storage_lock-inl.hpp util/string_utils.hpp util/thread_cached_int.hpp util/timeouts.hpp @@ -514,8 +516,7 @@ set(arcticdb_srcs version/version_core.cpp version/version_store_api.cpp version/version_utils.cpp - version/version_map_batch_methods.cpp - ) + version/version_map_batch_methods.cpp) add_library(arcticdb_core_object OBJECT ${arcticdb_srcs}) @@ -946,6 +947,7 @@ if(${TEST}) util/test/test_id_transformation.cpp util/test/test_key_utils.cpp util/test/test_ranges_from_future.cpp + util/test/test_reliable_storage_lock.cpp util/test/test_slab_allocator.cpp util/test/test_storage_lock.cpp util/test/test_string_pool.cpp @@ -963,8 +965,7 @@ if(${TEST}) version/test/test_version_store.cpp version/test/version_map_model.hpp python/python_handlers.cpp - storage/test/common.hpp - ) + storage/test/common.hpp) set(EXECUTABLE_PERMS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE) # 755 diff --git a/cpp/arcticdb/async/async_store.hpp b/cpp/arcticdb/async/async_store.hpp index bd712b8f06..fed575a817 100644 --- a/cpp/arcticdb/async/async_store.hpp +++ b/cpp/arcticdb/async/async_store.hpp @@ -151,6 +151,15 @@ entity::VariantKey write_sync( return WriteSegmentTask{library_}(std::move(encoded)); } +entity::VariantKey write_if_none_sync( + KeyType key_type, + const StreamId &stream_id, + SegmentInMemory &&segment) override { + util::check(is_ref_key_class(key_type), "Expected ref key type got {}", key_type); + auto encoded = EncodeRefTask{key_type, stream_id, std::move(segment), codec_, encoding_version_}(); + return WriteIfNoneTask{library_}(std::move(encoded)); +} + bool is_path_valid(const std::string_view path) const override { return library_->is_path_valid(path); } @@ -271,6 +280,10 @@ bool supports_prefix_matching() const override { return library_->supports_prefix_matching(); } +bool supports_atomic_writes() const override { + return library_->supports_atomic_writes(); +} + std::string key_path(const VariantKey& key) const { return library_->key_path(key); } diff --git a/cpp/arcticdb/async/tasks.hpp b/cpp/arcticdb/async/tasks.hpp index 0663e58baf..f28c5ce2b3 100644 --- a/cpp/arcticdb/async/tasks.hpp +++ b/cpp/arcticdb/async/tasks.hpp @@ -187,6 +187,23 @@ struct WriteSegmentTask : BaseTask { } }; +struct WriteIfNoneTask : BaseTask { + std::shared_ptr lib_; + + explicit WriteIfNoneTask(std::shared_ptr lib) : + lib_(std::move(lib)) { + } + + ARCTICDB_MOVE_ONLY_DEFAULT(WriteIfNoneTask) + + VariantKey operator()(storage::KeySegmentPair &&key_seg) const { + ARCTICDB_SAMPLE(WriteSegmentTask, 0) + auto k = key_seg.variant_key(); + lib_->write_if_none(std::move(key_seg)); + return k; + } +}; + struct UpdateSegmentTask : BaseTask { std::shared_ptr lib_; storage::UpdateOpts opts_; diff --git a/cpp/arcticdb/entity/key.cpp b/cpp/arcticdb/entity/key.cpp index 5507b40e3d..8c31aee12c 100644 --- a/cpp/arcticdb/entity/key.cpp +++ b/cpp/arcticdb/entity/key.cpp @@ -60,6 +60,7 @@ KeyData get_key_data(KeyType key_type) { STRING_REF(KeyType::APPEND_REF, aref, 'a') STRING_KEY(KeyType::MULTI_KEY, mref, 'm') STRING_REF(KeyType::LOCK, lref, 'x') + STRING_REF(KeyType::ATOMIC_LOCK, alref, 'A') STRING_REF(KeyType::SNAPSHOT_TOMBSTONE, ttomb, 'X') STRING_KEY(KeyType::APPEND_DATA, app, 'b') STRING_REF(KeyType::BLOCK_VERSION_REF, bvref, 'R') diff --git a/cpp/arcticdb/entity/key.hpp b/cpp/arcticdb/entity/key.hpp index 90a0db0f6d..54b57c8fb9 100644 --- a/cpp/arcticdb/entity/key.hpp +++ b/cpp/arcticdb/entity/key.hpp @@ -182,12 +182,14 @@ enum class KeyType : int { * Used for storing the ids of storages that failed to sync */ REPLICATION_FAIL_INFO = 26, - /* * A reference key storing many versions, used to track state within some of our background jobs. */ BLOCK_VERSION_REF = 27, - + /* + * Used for a list based reliable storage lock + */ + ATOMIC_LOCK = 28, UNDEFINED }; diff --git a/cpp/arcticdb/python/python_module.cpp b/cpp/arcticdb/python/python_module.cpp index 50321d6f00..474e74bdeb 100644 --- a/cpp/arcticdb/python/python_module.cpp +++ b/cpp/arcticdb/python/python_module.cpp @@ -335,7 +335,7 @@ PYBIND11_MODULE(arcticdb_ext, m) { arcticdb::storage::apy::register_bindings(storage_submodule, base_exception); arcticdb::stream::register_bindings(m); - arcticdb::toolbox::apy::register_bindings(m); + arcticdb::toolbox::apy::register_bindings(m, base_exception); m.def("get_version_string", &arcticdb::get_arcticdb_version_string); diff --git a/cpp/arcticdb/storage/azure/azure_storage.hpp b/cpp/arcticdb/storage/azure/azure_storage.hpp index 2c8168bfc0..1c679cfab6 100644 --- a/cpp/arcticdb/storage/azure/azure_storage.hpp +++ b/cpp/arcticdb/storage/azure/azure_storage.hpp @@ -37,6 +37,10 @@ class AzureStorage final : public Storage { protected: void do_write(Composite&& kvs) final; + void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final { + storage::raise("Atomic operations are only supported for s3 backend"); + }; + void do_update(Composite&& kvs, UpdateOpts opts) final; void do_read(Composite&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final; @@ -51,6 +55,10 @@ class AzureStorage final : public Storage { return true; } + bool do_supports_atomic_writes() const final { + return false; + } + bool do_fast_delete() final { return false; } diff --git a/cpp/arcticdb/storage/file/mapped_file_storage.hpp b/cpp/arcticdb/storage/file/mapped_file_storage.hpp index 6371becd26..ee4b395462 100644 --- a/cpp/arcticdb/storage/file/mapped_file_storage.hpp +++ b/cpp/arcticdb/storage/file/mapped_file_storage.hpp @@ -36,6 +36,10 @@ class MappedFileStorage final : public SingleFileStorage { void do_write(Composite&& kvs) override; + void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final { + storage::raise("Atomic operations are only supported for s3 backend"); + }; + void do_update(Composite&& kvs, UpdateOpts opts) override; void do_read(Composite&& ks, const ReadVisitor& visitor, storage::ReadKeyOpts opts) override; @@ -46,6 +50,10 @@ class MappedFileStorage final : public SingleFileStorage { return false; }; + bool do_supports_atomic_writes() const final { + return false; + } + std::string do_key_path(const VariantKey&) const override { return {}; } bool do_fast_delete() override; diff --git a/cpp/arcticdb/storage/library.hpp b/cpp/arcticdb/storage/library.hpp index 33aef916c7..6b3d58837c 100644 --- a/cpp/arcticdb/storage/library.hpp +++ b/cpp/arcticdb/storage/library.hpp @@ -90,6 +90,14 @@ class Library { storages_->write(std::move(kvs)); } + void write_if_none(KeySegmentPair&& kv) { + if (open_mode() < OpenMode::WRITE) { + throw LibraryPermissionException(library_path_, open_mode(), "write"); + } + + storages_->write_if_none(std::move(kv)); + } + void update(Composite&& kvs, storage::UpdateOpts opts) { ARCTICDB_SAMPLE(LibraryUpdate, 0) if (open_mode() < OpenMode::WRITE) @@ -155,6 +163,8 @@ class Library { bool supports_prefix_matching() const { return storages_->supports_prefix_matching(); } + bool supports_atomic_writes() const { return storages_->supports_atomic_writes(); } + const LibraryPath &library_path() const { return library_path_; } OpenMode open_mode() const { return storages_->open_mode(); } diff --git a/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp b/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp index b04af485b6..9c8bf9cbdc 100644 --- a/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp +++ b/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp @@ -38,6 +38,10 @@ class LmdbStorage final : public Storage { private: void do_write(Composite&& kvs) final; + void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final { + storage::raise("Atomic operations are only supported for s3 backend"); + }; + void do_update(Composite&& kvs, UpdateOpts opts) final; void do_read(Composite&& ks, const ReadVisitor& visitor, storage::ReadKeyOpts opts) final; @@ -48,6 +52,10 @@ class LmdbStorage final : public Storage { return false; }; + bool do_supports_atomic_writes() const final { + return false; + } + inline bool do_fast_delete() final; void cleanup() override; diff --git a/cpp/arcticdb/storage/memory/memory_storage.hpp b/cpp/arcticdb/storage/memory/memory_storage.hpp index a02dd08269..6cec399494 100644 --- a/cpp/arcticdb/storage/memory/memory_storage.hpp +++ b/cpp/arcticdb/storage/memory/memory_storage.hpp @@ -29,6 +29,10 @@ namespace arcticdb::storage::memory { private: void do_write(Composite&& kvs) final; + void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final { + storage::raise("Atomic operations are only supported for s3 backend"); + }; + void do_update(Composite&& kvs, UpdateOpts opts) final; void do_read(Composite&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final; @@ -41,6 +45,10 @@ namespace arcticdb::storage::memory { return false; } + bool do_supports_atomic_writes() const final { + return false; + } + inline bool do_fast_delete() final; bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string & prefix) final; diff --git a/cpp/arcticdb/storage/mongo/mongo_storage.hpp b/cpp/arcticdb/storage/mongo/mongo_storage.hpp index 7dbe22f999..fe505b469c 100644 --- a/cpp/arcticdb/storage/mongo/mongo_storage.hpp +++ b/cpp/arcticdb/storage/mongo/mongo_storage.hpp @@ -31,6 +31,10 @@ class MongoStorage final : public Storage { private: void do_write(Composite&& kvs) final; + void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final { + storage::raise("Atomic operations are only supported for s3 backend"); + }; + void do_update(Composite&& kvs, UpdateOpts opts) final; void do_read(Composite&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final; @@ -43,6 +47,10 @@ class MongoStorage final : public Storage { return false; } + bool do_supports_atomic_writes() const final { + return false; + } + inline bool do_fast_delete() final; bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string &prefix) final; diff --git a/cpp/arcticdb/storage/python_bindings.cpp b/cpp/arcticdb/storage/python_bindings.cpp index a7a5acfa9d..1b1e45b059 100644 --- a/cpp/arcticdb/storage/python_bindings.cpp +++ b/cpp/arcticdb/storage/python_bindings.cpp @@ -51,6 +51,7 @@ void register_bindings(py::module& storage, py::exception(fmt::format("Atomic operation failed: {}", error_message_suffix)); + } + // We create a more detailed error explanation in case of NETWORK_CONNECTION errors to remedy #880. if (type == Aws::S3::S3Errors::NETWORK_CONNECTION) { error_message = fmt::format("Unexpected network error: {} " @@ -129,6 +133,27 @@ namespace s3 { }); } + template + void do_write_if_none_impl( + KeySegmentPair &&kv, + const std::string &root_folder, + const std::string &bucket_name, + S3ClientWrapper &s3_client, + KeyBucketizer &&bucketizer) { + ARCTICDB_SAMPLE(S3StorageWriteIfNone, 0) + auto key_type_dir = key_type_folder(root_folder, kv.key_type()); + auto &k = kv.variant_key(); + auto s3_object_name = object_path(bucketizer.bucketize(key_type_dir, k), k); + auto &seg = kv.segment(); + + auto put_object_result = s3_client.put_object(s3_object_name, std::move(seg), bucket_name, PutHeader::IF_NONE_MATCH); + + if (!put_object_result.is_success()) { + auto& error = put_object_result.get_error(); + raise_s3_exception(error, s3_object_name); + } + } + template void do_update_impl( Composite &&kvs, diff --git a/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp b/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp index d7a3dfac5d..d9b72f7ed0 100644 --- a/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp +++ b/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp @@ -35,6 +35,10 @@ class NfsBackedStorage final : public Storage { private: void do_write(Composite&& kvs) final; + void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final { + storage::raise("Atomic operations are only supported for s3 backend"); + }; + void do_update(Composite&& kvs, UpdateOpts opts) final; void do_read(Composite&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final; @@ -49,6 +53,10 @@ class NfsBackedStorage final : public Storage { return true; } + bool do_supports_atomic_writes() const final { + return false; + } + bool do_fast_delete() final { return false; } diff --git a/cpp/arcticdb/storage/s3/s3_client_wrapper.hpp b/cpp/arcticdb/storage/s3/s3_client_wrapper.hpp index 96c98e13ce..830629f44f 100644 --- a/cpp/arcticdb/storage/s3/s3_client_wrapper.hpp +++ b/cpp/arcticdb/storage/s3/s3_client_wrapper.hpp @@ -46,6 +46,11 @@ struct DeleteOutput{ std::vector failed_deletes; }; +enum class PutHeader{ + NONE, + IF_NONE_MATCH +}; + // An abstract class, which is responsible for sending the requests and parsing the responses from S3. // It can be derived as either a real connection to S3 or a mock used for unit tests. class S3ClientWrapper { @@ -57,7 +62,8 @@ class S3ClientWrapper { virtual S3Result put_object( const std::string& s3_object_name, Segment&& segment, - const std::string& bucket_name) = 0; + const std::string& bucket_name, + PutHeader header = PutHeader::NONE) = 0; virtual S3Result delete_objects( const std::vector& s3_object_names, diff --git a/cpp/arcticdb/storage/s3/s3_mock_client.cpp b/cpp/arcticdb/storage/s3/s3_mock_client.cpp index a6b33570f5..3d614d5a05 100644 --- a/cpp/arcticdb/storage/s3/s3_mock_client.cpp +++ b/cpp/arcticdb/storage/s3/s3_mock_client.cpp @@ -49,6 +49,7 @@ std::optional has_failure_trigger(const std::string& s3_object } const auto not_found_error = Aws::S3::S3Error(Aws::Client::AWSError(Aws::S3::S3Errors::RESOURCE_NOT_FOUND, false)); +const auto precondition_failed_error = Aws::S3::S3Error(Aws::Client::AWSError(Aws::S3::S3Errors::UNKNOWN, "Precondition failed", "Precondition failed", false)); S3Result MockS3Client::head_object( const std::string& s3_object_name, @@ -83,12 +84,17 @@ S3Result MockS3Client::get_object( S3Result MockS3Client::put_object( const std::string &s3_object_name, Segment &&segment, - const std::string &bucket_name) { + const std::string &bucket_name, + PutHeader header) { auto maybe_error = has_failure_trigger(s3_object_name, StorageOperation::WRITE); if (maybe_error.has_value()) { return {*maybe_error}; } + if (header == PutHeader::IF_NONE_MATCH && s3_contents.contains({bucket_name, s3_object_name})) { + return {precondition_failed_error}; + } + s3_contents.insert_or_assign({bucket_name, s3_object_name}, std::move(segment)); return {std::monostate()}; diff --git a/cpp/arcticdb/storage/s3/s3_mock_client.hpp b/cpp/arcticdb/storage/s3/s3_mock_client.hpp index 04c6812218..75561adb91 100644 --- a/cpp/arcticdb/storage/s3/s3_mock_client.hpp +++ b/cpp/arcticdb/storage/s3/s3_mock_client.hpp @@ -61,7 +61,8 @@ class MockS3Client : public S3ClientWrapper { S3Result put_object( const std::string& s3_object_name, Segment&& segment, - const std::string& bucket_name) override; + const std::string& bucket_name, + PutHeader header = PutHeader::NONE) override; S3Result delete_objects( const std::vector& s3_object_names, diff --git a/cpp/arcticdb/storage/s3/s3_real_client.cpp b/cpp/arcticdb/storage/s3/s3_real_client.cpp index b2f74dadc8..2f5d6e79de 100644 --- a/cpp/arcticdb/storage/s3/s3_real_client.cpp +++ b/cpp/arcticdb/storage/s3/s3_real_client.cpp @@ -133,12 +133,16 @@ S3Result RealS3Client::get_object( S3Result RealS3Client::put_object( const std::string &s3_object_name, Segment &&segment, - const std::string &bucket_name) { + const std::string &bucket_name, + PutHeader header) { ARCTICDB_SUBSAMPLE(S3StorageWritePreamble, 0) Aws::S3::Model::PutObjectRequest request; request.SetBucket(bucket_name.c_str()); request.SetKey(s3_object_name.c_str()); + if (header == PutHeader::IF_NONE_MATCH) { + request.SetIfNoneMatch("*"); + } ARCTICDB_RUNTIME_DEBUG(log::storage(), "Set s3 key {}", request.GetKey().c_str()); auto [dst, write_size, buffer] = segment.serialize_header(); diff --git a/cpp/arcticdb/storage/s3/s3_real_client.hpp b/cpp/arcticdb/storage/s3/s3_real_client.hpp index 57bfe87b65..414178e507 100644 --- a/cpp/arcticdb/storage/s3/s3_real_client.hpp +++ b/cpp/arcticdb/storage/s3/s3_real_client.hpp @@ -38,7 +38,8 @@ class RealS3Client : public S3ClientWrapper { S3Result put_object( const std::string& s3_object_name, Segment&& segment, - const std::string& bucket_name) override; + const std::string& bucket_name, + PutHeader header = PutHeader::NONE) override; S3Result delete_objects( const std::vector& s3_object_names, diff --git a/cpp/arcticdb/storage/s3/s3_storage.cpp b/cpp/arcticdb/storage/s3/s3_storage.cpp index 180b52c332..4f6facb6ce 100644 --- a/cpp/arcticdb/storage/s3/s3_storage.cpp +++ b/cpp/arcticdb/storage/s3/s3_storage.cpp @@ -52,6 +52,10 @@ void S3Storage::do_write(Composite&& kvs) { detail::do_write_impl(std::move(kvs), root_folder_, bucket_name_, *s3_client_, FlatBucketizer{}); } +void S3Storage::do_write_if_none(KeySegmentPair&& kv) { + detail::do_write_if_none_impl(std::move(kv), root_folder_, bucket_name_, *s3_client_, FlatBucketizer{}); +} + void S3Storage::do_update(Composite&& kvs, UpdateOpts) { detail::do_update_impl(std::move(kvs), root_folder_, bucket_name_, *s3_client_, FlatBucketizer{}); } diff --git a/cpp/arcticdb/storage/s3/s3_storage.hpp b/cpp/arcticdb/storage/s3/s3_storage.hpp index 196b9c0b8c..9be7f11b98 100644 --- a/cpp/arcticdb/storage/s3/s3_storage.hpp +++ b/cpp/arcticdb/storage/s3/s3_storage.hpp @@ -44,6 +44,8 @@ class S3Storage final : public Storage { private: void do_write(Composite&& kvs) final; + void do_write_if_none(KeySegmentPair&& kv) final; + void do_update(Composite&& kvs, UpdateOpts opts) final; void do_read(Composite&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final; @@ -58,6 +60,13 @@ class S3Storage final : public Storage { return true; } + bool do_supports_atomic_writes() const final { + // There is no way to differentiate whether an s3 backed supports atomic writes. As of Nov 2024 S3 and MinIO + // support atomic If-None-Match and If-Match put operations. Unfortunately if we're running on VAST or PURE + // these would just work like regular PUTs with no way to know. + return true; + }; + bool do_fast_delete() final { return false; } diff --git a/cpp/arcticdb/storage/storage.hpp b/cpp/arcticdb/storage/storage.hpp index 0291f74ec8..27abfb5b23 100644 --- a/cpp/arcticdb/storage/storage.hpp +++ b/cpp/arcticdb/storage/storage.hpp @@ -107,6 +107,10 @@ class Storage { return write(Composite{std::move(kv)}); } + void write_if_none(KeySegmentPair&& kv) { + return do_write_if_none(std::move(kv)); + } + void update(Composite &&kvs, UpdateOpts opts) { ARCTICDB_SAMPLE(StorageUpdate, 0) return do_update(std::move(kvs), opts); @@ -148,6 +152,10 @@ class Storage { return do_supports_prefix_matching(); } + bool supports_atomic_writes() const { + return do_supports_atomic_writes(); + } + bool fast_delete() { return do_fast_delete(); } @@ -186,6 +194,8 @@ class Storage { private: virtual void do_write(Composite&& kvs) = 0; + virtual void do_write_if_none(KeySegmentPair&& kv) = 0; + virtual void do_update(Composite&& kvs, UpdateOpts opts) = 0; virtual void do_read(Composite&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) = 0; @@ -196,6 +206,8 @@ class Storage { virtual bool do_supports_prefix_matching() const = 0; + virtual bool do_supports_atomic_writes() const = 0; + virtual bool do_fast_delete() = 0; // Stop iteration and return true upon the first key k for which visitor(k) is true, return false if no key matches diff --git a/cpp/arcticdb/storage/storages.hpp b/cpp/arcticdb/storage/storages.hpp index bd716ef240..bc2de2c526 100644 --- a/cpp/arcticdb/storage/storages.hpp +++ b/cpp/arcticdb/storage/storages.hpp @@ -46,6 +46,10 @@ class Storages { primary().write(std::move(kvs)); } + void write_if_none(KeySegmentPair&& kv) { + primary().write_if_none(std::move(kv)); + } + void update(Composite&& kvs, storage::UpdateOpts opts) { ARCTICDB_SAMPLE(StoragesUpdate, 0) primary().update(std::move(kvs), opts); @@ -55,6 +59,10 @@ class Storages { return primary().supports_prefix_matching(); } + bool supports_atomic_writes() const { + return primary().supports_atomic_writes(); + } + bool fast_delete() { return primary().fast_delete(); } diff --git a/cpp/arcticdb/storage/test/in_memory_store.hpp b/cpp/arcticdb/storage/test/in_memory_store.hpp index d351e19905..da78be3c28 100644 --- a/cpp/arcticdb/storage/test/in_memory_store.hpp +++ b/cpp/arcticdb/storage/test/in_memory_store.hpp @@ -39,6 +39,10 @@ namespace arcticdb { return false; } + bool supports_atomic_writes() const override { + return true; + } + bool fast_delete() override { return false; } @@ -144,6 +148,15 @@ namespace arcticdb { return write(key_type, stream_id, std::move(segment)).get(); } + entity::VariantKey write_if_none_sync( + KeyType key_type, + const StreamId& stream_id, + SegmentInMemory &&segment) override { + auto key = entity::RefKey{stream_id, key_type}; + add_segment(key, std::move(segment), true); + return key; + } + bool is_path_valid(const std::string_view) const override { return true; } @@ -465,10 +478,15 @@ namespace arcticdb { seg_by_atom_key_[key] = std::make_unique(std::move(seg)); } - void add_segment(const RefKey &key, SegmentInMemory &&seg) { + void add_segment(const RefKey &key, SegmentInMemory &&seg, bool if_none_match = false) { StorageFailureSimulator::instance()->go(FailureType::WRITE); std::lock_guard lock{mutex_}; ARCTICDB_DEBUG(log::storage(), "Adding segment with key {}", key); + if (if_none_match) { + if (seg_by_ref_key_.find(key) != seg_by_ref_key_.end()) { + storage::raise("Precondition failed. Object is already present."); + } + } seg_by_ref_key_[key] = std::make_unique(std::move(seg)); } diff --git a/cpp/arcticdb/stream/stream_sink.hpp b/cpp/arcticdb/stream/stream_sink.hpp index ed28a611ab..fe90e26705 100644 --- a/cpp/arcticdb/stream/stream_sink.hpp +++ b/cpp/arcticdb/stream/stream_sink.hpp @@ -99,6 +99,13 @@ struct StreamSink { const StreamId &stream_id, SegmentInMemory &&segment) = 0; + virtual bool supports_atomic_writes() const = 0; + + virtual entity::VariantKey write_if_none_sync( + KeyType key_type, + const StreamId &stream_id, + SegmentInMemory &&segment) = 0; + [[nodiscard]] virtual folly::Future write_compressed(storage::KeySegmentPair ks) = 0; virtual void write_compressed_sync(storage::KeySegmentPair ks) = 0; diff --git a/cpp/arcticdb/stream/test/stream_test_common.cpp b/cpp/arcticdb/stream/test/stream_test_common.cpp index 3a02c8a639..d5ee8a6ae1 100644 --- a/cpp/arcticdb/stream/test/stream_test_common.cpp +++ b/cpp/arcticdb/stream/test/stream_test_common.cpp @@ -9,6 +9,4 @@ namespace arcticdb { -std::atomic PilotedClock::time_{0}; - } //namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/toolbox/python_bindings.cpp b/cpp/arcticdb/toolbox/python_bindings.cpp index 1ec9f124aa..f53b89e07a 100644 --- a/cpp/arcticdb/toolbox/python_bindings.cpp +++ b/cpp/arcticdb/toolbox/python_bindings.cpp @@ -15,10 +15,11 @@ #include #include #include +#include namespace arcticdb::toolbox::apy { -void register_bindings(py::module &m) { +void register_bindings(py::module &m, py::exception& base_exception) { auto tools = m.def_submodule("tools", "Library management tool hooks"); using namespace arcticdb::toolbox::apy; using namespace arcticdb::storage; @@ -67,6 +68,24 @@ void register_bindings(py::module &m) { .def("inspect_env_variable", &LibraryTool::inspect_env_variable) .def_static("read_unaltered_lib_cfg", &LibraryTool::read_unaltered_lib_cfg); + // Reliable storage lock exposed for integration testing. It is intended for use in C++ + using namespace arcticdb::lock; + + py::register_exception(tools, "LostReliableLock", base_exception.ptr()); + + py::class_>(tools, "ReliableStorageLock") + .def(py::init<>([](std::string base_name, std::shared_ptr lib, timestamp timeout){ + auto store = version_store::LocalVersionedEngine(lib)._test_get_store(); + return ReliableStorageLock<>(base_name, store, timeout); + })); + + py::class_(tools, "ReliableStorageLockManager") + .def(py::init<>([](){ + return ReliableStorageLockManager(); + })) + .def("take_lock_guard", &ReliableStorageLockManager::take_lock_guard) + .def("free_lock_guard", &ReliableStorageLockManager::free_lock_guard); + // S3 Storage tool using namespace arcticdb::storage::s3; py::class_>(tools, "S3Tool") diff --git a/cpp/arcticdb/toolbox/python_bindings.hpp b/cpp/arcticdb/toolbox/python_bindings.hpp index 63db2eb0da..2975663ce5 100644 --- a/cpp/arcticdb/toolbox/python_bindings.hpp +++ b/cpp/arcticdb/toolbox/python_bindings.hpp @@ -20,7 +20,7 @@ namespace arcticdb::toolbox::apy { namespace py = pybind11; -void register_bindings(py::module &m); +void register_bindings(py::module &m, py::exception& base_exception); } // namespace arcticdb::toolbox::apy diff --git a/cpp/arcticdb/util/error_code.hpp b/cpp/arcticdb/util/error_code.hpp index da53b08ab7..c38484d3e0 100644 --- a/cpp/arcticdb/util/error_code.hpp +++ b/cpp/arcticdb/util/error_code.hpp @@ -80,10 +80,12 @@ inline std::unordered_map get_error_category_names() ERROR_CODE(5002, E_SYMBOL_NOT_FOUND) \ ERROR_CODE(5003, E_PERMISSION) \ ERROR_CODE(5004, E_RESOURCE_NOT_FOUND) \ + ERROR_CODE(5005, E_UNSUPPORTED_ATOMIC_OPERATION) \ ERROR_CODE(5010, E_LMDB_MAP_FULL) \ ERROR_CODE(5011, E_UNEXPECTED_LMDB_ERROR) \ ERROR_CODE(5020, E_UNEXPECTED_S3_ERROR) \ ERROR_CODE(5021, E_S3_RETRYABLE) \ + ERROR_CODE(5022, E_ATOMIC_OPERATION_FAILED) \ ERROR_CODE(5030, E_UNEXPECTED_AZURE_ERROR) \ ERROR_CODE(5050, E_MONGO_BULK_OP_NO_REPLY) \ ERROR_CODE(5051, E_UNEXPECTED_MONGO_ERROR) \ diff --git a/cpp/arcticdb/util/reliable_storage_lock-inl.hpp b/cpp/arcticdb/util/reliable_storage_lock-inl.hpp new file mode 100644 index 0000000000..cd564b4bfc --- /dev/null +++ b/cpp/arcticdb/util/reliable_storage_lock-inl.hpp @@ -0,0 +1,247 @@ +/* Copyright 2023 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ + +#include + +#include +#include +#include +#include + +#include +#include + +namespace arcticdb { + +namespace lock { + +const auto SEPARATOR = '*'; +const auto EXTENDS_PER_TIMEOUT = 5u; +const auto REMOVE_AFTER_TIMEOUTS = 10u; + +inline StreamDescriptor lock_stream_descriptor(const StreamId &stream_id) { + return stream_descriptor( + stream_id, + stream::RowCountIndex(), + {scalar_field(DataType::INT64, "expiration")}); +} + +inline SegmentInMemory lock_segment(const StreamId &name, timestamp expiration) { + SegmentInMemory output{lock_stream_descriptor(name)}; + output.set_scalar(0, expiration); + output.end_row(); + return output; +} + +template +ReliableStorageLock::ReliableStorageLock(const std::string &base_name, const std::shared_ptr store, timestamp timeout) : + base_name_(base_name), store_(store), timeout_(timeout) { + storage::check(store_->supports_atomic_writes(), "Storage does not support atomic writes, so we can't create a lock"); +} + +template +timestamp ReliableStorageLock::timeout() const { + return timeout_; +} + +inline AcquiredLockId get_next_id(std::optional maybe_prev) { + if (maybe_prev.has_value()) { + return maybe_prev.value() + 1; + } + return 0; +} + +template +StreamId ReliableStorageLock::get_stream_id(AcquiredLockId lock_id) const { + return fmt::format("{}{}{}", base_name_, SEPARATOR, lock_id); +} + +inline AcquiredLockId extract_lock_id_from_stream_id(const StreamId& stream_id) { + auto string_id = std::get(stream_id); + auto lock_id_string = string_id.substr(string_id.find(SEPARATOR)+1, string_id.size()); + return std::stoull(lock_id_string); +} + +template +std::pair, std::optional> ReliableStorageLock::get_all_locks() const { + std::vector lock_ids; + store_->iterate_type( + KeyType::ATOMIC_LOCK, + [&lock_ids](VariantKey &&key) { + auto current_lock_id = extract_lock_id_from_stream_id(variant_key_id(key)); + lock_ids.push_back(current_lock_id); + }, base_name_ + SEPARATOR); + std::optional latest = lock_ids.size() == 0 ? std::nullopt : std::make_optional<>(*std::max_element(lock_ids.begin(), lock_ids.end())); + return {lock_ids, latest}; +} + +template +timestamp ReliableStorageLock::get_expiration(RefKey lock_key) const { + auto kv = store_->read_sync(lock_key, storage::ReadKeyOpts{}); + return kv.second.template scalar_at(0, 0).value(); +} + +template +void ReliableStorageLock::clear_old_locks(const std::vector& lock_ids) const { + auto now = ClockType::nanos_since_epoch(); + auto to_delete = std::vector(); + // We only clear locks that have expired more than 10 timeouts (we assume a process can't be paused for more than the timeout) ago. + // We do this to avoid a process mistakenly taking a lock if: + // 1. Process A lists locks and gets [4, 5, 6] + // 2. Process A decides to attempt taking lock 7 + // 3. Process A gets paused + // 4. Process B takes locks 7 and 8 + // 5. Process B decides to clear lock 7 since it's not the latest + // 6. Process A succeeds in taking lock 7 + for (auto lock_id : lock_ids) { + auto lock_key = RefKey{get_stream_id(lock_id), KeyType::ATOMIC_LOCK}; + if (get_expiration(lock_key) + REMOVE_AFTER_TIMEOUTS * timeout_ < now) { + to_delete.emplace_back(lock_key); + } + } + store_->remove_keys_sync(to_delete); +} + +template +std::optional ReliableStorageLock::try_take_lock() const { + auto [existing_locks, latest] = get_all_locks(); + if (latest.has_value()) { + auto expires = get_expiration(RefKey{get_stream_id(latest.value()), KeyType::ATOMIC_LOCK}); + if (expires > ClockType::nanos_since_epoch()) { + // An unexpired lock exists + return std::nullopt; + } + } + return try_take_next_id(existing_locks, latest); +} + +template +AcquiredLockId ReliableStorageLock::retry_until_take_lock() const { + // We don't use the ExponentialBackoff because we want to be able to wait indefinitely + auto max_wait = std::chrono::duration_cast(std::chrono::nanoseconds(timeout())); + auto min_wait = max_wait / 16; + auto current_wait = min_wait; + std::minstd_rand gen(std::random_device{}()); + std::uniform_real_distribution<> dist(1.0, 1.2); + auto jittered_wait = [&]() { + auto factor = dist(gen); + return current_wait * factor; + }; + + auto acquired_lock = try_take_lock(); + while (!acquired_lock.has_value()) { + std::this_thread::sleep_for(jittered_wait()); + current_wait = std::min(current_wait * 2, max_wait); + acquired_lock = try_take_lock(); + } + return acquired_lock.value(); +} + +template +std::optional ReliableStorageLock::try_extend_lock(AcquiredLockId acquired_lock) const { + auto [existing_locks, latest] = get_all_locks(); + util::check(latest.has_value() && latest.value() >= acquired_lock, + "We are trying to extend a newer lock_id than the existing one in storage. Extend lock_id: {}", + acquired_lock); + if (latest.value() != acquired_lock) { + // We have lost the lock while holding it (most likely due to timeout). + return std::nullopt; + } + return try_take_next_id(existing_locks, latest); +} + +template +void ReliableStorageLock::free_lock(AcquiredLockId acquired_lock) const { + auto [existing_locks, latest_lock_id] = get_all_locks(); + util::check(latest_lock_id.has_value() && latest_lock_id.value() >= acquired_lock, + "We are trying to free a newer lock_id than the existing one in storage. Free lock_id: {}, Existing lock_id: {}", + acquired_lock, latest_lock_id); + if (latest_lock_id.value() != acquired_lock) { + // Lock is already lost + return; + } + auto lock_stream_id = get_stream_id(acquired_lock); + auto expiration = ClockType::nanos_since_epoch(); // Write current time to mark lock as expired as of now + store_->write_sync(KeyType::ATOMIC_LOCK, lock_stream_id, lock_segment(lock_stream_id, expiration)); +} + +template +std::optional ReliableStorageLock::try_take_next_id(const std::vector& existing_locks, std::optional latest) const { + AcquiredLockId lock_id = get_next_id(latest); + auto lock_stream_id = get_stream_id(lock_id); + auto expiration = ClockType::nanos_since_epoch() + timeout_; + try { + store_->write_if_none_sync(KeyType::ATOMIC_LOCK, lock_stream_id, lock_segment(lock_stream_id, expiration)); + } catch (const StorageException& e) { + // There is no specific Aws::S3::S3Errors for the failed atomic operation, so we catch any StorageException. + // Either way it's safe to assume we have failed to acquire the lock in case of transient S3 error. + // If error persists we'll approprieately raise in the next attempt to LIST/GET the existing lock and propagate + // the transient error. + log::lock().debug("Failed to acquire lock (likely someone acquired it before us): {}", e.what()); + return std::nullopt; + } + // We clear old locks only after aquiring the lock to avoid duplicating the deletion work + clear_old_locks(existing_locks); + return lock_id; +} + +inline ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageLock<> &lock, AcquiredLockId acquired_lock, std::optional&& on_lost_lock) : + lock_(lock), acquired_lock_(std::nullopt), on_lost_lock_(std::move(on_lost_lock)) { + acquired_lock_ = acquired_lock; + // We heartbeat 5 times per lock timeout to extend the lock. + auto hearbeat_frequency = std::chrono::duration_cast( + std::chrono::nanoseconds(lock_.timeout() / EXTENDS_PER_TIMEOUT)); + extend_lock_heartbeat_.addFunction( + [that=this](){ + if (that->acquired_lock_.has_value()) { + that->acquired_lock_ = that->lock_.try_extend_lock(that->acquired_lock_.value()); + if (!that->acquired_lock_.has_value()) { + // Clean up if we have lost the lock. + that->cleanup_on_lost_lock(); + } + } + }, hearbeat_frequency, "Extend lock", hearbeat_frequency); + extend_lock_heartbeat_.start(); +} + +inline void ReliableStorageLockGuard::cleanup_on_lost_lock() { + // We do not use shutdown because we don't want to run it from within a FunctionScheduler thread to avoid a deadlock + extend_lock_heartbeat_.cancelAllFunctions(); + if (on_lost_lock_.has_value()){ + on_lost_lock_.value()(); + } +} + +inline ReliableStorageLockGuard::~ReliableStorageLockGuard() { + extend_lock_heartbeat_.shutdown(); + if (acquired_lock_.has_value()) { + lock_.free_lock(acquired_lock_.value()); + } +} + +inline void ReliableStorageLockGuard::set_on_lost_lock(folly::Func &&on_lost_lock) { + on_lost_lock_ = std::make_optional(std::move(on_lost_lock)); + if (!acquired_lock_.has_value()) { + // Lock was lost before we set on_lost_lock. Running callback immediately. + on_lost_lock_.value()(); + } +} + +inline void ReliableStorageLockManager::take_lock_guard(const ReliableStorageLock<> &lock) { + auto acquired = lock.retry_until_take_lock(); + guard = std::make_shared(lock, acquired, [](){ + throw LostReliableLock(); + }); +} + +inline void ReliableStorageLockManager::free_lock_guard() { + guard = std::nullopt; +} + +} + +} diff --git a/cpp/arcticdb/util/reliable_storage_lock.hpp b/cpp/arcticdb/util/reliable_storage_lock.hpp new file mode 100644 index 0000000000..4628a67b2b --- /dev/null +++ b/cpp/arcticdb/util/reliable_storage_lock.hpp @@ -0,0 +1,88 @@ +/* Copyright 2023 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ + +#pragma once + +#include +#include +#include +#include + +#include +#include + +namespace arcticdb { + +namespace lock { + +using AcquiredLockId = uint64_t; + +// The ReliableStorageLock is a storage lock which relies on atomic If-None-Match Put and ListObject operations to +// provide a more reliable lock than the StorageLock but it requires the backend to support atomic operations. It should +// be completely consistent unless a process holding a lock gets paused for times comparable to the lock timeout. +// It lock follows the algorithm described here: +// https://www.morling.dev/blog/leader-election-with-s3-conditional-writes/ +// Note that the ReliableStorageLock just provides methods for requesting or extending acquired locks. It doesn't hold any +// information about the acquired locks so far and none of its APIs are re-entrant. Thus the user is responsible for +// protecting and extending the acquired locks (which can be done through the ReliableStorageLockGuard). +template +class ReliableStorageLock { +public: + ReliableStorageLock(const std::string& base_name, const std::shared_ptr store, timestamp timeout); + ReliableStorageLock(const ReliableStorageLock& other) = default; + + AcquiredLockId retry_until_take_lock() const; + std::optional try_take_lock() const; + std::optional try_extend_lock(AcquiredLockId acquired_lock) const; + void free_lock(AcquiredLockId acquired_lock) const; + timestamp timeout() const; +private: + std::optional try_take_next_id(const std::vector& existing_locks, std::optional latest) const; + std::pair, std::optional> get_all_locks() const; + timestamp get_expiration(RefKey lock_key) const; + void clear_old_locks(const std::vector& acquired_locks) const; + StreamId get_stream_id(AcquiredLockId acquired_lock) const; + std::string base_name_; + std::shared_ptr store_; + timestamp timeout_; +}; + +// The ReliableStorageLockGuard protects an AcquiredLockId and frees it on destruction. While the lock is held it +// periodically extends its timeout in a heartbeating thread. If for some reason the lock is lost we get notified +// via the on_lock_lost. +class ReliableStorageLockGuard { +public: + ReliableStorageLockGuard(const ReliableStorageLock<>& lock, AcquiredLockId acquired_lock, std::optional&& on_lost_lock); + + ~ReliableStorageLockGuard(); + + // Will immediately trigger [on_lost_lock] if lock is already lost. + void set_on_lost_lock(folly::Func&& on_lost_lock); +private: + void cleanup_on_lost_lock(); + const ReliableStorageLock<> lock_; + std::optional acquired_lock_; + std::optional on_lost_lock_; + folly::FunctionScheduler extend_lock_heartbeat_; +}; + + +// Only used for python tests +struct LostReliableLock : std::exception {}; +class ReliableStorageLockManager { +public: + void take_lock_guard(const ReliableStorageLock<>& lock); + void free_lock_guard(); +private: + std::optional> guard = std::nullopt; +}; + +} + +} + +#include "arcticdb/util/reliable_storage_lock-inl.hpp" \ No newline at end of file diff --git a/cpp/arcticdb/util/storage_lock.hpp b/cpp/arcticdb/util/storage_lock.hpp index 7fdd45a443..f1b1229f85 100644 --- a/cpp/arcticdb/util/storage_lock.hpp +++ b/cpp/arcticdb/util/storage_lock.hpp @@ -70,6 +70,8 @@ inline std::thread::id get_thread_id() noexcept { return std::this_thread::get_id(); } +// This StorageLock is inherently unreliable. It does not use atomic operations and it is possible for two processes to acquire if the timing is right. +// If you want a reliable alternative which is slower but uses atomic primitives you can look at the `ReliableStorageLock`. template class StorageLock { // 1 Day diff --git a/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp b/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp new file mode 100644 index 0000000000..4fa555a0b6 --- /dev/null +++ b/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp @@ -0,0 +1,128 @@ +/* Copyright 2023 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ + +#include +#include +#include +#include +#include +#include +#include + +using namespace arcticdb; +using namespace lock; + +// These tests test the actual implementation + +TEST(ReliableStorageLock, SingleThreaded) { + auto store = std::make_shared(); + using Clock = util::ManualClock; + // We have 2 locks, one with timeout of 20 and another with a timeout of 10 + ReliableStorageLock lock1{StringId{"test_lock"}, store, 20}; + ReliableStorageLock lock2{StringId{"test_lock"}, store, 10}; + + auto count_locks = [&]() { + auto number_of_lock_keys = 0; + store->iterate_type(KeyType::ATOMIC_LOCK, [&number_of_lock_keys](VariantKey&& _ [[maybe_unused]]){++number_of_lock_keys;}); + return number_of_lock_keys; + }; + + // We take the first lock at 0 and it should not expire until 20 + Clock::time_ = 0; + ASSERT_EQ(lock1.try_take_lock(), std::optional{0}); + ASSERT_EQ(lock2.try_take_lock(), std::nullopt); + Clock::time_ = 5; + ASSERT_EQ(lock1.try_take_lock(), std::nullopt); + ASSERT_EQ(lock2.try_take_lock(), std::nullopt); + Clock::time_ = 10; + ASSERT_EQ(lock1.try_take_lock(), std::nullopt); + ASSERT_EQ(lock2.try_take_lock(), std::nullopt); + Clock::time_ = 19; + ASSERT_EQ(lock1.try_take_lock(), std::nullopt); + ASSERT_EQ(lock2.try_take_lock(), std::nullopt); + + // Once the first lock has expired we can take a new lock with lock_id=1 + Clock::time_ = 20; + ASSERT_EQ(lock2.try_take_lock(), std::optional{1}); + ASSERT_EQ(count_locks(), 2); + + // We can extend the lock timeout at 25 to 35 and get an lock_id=2 + Clock::time_ = 25; + ASSERT_EQ(lock1.try_take_lock(), std::nullopt); + ASSERT_EQ(lock2.try_extend_lock(1), std::optional{2}); + ASSERT_EQ(count_locks(), 3); + Clock::time_ = 34; + ASSERT_EQ(lock1.try_take_lock(), std::nullopt); + + // At time 35 the lock with lock_id=2 has expired and we can re-acquire the lock + Clock::time_ = 35; + ASSERT_EQ(lock1.try_take_lock(), std::optional{3}); + ASSERT_EQ(count_locks(), 4); + ASSERT_EQ(lock2.try_take_lock(), std::nullopt); + + // And we can free the lock immediately to allow re-aquiring without waiting for timeout + lock1.free_lock(3); + ASSERT_EQ(lock2.try_take_lock(), std::optional{4}); + ASSERT_EQ(count_locks(), 5); + + // But if we take a lock at 1000 all locks would have expired a 10xtimeout=100 ago, and we should clear all apart from latest lock_id=5 + Clock::time_ = 1000; + ASSERT_EQ(lock2.try_take_lock(), std::optional{5}); + ASSERT_EQ(count_locks(), 1); +} + +struct SlowIncrementTask : async::BaseTask { + int& cnt_; + const ReliableStorageLock<>& lock_; + std::chrono::milliseconds sleep_time_; + bool lock_lost_ = false; + + SlowIncrementTask(int& cnt, ReliableStorageLock<>& lock, std::chrono::milliseconds sleep_time) : + cnt_(cnt), lock_(lock), sleep_time_(sleep_time) {} + + void operator()() { + auto acquired = lock_.retry_until_take_lock(); + auto guard = ReliableStorageLockGuard(lock_, acquired, [that = this](){ + that->lock_lost_ = true; + }); + auto value_before_sleep = cnt_; + std::this_thread::sleep_for(sleep_time_); + if (lock_lost_) { + // Return early on a lost lock. We will raise an issue if this happens. + return; + } + cnt_ = value_before_sleep + 1; + } +}; + + +TEST(ReliableStorageLock, StressMultiThreaded) { + // It is hard to use a piloted clock for these tests because the folly::FunctionScheduler we use for the lock + // extensions doesn't support a custom clock. Thus this test will need to run for about 2 minutes. + auto threads = 100u; + folly::FutureExecutor exec{threads}; + auto store = std::make_shared(); + // Running the test with tighter timeout than the 1000ms timeout causes it to fail occasionally. + // Seemingly because the heartbeating thread might occasionally not run for long periods of time. This problem disappears with larger timouts like 1000ms. + ReliableStorageLock<> lock{StringId{"test_lock"}, store, ONE_SECOND}; + + int counter = 0; + + std::vector> futures; + for(auto i = 0u; i < threads; ++i) { + // We use both fast and slow tasks to test both fast lock frees and lock extensions + auto sleep_time = std::chrono::milliseconds(i%2 * 2000); + futures.emplace_back(exec.addFuture(SlowIncrementTask{counter, lock, sleep_time})); + } + folly::collectAll(futures).get(); + + // If no locks were lost and no races happened each thread will have incremented the counter exactly once + ASSERT_EQ(counter, threads); + + // Also the lock should be free by the end (i.e. we can take a new lock) + ASSERT_EQ(lock.try_take_lock().has_value(), true); +} diff --git a/cpp/vcpkg b/cpp/vcpkg index c82f746672..6e1219d080 160000 --- a/cpp/vcpkg +++ b/cpp/vcpkg @@ -1 +1 @@ -Subproject commit c82f74667287d3dc386bce81e44964370c91a289 +Subproject commit 6e1219d080fc0525f55bf8c8ba57d8ab2ce35422 diff --git a/cpp/vcpkg.json b/cpp/vcpkg.json index b364273b91..aad9458c6c 100644 --- a/cpp/vcpkg.json +++ b/cpp/vcpkg.json @@ -31,16 +31,52 @@ "features": [ "push", "pull" ] }, "mongo-cxx-driver", - { - "name": "aws-c-io", - "$version reason": "To pick up https://github.com/awslabs/aws-c-io/pull/515" - }, { "name": "aws-sdk-cpp", - "$version reason": "Minimum version in the baseline that works with aws-c-io above.", + "$version reason": "Version which contains atomic put operations", "default-features": false, "features": [ "s3", "identity-management" ] }, + { + "name": "aws-crt-cpp", + "version>=": "0.28.3" + }, + { + "name": "aws-c-s3", + "version>=": "0.6.6" + }, + { + "name": "aws-c-io", + "version>=": "0.14.18" + }, + { + "name": "aws-c-common", + "version>=": "0.9.28" + }, + { + "name": "aws-c-auth", + "version>=": "0.7.31" + }, + { + "name": "aws-c-cal", + "version>=": "0.7.4" + }, + { + "name": "aws-c-http", + "version>=": "0.8.10" + }, + { + "name": "aws-c-sdkutils", + "version>=": "0.1.19" + }, + { + "name": "aws-c-event-stream", + "version>=": "0.4.3" + }, + { + "name": "aws-checksums", + "version>=": "0.1.20" + }, "boost-dynamic-bitset", "boost-interprocess", "boost-callable-traits", @@ -68,9 +104,8 @@ ], "overrides": [ { "name": "openssl", "version-string": "3.3.0" }, - { "name": "aws-sdk-cpp", "version": "1.11.201" }, + { "name": "aws-sdk-cpp", "version": "1.11.405" }, { "name": "azure-core-cpp", "version": "1.12.0" }, - { "name": "aws-c-s3", "version": "0.3.24" }, { "name": "benchmark", "version": "1.9.0" }, { "name": "bitmagic", "version": "7.12.3" }, { "name": "boost-algorithm", "version": "1.84.0" }, diff --git a/environment_unix.yml b/environment_unix.yml index 54277bf6e0..5d76d33d7c 100644 --- a/environment_unix.yml +++ b/environment_unix.yml @@ -27,7 +27,7 @@ dependencies: - pybind11 <2.11 - pcre - cyrus-sasl - - aws-sdk-cpp + - aws-sdk-cpp >=1.11.405 - prometheus-cpp - libprotobuf - openssl diff --git a/python/tests/integration/arcticdb/test_storage_lock.py b/python/tests/integration/arcticdb/test_storage_lock.py new file mode 100644 index 0000000000..b3294943c0 --- /dev/null +++ b/python/tests/integration/arcticdb/test_storage_lock.py @@ -0,0 +1,51 @@ +import pandas as pd +import numpy as np +import pytest + +from arcticdb_ext.tools import ReliableStorageLock, ReliableStorageLockManager +from tests.util.mark import PERSISTENT_STORAGE_TESTS_ENABLED, REAL_S3_TESTS_MARK + +import time + +from arcticdb.util.test import assert_frame_equal +from multiprocessing import Process + + +one_sec = 1_000_000_000 + + +def slow_increment_task(lib, symbol, sleep_time, lock_manager, lock): + lock_manager.take_lock_guard(lock) + df = lib.read(symbol).data + df["col"][0] = df["col"][0] + 1 + time.sleep(sleep_time) + lib.write(symbol, df) + lock_manager.free_lock_guard() + + +@pytest.mark.parametrize("num_processes,max_sleep", [(100, 1), (5, 20)]) +@REAL_S3_TESTS_MARK +def test_many_increments(real_s3_version_store, num_processes, max_sleep): + lib = real_s3_version_store + init_df = pd.DataFrame({"col": [0]}) + symbol = "counter" + lib.version_store.force_delete_symbol(symbol) + lib.write(symbol, init_df) + lock = ReliableStorageLock("test_lock", lib._library, 10*one_sec) + lock_manager = ReliableStorageLockManager() + + processes = [ + Process(target=slow_increment_task, args=(lib, symbol, 0 if i%2==0 else max_sleep, lock_manager, lock)) + for i in range(num_processes) + ] + for p in processes: + p.start() + + for p in processes: + p.join() + + vit = lib.read(symbol) + read_df = vit.data + expected_df = pd.DataFrame({"col": [num_processes]}) + assert_frame_equal(read_df, expected_df) + assert vit.version == num_processes