Skip to content

Commit

Permalink
Fix VersionMapImpl::follow_version_chain so that it does not stop when
Browse files Browse the repository at this point in the history
the first tombstone_all key is reached. We may have tombstone_all key
which deletes versions way down on the chain. Instead we should stop
when the current key version is <= tombstone_all version.
  • Loading branch information
Vasil Pashov authored and Vasil Pashov committed Jun 7, 2023
1 parent b756d5b commit 10876a7
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 8 deletions.
11 changes: 6 additions & 5 deletions cpp/arcticdb/version/version_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,22 @@ class VersionMapImpl {
const LoadParameter& load_params) const {
auto next_key = ref_entry.head_;
entry->head_ = ref_entry.head_;
auto loaded_until = std::numeric_limits<VersionId>::max();
auto oldest_loaded_index = std::numeric_limits<VersionId>::max();

if ((load_params.load_type_ == LoadType::LOAD_LATEST || load_params.load_type_ == LoadType::LOAD_LATEST_UNDELETED ) && is_index_key_type(ref_entry.keys_[0].type())) {
entry->keys_.push_back(ref_entry.keys_[0]);
} else {
do {
auto [key, seg] = store->read_sync(next_key.value());
std::tie(next_key, loaded_until) = read_segment_with_keys(seg, entry);
std::tie(next_key, oldest_loaded_index) = read_segment_with_keys(seg, entry);
} while (next_key
&& need_to_load_further(load_params, loaded_until)
&& need_to_load_further(load_params, oldest_loaded_index)
&& load_latest_ongoing(load_params, entry)
&& looking_for_undeleted(load_params, entry));
&& looking_for_undeleted(load_params, entry)
&& !is_tombstoned_by_tombstone_all(oldest_loaded_index, entry));

if(load_params.load_type_ == LoadType::LOAD_DOWNTO)
entry->loaded_until_ = loaded_until;
entry->loaded_until_ = oldest_loaded_index;
}
}

Expand Down
14 changes: 11 additions & 3 deletions cpp/arcticdb/version/version_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ inline std::pair<std::optional<AtomKey>, VersionId> read_segment_with_keys(
VersionMapEntry& entry) {
ssize_t row = 0;
std::optional<AtomKey> next;
VersionId oldest_loaded = std::numeric_limits<VersionId>::max();
VersionId oldest_loaded_index = std::numeric_limits<VersionId>::max();
for (; row < ssize_t(seg.row_count()); ++row) {
auto key = read_key_row(seg, row);
ARCTICDB_TRACE(log::version(), "Reading key {}", key);
if (is_index_key_type(key.type())) {
entry.keys_.push_back(key);
oldest_loaded = std::min(oldest_loaded, key.version_id());
oldest_loaded_index = std::min(oldest_loaded_index, key.version_id());
} else if (key.type() == KeyType::TOMBSTONE) {
entry.tombstones_.try_emplace(key.version_id(), key);
entry.keys_.push_back(key);
Expand All @@ -96,7 +96,7 @@ inline std::pair<std::optional<AtomKey>, VersionId> read_segment_with_keys(
}
}
util::check(row == ssize_t(seg.row_count()), "Unexpected ordering in journal segment");
return std::make_pair(next, oldest_loaded);
return std::make_pair(next, oldest_loaded_index);
}

inline std::pair<std::optional<AtomKey>, VersionId> read_segment_with_keys(
Expand Down Expand Up @@ -225,6 +225,14 @@ inline bool looking_for_undeleted(const LoadParameter& load_params, const std::s
return false;
}

inline bool is_tombstoned_by_tombstone_all(
const VersionId& oldest_loaded_index,
const std::shared_ptr<VersionMapEntry>& entry)
{
return entry->tombstone_all_
&& entry->tombstone_all_->version_id() >= oldest_loaded_index;
}

void fix_stream_ids_of_index_keys(
const std::shared_ptr<Store> &store,
const StreamId &stream_id,
Expand Down

0 comments on commit 10876a7

Please sign in to comment.