Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ set(arcticdb_srcs
util/slab_allocator.hpp
util/sparse_utils.hpp
util/storage_lock.hpp
util/reliable_storage_lock.hpp
util/reliable_storage_lock-inl.hpp
util/string_utils.hpp
util/thread_cached_int.hpp
util/timeouts.hpp
Expand Down Expand Up @@ -514,8 +516,7 @@ set(arcticdb_srcs
version/version_core.cpp
version/version_store_api.cpp
version/version_utils.cpp
version/version_map_batch_methods.cpp
)
version/version_map_batch_methods.cpp)

add_library(arcticdb_core_object OBJECT ${arcticdb_srcs})

Expand Down Expand Up @@ -946,6 +947,7 @@ if(${TEST})
util/test/test_id_transformation.cpp
util/test/test_key_utils.cpp
util/test/test_ranges_from_future.cpp
util/test/test_reliable_storage_lock.cpp
util/test/test_slab_allocator.cpp
util/test/test_storage_lock.cpp
util/test/test_string_pool.cpp
Expand All @@ -963,8 +965,7 @@ if(${TEST})
version/test/test_version_store.cpp
version/test/version_map_model.hpp
python/python_handlers.cpp
storage/test/common.hpp
)
storage/test/common.hpp)

set(EXECUTABLE_PERMS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE) # 755

Expand Down
13 changes: 13 additions & 0 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,15 @@ entity::VariantKey write_sync(
return WriteSegmentTask{library_}(std::move(encoded));
}

entity::VariantKey write_if_none_sync(
KeyType key_type,
const StreamId &stream_id,
SegmentInMemory &&segment) override {
util::check(is_ref_key_class(key_type), "Expected ref key type got {}", key_type);
auto encoded = EncodeRefTask{key_type, stream_id, std::move(segment), codec_, encoding_version_}();
return WriteIfNoneTask{library_}(std::move(encoded));
}

bool is_path_valid(const std::string_view path) const override {
return library_->is_path_valid(path);
}
Expand Down Expand Up @@ -271,6 +280,10 @@ bool supports_prefix_matching() const override {
return library_->supports_prefix_matching();
}

bool supports_atomic_writes() const override {
return library_->supports_atomic_writes();
}

std::string key_path(const VariantKey& key) const {
return library_->key_path(key);
}
Expand Down
17 changes: 17 additions & 0 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,23 @@ struct WriteSegmentTask : BaseTask {
}
};

struct WriteIfNoneTask : BaseTask {
std::shared_ptr<storage::Library> lib_;

explicit WriteIfNoneTask(std::shared_ptr<storage::Library> lib) :
lib_(std::move(lib)) {
}

ARCTICDB_MOVE_ONLY_DEFAULT(WriteIfNoneTask)

VariantKey operator()(storage::KeySegmentPair &&key_seg) const {
ARCTICDB_SAMPLE(WriteSegmentTask, 0)
auto k = key_seg.variant_key();
lib_->write_if_none(std::move(key_seg));
return k;
}
};

