Skip to content

Commit

Permalink
Assume fast tombstone is true and fix bug in VersionMapImpl::follow_v…
Browse files Browse the repository at this point in the history
…ersion_chain (#366)

Resolves #174 

**Changes**

* Remove all branches involving fast tombstone property assuming it's
always true.
* Fix a bug in follow version chain functionality.

**Bug description**
Up to now the code assumes that there are no active versions after a
tombstone_all key and stops following the version chain after
encountering a `tombstone_all` key type. This is not true when we use
`prune_previous_versions`. Imagine the following calls:
1. Add version 0 for a symbol `sym`
2. Add version 1 for the symbol `sym`
3. Prune previous versions

This would result in the following version chain:

0. `(key_type: tombstone_all, version_id: 0)`
1. `(key_type: version, version_id: 0)`
2. `(key_type: index, version_id: 1)`
3. `(key_type: version, version_id: 1)`
5. `(key_type: index, version_id 0)`

Where we have a live version (version 1) after the `tombstone_all` key.

The bug does not appear when using `write + prune` because in that case
we first add the `tombstone_all` key and after that we write the new
version.

**Solution**
In case there is `tombstone_all` key, in
`VersionMapImpl::follow_version_chain` we need to keep track of the
oldest index which was read and stop reading only if
`tombstone_all_->version_id()` is greater or equal to the `version_id`
of the oldest index.

---------

Co-authored-by: Vasil Pashov <Vasil.Pashov@man.com>
  • Loading branch information
vasil-pashov and Vasil Pashov authored Jun 9, 2023
1 parent 69412a8 commit d454f0e
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 109 deletions.
1 change: 1 addition & 0 deletions cpp/arcticdb/util/error_code.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ inline std::unordered_map<ErrorCategory, const char*> get_error_category_names()
ERROR_CODE(4004, E_OPERATION_NOT_SUPPORTED_WITH_PICKLED_DATA) \
ERROR_CODE(5000, E_KEY_NOT_FOUND) \
ERROR_CODE(5001, E_DUPLICATE_KEY) \
ERROR_CODE(5002, E_SYMBOL_NOT_FOUND) \
ERROR_CODE(6000, E_UNSORTED_DATA) \
ERROR_CODE(7000, E_INVALID_USER_ARGUMENT) \
ERROR_CODE(7001, E_INVALID_DECIMAL_STRING) \
Expand Down
3 changes: 0 additions & 3 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1235,9 +1235,6 @@ void LocalVersionedEngine::configure(const storage::LibraryDescriptor::VariantSt
if(cfg.has_failure_sim()) {
store->set_failure_sim(cfg.failure_sim());
}
if(cfg.write_options().descriptor()->FindFieldByLowercaseName("fast_tombstone_all")) {
version_map->set_fast_tombstone_all(cfg.write_options().fast_tombstone_all());
}
if(cfg.write_options().has_sync_passive()) {
version_map->set_log_changes(cfg.write_options().sync_passive().enabled());
}
Expand Down
24 changes: 1 addition & 23 deletions cpp/arcticdb/version/test/rapidcheck_version_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,6 @@ struct WriteVersion : rc::state::Command<Model, MapStorePair> {
}
};

template <typename Model>
struct SetUseFastTombstoneAll : rc::state::Command<Model, MapStorePair> {
bool value_;

explicit SetUseFastTombstoneAll(const Model&) ARCTICDB_UNUSED:
value_(*rc::gen::arbitrary<bool>()) {}

void apply(Model &) const override {
}

void run(const Model& , MapStorePair & sut) const override {
sut.map_->set_fast_tombstone_all(value_);
}

void show(std::ostream &os) const override {
os << "SetUseFastTombstoneAll(" << value_ << ")";
}
};

template <typename Model>
struct DeleteAllVersions : rc::state::Command<Model, MapStorePair> {
std::string symbol_;
Expand Down Expand Up @@ -289,9 +270,6 @@ RC_GTEST_PROP(VersionMap, RapidcheckTombstones, ()) {
GetLatestVersion<VersionMapTombstonesModel>,
GetAllVersions<VersionMapTombstonesModel>,
DeleteAllVersions<VersionMapTombstonesModel>,
Compact<VersionMapTombstonesModel>,
SetUseFastTombstoneAll<VersionMapTombstonesModel>>()
//CompactAndRemoveDeleted<VersionMapTombstonesModel>>()

Compact<VersionMapTombstonesModel>>()
);
}
19 changes: 0 additions & 19 deletions cpp/arcticdb/version/test/test_version_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,25 +316,6 @@ TEST(VersionMap, TombstoneAllTwice) {
// Don't need a check condition, checking validation
}

TEST(VersionMap, TombstoneAllRemoveFastTombstone) {
auto store = std::make_shared<InMemoryStore>();
StreamId id{"test1"};
THREE_SIMPLE_KEYS

auto version_map = std::make_shared<VersionMap>();
version_map->set_fast_tombstone_all(true);
version_map->set_validate(true);
version_map->write_and_prune_previous(store, key1, std::nullopt);
auto maybe_prev = get_latest_version(store, version_map, id, true, false);
version_map->write_and_prune_previous(store, key2, maybe_prev.value());
version_map->set_fast_tombstone_all(false);
maybe_prev = get_latest_version(store, version_map, id, true, false);
version_map->write_and_prune_previous(store, key3, maybe_prev.value());
auto versions = get_all_versions(store, version_map, id, true, false);
ASSERT_EQ(versions.size(), 1);
ASSERT_EQ(versions[0], key3);
}

void write_old_style_journal_entry(const AtomKey &key, std::shared_ptr<StreamSink> store) {
IndexAggregator<RowCountIndex> journal_agg(key.id(), [&](auto &&segment) {
store->write(KeyType::VERSION_JOURNAL,
Expand Down
79 changes: 25 additions & 54 deletions cpp/arcticdb/version/version_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ class VersionMapImpl {
MapType map_;
bool validate_ = false;
bool log_changes_ = false;
bool fast_tombstone_all_ = false;
std::optional<timestamp> reload_interval_;
mutable std::mutex map_mutex_;
std::shared_ptr<LockTable> lock_table_ = std::make_shared<LockTable>();
Expand All @@ -124,10 +123,6 @@ class VersionMapImpl {
validate_ = value;
}

void set_fast_tombstone_all(bool value) {
fast_tombstone_all_ = value;
}

void set_log_changes(bool value) {
log_changes_ = value;
}
Expand All @@ -152,17 +147,21 @@ class VersionMapImpl {
auto next_key = ref_entry.head_;
entry->head_ = ref_entry.head_;
auto loaded_until = std::numeric_limits<VersionId>::max();
VersionId oldest_loaded_index_version = std::numeric_limits<VersionId>::max();

if ((load_params.load_type_ == LoadType::LOAD_LATEST || load_params.load_type_ == LoadType::LOAD_LATEST_UNDELETED ) && is_index_key_type(ref_entry.keys_[0].type())) {
if ((load_params.load_type_ == LoadType::LOAD_LATEST || load_params.load_type_ == LoadType::LOAD_LATEST_UNDELETED)
&& is_index_key_type(ref_entry.keys_[0].type()))
{
entry->keys_.push_back(ref_entry.keys_[0]);
} else {
do {
auto [key, seg] = store->read_sync(next_key.value());
std::tie(next_key, loaded_until) = read_segment_with_keys(seg, entry);
oldest_loaded_index_version = std::min(oldest_loaded_index_version, loaded_until);
} while (next_key
&& need_to_load_further(load_params, loaded_until)
&& load_latest_ongoing(load_params, entry)
&& looking_for_undeleted(load_params, entry));
&& looking_for_undeleted(load_params, entry, oldest_loaded_index_version));

if(load_params.load_type_ == LoadType::LOAD_DOWNTO)
entry->loaded_until_ = loaded_until;
Expand All @@ -173,7 +172,7 @@ class VersionMapImpl {
std::shared_ptr<Store> store,
const StreamId& stream_id,
const LoadParameter load_params,
const std::shared_ptr<VersionMapEntry>& entry) {
const std::shared_ptr<VersionMapEntry>& entry) {
load_params.validate();
auto max_trials = ConfigsMap::instance()->get_int("VersionMap.MaxReadRefTrials", 2);
while (max_trials--) {
Expand Down Expand Up @@ -242,10 +241,7 @@ class VersionMapImpl {
}

/**
* Tombstone all non-deleted versions of the given stream and do the related housekeeping. It works in two modes:
* - fast_tombstone_all_: Writes a TOMBSTONE_ALL for the latest undeleted version
* - Otherwise: writes individual TOMBSTONES for each version (replaces any previous TOMBSTONE_ALL)
*
* Tombstone all non-deleted versions of the given stream and do the related housekeeping.
* @param first_key_to_tombstone The first key in the version chain that should be tombstoned. When empty
* then the first index key onwards is tombstoned, so the whole chain is tombstoned.
*/
Expand All @@ -267,9 +263,13 @@ class VersionMapImpl {
const AtomKey &key, const
std::optional<AtomKey>& previous_key) {
ARCTICDB_DEBUG(log::version(), "Version map pruning previous versions for stream {}", key.id());
auto load_type = fast_tombstone_all_ ? LoadType::LOAD_UNDELETED : LoadType::LOAD_ALL;
auto entry = check_reload(store, key.id(), LoadParameter{load_type}, true, false,
__FUNCTION__);
auto entry = check_reload(
store,
key.id(),
LoadParameter{LoadType::LOAD_UNDELETED},
true,
false,
__FUNCTION__);
auto result = tombstone_from_key_or_all_internal(store, key.id(), previous_key, entry);

do_write(store, key, entry);
Expand Down Expand Up @@ -884,9 +884,13 @@ class VersionMapImpl {
std::optional<AtomKey> first_key_to_tombstone = std::nullopt,
std::shared_ptr<VersionMapEntry> entry = nullptr) {
if (!entry) {
auto load_type = fast_tombstone_all_ ? LoadType::LOAD_UNDELETED : LoadType::LOAD_ALL;
entry = check_reload(store, stream_id, LoadParameter{load_type}, true, false,
__FUNCTION__);
entry = check_reload(
store,
stream_id,
LoadParameter{LoadType::LOAD_UNDELETED},
true,
false,
__FUNCTION__);
}
if (!first_key_to_tombstone)
first_key_to_tombstone = entry->get_first_index(false);
Expand All @@ -900,44 +904,11 @@ class VersionMapImpl {
}

if (!output.empty()) {
if(fast_tombstone_all_) {
auto tombstone_key = write_tombstone_all_key(store, first_key_to_tombstone.value(), entry);

if(log_changes_)
log_tombstone_all(store, stream_id, tombstone_key.version_id());

} else {
for (const auto &index : output) {
auto tombstone = write_tombstone(store, index, index.id(), entry);
entry->tombstones_.insert(std::make_pair(index.version_id(), std::move(tombstone)));
}
auto tombstone_key = write_tombstone_all_key(store, first_key_to_tombstone.value(), entry);
if(log_changes_) {
log_tombstone_all(store, stream_id, tombstone_key.version_id());
}
}

// Get rid of tombstone_all key if not set on this library
if(!fast_tombstone_all_ && entry->tombstone_all_) {
auto all_indexes = entry->get_indexes(true);
for(const auto& index_key : all_indexes) {
if(entry->is_tombstoned_via_tombstone_all(index_key.version_id()) &&
!entry->has_individual_tombstone(index_key.version_id())) {
auto tombstone = index_to_tombstone(index_key, index_key.id(), store->current_timestamp());
entry->tombstones_.try_emplace(tombstone.version_id(), tombstone);
entry->keys_.push_front(tombstone);
}
}

remove_entry_version_keys(store, entry, stream_id);
entry->keys_.erase(std::remove_if(
std::begin(entry->keys_),
std::end(entry->keys_),
[](const auto& k){
return k.type() == KeyType::TOMBSTONE_ALL || k.type() == KeyType::VERSION; }),
std::end(entry->keys_));

auto version_id = all_indexes.begin()->version_id();
entry->head_ = write_entry_to_storage(store, stream_id, version_id, entry);
}

return output;
}
};
Expand Down
11 changes: 8 additions & 3 deletions cpp/arcticdb/version/version_store_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -934,10 +934,15 @@ void PythonVersionStore::fix_symbol_trees(const std::vector<StreamId>& symbols)

void PythonVersionStore::prune_previous_versions(const StreamId& stream_id) {
ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: prune_previous_versions stream_id={}", stream_id);
const auto entry = version_map()->check_reload(store(), stream_id, LoadParameter{LoadType::LOAD_UNDELETED},
true, false, __FUNCTION__);
const std::shared_ptr<VersionMapEntry>& entry = version_map()->check_reload(
store(),
stream_id,
LoadParameter{LoadType::LOAD_UNDELETED},
true,
false,
__FUNCTION__);
storage::check<ErrorCode::E_SYMBOL_NOT_FOUND>(!entry->empty(), "Symbol {} is not found", stream_id);
auto latest = entry->get_first_index(false);
util::check(latest.has_value(), "Cannot prune previous versions for non-existent symbol {}", stream_id);

auto prev_id = get_prev_version_in_entry(entry, latest->version_id());
if (!prev_id) {
Expand Down
28 changes: 21 additions & 7 deletions cpp/arcticdb/version/version_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ inline std::pair<std::optional<AtomKey>, VersionId> read_segment_with_keys(
VersionMapEntry& entry) {
ssize_t row = 0;
std::optional<AtomKey> next;
VersionId oldest_loaded = std::numeric_limits<VersionId>::max();
VersionId oldest_loaded_index = std::numeric_limits<VersionId>::max();
for (; row < ssize_t(seg.row_count()); ++row) {
auto key = read_key_row(seg, row);
ARCTICDB_TRACE(log::version(), "Reading key {}", key);
if (is_index_key_type(key.type())) {
entry.keys_.push_back(key);
oldest_loaded = std::min(oldest_loaded, key.version_id());
oldest_loaded_index = std::min(oldest_loaded_index, key.version_id());
} else if (key.type() == KeyType::TOMBSTONE) {
entry.tombstones_.try_emplace(key.version_id(), key);
entry.keys_.push_back(key);
Expand All @@ -96,7 +96,7 @@ inline std::pair<std::optional<AtomKey>, VersionId> read_segment_with_keys(
}
}
util::check(row == ssize_t(seg.row_count()), "Unexpected ordering in journal segment");
return std::make_pair(next, oldest_loaded);
return std::make_pair(next, oldest_loaded_index);
}

inline std::pair<std::optional<AtomKey>, VersionId> read_segment_with_keys(
Expand Down Expand Up @@ -217,12 +217,26 @@ inline bool load_latest_ongoing(const LoadParameter& load_params, const std::sha
return false;
}

inline bool looking_for_undeleted(const LoadParameter& load_params, const std::shared_ptr<VersionMapEntry>& entry) {
if(!(load_params.load_type_ == LoadType::LOAD_UNDELETED && entry->tombstone_all_))
inline bool looking_for_undeleted(const LoadParameter& load_params, const std::shared_ptr<VersionMapEntry>& entry, const VersionId& oldest_loaded_index) {
if(load_params.load_type_ != LoadType::LOAD_UNDELETED) {
return true;
}

ARCTICDB_DEBUG(log::version(), "Exiting because we have found an undeleted version");
return false;
if(entry->tombstone_all_) {
const bool is_deleted_by_tombstone_all = entry->tombstone_all_->version_id() >= oldest_loaded_index;
if(is_deleted_by_tombstone_all) {
ARCTICDB_DEBUG(
log::version(),
"Exiting because tombstone all key deletes all versions beyond: {} and the oldest loaded index has version: {}",
entry->tombstone_all_->version_id(),
oldest_loaded_index);
return false;
} else {
return true;
}
} else {
return true;
}
}

void fix_stream_ids_of_index_keys(
Expand Down

0 comments on commit d454f0e

Please sign in to comment.