Skip to content

Commit

Permalink
Extract include_deleted out of LoadType and allow full cartesian product
Browse files Browse the repository at this point in the history
Introduces a new type LoadStrategy which has the load_type, the
to_load enum, which indicates whether to load ANY or only UNDELETED
versions and the version/time we need to search for.

With this commit we mostly preserve existing behavior with a few
exceptions:
- The newly introduced deleted/undeleted variants of LOAD_DOWNTO and LOAD_FROM_TIME are introduced and are handled appropriately and their behavior is documented in LoadStrategy.
- Abstract away the batch version map merging of LoadStrategies and fix
  a small bug there.
- Fixes #1386 by passing include_deleted=false when looking for as_of
  time or version

Also adds an elaborate FollowingVersionChain and
FollowingVersionChainWithCaching cpp tests
  • Loading branch information
IvoDD committed Apr 18, 2024
1 parent 281195d commit 1275829
Show file tree
Hide file tree
Showing 12 changed files with 422 additions and 253 deletions.
4 changes: 2 additions & 2 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -871,8 +871,8 @@ folly::Future<folly::Unit> LocalVersionedEngine::delete_trees_responsibly(
auto min_versions = min_versions_for_each_stream(orig_keys_to_delete);
for (const auto& min : min_versions) {
auto load_param = load_type == LoadType::LOAD_DOWNTO
? LoadParameter{load_type, static_cast<SignedVersionId>(min.second)}
: LoadParameter{load_type};
? LoadParameter{load_type, ToLoad::ANY, static_cast<SignedVersionId>(min.second)}
: LoadParameter{load_type, ToLoad::ANY};
const auto entry = version_map()->check_reload(store(), min.first, load_param, __FUNCTION__);
entry_map.try_emplace(std::move(min.first), entry);
}
Expand Down
39 changes: 23 additions & 16 deletions cpp/arcticdb/version/test/test_stream_version_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ TEST(StreamVersionData, SpecificVersion) {
VersionQuery query_2{SpecificVersionQuery{VersionId(4)}, false};
stream_version_data.react(query_2);
ASSERT_EQ(stream_version_data.count_, 2);
ASSERT_EQ(stream_version_data.load_param_.load_type_, LoadType::LOAD_DOWNTO);
ASSERT_EQ(stream_version_data.load_param_.load_until_version_, 4);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_DOWNTO);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.to_load_, ToLoad::UNDELETED);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_until_version_, 4);
}

TEST(StreamVersionData, SpecificVersionReversed) {
Expand All @@ -24,8 +25,9 @@ TEST(StreamVersionData, SpecificVersionReversed) {
VersionQuery query_2{SpecificVersionQuery{VersionId(12)}, false};
stream_version_data.react(query_2);
ASSERT_EQ(stream_version_data.count_, 2);
ASSERT_EQ(stream_version_data.load_param_.load_type_, LoadType::LOAD_DOWNTO);
ASSERT_EQ(stream_version_data.load_param_.load_until_version_, 4);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_DOWNTO);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.to_load_, ToLoad::UNDELETED);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_until_version_, 4);
}

TEST(StreamVersionData, Timestamp) {
Expand All @@ -38,8 +40,9 @@ TEST(StreamVersionData, Timestamp) {
VersionQuery query_2{TimestampVersionQuery{timestamp(4)}, false};
stream_version_data.react(query_2);
ASSERT_EQ(stream_version_data.count_, 2);
ASSERT_EQ(stream_version_data.load_param_.load_type_, LoadType::LOAD_FROM_TIME);
ASSERT_EQ(stream_version_data.load_param_.load_from_time_, 4);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_FROM_TIME);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.to_load_, ToLoad::UNDELETED);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_from_time_, 4);
}

TEST(StreamVersionData, TimestampUnordered) {
Expand All @@ -54,8 +57,9 @@ TEST(StreamVersionData, TimestampUnordered) {
VersionQuery query_3{TimestampVersionQuery{timestamp(4)}, false};
stream_version_data.react(query_3);
ASSERT_EQ(stream_version_data.count_, 3);
ASSERT_EQ(stream_version_data.load_param_.load_type_, LoadType::LOAD_FROM_TIME);
ASSERT_EQ(stream_version_data.load_param_.load_from_time_, 3);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_FROM_TIME);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.to_load_, ToLoad::UNDELETED);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_from_time_, 3);
}

TEST(StreamVersionData, Latest) {
Expand All @@ -66,8 +70,9 @@ TEST(StreamVersionData, Latest) {
VersionQuery query_1{std::monostate{}, false};
stream_version_data.react(query_1);
ASSERT_EQ(stream_version_data.count_, 1);
ASSERT_EQ(stream_version_data.load_param_.load_type_, LoadType::LOAD_LATEST_UNDELETED);
ASSERT_EQ(stream_version_data.load_param_.load_until_version_.has_value(), false);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_LATEST);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.to_load_, ToLoad::UNDELETED);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_until_version_.has_value(), false);
}

TEST(StreamVersionData, SpecificToTimestamp) {
Expand All @@ -80,9 +85,10 @@ TEST(StreamVersionData, SpecificToTimestamp) {
VersionQuery query_2{TimestampVersionQuery{timestamp(3)}, false};
stream_version_data.react(query_2);
ASSERT_EQ(stream_version_data.count_, 2);
ASSERT_EQ(stream_version_data.load_param_.load_type_, LoadType::LOAD_UNDELETED);
ASSERT_EQ(stream_version_data.load_param_.load_until_version_.has_value(), false);
ASSERT_EQ(stream_version_data.load_param_.load_from_time_.has_value(), false);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_ALL);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.to_load_, ToLoad::UNDELETED);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_until_version_.has_value(), false);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_from_time_.has_value(), false);
}

TEST(StreamVersionData, TimestampToSpecific) {
Expand All @@ -95,7 +101,8 @@ TEST(StreamVersionData, TimestampToSpecific) {
VersionQuery query_2{SpecificVersionQuery{VersionId(12)}, false};
stream_version_data.react(query_2);
ASSERT_EQ(stream_version_data.count_, 2);
ASSERT_EQ(stream_version_data.load_param_.load_type_, LoadType::LOAD_UNDELETED);
ASSERT_EQ(stream_version_data.load_param_.load_until_version_.has_value(), false);
ASSERT_EQ(stream_version_data.load_param_.load_from_time_.has_value(), false);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_ALL);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.to_load_, ToLoad::UNDELETED);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_until_version_.has_value(), false);
ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_from_time_.has_value(), false);
}

0 comments on commit 1275829

Please sign in to comment.