Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assume fast tombstone is true and fix bug in VersionMapImpl::follow_version_chain #366

Merged
merged 6 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/arcticdb/util/error_code.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ inline std::unordered_map<ErrorCategory, const char*> get_error_category_names()
ERROR_CODE(4004, E_OPERATION_NOT_SUPPORTED_WITH_PICKLED_DATA) \
ERROR_CODE(5000, E_KEY_NOT_FOUND) \
ERROR_CODE(5001, E_DUPLICATE_KEY) \
ERROR_CODE(5002, E_SYMBOL_NOT_FOUND) \
ERROR_CODE(6000, E_UNSORTED_DATA) \
ERROR_CODE(7000, E_INVALID_USER_ARGUMENT) \
ERROR_CODE(7001, E_INVALID_DECIMAL_STRING) \
Expand Down
3 changes: 0 additions & 3 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1235,9 +1235,6 @@ void LocalVersionedEngine::configure(const storage::LibraryDescriptor::VariantSt
if(cfg.has_failure_sim()) {
store->set_failure_sim(cfg.failure_sim());
}
if(cfg.write_options().descriptor()->FindFieldByLowercaseName("fast_tombstone_all")) {
version_map->set_fast_tombstone_all(cfg.write_options().fast_tombstone_all());
}
if(cfg.write_options().has_sync_passive()) {
version_map->set_log_changes(cfg.write_options().sync_passive().enabled());
}
Expand Down
24 changes: 1 addition & 23 deletions cpp/arcticdb/version/test/rapidcheck_version_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,6 @@ struct WriteVersion : rc::state::Command<Model, MapStorePair> {
}
};

template <typename Model>
struct SetUseFastTombstoneAll : rc::state::Command<Model, MapStorePair> {
bool value_;

explicit SetUseFastTombstoneAll(const Model&) ARCTICDB_UNUSED:
value_(*rc::gen::arbitrary<bool>()) {}

void apply(Model &) const override {
}

void run(const Model& , MapStorePair & sut) const override {
sut.map_->set_fast_tombstone_all(value_);
}

void show(std::ostream &os) const override {
os << "SetUseFastTombstoneAll(" << value_ << ")";
}
};

template <typename Model>
struct DeleteAllVersions : rc::state::Command<Model, MapStorePair> {
std::string symbol_;
Expand Down Expand Up @@ -289,9 +270,6 @@ RC_GTEST_PROP(VersionMap, RapidcheckTombstones, ()) {
GetLatestVersion<VersionMapTombstonesModel>,
GetAllVersions<VersionMapTombstonesModel>,
DeleteAllVersions<VersionMapTombstonesModel>,
Compact<VersionMapTombstonesModel>,
SetUseFastTombstoneAll<VersionMapTombstonesModel>>()
//CompactAndRemoveDeleted<VersionMapTombstonesModel>>()

Compact<VersionMapTombstonesModel>>()
);
}
19 changes: 0 additions & 19 deletions cpp/arcticdb/version/test/test_version_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,25 +316,6 @@ TEST(VersionMap, TombstoneAllTwice) {
// Don't need a check condition, checking validation
}

TEST(VersionMap, TombstoneAllRemoveFastTombstone) {
auto store = std::make_shared<InMemoryStore>();
StreamId id{"test1"};
THREE_SIMPLE_KEYS

auto version_map = std::make_shared<VersionMap>();
version_map->set_fast_tombstone_all(true);
version_map->set_validate(true);
version_map->write_and_prune_previous(store, key1, std::nullopt);
auto maybe_prev = get_latest_version(store, version_map, id, true, false);
version_map->write_and_prune_previous(store, key2, maybe_prev.value());
version_map->set_fast_tombstone_all(false);
vasil-pashov marked this conversation as resolved.
Show resolved Hide resolved
maybe_prev = get_latest_version(store, version_map, id, true, false);
version_map->write_and_prune_previous(store, key3, maybe_prev.value());
auto versions = get_all_versions(store, version_map, id, true, false);
ASSERT_EQ(versions.size(), 1);
ASSERT_EQ(versions[0], key3);
}