struct UpdateSegmentTask : BaseTask {
std::shared_ptr<storage::Library> lib_;
storage::UpdateOpts opts_;
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/entity/key.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ KeyData get_key_data(KeyType key_type) {
STRING_REF(KeyType::APPEND_REF, aref, 'a')
STRING_KEY(KeyType::MULTI_KEY, mref, 'm')
STRING_REF(KeyType::LOCK, lref, 'x')
STRING_REF(KeyType::ATOMIC_LOCK, alref, 'A')
STRING_REF(KeyType::SNAPSHOT_TOMBSTONE, ttomb, 'X')
STRING_KEY(KeyType::APPEND_DATA, app, 'b')
STRING_REF(KeyType::BLOCK_VERSION_REF, bvref, 'R')
Expand Down
6 changes: 4 additions & 2 deletions cpp/arcticdb/entity/key.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,14 @@ enum class KeyType : int {
* Used for storing the ids of storages that failed to sync
*/
REPLICATION_FAIL_INFO = 26,

/*
* A reference key storing many versions, used to track state within some of our background jobs.
*/
BLOCK_VERSION_REF = 27,

/*
* Used for a list based reliable storage lock
*/
ATOMIC_LOCK = 28,
UNDEFINED
};

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/python/python_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ PYBIND11_MODULE(arcticdb_ext, m) {
arcticdb::storage::apy::register_bindings(storage_submodule, base_exception);

arcticdb::stream::register_bindings(m);
arcticdb::toolbox::apy::register_bindings(m);
arcticdb::toolbox::apy::register_bindings(m, base_exception);

m.def("get_version_string", &arcticdb::get_arcticdb_version_string);

Expand Down
8 changes: 8 additions & 0 deletions cpp/arcticdb/storage/azure/azure_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ class AzureStorage final : public Storage {
protected:
void do_write(Composite<KeySegmentPair>&& kvs) final;

void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final {
storage::raise<ErrorCode::E_UNSUPPORTED_ATOMIC_OPERATION>("Atomic operations are only supported for s3 backend");
};

void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) final;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final;
Expand All @@ -51,6 +55,10 @@ class AzureStorage final : public Storage {
return true;
}

bool do_supports_atomic_writes() const final {
return false;
}

bool do_fast_delete() final {
return false;
}
Expand Down
8 changes: 8 additions & 0 deletions cpp/arcticdb/storage/file/mapped_file_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class MappedFileStorage final : public SingleFileStorage {

void do_write(Composite<KeySegmentPair>&& kvs) override;

void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final {
storage::raise<ErrorCode::E_UNSUPPORTED_ATOMIC_OPERATION>("Atomic operations are only supported for s3 backend");
};

void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) override;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, storage::ReadKeyOpts opts) override;
Expand All @@ -46,6 +50,10 @@ class MappedFileStorage final : public SingleFileStorage {
return false;
};

bool do_supports_atomic_writes() const final {
return false;
}

std::string do_key_path(const VariantKey&) const override { return {}; }

bool do_fast_delete() override;
Expand Down
10 changes: 10 additions & 0 deletions cpp/arcticdb/storage/library.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ class Library {
storages_->write(std::move(kvs));
}

void write_if_none(KeySegmentPair&& kv) {
if (open_mode() < OpenMode::WRITE) {
throw LibraryPermissionException(library_path_, open_mode(), "write");
}

storages_->write_if_none(std::move(kv));
}

void update(Composite<KeySegmentPair>&& kvs, storage::UpdateOpts opts) {
ARCTICDB_SAMPLE(LibraryUpdate, 0)
if (open_mode() < OpenMode::WRITE)
Expand Down Expand Up @@ -155,6 +163,8 @@ class Library {

bool supports_prefix_matching() const { return storages_->supports_prefix_matching(); }

bool supports_atomic_writes() const { return storages_->supports_atomic_writes(); }

const LibraryPath &library_path() const { return library_path_; }

OpenMode open_mode() const { return storages_->open_mode(); }
Expand Down
8 changes: 8 additions & 0 deletions cpp/arcticdb/storage/lmdb/lmdb_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class LmdbStorage final : public Storage {
private:
void do_write(Composite<KeySegmentPair>&& kvs) final;

void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final {
storage::raise<ErrorCode::E_UNSUPPORTED_ATOMIC_OPERATION>("Atomic operations are only supported for s3 backend");
};

void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) final;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, storage::ReadKeyOpts opts) final;
Expand All @@ -48,6 +52,10 @@ class LmdbStorage final : public Storage {
return false;
};

bool do_supports_atomic_writes() const final {
return false;
}

inline bool do_fast_delete() final;

void cleanup() override;
Expand Down
8 changes: 8 additions & 0 deletions cpp/arcticdb/storage/memory/memory_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ namespace arcticdb::storage::memory {
private:
void do_write(Composite<KeySegmentPair>&& kvs) final;

void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final {
storage::raise<ErrorCode::E_UNSUPPORTED_ATOMIC_OPERATION>("Atomic operations are only supported for s3 backend");
};

void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) final;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final;
Expand All @@ -41,6 +45,10 @@ namespace arcticdb::storage::memory {
return false;
}

bool do_supports_atomic_writes() const final {
return false;
}

inline bool do_fast_delete() final;

bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string & prefix) final;
Expand Down
8 changes: 8 additions & 0 deletions cpp/arcticdb/storage/mongo/mongo_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class MongoStorage final : public Storage {
private:
void do_write(Composite<KeySegmentPair>&& kvs) final;

void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final {
storage::raise<ErrorCode::E_UNSUPPORTED_ATOMIC_OPERATION>("Atomic operations are only supported for s3 backend");
};

void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) final;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final;
Expand All @@ -43,6 +47,10 @@ class MongoStorage final : public Storage {
return false;
}

bool do_supports_atomic_writes() const final {
return false;
}

inline bool do_fast_delete() final;

bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string &prefix) final;
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/storage/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept
.value("STORAGE_INFO", KeyType::STORAGE_INFO)
.value("APPEND_REF", KeyType::APPEND_REF)
.value("LOCK", KeyType::LOCK)
.value("SLOW_LOCK", KeyType::ATOMIC_LOCK)
.value("SNAPSHOT_REF", KeyType::SNAPSHOT_REF)
.value("TOMBSTONE", KeyType::TOMBSTONE)
.value("APPEND_DATA", KeyType::APPEND_DATA)
Expand Down
25 changes: 25 additions & 0 deletions cpp/arcticdb/storage/s3/detail-inl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ namespace s3 {
error_message_suffix));
}

