diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 244b7c8faed..b053698b2bd 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -871,8 +871,8 @@ folly::Future LocalVersionedEngine::delete_trees_responsibly( auto min_versions = min_versions_for_each_stream(orig_keys_to_delete); for (const auto& min : min_versions) { auto load_param = load_type == LoadType::LOAD_DOWNTO - ? LoadParameter{load_type, static_cast(min.second)} - : LoadParameter{load_type}; + ? LoadParameter{load_type, ToLoad::ANY, static_cast(min.second)} + : LoadParameter{load_type, ToLoad::ANY}; const auto entry = version_map()->check_reload(store(), min.first, load_param, __FUNCTION__); entry_map.try_emplace(std::move(min.first), entry); } diff --git a/cpp/arcticdb/version/test/test_stream_version_data.cpp b/cpp/arcticdb/version/test/test_stream_version_data.cpp index f51934001c6..90f888349e2 100644 --- a/cpp/arcticdb/version/test/test_stream_version_data.cpp +++ b/cpp/arcticdb/version/test/test_stream_version_data.cpp @@ -12,8 +12,9 @@ TEST(StreamVersionData, SpecificVersion) { VersionQuery query_2{SpecificVersionQuery{VersionId(4)}, false}; stream_version_data.react(query_2); ASSERT_EQ(stream_version_data.count_, 2); - ASSERT_EQ(stream_version_data.load_param_.load_type_, LoadType::LOAD_DOWNTO); - ASSERT_EQ(stream_version_data.load_param_.load_until_version_, 4); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_DOWNTO); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.to_load_, ToLoad::UNDELETED); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_until_version_, 4); } TEST(StreamVersionData, SpecificVersionReversed) { @@ -24,8 +25,9 @@ TEST(StreamVersionData, SpecificVersionReversed) { VersionQuery query_2{SpecificVersionQuery{VersionId(12)}, false}; stream_version_data.react(query_2); ASSERT_EQ(stream_version_data.count_, 2); - ASSERT_EQ(stream_version_data.load_param_.load_type_, LoadType::LOAD_DOWNTO); - ASSERT_EQ(stream_version_data.load_param_.load_until_version_, 4); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_DOWNTO); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.to_load_, ToLoad::UNDELETED); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_until_version_, 4); } TEST(StreamVersionData, Timestamp) { @@ -38,8 +40,9 @@ TEST(StreamVersionData, Timestamp) { VersionQuery query_2{TimestampVersionQuery{timestamp(4)}, false}; stream_version_data.react(query_2); ASSERT_EQ(stream_version_data.count_, 2); - ASSERT_EQ(stream_version_data.load_param_.load_type_, LoadType::LOAD_FROM_TIME); - ASSERT_EQ(stream_version_data.load_param_.load_from_time_, 4); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_FROM_TIME); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.to_load_, ToLoad::UNDELETED); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_from_time_, 4); } TEST(StreamVersionData, TimestampUnordered) { @@ -54,8 +57,9 @@ TEST(StreamVersionData, TimestampUnordered) { VersionQuery query_3{TimestampVersionQuery{timestamp(4)}, false}; stream_version_data.react(query_3); ASSERT_EQ(stream_version_data.count_, 3); - ASSERT_EQ(stream_version_data.load_param_.load_type_, LoadType::LOAD_FROM_TIME); - ASSERT_EQ(stream_version_data.load_param_.load_from_time_, 3); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_FROM_TIME); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.to_load_, ToLoad::UNDELETED); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_from_time_, 3); } TEST(StreamVersionData, Latest) { @@ -66,8 +70,9 @@ TEST(StreamVersionData, Latest) { VersionQuery query_1{std::monostate{}, false}; stream_version_data.react(query_1); ASSERT_EQ(stream_version_data.count_, 1); - ASSERT_EQ(stream_version_data.load_param_.load_type_, LoadType::LOAD_LATEST_UNDELETED); - ASSERT_EQ(stream_version_data.load_param_.load_until_version_.has_value(), false); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_LATEST); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.to_load_, ToLoad::UNDELETED); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_until_version_.has_value(), false); } TEST(StreamVersionData, SpecificToTimestamp) { @@ -80,9 +85,10 @@ TEST(StreamVersionData, SpecificToTimestamp) { VersionQuery query_2{TimestampVersionQuery{timestamp(3)}, false}; stream_version_data.react(query_2); ASSERT_EQ(stream_version_data.count_, 2); - ASSERT_EQ(stream_version_data.load_param_.load_type_, LoadType::LOAD_UNDELETED); - ASSERT_EQ(stream_version_data.load_param_.load_until_version_.has_value(), false); - ASSERT_EQ(stream_version_data.load_param_.load_from_time_.has_value(), false); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_ALL); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.to_load_, ToLoad::UNDELETED); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_until_version_.has_value(), false); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_from_time_.has_value(), false); } TEST(StreamVersionData, TimestampToSpecific) { @@ -95,7 +101,8 @@ TEST(StreamVersionData, TimestampToSpecific) { VersionQuery query_2{SpecificVersionQuery{VersionId(12)}, false}; stream_version_data.react(query_2); ASSERT_EQ(stream_version_data.count_, 2); - ASSERT_EQ(stream_version_data.load_param_.load_type_, LoadType::LOAD_UNDELETED); - ASSERT_EQ(stream_version_data.load_param_.load_until_version_.has_value(), false); - ASSERT_EQ(stream_version_data.load_param_.load_from_time_.has_value(), false); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_ALL); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.to_load_, ToLoad::UNDELETED); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_until_version_.has_value(), false); + ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_from_time_.has_value(), false); } \ No newline at end of file diff --git a/cpp/arcticdb/version/test/test_version_map.cpp b/cpp/arcticdb/version/test/test_version_map.cpp index 2819796f60f..be5f37c08af 100644 --- a/cpp/arcticdb/version/test/test_version_map.cpp +++ b/cpp/arcticdb/version/test/test_version_map.cpp @@ -209,7 +209,7 @@ TEST(VersionMap, TestLoadsRefAndIteration) { version_map->load_via_iteration(store, id, entry_iteration); auto entry_ref = std::make_shared(); - version_map->load_via_ref_key(store, id, LoadParameter{LoadType::LOAD_ALL}, entry_ref); + version_map->load_via_ref_key(store, id, LoadStrategy{LoadType::LOAD_ALL, ToLoad::ANY}, entry_ref); ASSERT_EQ(entry_iteration->head_, entry_ref->head_); ASSERT_EQ(entry_iteration->keys_.size(), entry_ref->keys_.size()); @@ -452,7 +452,7 @@ TEST(VersionMap, FixRefKeyTombstones) { auto key5 = atom_key_with_version(id, 1, 1696590624590123209); version_map->write_version(store, key5, key4); auto key6 = atom_key_with_version(id, 0, 1696590624612743245); - auto entry = version_map->check_reload(store, id, LoadParameter{LoadType::LOAD_LATEST}, __FUNCTION__); + auto entry = version_map->check_reload(store, id, LoadParameter{LoadType::LOAD_LATEST, ToLoad::ANY}, __FUNCTION__); version_map->journal_single_key(store, key5, entry->head_.value()); auto valid = version_map->check_ref_key(store, id); @@ -578,15 +578,13 @@ std::shared_ptr write_two_versions(std::shared_ptrcheck_reload( store, id, - LoadParameter{LoadType::NOT_LOADED}, + LoadParameter{LoadType::NOT_LOADED, ToLoad::ANY}, __FUNCTION__); auto key1 = atom_key_with_version(id, 0, 0); version_map->do_write(store, key1, entry); - write_symbol_ref(store, key1, std::nullopt, entry->head_.value()); auto key2 = atom_key_with_version(id, 1, 1); version_map->do_write(store, key2, entry); - write_symbol_ref(store, key2, std::nullopt, entry->head_.value()); return entry; } @@ -596,7 +594,7 @@ void write_alternating_deleted_undeleted(std::shared_ptr store, s auto entry = version_map->check_reload( store, id, - LoadParameter{LoadType::NOT_LOADED}, + LoadParameter{LoadType::NOT_LOADED, ToLoad::ANY}, __FUNCTION__); auto key1 = atom_key_with_version(id, 0, 0); @@ -605,24 +603,108 @@ void write_alternating_deleted_undeleted(std::shared_ptr store, s // Write version 0 version_map->do_write(store, key1, entry); - write_symbol_ref(store, key1, std::nullopt, entry->head_.value()); // Tombstone_all on version 0 version_map->delete_all_versions(store, id); // Write version 1 version_map->do_write(store, key2, entry); - write_symbol_ref(store, key2, std::nullopt, entry->head_.value()); // Write version 2 version_map->do_write(store, key3, entry); - write_symbol_ref(store, key3, std::nullopt, entry->head_.value()); // Tombstone version 2 version_map->write_tombstone(store, VersionId{2}, id, entry, timestamp{3}); } -TEST(VersionMap, FollowingVersionChainEndEarly) { +TEST(VersionMap, FollowingVersionChain){ + // Set up the version chain v0(tombstone_all) <- v1 <- v2(tombstoned) + auto store = std::make_shared(); + auto version_map = std::make_shared(); + StreamId id{"test"}; + write_alternating_deleted_undeleted(store, version_map, id); + + auto check_strategy_loads_to = [&](LoadStrategy load_strategy, VersionId should_load_to){ + auto ref_entry = VersionMapEntry{}; + read_symbol_ref(store, id, ref_entry); + auto follow_result = std::make_shared(); + + version_map->follow_version_chain(store, ref_entry, follow_result, load_strategy); + EXPECT_EQ(follow_result->loaded_with_progress_.oldest_loaded_index_version_, VersionId{should_load_to}); + }; + + check_strategy_loads_to(LoadStrategy{LoadType::LOAD_DOWNTO, ToLoad::ANY, static_cast(0)}, 0); + check_strategy_loads_to(LoadStrategy{LoadType::LOAD_DOWNTO, ToLoad::ANY, static_cast(-2)}, 1); + // DOWN_TO will not skip through tombstoned versions even when include_deleted=false + check_strategy_loads_to(LoadStrategy{LoadType::LOAD_DOWNTO, ToLoad::UNDELETED, static_cast(-1)}, 2); + check_strategy_loads_to(LoadStrategy{LoadType::LOAD_DOWNTO, ToLoad::UNDELETED, static_cast(0)}, 0); + + // FROM_TIME when include_deleted=false will skip through deleted versions to go to the latest undeleted version before the timestamp. + check_strategy_loads_to(LoadStrategy{LoadType::LOAD_FROM_TIME, ToLoad::UNDELETED, static_cast(10)}, 1); + check_strategy_loads_to(LoadStrategy{LoadType::LOAD_FROM_TIME, ToLoad::UNDELETED, static_cast(0)}, 0); + check_strategy_loads_to(LoadStrategy{LoadType::LOAD_FROM_TIME, ToLoad::ANY, static_cast(2)}, 2); + check_strategy_loads_to(LoadStrategy{LoadType::LOAD_FROM_TIME, ToLoad::ANY, static_cast(0)}, 0); + + check_strategy_loads_to(LoadStrategy{LoadType::LOAD_LATEST, ToLoad::ANY}, 2); + check_strategy_loads_to(LoadStrategy{LoadType::LOAD_LATEST, ToLoad::UNDELETED}, 1); + + check_strategy_loads_to(LoadStrategy{LoadType::LOAD_ALL, ToLoad::ANY}, 0); + check_strategy_loads_to(LoadStrategy{LoadType::LOAD_ALL, ToLoad::UNDELETED}, 0); +} + +TEST(VersionMap, FollowingVersionChainWithCaching){ + ScopedConfig sc("VersionMap.ReloadInterval", std::numeric_limits::max()); + // Set up the version chain v0(tombstone_all) <- v1 <- v2(tombstoned) + auto store = std::make_shared(); + auto version_map = std::make_shared(); + StreamId id{"test"}; + write_alternating_deleted_undeleted(store, version_map, id); + // We create an empty version map after populating the versions + version_map = std::make_shared(); + + auto check_loads_versions = [&](LoadParameter load_param, uint32_t should_load_any, uint32_t should_load_undeleted){ + auto loaded = version_map->check_reload(store, id, load_param, __FUNCTION__); + EXPECT_EQ(loaded->get_indexes(true).size(), should_load_any); + EXPECT_EQ(loaded->get_indexes(false).size(), should_load_undeleted); + }; + + check_loads_versions(LoadParameter{LoadType::LOAD_DOWNTO, ToLoad::ANY, static_cast(-1)}, 1, 0); + // LOAD_FROM_TIME should not be cached by the LOAD_DOWNTO and should reload from storage up to the latest undeleted version, hence loading 2 versions, 1 of which is undeleted. + check_loads_versions(LoadParameter{LoadType::LOAD_FROM_TIME, ToLoad::UNDELETED, static_cast(10)}, 2, 1); + // LOAD_LATEST should be cached by the LOAD_FROM_TIME, so we still have the same 2 loaded versions + check_loads_versions(LoadParameter{LoadType::LOAD_LATEST, ToLoad::ANY}, 2, 1); + // This LOAD_FROM_TIME should still use the cached 2 versions + check_loads_versions(LoadParameter{LoadType::LOAD_FROM_TIME, ToLoad::ANY, static_cast(1)}, 2, 1); + + // We just get the entry to use for the tombstone and the write + auto entry = version_map->check_reload( + store, + id, + LoadParameter{LoadType::NOT_LOADED, ToLoad::ANY}, + __FUNCTION__); + // We delete the only undeleted key + version_map->write_tombstone(store, VersionId{1}, id, entry, timestamp{4}); + + // LOAD_LATEST should still be cached, but the cached entry now needs to have no undeleted keys + check_loads_versions(LoadParameter{LoadType::LOAD_LATEST, ToLoad::ANY}, 2, 0); + // LOAD_FROM_TIME UNDELETED should no longer be cached even though we used the same request before because the undeleted key it went to got deleted. So it will load the entire version chain + check_loads_versions(LoadParameter{LoadType::LOAD_FROM_TIME, ToLoad::UNDELETED, static_cast(10)}, 3, 0); + + // We add a new undeleted key + auto key4 = atom_key_with_version(id, 3, 5); + version_map->do_write(store, key4, entry); + + // LOAD_LATEST should still be cached, but the cached entry now needs to have one more undeleted version + check_loads_versions(LoadParameter{LoadType::LOAD_LATEST, ToLoad::ANY}, 4, 1); + + // We delete everything with a tombstone_all + version_map->delete_all_versions(store, id); + + // LOAD_LATEST should still be cached, but now have no undeleted versions + check_loads_versions(LoadParameter{LoadType::LOAD_LATEST, ToLoad::ANY}, 4, 0); +} + +TEST(VersionMap, FollowingVersionChainEndEarlyOnTombstoneAll) { auto store = std::make_shared(); auto version_map = std::make_shared(); StreamId id{"test"}; @@ -635,18 +717,30 @@ TEST(VersionMap, FollowingVersionChainEndEarly) { read_symbol_ref(store, id, ref_entry); auto follow_result = std::make_shared(); - for (auto load_params: { - LoadParameter{LoadType::LOAD_DOWNTO, static_cast(0)}, - LoadParameter{LoadType::LOAD_FROM_TIME, static_cast(0)}, - LoadParameter{LoadType::LOAD_UNDELETED}, - LoadParameter{LoadType::LOAD_LATEST_UNDELETED} + for (auto load_strategy: { + LoadStrategy{LoadType::LOAD_DOWNTO, ToLoad::UNDELETED, static_cast(0)}, + LoadStrategy{LoadType::LOAD_FROM_TIME, ToLoad::UNDELETED, static_cast(0)}, + LoadStrategy{LoadType::LOAD_ALL, ToLoad::UNDELETED}, + LoadStrategy{LoadType::LOAD_LATEST, ToLoad::UNDELETED} }) { follow_result->clear(); - version_map->follow_version_chain(store, ref_entry, follow_result, load_params); - // When loading with any of the specified load params we should end following the version chain early + version_map->follow_version_chain(store, ref_entry, follow_result, load_strategy); + // When loading with any of the specified load strategies with include_deleted=false we should end following the version chain early // at version 1 because that's when we encounter the TOMBSTONE_ALL. EXPECT_EQ(follow_result->loaded_with_progress_.oldest_loaded_index_version_, VersionId{1}); } + + for (auto load_strategy: { + LoadStrategy{LoadType::LOAD_DOWNTO, ToLoad::ANY, static_cast(0)}, + LoadStrategy{LoadType::LOAD_FROM_TIME, ToLoad::ANY, static_cast(0)}, + LoadStrategy{LoadType::LOAD_ALL, ToLoad::ANY} + }) { + follow_result->clear(); + version_map->follow_version_chain(store, ref_entry, follow_result, load_strategy); + // When loading with any of the specified load strategies with include_deleted=true we should continue to the beginning + // at version 0 even though it was deleted. + EXPECT_EQ(follow_result->loaded_with_progress_.oldest_loaded_index_version_, VersionId{0}); + } } TEST(VersionMap, CacheInvalidation) { @@ -662,7 +756,7 @@ TEST(VersionMap, CacheInvalidation) { // Load to_load inside the clean version map cache clean_version_map->check_reload(store, id, to_load, __FUNCTION__); // Check whether to_check_if_cached is being cached by to_load - EXPECT_EQ(clean_version_map->has_cached_entry(id, to_check_if_cached), expected_outcome); + EXPECT_EQ(clean_version_map->has_cached_entry(id, to_check_if_cached.load_strategy_), expected_outcome); }; auto check_all_caching = [&](const std::vector& to_load, const std::vector& to_check_if_cached, bool expected_result){ @@ -673,8 +767,8 @@ TEST(VersionMap, CacheInvalidation) { } }; - auto load_all_param = LoadParameter{LoadType::LOAD_ALL}; - auto load_all_undeleted_param = LoadParameter{LoadType::LOAD_UNDELETED}; + auto load_all_param = LoadParameter{LoadType::LOAD_ALL, ToLoad::ANY}; + auto load_all_undeleted_param = LoadParameter{LoadType::LOAD_ALL, ToLoad::UNDELETED}; check_caching(load_all_param, load_all_undeleted_param, true); check_caching(load_all_undeleted_param, load_all_param, false); @@ -682,26 +776,27 @@ TEST(VersionMap, CacheInvalidation) { std::vector should_load_to_v[num_versions] = { // Different parameters which should all load to v0 std::vector{ - LoadParameter{LoadType::LOAD_DOWNTO, static_cast(0)}, - LoadParameter{LoadType::LOAD_DOWNTO, static_cast(-3)}, - LoadParameter{LoadType::LOAD_FROM_TIME, static_cast(0)}, + LoadParameter{LoadType::LOAD_DOWNTO, ToLoad::ANY, static_cast(0)}, + LoadParameter{LoadType::LOAD_DOWNTO, ToLoad::ANY, static_cast(-3)}, + LoadParameter{LoadType::LOAD_FROM_TIME, ToLoad::ANY, static_cast(0)}, }, // Different parameters which should all load to v1 std::vector{ - LoadParameter{LoadType::LOAD_DOWNTO, static_cast(1)}, - LoadParameter{LoadType::LOAD_DOWNTO, static_cast(-2)}, - LoadParameter{LoadType::LOAD_FROM_TIME, static_cast(1)}, - LoadParameter{LoadType::LOAD_FROM_TIME, - static_cast(2)}, // LOAD_FROM_TIME loads up to an undeleted version - LoadParameter{LoadType::LOAD_LATEST_UNDELETED}, + LoadParameter{LoadType::LOAD_DOWNTO, ToLoad::ANY, static_cast(1)}, + LoadParameter{LoadType::LOAD_DOWNTO, ToLoad::ANY, static_cast(-2)}, + LoadParameter{LoadType::LOAD_FROM_TIME, ToLoad::ANY, static_cast(1)}, + LoadParameter{LoadType::LOAD_FROM_TIME, ToLoad::UNDELETED, + static_cast(2)}, // when include_deleted=false LOAD_FROM_TIME searches for an undeleted version + LoadParameter{LoadType::LOAD_LATEST, ToLoad::UNDELETED}, }, // Different parameters which should all load to v2 std::vector{ - LoadParameter{LoadType::LOAD_DOWNTO, static_cast(2)}, - LoadParameter{LoadType::LOAD_DOWNTO, static_cast(-1)}, - LoadParameter{LoadType::LOAD_LATEST}, + LoadParameter{LoadType::LOAD_DOWNTO, ToLoad::ANY, static_cast(2)}, + LoadParameter{LoadType::LOAD_DOWNTO, ToLoad::ANY, static_cast(-1)}, + LoadParameter{LoadType::LOAD_FROM_TIME, ToLoad::ANY, static_cast(2)}, + LoadParameter{LoadType::LOAD_LATEST, ToLoad::ANY}, } }; @@ -735,24 +830,26 @@ TEST(VersionMap, CacheInvalidationWithTombstoneAfterLoad) { auto entry = version_map->check_reload( store, id, - LoadParameter{LoadType::LOAD_DOWNTO, static_cast(1)}, + LoadParameter{LoadType::LOAD_DOWNTO, ToLoad::ANY, static_cast(1)}, __FUNCTION__); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadParameter{LoadType::LOAD_LATEST_UNDELETED})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadParameter{LoadType::LOAD_FROM_TIME, static_cast(1)})); - ASSERT_FALSE(version_map->has_cached_entry(id, LoadParameter{LoadType::LOAD_FROM_TIME, static_cast(0)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadParameter{LoadType::LOAD_DOWNTO, static_cast(-1)})); - ASSERT_FALSE(version_map->has_cached_entry(id, LoadParameter{LoadType::LOAD_DOWNTO, static_cast(-2)})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_LATEST, ToLoad::UNDELETED})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_FROM_TIME, ToLoad::UNDELETED, static_cast(1)})); + ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_FROM_TIME, ToLoad::UNDELETED, static_cast(0)})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_DOWNTO, ToLoad::UNDELETED, static_cast(-1)})); + ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_DOWNTO, ToLoad::UNDELETED, static_cast(-2)})); // When - we delete version 1 and reload version_map->write_tombstone(store, VersionId{1}, id, entry); // Now when the cached version is deleted, we should invalidate the cache for load parameters which look for undeleted. - ASSERT_FALSE(version_map->has_cached_entry(id, LoadParameter{LoadType::LOAD_LATEST_UNDELETED})); - ASSERT_FALSE(version_map->has_cached_entry(id, LoadParameter{LoadType::LOAD_FROM_TIME, static_cast(1)})); - //TODO: Add more undeleted checks + ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_LATEST, ToLoad::UNDELETED})); + ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_FROM_TIME, ToLoad::UNDELETED, static_cast(1)})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_LATEST, ToLoad::ANY})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_FROM_TIME, ToLoad::ANY, static_cast(1)})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_DOWNTO, ToLoad::UNDELETED, static_cast(-1)})); - LoadParameter load_param{LoadType::LOAD_LATEST_UNDELETED}; + LoadParameter load_param{LoadType::LOAD_LATEST, ToLoad::UNDELETED}; const auto latest_undeleted_entry = version_map->check_reload(store, id, load_param, __FUNCTION__); // Then - version 0 should be returned @@ -776,30 +873,30 @@ TEST(VersionMap, CacheInvalidationWithTombstoneAllAfterLoad) { auto entry = version_map->check_reload( store, id, - LoadParameter{LoadType::LOAD_DOWNTO, static_cast(0)}, + LoadParameter{LoadType::LOAD_DOWNTO, ToLoad::ANY, static_cast(0)}, __FUNCTION__); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadParameter{LoadType::LOAD_LATEST_UNDELETED})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadParameter{LoadType::LOAD_FROM_TIME, static_cast(1)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadParameter{LoadType::LOAD_FROM_TIME, static_cast(0)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadParameter{LoadType::LOAD_DOWNTO, static_cast(-1)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadParameter{LoadType::LOAD_DOWNTO, static_cast(-2)})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_LATEST, ToLoad::UNDELETED})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_FROM_TIME, ToLoad::UNDELETED, static_cast(1)})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_FROM_TIME, ToLoad::UNDELETED, static_cast(0)})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_DOWNTO, ToLoad::UNDELETED, static_cast(-1)})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_DOWNTO, ToLoad::UNDELETED, static_cast(-2)})); // When - we delete version 1 auto tombstone_key = version_map->write_tombstone(store, VersionId{1}, id, entry); // We should not invalidate the cache because the version we loaded to is still undeleted - ASSERT_TRUE(version_map->has_cached_entry(id, LoadParameter{LoadType::LOAD_LATEST_UNDELETED})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadParameter{LoadType::LOAD_FROM_TIME, static_cast(1)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadParameter{LoadType::LOAD_FROM_TIME, static_cast(0)})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_LATEST, ToLoad::UNDELETED})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_FROM_TIME, ToLoad::UNDELETED, static_cast(1)})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_FROM_TIME, ToLoad::UNDELETED, static_cast(0)})); // When - we delete all versions version_map->write_tombstone_all_key(store, tombstone_key, entry); // We should invalidate cached undeleted checks - ASSERT_FALSE(version_map->has_cached_entry(id, LoadParameter{LoadType::LOAD_LATEST_UNDELETED})); - ASSERT_FALSE(version_map->has_cached_entry(id, LoadParameter{LoadType::LOAD_FROM_TIME, static_cast(1)})); - ASSERT_FALSE(version_map->has_cached_entry(id, LoadParameter{LoadType::LOAD_FROM_TIME, static_cast(0)})); + ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_LATEST, ToLoad::UNDELETED})); + ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_FROM_TIME, ToLoad::UNDELETED, static_cast(1)})); + ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_FROM_TIME, ToLoad::UNDELETED, static_cast(0)})); } #define GTEST_COUT std::cerr << "[ ] [ INFO ]" diff --git a/cpp/arcticdb/version/test/test_version_store.cpp b/cpp/arcticdb/version/test/test_version_store.cpp index 6910ce21544..26ea10461ef 100644 --- a/cpp/arcticdb/version/test/test_version_store.cpp +++ b/cpp/arcticdb/version/test/test_version_store.cpp @@ -136,7 +136,7 @@ TEST(PythonVersionStore, IterationVsRefWrite) { auto ref_entry = std::make_shared(); version_map->load_via_iteration(mock_store, stream_id, iter_entry); - version_map->load_via_ref_key(mock_store, stream_id, LoadParameter{LoadType::LOAD_ALL}, ref_entry); + version_map->load_via_ref_key(mock_store, stream_id, LoadStrategy{LoadType::LOAD_ALL, ToLoad::ANY}, ref_entry); EXPECT_EQ(std::string(iter_entry->head_.value().view()), std::string(ref_entry->head_.value().view())); ASSERT_EQ(iter_entry->keys_.size(), ref_entry->keys_.size()); @@ -151,7 +151,7 @@ TEST(PythonVersionStore, IterationVsRefWrite) { auto ref_entry_compact = std::make_shared(); version_map->load_via_iteration(mock_store, stream_id, iter_entry_compact); - version_map->load_via_ref_key(mock_store, stream_id, LoadParameter{LoadType::LOAD_ALL}, ref_entry_compact); + version_map->load_via_ref_key(mock_store, stream_id, LoadStrategy{LoadType::LOAD_ALL, arcticdb::ToLoad::ANY}, ref_entry_compact); EXPECT_EQ(std::string(iter_entry_compact->head_.value().view()), std::string(ref_entry_compact->head_.value().view())); ASSERT_EQ(iter_entry_compact->keys_.size(), ref_entry_compact->keys_.size()); diff --git a/cpp/arcticdb/version/test/version_backwards_compat.hpp b/cpp/arcticdb/version/test/version_backwards_compat.hpp index a1320526971..93f7a04c902 100644 --- a/cpp/arcticdb/version/test/version_backwards_compat.hpp +++ b/cpp/arcticdb/version/test/version_backwards_compat.hpp @@ -33,7 +33,7 @@ std::deque backwards_compat_delete_all_versions( const StreamId& stream_id ) { std::deque output; - auto entry = version_map->check_reload(store, stream_id, LoadParameter{LoadType::LOAD_ALL}, __FUNCTION__); + auto entry = version_map->check_reload(store, stream_id, LoadParameter{LoadType::LOAD_ALL, ToLoad::ANY}, __FUNCTION__); auto indexes = entry->get_indexes(false); output.assign(std::begin(indexes), std::end(indexes)); @@ -49,7 +49,7 @@ std::vector backwards_compat_write_and_prune_previous(std::shared_ptr output; - auto entry = version_map->check_reload(store, key.id(), LoadParameter{LoadType::LOAD_ALL}, __FUNCTION__); + auto entry = version_map->check_reload(store, key.id(), LoadParameter{LoadType::LOAD_ALL, ToLoad::ANY}, __FUNCTION__); auto old_entry = *entry; entry->clear(); diff --git a/cpp/arcticdb/version/version_functions.hpp b/cpp/arcticdb/version/version_functions.hpp index 9659513f10f..4daf0431a9c 100644 --- a/cpp/arcticdb/version/version_functions.hpp +++ b/cpp/arcticdb/version/version_functions.hpp @@ -24,7 +24,7 @@ inline std::optional get_latest_undeleted_version( const StreamId &stream_id, const pipelines::VersionQuery& version_query) { ARCTICDB_RUNTIME_SAMPLE(GetLatestUndeletedVersion, 0) - LoadParameter load_param{LoadType::LOAD_LATEST_UNDELETED}; + LoadParameter load_param{LoadType::LOAD_LATEST, ToLoad::UNDELETED}; set_load_param_options(load_param, version_query); const auto entry = version_map->check_reload(store, stream_id, load_param, __FUNCTION__); return entry->get_first_index(false).first; @@ -36,7 +36,7 @@ inline std::pair, bool> get_latest_version( const StreamId &stream_id, const pipelines::VersionQuery& version_query) { ARCTICDB_SAMPLE(GetLatestVersion, 0) - LoadParameter load_param{LoadType::LOAD_LATEST}; + LoadParameter load_param{LoadType::LOAD_LATEST, ToLoad::ANY}; set_load_param_options(load_param, version_query); auto entry = version_map->check_reload(store, stream_id, load_param, __FUNCTION__); return entry->get_first_index(true); @@ -49,7 +49,7 @@ inline version_store::UpdateInfo get_latest_undeleted_version_and_next_version_i const StreamId &stream_id, const pipelines::VersionQuery& version_query) { ARCTICDB_SAMPLE(GetLatestUndeletedVersionAndHighestVersionId, 0) - LoadParameter load_param{LoadType::LOAD_LATEST_UNDELETED}; + LoadParameter load_param{LoadType::LOAD_LATEST, ToLoad::UNDELETED}; set_load_param_options(load_param, version_query); auto entry = version_map->check_reload(store, stream_id, load_param, __FUNCTION__); auto latest_version = entry->get_first_index(true).first; @@ -65,7 +65,7 @@ inline std::vector get_all_versions( const pipelines::VersionQuery& version_query ) { ARCTICDB_SAMPLE(GetAllVersions, 0) - LoadParameter load_param{LoadType::LOAD_UNDELETED}; + LoadParameter load_param{LoadType::LOAD_ALL, ToLoad::UNDELETED}; set_load_param_options(load_param, version_query); auto entry = version_map->check_reload(store, stream_id, load_param, __FUNCTION__); return entry->get_indexes(false); @@ -78,7 +78,7 @@ inline std::optional get_specific_version( SignedVersionId signed_version_id, const pipelines::VersionQuery& version_query, bool include_deleted = false) { - LoadParameter load_param{LoadType::LOAD_DOWNTO, signed_version_id}; + LoadParameter load_param{LoadType::LOAD_DOWNTO, ToLoad::UNDELETED, signed_version_id}; auto entry = version_map->check_reload(store, stream_id, load_param, __FUNCTION__); set_load_param_options(load_param, version_query); VersionId version_id; @@ -155,7 +155,7 @@ inline std::unordered_map get_all_tombstoned_versions( const std::shared_ptr &version_map, const StreamId &stream_id, const pipelines::VersionQuery& version_query) { - LoadParameter load_param{LoadType::LOAD_ALL}; + LoadParameter load_param{LoadType::LOAD_ALL, ToLoad::ANY}; set_load_param_options(load_param, version_query); auto entry = version_map->check_reload(store, stream_id, load_param, __FUNCTION__); std::unordered_map result; @@ -174,7 +174,7 @@ inline version_store::TombstoneVersionResult tombstone_version( bool allow_tombstoning_beyond_latest_version=false, const std::optional& creation_ts=std::nullopt) { ARCTICDB_DEBUG(log::version(), "Tombstoning version {} for stream {}", version_id, stream_id); - LoadParameter load_param{LoadType::LOAD_UNDELETED}; + LoadParameter load_param{LoadType::LOAD_ALL, ToLoad::UNDELETED}; set_load_param_options(load_param, version_query); auto entry = version_map->check_reload(store, stream_id, load_param, __FUNCTION__); // Might as well do the previous/next version check while we find the required version_id. @@ -238,7 +238,7 @@ inline std::optional load_index_key_from_time( const StreamId &stream_id, timestamp from_time, const pipelines::VersionQuery& version_query) { - LoadParameter load_param{LoadType::LOAD_FROM_TIME, from_time}; + LoadParameter load_param{LoadType::LOAD_FROM_TIME, ToLoad::UNDELETED, from_time}; set_load_param_options(load_param, version_query); auto entry = version_map->check_reload(store, stream_id, load_param, __FUNCTION__); auto indexes = entry->get_indexes(false); @@ -250,7 +250,7 @@ inline std::vector get_index_and_tombstone_keys( const std::shared_ptr &version_map, const StreamId &stream_id, const pipelines::VersionQuery& version_query) { - LoadParameter load_param{LoadType::LOAD_ALL}; + LoadParameter load_param{LoadType::LOAD_ALL, ToLoad::ANY}; set_load_param_options(load_param, version_query); const auto entry = version_map->check_reload(store, stream_id, load_param, __FUNCTION__); std::vector res; diff --git a/cpp/arcticdb/version/version_map.hpp b/cpp/arcticdb/version/version_map.hpp index ae7bcee4ccb..6a17fd68d84 100644 --- a/cpp/arcticdb/version/version_map.hpp +++ b/cpp/arcticdb/version/version_map.hpp @@ -145,7 +145,7 @@ class VersionMapImpl { const std::shared_ptr& store, const VersionMapEntry& ref_entry, const std::shared_ptr& entry, - const LoadParameter& load_params) const { + const LoadStrategy& load_strategy) const { auto next_key = ref_entry.head_; entry->head_ = ref_entry.head_; @@ -158,7 +158,7 @@ class VersionMapImpl { cached_penultimate_index = ref_entry.keys_[1]; } - if (key_exists_in_ref_entry(load_params, ref_entry, cached_penultimate_index)) { + if (key_exists_in_ref_entry(load_strategy, ref_entry, cached_penultimate_index)) { load_progress = ref_entry.loaded_with_progress_; entry->keys_.push_back(ref_entry.keys_[0]); if(cached_penultimate_index) @@ -170,10 +170,10 @@ class VersionMapImpl { next_key = read_segment_with_keys(seg, entry, load_progress); set_latest_version(entry, latest_version); } while (next_key - && !loaded_until_version_id(load_params, load_progress, latest_version) - && !loaded_until_timestamp(load_params, load_progress) - && load_latest_ongoing(load_params, entry) - && looking_for_undeleted(load_params, entry, load_progress)); + && !loaded_until_version_id(load_strategy, load_progress, latest_version) + && !loaded_until_timestamp(load_strategy, load_progress) + && load_latest_ongoing(load_strategy, entry) + && looking_for_undeleted(load_strategy, entry, load_progress)); } entry->loaded_with_progress_ = load_progress; } @@ -181,9 +181,9 @@ class VersionMapImpl { void load_via_ref_key( std::shared_ptr store, const StreamId& stream_id, - const LoadParameter& load_params, + const LoadStrategy& load_strategy, const std::shared_ptr& entry) { - load_params.validate(); + load_strategy.validate(); static const auto max_trial_config = ConfigsMap::instance()->get_int("VersionMap.MaxReadRefTrials", 2); auto max_trials = max_trial_config; while (true) { @@ -193,7 +193,7 @@ class VersionMapImpl { if (ref_entry.empty()) return; - follow_version_chain(store, ref_entry, entry, load_params); + follow_version_chain(store, ref_entry, entry, load_strategy); break; } catch (const std::exception &err) { if (--max_trials <= 0) { @@ -234,7 +234,7 @@ class VersionMapImpl { } void write_version(std::shared_ptr store, const AtomKey &key, const std::optional& previous_key) { - LoadParameter load_param{LoadType::LOAD_LATEST}; + LoadParameter load_param{LoadType::LOAD_LATEST, ToLoad::ANY}; auto entry = check_reload(store, key.id(), load_param, __FUNCTION__); do_write(store, key, entry); @@ -267,7 +267,7 @@ class VersionMapImpl { auto entry = check_reload( store, stream_id, - LoadParameter{LoadType::LOAD_UNDELETED}, + LoadParameter{LoadType::LOAD_ALL, ToLoad::UNDELETED}, __FUNCTION__); auto output = tombstone_from_key_or_all_internal(store, stream_id, first_key_to_tombstone, entry); @@ -281,7 +281,7 @@ class VersionMapImpl { } std::string dump_entry(const std::shared_ptr& store, const StreamId& stream_id) { - const auto entry = check_reload(store, stream_id, LoadParameter{LoadType::LOAD_ALL}, __FUNCTION__); + const auto entry = check_reload(store, stream_id, LoadParameter{LoadType::LOAD_ALL, ToLoad::ANY}, __FUNCTION__); return entry->dump(); } @@ -293,7 +293,7 @@ class VersionMapImpl { auto entry = check_reload( store, key.id(), - LoadParameter{LoadType::LOAD_UNDELETED}, + LoadParameter{LoadType::LOAD_ALL, ToLoad::UNDELETED}, __FUNCTION__); auto [_, result] = tombstone_from_key_or_all_internal(store, key.id(), previous_key, entry); @@ -332,7 +332,7 @@ class VersionMapImpl { // This method has no API, and is not tested in the rapidcheck tests, but could easily be enabled there. // It compacts the version map but skips any keys which have been deleted (to free up space). ARCTICDB_DEBUG(log::version(), "Version map compacting versions for stream {}", stream_id); - auto entry = check_reload(store, stream_id, LoadParameter{LoadType::LOAD_ALL}, __FUNCTION__); + auto entry = check_reload(store, stream_id, LoadParameter{LoadType::LOAD_ALL, ToLoad::ANY}, __FUNCTION__); if (!requires_compaction(entry)) return; @@ -448,7 +448,7 @@ class VersionMapImpl { void compact(std::shared_ptr store, const StreamId& stream_id) { ARCTICDB_DEBUG(log::version(), "Version map compacting versions for stream {}", stream_id); - auto entry = check_reload(store, stream_id, LoadParameter{LoadType::LOAD_ALL}, __FUNCTION__); + auto entry = check_reload(store, stream_id, LoadParameter{LoadType::LOAD_ALL, ToLoad::ANY}, __FUNCTION__); if (entry->empty()) { log::version().warn("Entry is empty in compact"); return; @@ -470,7 +470,7 @@ class VersionMapImpl { void overwrite_symbol_tree( std::shared_ptr store, const StreamId& stream_id, const std::vector& index_keys) { - auto entry = check_reload(store, stream_id, LoadParameter{LoadType::LOAD_ALL}, __FUNCTION__); + auto entry = check_reload(store, stream_id, LoadParameter{LoadType::LOAD_ALL, ToLoad::ANY}, __FUNCTION__); auto old_entry = *entry; if (!index_keys.empty()) { entry->keys_.assign(std::begin(index_keys), std::end(index_keys)); @@ -494,11 +494,11 @@ class VersionMapImpl { const char* function ARCTICDB_UNUSED) { ARCTICDB_DEBUG(log::version(), "Check reload in function {} for id {}", function, stream_id); - if (has_cached_entry(stream_id, load_param)) { + if (has_cached_entry(stream_id, load_param.load_strategy_)) { return get_entry(stream_id); } - return storage_reload(store, stream_id, load_param, load_param.iterate_on_failure_); + return storage_reload(store, stream_id, load_param.load_strategy_, load_param.iterate_on_failure_); } void do_write( @@ -555,13 +555,13 @@ class VersionMapImpl { * * @param stream_id symbol to check * @param load_param the load type - * @return whether we have a cached entry suitable for the load type, so do not need to go to storage + * @return whether we have a cached entry suitable for the load strategy, so do not need to go to storage */ - bool has_cached_entry(const StreamId &stream_id, const LoadParameter& load_param) const { - LoadType requested_load_type = load_param.load_type_; + bool has_cached_entry(const StreamId &stream_id, const LoadStrategy& requested_load_strategy) const { + LoadType requested_load_type = requested_load_strategy.load_type_; util::check(requested_load_type < LoadType::UNKNOWN, "Unexpected load type requested {}", requested_load_type); - load_param.validate(); + requested_load_strategy.validate(); MapType::const_iterator entry_it; if(!find_entry(entry_it, stream_id)) { return false; @@ -579,35 +579,41 @@ class VersionMapImpl { return false; } - LoadType cached_load_type = entry->load_type_; + LoadType cached_load_type = entry->load_strategy_.load_type_; switch (requested_load_type) { case LoadType::NOT_LOADED: return true; case LoadType::LOAD_LATEST: { - // If entry has at least one index we have the latest value cached - auto opt_latest = entry->get_first_index(true).first; - return opt_latest.has_value(); - } - case LoadType::LOAD_LATEST_UNDELETED: { - // If entry has at least one undeleted index we have the latest_undeleted cached - // This check can be slow if we have thousands of deleted versions before the first undeleted. If that is - // ever a problem we can just store a boolean if we have an undeleted version. - auto opt_latest = entry->get_first_index(false).first; + // If entry has at least one (maybe undeleted) index we have the latest value cached + + // This check can be slow if we have thousands of deleted versions before the first undeleted and we're + // looking for an undeleted version. If that is ever a problem we can just store a boolean whether + // we have an undeleted version. + auto opt_latest = entry->get_first_index(requested_load_strategy.should_include_deleted()).first; return opt_latest.has_value(); } case LoadType::LOAD_DOWNTO: // We check whether the oldest loaded version is before or at the requested one - return loaded_as_far_as_version_id(*entry, load_param.load_until_version_.value()); - case LoadType::LOAD_FROM_TIME: - // We check whether the earliest loaded timestamp is before or at the requested on - return entry->loaded_with_progress_.earliest_loaded_undeleted_timestamp_ <= load_param.load_from_time_.value(); - case LoadType::LOAD_UNDELETED: - // We can have all undeleteded versions cached when cache was loaded by either loading all or all undeleted. - return cached_load_type==LoadType::LOAD_ALL || cached_load_type==LoadType::LOAD_UNDELETED; + return loaded_as_far_as_version_id(*entry, requested_load_strategy.load_until_version_.value()); + case LoadType::LOAD_FROM_TIME: { + // We check whether the cached (deleted or undeleted) timestamp is before or at the requested one + auto cached_timestamp = requested_load_strategy.should_include_deleted() ? + entry->loaded_with_progress_.earliest_loaded_timestamp_ : + entry->loaded_with_progress_.earliest_loaded_undeleted_timestamp_; + return cached_timestamp <= requested_load_strategy.load_from_time_.value(); + } case LoadType::LOAD_ALL: - // We can have all versions cached only when cache was loaded by loading all versions. - return cached_load_type==LoadType::LOAD_ALL; + // All versions are cached if the last time we loaded the cache we used a LOAD_ALL strategy. + // Also, we either need the cached entry to include delete or to not request for deleted versions. + // It is also possible to do other checks e.g. loaded_with_progress.oldest_loaded_index_version, but + // it's hard for these checks to be extensive because for e.g. should_include_deleted() = false we can't know + // based on the load progress if we have loaded all undeleted versions, because we don't know the earliest + // undeleted version. + if (cached_load_type==LoadType::LOAD_ALL){ + return entry->load_strategy_.should_include_deleted() || !requested_load_strategy.should_include_deleted(); + } + return false; default: util::raise_rte("Unexpected load type in cache {}", cached_load_type); } @@ -747,7 +753,7 @@ class VersionMapImpl { std::shared_ptr storage_reload( std::shared_ptr store, const StreamId& stream_id, - const LoadParameter& load_param, + const LoadStrategy& load_strategy, bool iterate_on_failure) { /* * Goes to the storage for a given symbol, and recreates the VersionMapEntry from preferably the ref key @@ -760,13 +766,13 @@ class VersionMapImpl { const auto clock_unsync_tolerance = ConfigsMap::instance()->get_int("VersionMap.UnsyncTolerance", DEFAULT_CLOCK_UNSYNC_TOLERANCE); entry->last_reload_time_ = Clock::nanos_since_epoch() - clock_unsync_tolerance; - entry->load_type_ = LoadType::NOT_LOADED; // FUTURE: to make more thread-safe with #368 + entry->load_strategy_ = LoadStrategy{LoadType::NOT_LOADED}; // FUTURE: to make more thread-safe with #368 try { auto temp = std::make_shared(*entry); - load_via_ref_key(store, stream_id, load_param, temp); + load_via_ref_key(store, stream_id, load_strategy, temp); std::swap(*entry, *temp); - entry->load_type_ = load_param.load_type_; + entry->load_strategy_ = load_strategy; } catch (const std::runtime_error &err) { if (iterate_on_failure) { @@ -779,7 +785,7 @@ class VersionMapImpl { } if (iterate_on_failure && entry->empty()) { (void) load_via_iteration(store, stream_id, entry); - entry->load_type_ = LoadType::LOAD_ALL; + entry->load_strategy_ = LoadStrategy{LoadType::LOAD_ALL, ToLoad::ANY}; } util::check(entry->keys_.empty() || entry->head_, "Non-empty VersionMapEntry should set head"); @@ -845,7 +851,7 @@ class VersionMapImpl { try { auto entry_ref = std::make_shared(); - load_via_ref_key(store, stream_id, LoadParameter{LoadType::LOAD_ALL}, entry_ref); + load_via_ref_key(store, stream_id, LoadStrategy{LoadType::LOAD_ALL, ToLoad::ANY}, entry_ref); entry_ref->validate(); } catch (const std::exception& err) { log::version().warn( @@ -858,7 +864,7 @@ class VersionMapImpl { bool indexes_sorted(const std::shared_ptr& store, const StreamId& stream_id) { auto entry_ref = std::make_shared(); - load_via_ref_key(store, stream_id, LoadParameter{LoadType::LOAD_ALL}, entry_ref); + load_via_ref_key(store, stream_id, LoadStrategy{LoadType::LOAD_ALL, ToLoad::ANY}, entry_ref); auto indexes = entry_ref->get_indexes(true); return std::is_sorted(std::cbegin(indexes), std::cend(indexes), [] (const auto& l, const auto& r) { return l > r; @@ -951,7 +957,7 @@ class VersionMapImpl { entry = check_reload( store, stream_id, - LoadParameter{LoadType::LOAD_UNDELETED}, + LoadParameter{LoadType::LOAD_ALL, ToLoad::UNDELETED}, __FUNCTION__); } diff --git a/cpp/arcticdb/version/version_map_batch_methods.cpp b/cpp/arcticdb/version/version_map_batch_methods.cpp index 6e18165c40b..c759b27a614 100644 --- a/cpp/arcticdb/version/version_map_batch_methods.cpp +++ b/cpp/arcticdb/version/version_map_batch_methods.cpp @@ -20,59 +20,18 @@ void StreamVersionData::react(const pipelines::VersionQuery &version_query) { } void StreamVersionData::do_react(std::monostate) { - if (load_param_.load_type_ == LoadType::NOT_LOADED) - load_param_ = LoadParameter{LoadType::LOAD_LATEST_UNDELETED}; - ++count_; + load_param_.load_strategy_ = union_of_undeleted_strategies(load_param_.load_strategy_, LoadStrategy{LoadType::LOAD_LATEST, ToLoad::UNDELETED}); } void StreamVersionData::do_react(const pipelines::SpecificVersionQuery &specific_version) { ++count_; - switch (load_param_.load_type_) { - case LoadType::NOT_LOADED: - [[fallthrough]]; - case LoadType::LOAD_LATEST_UNDELETED: - load_param_ = LoadParameter{LoadType::LOAD_DOWNTO, specific_version.version_id_}; - break; - case LoadType::LOAD_DOWNTO: - util::check(load_param_.load_until_version_.has_value(), - "Expect LOAD_DOWNTO to have version specified"); - if ((specific_version.version_id_ >= 0 && is_positive_version_query(load_param_)) || - (specific_version.version_id_ < 0 && load_param_.load_until_version_.value() < 0)) { - load_param_.load_until_version_ = std::min(load_param_.load_until_version_.value(), specific_version.version_id_); - } else { - load_param_ = LoadParameter{LoadType::LOAD_UNDELETED}; - } - break; - case LoadType::LOAD_FROM_TIME: - [[fallthrough]]; - case LoadType::LOAD_UNDELETED: - load_param_ = LoadParameter{LoadType::LOAD_UNDELETED}; - break; - default:util::raise_rte("Unexpected load state {} applying specific version query", load_param_.load_type_); - } + load_param_.load_strategy_ = union_of_undeleted_strategies(load_param_.load_strategy_, LoadStrategy{LoadType::LOAD_DOWNTO, ToLoad::UNDELETED, specific_version.version_id_}); } void StreamVersionData::do_react(const pipelines::TimestampVersionQuery ×tamp_query) { ++count_; - switch (load_param_.load_type_) { - case LoadType::NOT_LOADED: - [[fallthrough]]; - case LoadType::LOAD_LATEST_UNDELETED: - load_param_ = LoadParameter{LoadType::LOAD_FROM_TIME, timestamp_query.timestamp_}; - break; - case LoadType::LOAD_FROM_TIME: - util::check(load_param_.load_from_time_.has_value(), - "Expect LOAD_TO_TIME to have timestamp specified"); - load_param_.load_from_time_ = std::min(load_param_.load_from_time_.value(), timestamp_query.timestamp_); - break; - case LoadType::LOAD_DOWNTO: - [[fallthrough]]; - case LoadType::LOAD_UNDELETED: - load_param_ = LoadParameter{LoadType::LOAD_UNDELETED}; - break; - default:util::raise_rte("Unexpected load state {} applying specific version query", load_param_.load_type_); - } + load_param_.load_strategy_ = union_of_undeleted_strategies(load_param_.load_strategy_, LoadStrategy{LoadType::LOAD_FROM_TIME, ToLoad::UNDELETED, timestamp_query.timestamp_}); } void StreamVersionData::do_react(const pipelines::SnapshotVersionQuery &snapshot_query) { diff --git a/cpp/arcticdb/version/version_map_batch_methods.hpp b/cpp/arcticdb/version/version_map_batch_methods.hpp index e6b77b78b20..dfcef91f5ee 100644 --- a/cpp/arcticdb/version/version_map_batch_methods.hpp +++ b/cpp/arcticdb/version/version_map_batch_methods.hpp @@ -64,7 +64,7 @@ inline std::shared_ptr> batch_check_l const std::shared_ptr &version_map, const std::shared_ptr> &symbols) { ARCTICDB_SAMPLE(BatchGetLatestVersion, 0) - const LoadParameter load_param{LoadType::LOAD_LATEST_UNDELETED}; + const LoadParameter load_param{LoadType::LOAD_LATEST, ToLoad::UNDELETED}; auto output = std::make_shared>(); auto mutex = std::make_shared(); @@ -100,7 +100,7 @@ inline std::shared_ptr> batch_get_latest_v const std::vector &stream_ids, bool include_deleted) { ARCTICDB_SAMPLE(BatchGetLatestVersion, 0) - const LoadParameter load_param{include_deleted ? LoadType::LOAD_LATEST : LoadType::LOAD_LATEST_UNDELETED}; + const LoadParameter load_param{LoadType::LOAD_LATEST, include_deleted ? ToLoad::ANY : ToLoad::UNDELETED}; auto output = std::make_shared>(); auto mutex = std::make_shared(); @@ -129,7 +129,7 @@ inline std::vector, std::optional vector_fut.push_back(async::submit_io_task(CheckReloadTask{store, version_map, stream_id, - LoadParameter{LoadType::LOAD_LATEST_UNDELETED}}) + LoadParameter{LoadType::LOAD_LATEST, ToLoad::UNDELETED}}) .thenValue([](const std::shared_ptr& entry){ return std::make_pair(entry->get_first_index(false).first, entry->get_first_index(true).first); })); @@ -147,7 +147,7 @@ inline std::vector> batch_get_latest_un vector_fut.push_back(async::submit_io_task(CheckReloadTask{store, version_map, stream_id, - LoadParameter{LoadType::LOAD_LATEST_UNDELETED}}) + LoadParameter{LoadType::LOAD_LATEST, ToLoad::UNDELETED}}) .thenValue([](auto entry){ auto latest_version = entry->get_first_index(true).first; auto latest_undeleted_version = entry->get_first_index(false).first; @@ -194,7 +194,7 @@ inline std::shared_ptr> batch_get_specific MapRandomAccessWrapper wrapper{sym_versions}; submit_tasks_for_range(wrapper, [store, version_map](auto& sym_version) { - LoadParameter load_param{LoadType::LOAD_DOWNTO, static_cast(sym_version.second)}; + LoadParameter load_param{LoadType::LOAD_DOWNTO, ToLoad::UNDELETED, static_cast(sym_version.second)}; return async::submit_io_task(CheckReloadTask{store, version_map, sym_version.first, load_param}); }, [output, option, output_mutex, store, tombstoned_vers, tombstoned_vers_mutex] @@ -247,7 +247,7 @@ inline std::shared_ptr, AtomKe submit_tasks_for_range(wrapper, [store, version_map](auto sym_version) { auto first_version = *std::min_element(std::begin(sym_version.second), std::end(sym_version.second)); - LoadParameter load_param{LoadType::LOAD_DOWNTO, static_cast(first_version)}; + LoadParameter load_param{LoadType::LOAD_DOWNTO, ToLoad::UNDELETED, static_cast(first_version)}; return async::submit_io_task(CheckReloadTask{store, version_map, sym_version.first, load_param}); }, @@ -266,9 +266,13 @@ inline std::shared_ptr, AtomKe return output; } + +// [StreamVersionData] is used to combine different [VersionQuery]s for a stream_id into a list of needed snapshots and +// a single [LoadParameter] which will query the union of all version queries. +// It only ever produces load parameters where to_load=UNDELETED. struct StreamVersionData { size_t count_ = 0; - LoadParameter load_param_ = LoadParameter{LoadType::NOT_LOADED}; + LoadParameter load_param_ = LoadParameter{LoadType::NOT_LOADED, ToLoad::UNDELETED}; boost::container::small_vector snapshots_; explicit StreamVersionData(const pipelines::VersionQuery& version_query); diff --git a/cpp/arcticdb/version/version_map_entry.hpp b/cpp/arcticdb/version/version_map_entry.hpp index e06efbc6e99..8d486f927ed 100644 --- a/cpp/arcticdb/version/version_map_entry.hpp +++ b/cpp/arcticdb/version/version_map_entry.hpp @@ -23,10 +23,8 @@ using namespace arcticdb::stream; enum class LoadType : uint32_t { NOT_LOADED = 0, LOAD_LATEST, - LOAD_LATEST_UNDELETED, LOAD_DOWNTO, LOAD_FROM_TIME, - LOAD_UNDELETED, LOAD_ALL, UNKNOWN }; @@ -35,6 +33,12 @@ inline constexpr bool is_partial_load_type(LoadType load_type) { return load_type == LoadType::LOAD_DOWNTO || load_type == LoadType::LOAD_FROM_TIME; } +// Used to specify whether we want to load all or only undeleted versions +enum class ToLoad : uint32_t { + ANY, + UNDELETED +}; + enum class VersionStatus { LIVE, TOMBSTONED, @@ -46,17 +50,16 @@ struct VersionDetails { VersionStatus version_status_; }; -inline constexpr bool is_latest_load_type(LoadType load_type) { - return load_type == LoadType::LOAD_LATEST || load_type == LoadType::LOAD_LATEST_UNDELETED; -} - -struct LoadParameter { - explicit LoadParameter(LoadType load_type) : - load_type_(load_type) { +// The LoadStrategy describes how to load versions from the version chain. It consists of: +// load_type: Describes up to which point in the chain we need to go. +// to_load: Whether to include tombstoned versions +struct LoadStrategy { + explicit LoadStrategy(LoadType load_type, ToLoad to_load = ToLoad::ANY) : + load_type_(load_type), to_load_(to_load) { } - LoadParameter(LoadType load_type, int64_t load_from_time_or_until) : - load_type_(load_type) { + LoadStrategy(LoadType load_type, ToLoad to_load, int64_t load_from_time_or_until) : + load_type_(load_type), to_load_(to_load) { switch(load_type_) { case LoadType::LOAD_FROM_TIME: load_from_time_ = load_from_time_or_until; @@ -65,24 +68,98 @@ struct LoadParameter { load_until_version_ = load_from_time_or_until; break; default: - internal::raise("LoadParameter constructor with load_from_time_or_until parameter {} provided invalid load_type {}", + internal::raise("LoadStrategy constructor with load_from_time_or_until parameter {} provided invalid load_type {}", load_from_time_or_until, static_cast(load_type)); } } LoadType load_type_ = LoadType::NOT_LOADED; + ToLoad to_load_ = ToLoad::ANY; std::optional load_until_version_ = std::nullopt; std::optional load_from_time_ = std::nullopt; - bool iterate_on_failure_ = false; + + bool should_include_deleted() const { + switch (to_load_) { + case ToLoad::ANY: + return true; + case ToLoad::UNDELETED: + return false; + default: + util::raise_rte("Invalid to_load: {}", to_load_); + } + } void validate() const { - util::check((load_type_ == LoadType::LOAD_DOWNTO) == load_until_version_.has_value(), + internal::check((load_type_ == LoadType::LOAD_DOWNTO) == load_until_version_.has_value(), "Invalid load parameter: load_type {} with load_util {}", int(load_type_), load_until_version_.value_or(VersionId{})); - util::check((load_type_ == LoadType::LOAD_FROM_TIME) == load_from_time_.has_value(), + internal::check((load_type_ == LoadType::LOAD_FROM_TIME) == load_from_time_.has_value(), "Invalid load parameter: load_type {} with load_from_time_ {}", int(load_type_), load_from_time_.value_or(timestamp{})); } }; + +inline bool is_undeleted_strategy_subset(const LoadStrategy& left, const LoadStrategy& right){ + switch (left.load_type_) { + case LoadType::NOT_LOADED: + return true; + case LoadType::LOAD_LATEST: + // LOAD_LATEST is not a subset of LOAD_DOWNTO because LOAD_DOWNTO may not reach the latest undeleted version. + return right.load_type_ != LoadType::NOT_LOADED && right.load_type_ != LoadType::LOAD_DOWNTO; + case LoadType::LOAD_DOWNTO: + if (right.load_type_ == LoadType::LOAD_ALL) { + return true; + } + if (right.load_type_ == LoadType::LOAD_DOWNTO && ((left.load_until_version_.value()>=0) == (right.load_until_version_.value()>=0))) { + // Left is subset of right only when the [load_until]s have same sign and left's version is >= right's version + return left.load_until_version_.value() >= right.load_until_version_.value(); + } + break; + case LoadType::LOAD_FROM_TIME: + if (right.load_type_ == LoadType::LOAD_ALL){ + return true; + } + if (right.load_type_ == LoadType::LOAD_FROM_TIME){ + return left.load_from_time_.value() >= right.load_from_time_.value(); + } + break; + case LoadType::LOAD_ALL: + return right.load_type_ == LoadType::LOAD_ALL; + default: + util::raise_rte("Invalid load type: {}", left.load_type_); + } + return false; +} + +// Returns a strategy which is guaranteed to load all versions requested by left and right. +// Works only on strategies with include_deleted=false. +inline LoadStrategy union_of_undeleted_strategies(const LoadStrategy& left, const LoadStrategy& right){ + internal::check(!left.should_include_deleted(), "Trying to produce a union of undeleted strategies but left strategy includes deleted."); + internal::check(!right.should_include_deleted(), "Trying to produce a union of undeleted strategies but right strategy includes deleted."); + if (is_undeleted_strategy_subset(left, right)){ + return right; + } + if (is_undeleted_strategy_subset(right, left)){ + return left; + } + // If none is subset of the other, then we should load all versions. We can't be less conservative because we can't + // know where to load to with strategies which have a different load type. E.g. for LOAD_FROM_TIME and LOAD_DOWNTO + // we can't know where to read to unless we know the version chain. + // A possible workaround for this is to restructure loading the version chain to get a set of LoadStrategies and stop + // searching only when all of them are satisfied. + return LoadStrategy{LoadType::LOAD_ALL, ToLoad::UNDELETED}; +} + +// LoadParameter is just a LoadStrategy and a boolean specified from VersionQuery.iterate_on_failure defaulting to false. +struct LoadParameter { + LoadParameter(const LoadStrategy& load_strategy) : load_strategy_(load_strategy) {} + LoadParameter(LoadType load_type, ToLoad to_load) : load_strategy_(load_type, to_load) {} + LoadParameter(LoadType load_type, ToLoad to_load, int64_t load_from_time_or_until) : + load_strategy_(load_type, to_load, load_from_time_or_until) {} + + LoadStrategy load_strategy_; + bool iterate_on_failure_ = false; +}; + template bool deque_is_unique(std::deque vec) { sort(vec.begin(), vec.end()); @@ -144,9 +221,9 @@ struct VersionMapEntry { VersionMapEntry is all the data we have in-memory about each stream_id in the version map which in its essence is a map of StreamId: VersionMapEntry. It's created from the linked-list-like structure that we have in the storage, where the head_ points to the latest version and keys_ are basically all the index/version keys - loaded in memory in a deque - based on the load_type. + loaded in memory in a deque - based on the load_strategy. - load_type signifies the current state of the in memory structure vs the state on disk, where LOAD_LATEST will + load_strategy signifies the current state of the in memory structure vs the state on disk, where LOAD_LATEST will just load the latest version, and LOAD_ALL loads everything in memory by going through the linked list on disk. It also contains a map of version_ids and the tombstone key corresponding to it iff it has been pruned or @@ -162,6 +239,7 @@ struct VersionMapEntry { if (keys_.empty()) return; + // Sorting by creation_ts is safe from clock skew because we don't support parallel writes to the same symbol. std::sort(std::begin(keys_), std::end(keys_), [](const AtomKey &l, const AtomKey &r) { return l.creation_ts() > r.creation_ts(); }); @@ -169,12 +247,12 @@ struct VersionMapEntry { void clear() { head_.reset(); - load_type_ = LoadType::NOT_LOADED; last_reload_time_ = 0; tombstones_.clear(); tombstone_all_.reset(); keys_.clear(); loaded_with_progress_ = LoadProgress{}; + load_strategy_ = LoadStrategy{LoadType::NOT_LOADED}; } bool empty() const { @@ -191,7 +269,7 @@ struct VersionMapEntry { swap(left.last_reload_time_, right.last_reload_time_); swap(left.tombstone_all_, right.tombstone_all_); swap(left.head_, right.head_); - swap(left.load_type_, right.load_type_); + swap(left.load_strategy_, right.load_strategy_); swap(left.loaded_with_progress_, right.loaded_with_progress_); } @@ -365,7 +443,7 @@ struct VersionMapEntry { } std::optional head_; - LoadType load_type_ = LoadType::NOT_LOADED; + LoadStrategy load_strategy_ = LoadStrategy{LoadType::NOT_LOADED }; timestamp last_reload_time_ = 0; LoadProgress loaded_with_progress_; std::deque keys_; @@ -452,24 +530,38 @@ namespace fmt { template constexpr auto parse(ParseContext &ctx) { return ctx.begin(); } - template - auto format(arcticdb::LoadType l, FormatContext &ctx) const { - switch(l) { - case arcticdb::LoadType::NOT_LOADED: - return fmt::format_to(ctx.out(), "NOT_LOADED"); - case arcticdb::LoadType::LOAD_LATEST: - return fmt::format_to(ctx.out(), "LOAD_LATEST"); - case arcticdb::LoadType::LOAD_LATEST_UNDELETED: - return fmt::format_to(ctx.out(), "LOAD_LATEST_UNDELETED"); - case arcticdb::LoadType::LOAD_DOWNTO: - return fmt::format_to(ctx.out(), "LOAD_DOWNTO"); - case arcticdb::LoadType::LOAD_UNDELETED: - return fmt::format_to(ctx.out(), "LOAD_UNDELETED"); - case arcticdb::LoadType::LOAD_ALL: - return fmt::format_to(ctx.out(), "LOAD_ALL"); - default: - arcticdb::util::raise_rte("Unrecognized load type {}", int(l)); + template + auto format(arcticdb::LoadType l, FormatContext &ctx) const { + switch (l) { + case arcticdb::LoadType::NOT_LOADED: + return fmt::format_to(ctx.out(), "NOT_LOADED"); + case arcticdb::LoadType::LOAD_LATEST: + return fmt::format_to(ctx.out(), "LOAD_LATEST"); + case arcticdb::LoadType::LOAD_DOWNTO: + return fmt::format_to(ctx.out(), "LOAD_DOWNTO"); + case arcticdb::LoadType::LOAD_ALL: + return fmt::format_to(ctx.out(), "LOAD_ALL"); + default: + arcticdb::util::raise_rte("Unrecognized load type {}", int(l)); + } } - } -}; + }; + + template<> + struct formatter { + template + constexpr auto parse(ParseContext &ctx) { return ctx.begin(); } + + template + auto format(arcticdb::ToLoad l, FormatContext &ctx) const { + switch (l) { + case arcticdb::ToLoad::ANY: + return fmt::format_to(ctx.out(), "ANY"); + case arcticdb::ToLoad::UNDELETED: + return fmt::format_to(ctx.out(), "UNDELETED"); + default: + arcticdb::util::raise_rte("Unrecognized to load {}", int(l)); + } + } + }; } diff --git a/cpp/arcticdb/version/version_store_api.cpp b/cpp/arcticdb/version/version_store_api.cpp index ff4d4d0ad4c..d2e161dac7f 100644 --- a/cpp/arcticdb/version/version_store_api.cpp +++ b/cpp/arcticdb/version/version_store_api.cpp @@ -941,7 +941,7 @@ void PythonVersionStore::prune_previous_versions(const StreamId& stream_id) { const std::shared_ptr& entry = version_map()->check_reload( store(), stream_id, - LoadParameter{LoadType::LOAD_UNDELETED}, + LoadParameter{LoadType::LOAD_ALL, ToLoad::UNDELETED}, __FUNCTION__); storage::check(!entry->empty(), "Symbol {} is not found", stream_id); auto [latest, deleted] = entry->get_first_index(false); diff --git a/cpp/arcticdb/version/version_utils.hpp b/cpp/arcticdb/version/version_utils.hpp index 6e2f00a9988..77175fcff18 100644 --- a/cpp/arcticdb/version/version_utils.hpp +++ b/cpp/arcticdb/version/version_utils.hpp @@ -231,21 +231,21 @@ inline std::optional get_version_id_negative_index(VersionId latest, std::unordered_map get_num_version_entries(const std::shared_ptr &store, size_t batch_size); -inline bool is_positive_version_query(const LoadParameter& load_params) { - return load_params.load_until_version_.value() >= 0; +inline bool is_positive_version_query(const LoadStrategy& load_strategy) { + return load_strategy.load_until_version_.value() >= 0; } -inline bool loaded_until_version_id(const LoadParameter &load_params, const LoadProgress& load_progress, const std::optional& latest_version) { - if (!load_params.load_until_version_) +inline bool loaded_until_version_id(const LoadStrategy &load_strategy, const LoadProgress& load_progress, const std::optional& latest_version) { + if (!load_strategy.load_until_version_) return false; - if (is_positive_version_query(load_params)) { - if (load_progress.oldest_loaded_index_version_ > static_cast(*load_params.load_until_version_)) { + if (is_positive_version_query(load_strategy)) { + if (load_progress.oldest_loaded_index_version_ > static_cast(*load_strategy.load_until_version_)) { return false; } } else { if (latest_version.has_value()) { - if (auto opt_version_id = get_version_id_negative_index(*latest_version, *load_params.load_until_version_); + if (auto opt_version_id = get_version_id_negative_index(*latest_version, *load_strategy.load_until_version_); opt_version_id && load_progress.oldest_loaded_index_version_ > *opt_version_id) { return false; } @@ -256,7 +256,7 @@ inline bool loaded_until_version_id(const LoadParameter &load_params, const Load ARCTICDB_DEBUG(log::version(), "Exiting load downto because loaded to version {} for request {} with {} total versions", load_progress.oldest_loaded_index_version_, - *load_params.load_until_version_, + *load_strategy.load_until_version_, latest_version.value() ); return true; @@ -274,28 +274,32 @@ static constexpr timestamp nanos_to_seconds(timestamp nanos) { return nanos / timestamp(10000000000); } -inline bool loaded_until_timestamp(const LoadParameter &load_params, const LoadProgress& load_progress) { - if (!load_params.load_from_time_ || load_progress.earliest_loaded_undeleted_timestamp_ > *load_params.load_from_time_) +inline bool loaded_until_timestamp(const LoadStrategy &load_strategy, const LoadProgress& load_progress) { + if (!load_strategy.load_from_time_) + return false; + + auto loaded_deleted_or_undeleted_timestamp = load_strategy.should_include_deleted() ? load_progress.earliest_loaded_timestamp_ : load_progress.earliest_loaded_undeleted_timestamp_; + + if (loaded_deleted_or_undeleted_timestamp > *load_strategy.load_from_time_) return false; ARCTICDB_DEBUG(log::version(), "Exiting load from timestamp because request {} <= {}", - *load_params.load_from_time_, - load_progress.earliest_loaded_undeleted_timestamp_); + loaded_deleted_or_undeleted_timestamp, + *load_strategy.load_from_time_); return true; } -inline bool load_latest_ongoing(const LoadParameter &load_params, const std::shared_ptr &entry) { - if (!(load_params.load_type_ == LoadType::LOAD_LATEST_UNDELETED && entry->get_first_index(false).first) && - !(load_params.load_type_ == LoadType::LOAD_LATEST && entry->get_first_index(true).first)) +inline bool load_latest_ongoing(const LoadStrategy &load_strategy, const std::shared_ptr &entry) { + if (!(load_strategy.load_type_ == LoadType::LOAD_LATEST && entry->get_first_index(load_strategy.should_include_deleted()).first)) return true; - ARCTICDB_DEBUG(log::version(), "Exiting because we found a non-deleted index in load latest"); + ARCTICDB_DEBUG(log::version(), "Exiting because we found the latest version with include_deleted: {}", load_strategy.should_include_deleted()); return false; } -inline bool looking_for_undeleted(const LoadParameter& load_params, const std::shared_ptr& entry, const LoadProgress& load_progress) { - if(!(load_params.load_type_ == LoadType::LOAD_UNDELETED || load_params.load_type_ == LoadType::LOAD_LATEST_UNDELETED)) { +inline bool looking_for_undeleted(const LoadStrategy& load_strategy, const std::shared_ptr& entry, const LoadProgress& load_progress) { + if(load_strategy.should_include_deleted()) { return true; } @@ -316,25 +320,25 @@ inline bool looking_for_undeleted(const LoadParameter& load_params, const std::s } } -inline bool penultimate_key_contains_required_version_id(const AtomKey& key, const LoadParameter& load_params) { - if(is_positive_version_query(load_params)) { - return key.version_id() <= static_cast(load_params.load_until_version_.value()); +inline bool penultimate_key_contains_required_version_id(const AtomKey& key, const LoadStrategy& load_strategy) { + if(is_positive_version_query(load_strategy)) { + return key.version_id() <= static_cast(load_strategy.load_until_version_.value()); } else { - return *load_params.load_until_version_ == -1; + return *load_strategy.load_until_version_ == -1; } } -inline bool key_exists_in_ref_entry(const LoadParameter& load_params, const VersionMapEntry& ref_entry, std::optional& cached_penultimate_key) { - if (is_latest_load_type(load_params.load_type_) && is_index_key_type(ref_entry.keys_[0].type())) +inline bool key_exists_in_ref_entry(const LoadStrategy& load_strategy, const VersionMapEntry& ref_entry, std::optional& cached_penultimate_key) { + if (load_strategy.load_type_ == LoadType::LOAD_LATEST && is_index_key_type(ref_entry.keys_[0].type())) return true; - if(cached_penultimate_key && is_partial_load_type(load_params.load_type_)) { - load_params.validate(); - if(load_params.load_type_ == LoadType::LOAD_DOWNTO && penultimate_key_contains_required_version_id(*cached_penultimate_key, load_params)) { + if(cached_penultimate_key && is_partial_load_type(load_strategy.load_type_)) { + load_strategy.validate(); + if(load_strategy.load_type_ == LoadType::LOAD_DOWNTO && penultimate_key_contains_required_version_id(*cached_penultimate_key, load_strategy)) { return true; } - if(load_params.load_type_ == LoadType::LOAD_FROM_TIME &&cached_penultimate_key->creation_ts() <= load_params.load_from_time_.value()) { + if(load_strategy.load_type_ == LoadType::LOAD_FROM_TIME && cached_penultimate_key->creation_ts() <= load_strategy.load_from_time_.value()) { return true; } }