void write_old_style_journal_entry(const AtomKey &key, std::shared_ptr<StreamSink> store) {
IndexAggregator<RowCountIndex> journal_agg(key.id(), [&](auto &&segment) {
store->write(KeyType::VERSION_JOURNAL,
Expand Down
79 changes: 25 additions & 54 deletions cpp/arcticdb/version/version_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ class VersionMapImpl {
MapType map_;
bool validate_ = false;
bool log_changes_ = false;
bool fast_tombstone_all_ = false;
std::optional<timestamp> reload_interval_;
mutable std::mutex map_mutex_;
std::shared_ptr<LockTable> lock_table_ = std::make_shared<LockTable>();
Expand All @@ -124,10 +123,6 @@ class VersionMapImpl {
validate_ = value;
}

void set_fast_tombstone_all(bool value) {
fast_tombstone_all_ = value;
}

void set_log_changes(bool value) {
log_changes_ = value;
}
Expand All @@ -152,17 +147,21 @@ class VersionMapImpl {
auto next_key = ref_entry.head_;
entry->head_ = ref_entry.head_;
auto loaded_until = std::numeric_limits<VersionId>::max();
VersionId oldest_loaded_index_version = std::numeric_limits<VersionId>::max();

if ((load_params.load_type_ == LoadType::LOAD_LATEST || load_params.load_type_ == LoadType::LOAD_LATEST_UNDELETED ) && is_index_key_type(ref_entry.keys_[0].type())) {
if ((load_params.load_type_ == LoadType::LOAD_LATEST || load_params.load_type_ == LoadType::LOAD_LATEST_UNDELETED)
&& is_index_key_type(ref_entry.keys_[0].type()))
{
entry->keys_.push_back(ref_entry.keys_[0]);
} else {
do {
auto [key, seg] = store->read_sync(next_key.value());
std::tie(next_key, loaded_until) = read_segment_with_keys(seg, entry);
oldest_loaded_index_version = std::min(oldest_loaded_index_version, loaded_until);
vasil-pashov marked this conversation as resolved.
Show resolved Hide resolved
} while (next_key
&& need_to_load_further(load_params, loaded_until)
&& load_latest_ongoing(load_params, entry)
&& looking_for_undeleted(load_params, entry));
&& looking_for_undeleted(load_params, entry, oldest_loaded_index_version));

if(load_params.load_type_ == LoadType::LOAD_DOWNTO)
entry->loaded_until_ = loaded_until;
Expand All @@ -173,7 +172,7 @@ class VersionMapImpl {
std::shared_ptr<Store> store,
const StreamId& stream_id,
const LoadParameter load_params,
const std::shared_ptr<VersionMapEntry>& entry) {
const std::shared_ptr<VersionMapEntry>& entry) {
load_params.validate();
auto max_trials = ConfigsMap::instance()->get_int("VersionMap.MaxReadRefTrials", 2);
while (max_trials--) {
Expand Down Expand Up @@ -242,10 +241,7 @@ class VersionMapImpl {
}

/**
* Tombstone all non-deleted versions of the given stream and do the related housekeeping. It works in two modes:
* - fast_tombstone_all_: Writes a TOMBSTONE_ALL for the latest undeleted version
* - Otherwise: writes individual TOMBSTONES for each version (replaces any previous TOMBSTONE_ALL)
*
* Tombstone all non-deleted versions of the given stream and do the related housekeeping.
* @param first_key_to_tombstone The first key in the version chain that should be tombstoned. When empty
* then the first index key onwards is tombstoned, so the whole chain is tombstoned.
*/
Expand All @@ -267,9 +263,13 @@ class VersionMapImpl {
const AtomKey &key, const
std::optional<AtomKey>& previous_key) {
ARCTICDB_DEBUG(log::version(), "Version map pruning previous versions for stream {}", key.id());
auto load_type = fast_tombstone_all_ ? LoadType::LOAD_UNDELETED : LoadType::LOAD_ALL;
auto entry = check_reload(store, key.id(), LoadParameter{load_type}, true, false,
__FUNCTION__);
auto entry = check_reload(
store,
key.id(),
LoadParameter{LoadType::LOAD_UNDELETED},
true,
false,
__FUNCTION__);
auto result = tombstone_from_key_or_all_internal(store, key.id(), previous_key, entry);

do_write(store, key, entry);
Expand Down Expand Up @@ -884,9 +884,13 @@ class VersionMapImpl {
std::optional<AtomKey> first_key_to_tombstone = std::nullopt,
std::shared_ptr<VersionMapEntry> entry = nullptr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am starting to wonder if the first_key_to_tombstone argument is safe before this change even @willdealtry

Calls using it basically assumes the version map has not changed without checking the timeout.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the case where we're writing version 5 with prune previous, we always want to tombstone from version 4 backwards, otherwise we might end up deleting the version we just wrote.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, that's what it was for.

Would you agree that, ideally, we should check_reload and then locate the version key in the VersionMapEntry again? (I.e. not accepting the entry argument.)

(Not that we should make that change in this PR

if (!entry) {
auto load_type = fast_tombstone_all_ ? LoadType::LOAD_UNDELETED : LoadType::LOAD_ALL;
entry = check_reload(store, stream_id, LoadParameter{load_type}, true, false,
__FUNCTION__);
entry = check_reload(
store,
stream_id,
LoadParameter{LoadType::LOAD_UNDELETED},
true,
false,
__FUNCTION__);
}
if (!first_key_to_tombstone)
first_key_to_tombstone = entry->get_first_index(false);
Expand All @@ -900,44 +904,11 @@ class VersionMapImpl {
}

if (!output.empty()) {
if(fast_tombstone_all_) {
auto tombstone_key = write_tombstone_all_key(store, first_key_to_tombstone.value(), entry);

if(log_changes_)
log_tombstone_all(store, stream_id, tombstone_key.version_id());

} else {
for (const auto &index : output) {
auto tombstone = write_tombstone(store, index, index.id(), entry);
entry->tombstones_.insert(std::make_pair(index.version_id(), std::move(tombstone)));
}
auto tombstone_key = write_tombstone_all_key(store, first_key_to_tombstone.value(), entry);
if(log_changes_) {
log_tombstone_all(store, stream_id, tombstone_key.version_id());
}
}

// Get rid of tombstone_all key if not set on this library
if(!fast_tombstone_all_ && entry->tombstone_all_) {
auto all_indexes = entry->get_indexes(true);
for(const auto& index_key : all_indexes) {
if(entry->is_tombstoned_via_tombstone_all(index_key.version_id()) &&
!entry->has_individual_tombstone(index_key.version_id())) {
auto tombstone = index_to_tombstone(index_key, index_key.id(), store->current_timestamp());
entry->tombstones_.try_emplace(tombstone.version_id(), tombstone);
entry->keys_.push_front(tombstone);
}
}

remove_entry_version_keys(store, entry, stream_id);
entry->keys_.erase(std::remove_if(
std::begin(entry->keys_),
std::end(entry->keys_),
[](const auto& k){
return k.type() == KeyType::TOMBSTONE_ALL || k.type() == KeyType::VERSION; }),
std::end(entry->keys_));

auto version_id = all_indexes.begin()->version_id();
entry->head_ = write_entry_to_storage(store, stream_id, version_id, entry);
}

return output;
}
};
Expand Down
11 changes: 8 additions & 3 deletions cpp/arcticdb/version/version_store_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -934,10 +934,15 @@ void PythonVersionStore::fix_symbol_trees(const std::vector<StreamId>& symbols)

void PythonVersionStore::prune_previous_versions(const StreamId& stream_id) {
ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: prune_previous_versions stream_id={}", stream_id);
const auto entry = version_map()->check_reload(store(), stream_id, LoadParameter{LoadType::LOAD_UNDELETED},
true, false, __FUNCTION__);
const std::shared_ptr<VersionMapEntry>& entry = version_map()->check_reload(
store(),
stream_id,
LoadParameter{LoadType::LOAD_UNDELETED},
true,
false,
__FUNCTION__);
storage::check<ErrorCode::E_SYMBOL_NOT_FOUND>(!entry->empty(), "Symbol {} is not found", stream_id);
auto latest = entry->get_first_index(false);
util::check(latest.has_value(), "Cannot prune previous versions for non-existent symbol {}", stream_id);

auto prev_id = get_prev_version_in_entry(entry, latest->version_id());
if (!prev_id) {
Expand Down
28 changes: 21 additions & 7 deletions cpp/arcticdb/version/version_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ inline std::pair<std::optional<AtomKey>, VersionId> read_segment_with_keys(
VersionMapEntry& entry) {
ssize_t row = 0;
std::optional<AtomKey> next;
VersionId oldest_loaded = std::numeric_limits<VersionId>::max();
VersionId oldest_loaded_index = std::numeric_limits<VersionId>::max();
for (; row < ssize_t(seg.row_count()); ++row) {
auto key = read_key_row(seg, row);
ARCTICDB_TRACE(log::version(), "Reading key {}", key);
if (is_index_key_type(key.type())) {
entry.keys_.push_back(key);
oldest_loaded = std::min(oldest_loaded, key.version_id());
oldest_loaded_index = std::min(oldest_loaded_index, key.version_id());
} else if (key.type() == KeyType::TOMBSTONE) {
entry.tombstones_.try_emplace(key.version_id(), key);
entry.keys_.push_back(key);
Expand All @@ -96,7 +96,7 @@ inline std::pair<std::optional<AtomKey>, VersionId> read_segment_with_keys(
}
}
util::check(row == ssize_t(seg.row_count()), "Unexpected ordering in journal segment");
return std::make_pair(next, oldest_loaded);
return std::make_pair(next, oldest_loaded_index);
}

inline std::pair<std::optional<AtomKey>, VersionId> read_segment_with_keys(
Expand Down Expand Up @@ -217,12 +217,26 @@ inline bool load_latest_ongoing(const LoadParameter& load_params, const std::sha
return false;
}

inline bool looking_for_undeleted(const LoadParameter& load_params, const std::shared_ptr<VersionMapEntry>& entry) {
if(!(load_params.load_type_ == LoadType::LOAD_UNDELETED && entry->tombstone_all_))
inline bool looking_for_undeleted(const LoadParameter& load_params, const std::shared_ptr<VersionMapEntry>& entry, const VersionId& oldest_loaded_index) {
qc00 marked this conversation as resolved.
Show resolved Hide resolved
if(load_params.load_type_ != LoadType::LOAD_UNDELETED) {
return true;
}

ARCTICDB_DEBUG(log::version(), "Exiting because we have found an undeleted version");
return false;
if(entry->tombstone_all_) {
const bool is_deleted_by_tombstone_all = entry->tombstone_all_->version_id() >= oldest_loaded_index;
if(is_deleted_by_tombstone_all) {
ARCTICDB_DEBUG(
log::version(),
"Exiting because tombstone all key deletes all versions beyond: {} and the oldest loaded index has version: {}",
entry->tombstone_all_->version_id(),
oldest_loaded_index);
return false;
} else {
return true;
}
} else {
return true;
}
}

void fix_stream_ids_of_index_keys(
Expand Down