Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atomic version map update #1104

Merged
merged 2 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/arcticdb/util/error_code.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ inline std::unordered_map<ErrorCategory, const char*> get_error_category_names()
ERROR_CODE(2003, E_INCOMPATIBLE_INDEX) \
ERROR_CODE(2004, E_WRONG_SHAPE) \
ERROR_CODE(3000, E_NO_SUCH_VERSION) \
ERROR_CODE(3001, E_NO_SYMBOL_DATA) \
joe-iddon marked this conversation as resolved.
Show resolved Hide resolved
ERROR_CODE(3010, E_UNREADABLE_SYMBOL_LIST) \
ERROR_CODE(4000, E_DESCRIPTOR_MISMATCH) \
ERROR_CODE(4001, E_COLUMN_DOESNT_EXIST) \
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,7 @@ VersionedItem LocalVersionedEngine::compact_incomplete_dynamic(
bool via_iteration,
bool sparsify,
bool prune_previous_versions) {
log::version().info("Compacting incomplete symbol {}", stream_id);
log::version().debug("Compacting incomplete symbol {}", stream_id);

auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id, VersionQuery{}, ReadOptions{});
auto versioned_item = compact_incomplete_impl(
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/version/test/version_backwards_compat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ std::vector<AtomKey> backwards_compat_write_and_prune_previous(std::shared_ptr<S
auto old_entry = *entry;
entry->clear();
version_map->do_write(store, key, entry);
write_symbol_ref(store, key, std::nullopt, entry->head_.value());
version_map->remove_entry_version_keys(store, old_entry, key.id());
output = old_entry.get_indexes(false);

Expand Down
5 changes: 5 additions & 0 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,11 @@ FrameAndDescriptor read_dataframe_impl(
read_incompletes_to_pipeline(store, pipeline_context, read_query, read_options, false, false, false);
}

if(std::holds_alternative<StreamId>(version_info) && !pipeline_context->incompletes_after_) {
missing_data::raise<ErrorCode::E_NO_SYMBOL_DATA>(
"read_dataframe_impl: read returned no data for symbol {} (found no versions or append data)", pipeline_context->stream_id_);
}

modify_descriptor(pipeline_context, read_options);
generate_filtered_field_descriptors(pipeline_context, read_query.columns);
ARCTICDB_DEBUG(log::version(), "Fetching data to frame");
Expand Down
165 changes: 103 additions & 62 deletions cpp/arcticdb/version/version_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class VersionMapImpl {
auto entry = check_reload(store, key.id(), load_param, __FUNCTION__);

do_write(store, key, entry);
write_symbol_ref(store, key, std::nullopt, entry->head_.value());
if (validate_)
entry->validate();
if(log_changes_)
Expand All @@ -237,9 +238,8 @@ class VersionMapImpl {
const std::shared_ptr<Store>& store,
const AtomKey& previous_key,
const std::shared_ptr<VersionMapEntry>& entry) {
auto tombstone_key = get_tombstone_all_key(previous_key, store->current_timestamp());
entry->try_set_tombstone_all(tombstone_key);
do_write(store, tombstone_key, entry);
auto tombstone_key = write_tombstone_all_key_internal(store, previous_key, entry);
write_symbol_ref(store, tombstone_key, std::nullopt, entry->head_.value());
return tombstone_key;
}

Expand All @@ -253,7 +253,20 @@ class VersionMapImpl {
const StreamId& stream_id,
std::optional<AtomKey> first_key_to_tombstone = std::nullopt
) {
return tombstone_from_key_or_all_internal(store, stream_id, first_key_to_tombstone);
auto entry = check_reload(
store,
stream_id,
LoadParameter{LoadType::LOAD_UNDELETED},
__FUNCTION__);
auto output = tombstone_from_key_or_all_internal(store, stream_id, first_key_to_tombstone, entry);

if (validate_)
entry->validate();

if (entry->head_)
write_symbol_ref(store, *entry->keys_.cbegin(), std::nullopt, entry->head_.value());

return output;
}

std::string dump_entry(const std::shared_ptr<Store>& store, const StreamId& stream_id) {
Expand All @@ -263,8 +276,8 @@ class VersionMapImpl {

std::vector<AtomKey> write_and_prune_previous(
std::shared_ptr<Store> store,
const AtomKey &key, const
std::optional<AtomKey>& previous_key) {
const AtomKey &key,
const std::optional<AtomKey>& previous_key) {
ARCTICDB_DEBUG(log::version(), "Version map pruning previous versions for stream {}", key.id());
auto entry = check_reload(
store,
Expand All @@ -274,6 +287,7 @@ class VersionMapImpl {
auto [_, result] = tombstone_from_key_or_all_internal(store, key.id(), previous_key, entry);

do_write(store, key, entry);
write_symbol_ref(store, *entry->keys_.cbegin(), std::nullopt, entry->head_.value());

if (log_changes_)
log_write(store, key.id(), key.version_id());
Expand Down Expand Up @@ -304,6 +318,8 @@ class VersionMapImpl {
}

void compact_and_remove_deleted_indexes(std::shared_ptr<Store> store, const StreamId& stream_id) {
// This method has no API, and is not tested in the rapidcheck tests, but could easily be enabled there.
// It compacts the version map but skips any keys which have been deleted (to free up space).
ARCTICDB_DEBUG(log::version(), "Version map compacting versions for stream {}", stream_id);
auto entry = check_reload(store, stream_id, LoadParameter{LoadType::LOAD_ALL}, __FUNCTION__);
if (!requires_compaction(entry))
Expand All @@ -316,14 +332,14 @@ class VersionMapImpl {
auto new_entry = std::make_shared<VersionMapEntry>();
new_entry->keys_.push_front(*latest_version);

if (const auto first_is_tombstone = entry->get_tombstone_if_any(new_version_id); first_is_tombstone)
if (const auto first_is_tombstone = entry->get_tombstone(new_version_id); first_is_tombstone)
new_entry->keys_.emplace_front(std::move(*first_is_tombstone));

std::advance(latest_version, 1);

for (const auto &key : folly::Range{latest_version, entry->keys_.end()}) {
if (is_index_key_type(key.type())) {
const auto tombstone = entry->is_tombstoned(key);
const auto tombstone = entry->get_tombstone(key.version_id());
if (tombstone) {
if (!store->key_exists(key).get())
ARCTICDB_DEBUG(log::version(), "Removing deleted key {}", key);
Expand All @@ -341,6 +357,7 @@ class VersionMapImpl {
}
}
new_entry->head_ = write_entry_to_storage(store, stream_id, new_version_id, new_entry);
write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value());
remove_entry_version_keys(store, entry, stream_id);
if (validate_)
new_entry->validate();
Expand Down Expand Up @@ -394,44 +411,6 @@ class VersionMapImpl {
return to_atom(std::move(journal_key_fut).get());
}

std::shared_ptr<VersionMapEntry> compact_entry(
std::shared_ptr<Store> store,
const StreamId& stream_id,
const std::shared_ptr<VersionMapEntry>& entry) {
// For compacting an entry, we compact from the second version key in the chain
// This makes it concurrent safe (when use_tombstones is enabled)
// The first version key is in head and the second version key is first in entry.keys_
if (validate_)
entry->validate();
util::check(entry->head_.value().type() == KeyType::VERSION, "Type of head must be version");
auto new_entry = std::make_shared<VersionMapEntry>(*entry);

auto parent = std::find_if(std::begin(new_entry->keys_), std::end(new_entry->keys_),
[](const auto& k){return k.type() == KeyType ::VERSION;});

// Copy version keys to be removed
std::vector<VariantKey> version_keys_compacted;
std::copy_if(parent + 1, std::end(new_entry->keys_), std::back_inserter(version_keys_compacted),
[](const auto& k){return k.type() == KeyType::VERSION;});

// Copy index keys to be compacted
std::vector<AtomKey> index_keys_compacted;
std::copy_if(parent + 1, std::end(new_entry->keys_), std::back_inserter(index_keys_compacted),
[](const auto& k){return is_index_or_tombstone(k);});

update_version_key(store, *parent, index_keys_compacted, stream_id);
store->remove_keys(version_keys_compacted).get();

new_entry->keys_.erase(std::remove_if(parent + 1,
std::end(new_entry->keys_),
[](const auto& k){return k.type() == KeyType::VERSION;}),
std::end(new_entry->keys_));

if (validate_)
new_entry->validate();
return new_entry;
}

/** To be run as a stand-alone job only because it calls flush(). */
void compact_if_necessary_stand_alone(const std::shared_ptr<Store>& store, size_t batch_size) {
auto map = get_num_version_entries(store, batch_size);
Expand Down Expand Up @@ -486,6 +465,7 @@ class VersionMapImpl {
entry->keys_.assign(std::begin(index_keys), std::end(index_keys));
auto new_version_id = index_keys[0].version_id();
entry->head_ = write_entry_to_storage(store, stream_id, new_version_id, entry);
joe-iddon marked this conversation as resolved.
Show resolved Hide resolved
write_symbol_ref(store, *entry->keys_.cbegin(), std::nullopt, entry->head_.value());
if (validate_)
entry->validate();
}
Expand Down Expand Up @@ -522,7 +502,6 @@ class VersionMapImpl {

auto journal_key = to_atom(std::move(journal_single_key(store, key, entry->head_)));
write_to_entry(entry, key, journal_key);
write_symbol_ref(store, key, std::nullopt, journal_key);
}

AtomKey write_tombstone(
Expand All @@ -531,16 +510,8 @@ class VersionMapImpl {
const StreamId& stream_id,
const std::shared_ptr<VersionMapEntry>& entry,
const std::optional<timestamp>& creation_ts=std::nullopt) {
if (validate_)
entry->validate();

auto tombstone = util::variant_match(key, [&stream_id, store, &creation_ts](const auto &k){
return index_to_tombstone(k, stream_id, creation_ts.value_or(store->current_timestamp()));
});
do_write(store, tombstone, entry);
if(log_changes_)
log_tombstone(store, tombstone.id(), tombstone.version_id());

auto tombstone = write_tombstone_internal(store, key, stream_id, entry, creation_ts);
write_symbol_ref(store, tombstone, std::nullopt, entry->head_.value());
return tombstone;
}

Expand Down Expand Up @@ -570,6 +541,44 @@ class VersionMapImpl {
}

private:
std::shared_ptr<VersionMapEntry> compact_entry(
std::shared_ptr<Store> store,
const StreamId& stream_id,
const std::shared_ptr<VersionMapEntry>& entry) {
// For compacting an entry, we compact from the second version key in the chain
// This makes it concurrent safe (when use_tombstones is enabled)
// The first version key is in head and the second version key is first in entry.keys_
if (validate_)
joe-iddon marked this conversation as resolved.
Show resolved Hide resolved
entry->validate();
util::check(entry->head_.value().type() == KeyType::VERSION, "Type of head must be version");
auto new_entry = std::make_shared<VersionMapEntry>(*entry);

auto parent = std::find_if(std::begin(new_entry->keys_), std::end(new_entry->keys_),
[](const auto& k){return k.type() == KeyType ::VERSION;});

// Copy version keys to be removed
std::vector<VariantKey> version_keys_compacted;
std::copy_if(parent + 1, std::end(new_entry->keys_), std::back_inserter(version_keys_compacted),
[](const auto& k){return k.type() == KeyType::VERSION;});

// Copy index keys to be compacted
std::vector<AtomKey> index_keys_compacted;
std::copy_if(parent + 1, std::end(new_entry->keys_), std::back_inserter(index_keys_compacted),
[](const auto& k){return is_index_or_tombstone(k);});

update_version_key(store, *parent, index_keys_compacted, stream_id);
store->remove_keys(version_keys_compacted).get();

new_entry->keys_.erase(std::remove_if(parent + 1,
std::end(new_entry->keys_),
[](const auto& k){return k.type() == KeyType::VERSION;}),
std::end(new_entry->keys_));

if (validate_)
new_entry->validate();
return new_entry;
}

void write_to_entry(
const std::shared_ptr<VersionMapEntry>& entry,
const AtomKey& key,
Expand Down Expand Up @@ -684,7 +693,6 @@ class VersionMapImpl {
}

version_agg.commit();
write_symbol_ref(store, *entry->keys_.cbegin(), std::nullopt, journal_key);
return journal_key;
}

Expand Down Expand Up @@ -819,7 +827,8 @@ class VersionMapImpl {
entry->clear();
load_via_iteration(store, stream_id, entry, false);
remove_duplicate_index_keys(entry);
(void)rewrite_entry(store, stream_id, entry);
auto new_entry = rewrite_entry(store, stream_id, entry);
write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value());
}

void remove_and_rewrite_version_keys(std::shared_ptr<Store> store, const StreamId& stream_id) {
Expand All @@ -829,8 +838,9 @@ class VersionMapImpl {
entry->clear();
load_via_iteration(store, stream_id, entry, true);
remove_duplicate_index_keys(entry);
(void)rewrite_entry(store, stream_id, entry);
auto new_entry = rewrite_entry(store, stream_id, entry);
remove_entry_version_keys(store, old_entry, stream_id);
write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value());
}

void fix_ref_key(std::shared_ptr<Store> store, const StreamId& stream_id) {
Expand Down Expand Up @@ -879,7 +889,8 @@ class VersionMapImpl {

entry->keys_.insert(std::begin(entry->keys_), std::begin(missing_versions), std::end(missing_versions));
entry->sort();
rewrite_entry(store, stream_id, entry);
auto new_entry = rewrite_entry(store, stream_id, entry);
write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value());
}

std::shared_ptr<Lock> get_lock_object(const StreamId& stream_id) const {
Expand All @@ -892,6 +903,7 @@ class VersionMapImpl {
util::check(!entry->keys_.empty(), "Can't rewrite empty version journal entry");
auto version_id = entry->keys_[0].version_id();
entry->head_ = std::make_optional(write_entry_to_storage(store, stream_id, version_id, entry));
write_symbol_ref(store, *entry->keys_.cbegin(), std::nullopt, entry->head_.value());
return entry->head_.value();
}

Expand Down Expand Up @@ -945,14 +957,43 @@ class VersionMapImpl {
const VersionId version_id = latest_version ? latest_version->version_id() : 0;

if (!output.empty()) {
auto tombstone_key = write_tombstone_all_key(store, first_key_to_tombstone.value(), entry);
auto tombstone_key = write_tombstone_all_key_internal(store, first_key_to_tombstone.value(), entry);
if(log_changes_) {
log_tombstone_all(store, stream_id, tombstone_key.version_id());
}
}

return {version_id, std::move(output)};
}

AtomKey write_tombstone_all_key_internal(
const std::shared_ptr<Store>& store,
const AtomKey& previous_key,
const std::shared_ptr<VersionMapEntry>& entry) {
auto tombstone_key = get_tombstone_all_key(previous_key, store->current_timestamp());
entry->try_set_tombstone_all(tombstone_key);
do_write(store, tombstone_key, entry);
return tombstone_key;
}

AtomKey write_tombstone_internal(
std::shared_ptr<Store> store,
const std::variant<AtomKey, VersionId>& key,
const StreamId& stream_id,
const std::shared_ptr<VersionMapEntry>& entry,
const std::optional<timestamp>& creation_ts=std::nullopt) {
if (validate_)
entry->validate();

auto tombstone = util::variant_match(key, [&stream_id, store, &creation_ts](const auto &k){
return index_to_tombstone(k, stream_id, creation_ts.value_or(store->current_timestamp()));
});
do_write(store, tombstone, entry);
if(log_changes_)
log_tombstone(store, tombstone.id(), tombstone.version_id());

return tombstone;
}
};

using VersionMap = VersionMapImpl<>;
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/version/version_map_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ struct VersionMapEntry {
}

// Below four functions used to return optional<AtomKey> of the tombstone, but copying keys is expensive and only
// one function was actually interested in the key, so they now return bool. See get_tombstone_if_any().
// one function was actually interested in the key, so they now return bool. See get_tombstone().
bool has_individual_tombstone(VersionId version_id) const {
return tombstones_.count(version_id) != 0;
}
Expand All @@ -195,7 +195,7 @@ struct VersionMapEntry {
return is_tombstoned_via_tombstone_all(version_id) || has_individual_tombstone(version_id);
}

std::optional<AtomKey> get_tombstone_if_any(VersionId version_id) {
std::optional<AtomKey> get_tombstone(VersionId version_id) {
if (tombstone_all_ && tombstone_all_->version_id() >= version_id) {
return tombstone_all_;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/version/version_store_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ using namespace arcticdb::entity;
namespace as = arcticdb::stream;

/**
* PythonVersionStore contains all the Pythonic cruft that isn't portable, as well as non-essential features that are
* PythonVersionStore contains all the Python cruft that isn't portable, as well as non-essential features that are
* part of the backwards-compatibility with Arctic Python but that we think are random/a bad idea and aren't part of
* the main product.
*/
Expand Down
Loading