Skip to content

Commit

Permalink
Initial fix
Browse files Browse the repository at this point in the history
  • Loading branch information
joe-iddon committed Nov 27, 2023
1 parent fea52fc commit 79ca969
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 66 deletions.
1 change: 1 addition & 0 deletions cpp/arcticdb/util/error_code.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ inline std::unordered_map<ErrorCategory, const char*> get_error_category_names()
ERROR_CODE(2003, E_INCOMPATIBLE_INDEX) \
ERROR_CODE(2004, E_WRONG_SHAPE) \
ERROR_CODE(3000, E_NO_SUCH_VERSION) \
ERROR_CODE(3001, E_NO_SYMBOL_DATA) \
ERROR_CODE(3010, E_UNREADABLE_SYMBOL_LIST) \
ERROR_CODE(4000, E_DESCRIPTOR_MISMATCH) \
ERROR_CODE(4001, E_COLUMN_DOESNT_EXIST) \
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,7 @@ VersionedItem LocalVersionedEngine::compact_incomplete_dynamic(
bool via_iteration,
bool sparsify,
bool prune_previous_versions) {
log::version().info("Compacting incomplete symbol {}", stream_id);
log::version().debug("Compacting incomplete symbol {}", stream_id);

auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id, VersionQuery{}, ReadOptions{});
auto versioned_item = compact_incomplete_impl(
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/version/test/version_backwards_compat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ std::vector<AtomKey> backwards_compat_write_and_prune_previous(std::shared_ptr<S
auto old_entry = *entry;
entry->clear();
version_map->do_write(store, key, entry);
write_symbol_ref(store, key, std::nullopt, entry->head_.value());
version_map->remove_entry_version_keys(store, old_entry, key.id());
output = old_entry.get_indexes(false);

Expand Down
5 changes: 5 additions & 0 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,11 @@ FrameAndDescriptor read_dataframe_impl(
read_incompletes_to_pipeline(store, pipeline_context, read_query, read_options, false, false, false);
}

if(std::holds_alternative<StreamId>(version_info) && !pipeline_context->incompletes_after_) {
missing_data::raise<ErrorCode::E_NO_SYMBOL_DATA>(
"read_dataframe_impl: read returned no data for symbol {} (found no versions or append data)", pipeline_context->stream_id_);
}

modify_descriptor(pipeline_context, read_options);
generate_filtered_field_descriptors(pipeline_context, read_query.columns);
ARCTICDB_DEBUG(log::version(), "Fetching data to frame");
Expand Down
165 changes: 103 additions & 62 deletions cpp/arcticdb/version/version_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class VersionMapImpl {
auto entry = check_reload(store, key.id(), load_param, __FUNCTION__);

do_write(store, key, entry);
write_symbol_ref(store, key, std::nullopt, entry->head_.value());
if (validate_)
entry->validate();
if(log_changes_)
Expand All @@ -237,9 +238,8 @@ class VersionMapImpl {
const std::shared_ptr<Store>& store,
const AtomKey& previous_key,
const std::shared_ptr<VersionMapEntry>& entry) {
auto tombstone_key = get_tombstone_all_key(previous_key, store->current_timestamp());
entry->try_set_tombstone_all(tombstone_key);
do_write(store, tombstone_key, entry);
auto tombstone_key = write_tombstone_all_key_internal(store, previous_key, entry);
write_symbol_ref(store, tombstone_key, std::nullopt, entry->head_.value());
return tombstone_key;
}

Expand All @@ -253,7 +253,20 @@ class VersionMapImpl {
const StreamId& stream_id,
std::optional<AtomKey> first_key_to_tombstone = std::nullopt
) {
return tombstone_from_key_or_all_internal(store, stream_id, first_key_to_tombstone);
auto entry = check_reload(
store,
stream_id,
LoadParameter{LoadType::LOAD_UNDELETED},
__FUNCTION__);
auto output = tombstone_from_key_or_all_internal(store, stream_id, first_key_to_tombstone, entry);

if (validate_)
entry->validate();

if (entry->head_)
write_symbol_ref(store, *entry->keys_.cbegin(), std::nullopt, entry->head_.value());

return output;
}

std::string dump_entry(const std::shared_ptr<Store>& store, const StreamId& stream_id) {
Expand All @@ -263,8 +276,8 @@ class VersionMapImpl {

std::vector<AtomKey> write_and_prune_previous(
std::shared_ptr<Store> store,
const AtomKey &key, const
std::optional<AtomKey>& previous_key) {
const AtomKey &key,
const std::optional<AtomKey>& previous_key) {
ARCTICDB_DEBUG(log::version(), "Version map pruning previous versions for stream {}", key.id());
auto entry = check_reload(
store,
Expand All @@ -274,6 +287,7 @@ class VersionMapImpl {
auto [_, result] = tombstone_from_key_or_all_internal(store, key.id(), previous_key, entry);

do_write(store, key, entry);
write_symbol_ref(store, *entry->keys_.cbegin(), std::nullopt, entry->head_.value());

if (log_changes_)
log_write(store, key.id(), key.version_id());
Expand Down Expand Up @@ -304,6 +318,8 @@ class VersionMapImpl {
}

void compact_and_remove_deleted_indexes(std::shared_ptr<Store> store, const StreamId& stream_id) {
// This method has no API, and is not tested in the rapidcheck tests, but could easily be enabled there.
// It compacts the version map but skips any keys which have been deleted (to free up space).
ARCTICDB_DEBUG(log::version(), "Version map compacting versions for stream {}", stream_id);
auto entry = check_reload(store, stream_id, LoadParameter{LoadType::LOAD_ALL}, __FUNCTION__);
if (!requires_compaction(entry))
Expand All @@ -316,14 +332,14 @@ class VersionMapImpl {
auto new_entry = std::make_shared<VersionMapEntry>();
new_entry->keys_.push_front(*latest_version);

if (const auto first_is_tombstone = entry->get_tombstone_if_any(new_version_id); first_is_tombstone)
if (const auto first_is_tombstone = entry->get_tombstone(new_version_id); first_is_tombstone)
new_entry->keys_.emplace_front(std::move(*first_is_tombstone));

std::advance(latest_version, 1);

for (const auto &key : folly::Range{latest_version, entry->keys_.end()}) {
if (is_index_key_type(key.type())) {
const auto tombstone = entry->is_tombstoned(key);
const auto tombstone = entry->get_tombstone(key.version_id());
if (tombstone) {
if (!store->key_exists(key).get())
ARCTICDB_DEBUG(log::version(), "Removing deleted key {}", key);
Expand All @@ -341,6 +357,7 @@ class VersionMapImpl {
}
}
new_entry->head_ = write_entry_to_storage(store, stream_id, new_version_id, new_entry);
write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value());
remove_entry_version_keys(store, entry, stream_id);
if (validate_)
new_entry->validate();
Expand Down Expand Up @@ -394,44 +411,6 @@ class VersionMapImpl {
return to_atom(std::move(journal_key_fut).get());
}

std::shared_ptr<VersionMapEntry> compact_entry(
std::shared_ptr<Store> store,
const StreamId& stream_id,
const std::shared_ptr<VersionMapEntry>& entry) {
// For compacting an entry, we compact from the second version key in the chain
// This makes it concurrent safe (when use_tombstones is enabled)
// The first version key is in head and the second version key is first in entry.keys_
if (validate_)
entry->validate();
util::check(entry->head_.value().type() == KeyType::VERSION, "Type of head must be version");
auto new_entry = std::make_shared<VersionMapEntry>(*entry);

auto parent = std::find_if(std::begin(new_entry->keys_), std::end(new_entry->keys_),
[](const auto& k){return k.type() == KeyType ::VERSION;});

// Copy version keys to be removed
std::vector<VariantKey> version_keys_compacted;
std::copy_if(parent + 1, std::end(new_entry->keys_), std::back_inserter(version_keys_compacted),
[](const auto& k){return k.type() == KeyType::VERSION;});

// Copy index keys to be compacted
std::vector<AtomKey> index_keys_compacted;
std::copy_if(parent + 1, std::end(new_entry->keys_), std::back_inserter(index_keys_compacted),
[](const auto& k){return is_index_or_tombstone(k);});

update_version_key(store, *parent, index_keys_compacted, stream_id);
store->remove_keys(version_keys_compacted).get();

new_entry->keys_.erase(std::remove_if(parent + 1,
std::end(new_entry->keys_),
[](const auto& k){return k.type() == KeyType::VERSION;}),
std::end(new_entry->keys_));

if (validate_)
new_entry->validate();
return new_entry;
}

/** To be run as a stand-alone job only because it calls flush(). */
void compact_if_necessary_stand_alone(const std::shared_ptr<Store>& store, size_t batch_size) {
auto map = get_num_version_entries(store, batch_size);
Expand Down Expand Up @@ -486,6 +465,7 @@ class VersionMapImpl {
entry->keys_.assign(std::begin(index_keys), std::end(index_keys));
auto new_version_id = index_keys[0].version_id();
entry->head_ = write_entry_to_storage(store, stream_id, new_version_id, entry);
write_symbol_ref(store, *entry->keys_.cbegin(), std::nullopt, entry->head_.value());
if (validate_)
entry->validate();
}
Expand Down Expand Up @@ -521,7 +501,6 @@ class VersionMapImpl {

auto journal_key = to_atom(std::move(journal_single_key(store, key, entry->head_)));
write_to_entry(entry, key, journal_key);
write_symbol_ref(store, key, std::nullopt, journal_key);
}

AtomKey write_tombstone(
Expand All @@ -530,16 +509,8 @@ class VersionMapImpl {
const StreamId& stream_id,
const std::shared_ptr<VersionMapEntry>& entry,
const std::optional<timestamp>& creation_ts=std::nullopt) {
if (validate_)
entry->validate();

auto tombstone = util::variant_match(key, [&stream_id, store, &creation_ts](const auto &k){
return index_to_tombstone(k, stream_id, creation_ts.value_or(store->current_timestamp()));
});
do_write(store, tombstone, entry);
if(log_changes_)
log_tombstone(store, tombstone.id(), tombstone.version_id());

auto tombstone = write_tombstone_internal(store, key, stream_id, entry, creation_ts);
write_symbol_ref(store, tombstone, std::nullopt, entry->head_.value());
return tombstone;
}

Expand Down Expand Up @@ -569,6 +540,44 @@ class VersionMapImpl {
}

private:
std::shared_ptr<VersionMapEntry> compact_entry(
std::shared_ptr<Store> store,
const StreamId& stream_id,
const std::shared_ptr<VersionMapEntry>& entry) {
// For compacting an entry, we compact from the second version key in the chain
// This makes it concurrent safe (when use_tombstones is enabled)
// The first version key is in head and the second version key is first in entry.keys_
if (validate_)
entry->validate();
util::check(entry->head_.value().type() == KeyType::VERSION, "Type of head must be version");
auto new_entry = std::make_shared<VersionMapEntry>(*entry);

auto parent = std::find_if(std::begin(new_entry->keys_), std::end(new_entry->keys_),
[](const auto& k){return k.type() == KeyType ::VERSION;});

// Copy version keys to be removed
std::vector<VariantKey> version_keys_compacted;
std::copy_if(parent + 1, std::end(new_entry->keys_), std::back_inserter(version_keys_compacted),
[](const auto& k){return k.type() == KeyType::VERSION;});

// Copy index keys to be compacted
std::vector<AtomKey> index_keys_compacted;
std::copy_if(parent + 1, std::end(new_entry->keys_), std::back_inserter(index_keys_compacted),
[](const auto& k){return is_index_or_tombstone(k);});

update_version_key(store, *parent, index_keys_compacted, stream_id);
store->remove_keys(version_keys_compacted).get();

new_entry->keys_.erase(std::remove_if(parent + 1,
std::end(new_entry->keys_),
[](const auto& k){return k.type() == KeyType::VERSION;}),
std::end(new_entry->keys_));

if (validate_)
new_entry->validate();
return new_entry;
}

void write_to_entry(
const std::shared_ptr<VersionMapEntry>& entry,
const AtomKey& key,
Expand Down Expand Up @@ -683,7 +692,6 @@ class VersionMapImpl {
}

version_agg.commit();
write_symbol_ref(store, *entry->keys_.cbegin(), std::nullopt, journal_key);
return journal_key;
}

Expand Down Expand Up @@ -818,7 +826,8 @@ class VersionMapImpl {
entry->clear();
load_via_iteration(store, stream_id, entry, false);
remove_duplicate_index_keys(entry);
(void)rewrite_entry(store, stream_id, entry);
auto new_entry = rewrite_entry(store, stream_id, entry);
write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value());
}

void remove_and_rewrite_version_keys(std::shared_ptr<Store> store, const StreamId& stream_id) {
Expand All @@ -828,8 +837,9 @@ class VersionMapImpl {
entry->clear();
load_via_iteration(store, stream_id, entry, true);
remove_duplicate_index_keys(entry);
(void)rewrite_entry(store, stream_id, entry);
auto new_entry = rewrite_entry(store, stream_id, entry);
remove_entry_version_keys(store, old_entry, stream_id);
write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value());
}

void fix_ref_key(std::shared_ptr<Store> store, const StreamId& stream_id) {
Expand Down Expand Up @@ -878,7 +888,8 @@ class VersionMapImpl {

entry->keys_.insert(std::begin(entry->keys_), std::begin(missing_versions), std::end(missing_versions));
entry->sort();
rewrite_entry(store, stream_id, entry);
auto new_entry = rewrite_entry(store, stream_id, entry);
write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value());
}

std::shared_ptr<Lock> get_lock_object(const StreamId& stream_id) const {
Expand All @@ -891,6 +902,7 @@ class VersionMapImpl {
util::check(!entry->keys_.empty(), "Can't rewrite empty version journal entry");
auto version_id = entry->keys_[0].version_id();
entry->head_ = std::make_optional(write_entry_to_storage(store, stream_id, version_id, entry));
write_symbol_ref(store, *entry->keys_.cbegin(), std::nullopt, entry->head_.value());
return entry->head_.value();
}

Expand Down Expand Up @@ -944,14 +956,43 @@ class VersionMapImpl {
const VersionId version_id = latest_version ? latest_version->version_id() : 0;

if (!output.empty()) {
auto tombstone_key = write_tombstone_all_key(store, first_key_to_tombstone.value(), entry);
auto tombstone_key = write_tombstone_all_key_internal(store, first_key_to_tombstone.value(), entry);
if(log_changes_) {
log_tombstone_all(store, stream_id, tombstone_key.version_id());
}
}

return {version_id, std::move(output)};
}

AtomKey write_tombstone_all_key_internal(
const std::shared_ptr<Store>& store,
const AtomKey& previous_key,
const std::shared_ptr<VersionMapEntry>& entry) {
auto tombstone_key = get_tombstone_all_key(previous_key, store->current_timestamp());
entry->try_set_tombstone_all(tombstone_key);
do_write(store, tombstone_key, entry);
return tombstone_key;
}

AtomKey write_tombstone_internal(
std::shared_ptr<Store> store,
const std::variant<AtomKey, VersionId>& key,
const StreamId& stream_id,
const std::shared_ptr<VersionMapEntry>& entry,
const std::optional<timestamp>& creation_ts=std::nullopt) {
if (validate_)
entry->validate();

auto tombstone = util::variant_match(key, [&stream_id, store, &creation_ts](const auto &k){
return index_to_tombstone(k, stream_id, creation_ts.value_or(store->current_timestamp()));
});
do_write(store, tombstone, entry);
if(log_changes_)
log_tombstone(store, tombstone.id(), tombstone.version_id());

return tombstone;
}
};

using VersionMap = VersionMapImpl<>;
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/version/version_map_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ struct VersionMapEntry {
}

// Below four functions used to return optional<AtomKey> of the tombstone, but copying keys is expensive and only
// one function was actually interested in the key, so they now return bool. See get_tombstone_if_any().
// one function was actually interested in the key, so they now return bool. See get_tombstone().
bool has_individual_tombstone(VersionId version_id) const {
return tombstones_.count(version_id) != 0;
}
Expand All @@ -195,7 +195,7 @@ struct VersionMapEntry {
return is_tombstoned_via_tombstone_all(version_id) || has_individual_tombstone(version_id);
}

std::optional<AtomKey> get_tombstone_if_any(VersionId version_id) {
std::optional<AtomKey> get_tombstone(VersionId version_id) {
if (tombstone_all_ && tombstone_all_->version_id() >= version_id) {
return tombstone_all_;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/version/version_store_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ using namespace arcticdb::entity;
namespace as = arcticdb::stream;

/**
* PythonVersionStore contains all the Pythonic cruft that isn't portable, as well as non-essential features that are
* PythonVersionStore contains all the Python cruft that isn't portable, as well as non-essential features that are
* part of the backwards-compatibility with Arctic Python but that we think are random/a bad idea and aren't part of
* the main product.
*/
Expand Down

0 comments on commit 79ca969

Please sign in to comment.