if (type == Aws::S3::S3Errors::UNKNOWN && err.GetExceptionName().find("Precondition") != std::string::npos) {
raise<ErrorCode::E_ATOMIC_OPERATION_FAILED>(fmt::format("Atomic operation failed: {}", error_message_suffix));
}

// We create a more detailed error explanation in case of NETWORK_CONNECTION errors to remedy #880.
if (type == Aws::S3::S3Errors::NETWORK_CONNECTION) {
error_message = fmt::format("Unexpected network error: {} "
Expand Down Expand Up @@ -129,6 +133,27 @@ namespace s3 {
});
}

template<class KeyBucketizer>
void do_write_if_none_impl(
KeySegmentPair &&kv,
const std::string &root_folder,
const std::string &bucket_name,
S3ClientWrapper &s3_client,
KeyBucketizer &&bucketizer) {
ARCTICDB_SAMPLE(S3StorageWriteIfNone, 0)
auto key_type_dir = key_type_folder(root_folder, kv.key_type());
auto &k = kv.variant_key();
auto s3_object_name = object_path(bucketizer.bucketize(key_type_dir, k), k);
auto &seg = kv.segment();

auto put_object_result = s3_client.put_object(s3_object_name, std::move(seg), bucket_name, PutHeader::IF_NONE_MATCH);

if (!put_object_result.is_success()) {
auto& error = put_object_result.get_error();
raise_s3_exception(error, s3_object_name);
}
}

template<class KeyBucketizer>
void do_update_impl(
Composite<KeySegmentPair> &&kvs,
Expand Down
8 changes: 8 additions & 0 deletions cpp/arcticdb/storage/s3/nfs_backed_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ class NfsBackedStorage final : public Storage {
private:
void do_write(Composite<KeySegmentPair>&& kvs) final;

void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final {
storage::raise<ErrorCode::E_UNSUPPORTED_ATOMIC_OPERATION>("Atomic operations are only supported for s3 backend");
};

void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) final;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final;
Expand All @@ -49,6 +53,10 @@ class NfsBackedStorage final : public Storage {
return true;
}

bool do_supports_atomic_writes() const final {
return false;
}

bool do_fast_delete() final {
return false;
}
Expand Down
8 changes: 7 additions & 1 deletion cpp/arcticdb/storage/s3/s3_client_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ struct DeleteOutput{
std::vector<FailedDelete> failed_deletes;
};

enum class PutHeader{
NONE,
IF_NONE_MATCH
};

// An abstract class, which is responsible for sending the requests and parsing the responses from S3.
// It can be derived as either a real connection to S3 or a mock used for unit tests.
class S3ClientWrapper {
Expand All @@ -57,7 +62,8 @@ class S3ClientWrapper {
virtual S3Result<std::monostate> put_object(
const std::string& s3_object_name,
Segment&& segment,
const std::string& bucket_name) = 0;
const std::string& bucket_name,
PutHeader header = PutHeader::NONE) = 0;

virtual S3Result<DeleteOutput> delete_objects(
const std::vector<std::string>& s3_object_names,
Expand Down
8 changes: 7 additions & 1 deletion cpp/arcticdb/storage/s3/s3_mock_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ std::optional<Aws::S3::S3Error> has_failure_trigger(const std::string& s3_object
}

const auto not_found_error = Aws::S3::S3Error(Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::RESOURCE_NOT_FOUND, false));
const auto precondition_failed_error = Aws::S3::S3Error(Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::UNKNOWN, "Precondition failed", "Precondition failed", false));

S3Result<std::monostate> MockS3Client::head_object(
const std::string& s3_object_name,
Expand Down Expand Up @@ -83,12 +84,17 @@ S3Result<Segment> MockS3Client::get_object(
S3Result<std::monostate> MockS3Client::put_object(
const std::string &s3_object_name,
Segment &&segment,
const std::string &bucket_name) {
const std::string &bucket_name,
PutHeader header) {
auto maybe_error = has_failure_trigger(s3_object_name, StorageOperation::WRITE);
if (maybe_error.has_value()) {
return {*maybe_error};
}

if (header == PutHeader::IF_NONE_MATCH && s3_contents.contains({bucket_name, s3_object_name})) {
return {precondition_failed_error};
}

s3_contents.insert_or_assign({bucket_name, s3_object_name}, std::move(segment));

return {std::monostate()};
Expand Down
3 changes: 2 additions & 1 deletion cpp/arcticdb/storage/s3/s3_mock_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class MockS3Client : public S3ClientWrapper {
S3Result<std::monostate> put_object(
const std::string& s3_object_name,
Segment&& segment,
const std::string& bucket_name) override;
const std::string& bucket_name,
PutHeader header = PutHeader::NONE) override;

S3Result<DeleteOutput> delete_objects(
const std::vector<std::string>& s3_object_names,
Expand Down
Loading
Loading