Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: Fix cloning delta index when there are duplicated tuples (#9000) #9017

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ size_t DeltaValueReader::readRows(MutableColumns & output_cols, size_t offset, s
//
// So here, we should filter out those out-of-range rows.

auto mem_table_rows_offset = delta_snap->getMemTableSetRowsOffset();
auto total_delta_rows = delta_snap->getRows();
const auto mem_table_rows_offset = delta_snap->getMemTableSetRowsOffset();
const auto total_delta_rows = delta_snap->getRows();

auto persisted_files_start = std::min(offset, mem_table_rows_offset);
auto persisted_files_end = std::min(offset + limit, mem_table_rows_offset);
Expand Down
28 changes: 16 additions & 12 deletions dbms/src/Storages/DeltaMerge/DeltaIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ class DeltaIndex
}
}

DeltaIndexPtr tryCloneInner(size_t placed_deletes_limit, const Updates * updates = nullptr)
DeltaIndexPtr tryCloneInner(size_t rows_limit, size_t placed_deletes_limit, const Updates * updates = nullptr)
{
DeltaTreePtr delta_tree_copy;
size_t placed_rows_copy = 0;
size_t placed_deletes_copy = 0;
// Make sure the delta index do not place more deletes than `placed_deletes_limit`.
// Because delete ranges can break MVCC view.
{
std::scoped_lock lock(mutex);
// Safe to reuse the copy of the existing DeltaIndex
if (placed_deletes <= placed_deletes_limit)
// Make sure the MVCC view will not be broken by the mismatch of delta index and snapshot:
// - First, make sure the delta index do not place more deletes than `placed_deletes_limit`.
// - Second, make sure the snapshot includes all duplicated tuples in the delta index.
if (placed_deletes <= placed_deletes_limit && delta_tree->maxDupTupleID() < static_cast<Int64>(rows_limit))
{
delta_tree_copy = delta_tree;
placed_rows_copy = placed_rows;
Expand Down Expand Up @@ -186,8 +186,9 @@ class DeltaIndex
{
std::scoped_lock lock(mutex);

if ((maybe_advanced.placed_rows >= placed_rows && maybe_advanced.placed_deletes >= placed_deletes)
&& !(maybe_advanced.placed_rows == placed_rows && maybe_advanced.placed_deletes == placed_deletes))
if ((maybe_advanced.placed_rows >= placed_rows && maybe_advanced.placed_deletes >= placed_deletes) // advance
// not excatly the same
&& (maybe_advanced.placed_rows != placed_rows || maybe_advanced.placed_deletes != placed_deletes))
{
delta_tree = maybe_advanced.delta_tree;
placed_rows = maybe_advanced.placed_rows;
Expand All @@ -197,14 +198,17 @@ class DeltaIndex
return false;
}

DeltaIndexPtr tryClone(size_t /*rows*/, size_t deletes) { return tryCloneInner(deletes); }
/**
* Try to get a clone of current instance.
* Return an empty DeltaIndex if `deletes < this->placed_deletes` because the advanced delta-index will break
* the MVCC view.
*/
DeltaIndexPtr tryClone(size_t rows, size_t deletes) { return tryCloneInner(rows, deletes); }

DeltaIndexPtr cloneWithUpdates(const Updates & updates)
{
if (unlikely(updates.empty()))
throw Exception("Unexpected empty updates");

return tryCloneInner(updates.front().delete_ranges_offset, &updates);
RUNTIME_CHECK_MSG(!updates.empty(), "Unexpected empty updates");
return tryCloneInner(updates.front().rows_offset, updates.front().delete_ranges_offset, &updates);
}
};

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaPlace.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,10 @@ bool placeInsert(const SkippableBlockInputStreamPtr & stable, //
tuple_id = delta_value_space_offset + (offset + i);

if (dup)
{
delta_tree.addDelete(rid);
delta_tree.setMaxDupTupleID(tuple_id);
}
delta_tree.addInsert(rid, tuple_id);
}

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ class DeltaTree
size_t num_inserts = 0;
size_t num_deletes = 0;
size_t num_entries = 0;
Int64 max_dup_tuple_id = -1;

std::unique_ptr<Allocator> allocator;
size_t bytes = 0;
Expand Down Expand Up @@ -1016,6 +1017,8 @@ class DeltaTree
{
return num_deletes;
}
Int64 maxDupTupleID() const { return max_dup_tuple_id; }
void setMaxDupTupleID(Int64 tuple_id) { max_dup_tuple_id = std::max(tuple_id, max_dup_tuple_id); }

void addDelete(UInt64 rid);
void addInsert(UInt64 rid, UInt64 tuple_id);
Expand All @@ -1032,6 +1035,7 @@ DT_CLASS::DeltaTree(const DT_CLASS::Self & o)
, num_inserts(o.num_inserts)
, num_deletes(o.num_deletes)
, num_entries(o.num_entries)
, max_dup_tuple_id(o.max_dup_tuple_id)
, allocator(std::make_unique<Allocator>())
{
// If exception is thrown before clear copying_nodes, all nodes will be destroyed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ extern const char pause_when_reading_from_dt_stream[];

namespace DB::DM
{
namespace tests
{
class DeltaMergeStoreRWTest;
}
class UnorderedInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "UnorderedInputStream";
Expand Down Expand Up @@ -134,5 +138,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream
LoggerPtr log;
int64_t ref_no;
bool task_pool_added;

friend class tests::DeltaMergeStoreRWTest;
};
} // namespace DB::DM
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2229,7 +2229,8 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(const DMContext & dm_context
UInt64 max_version) const
{
auto delta_snap = delta_reader->getDeltaSnap();
// Clone a new delta index.
// Try to clone from the sahred delta index, if it fails to reuse the shared delta index,
// it will return an empty delta index and we should place it in the following branch.
auto my_delta_index = delta_snap->getSharedDeltaIndex()->tryClone(delta_snap->getRows(), delta_snap->getDeletes());
auto my_delta_tree = my_delta_index->getDeltaTree();

Expand Down
180 changes: 180 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <algorithm>
#include <future>
#include <iterator>
#include <memory>
#include <random>

namespace DB
Expand Down Expand Up @@ -3470,6 +3471,185 @@ try
CATCH


void DeltaMergeStoreRWTest::dupHandleVersionAndDeltaIndexAdvancedThanSnapshot()
{
auto table_column_defines = DMTestEnv::getDefaultColumns();
store = reload(table_column_defines);

auto create_block = [&](UInt64 beg, UInt64 end, UInt64 ts) {
auto block = DMTestEnv::prepareSimpleWriteBlock(beg, end, false, ts);
block.checkNumberOfRows();
return block;
};

auto write_block = [&](UInt64 beg, UInt64 end, UInt64 ts) {
auto block = create_block(beg, end, ts);
store->write(*db_context, db_context->getSettingsRef(), block);
};

auto create_stream = [&]() {
return store->read(
*db_context,
db_context->getSettingsRef(),
store->getTableColumns(),
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* start_ts= */ std::numeric_limits<UInt64>::max(),
EMPTY_FILTER,
TRACING_NAME,
/* keep_order= */ false,
/* is_fast_scan= */ false,
DEFAULT_BLOCK_SIZE)[0];
};

auto count_rows = [](BlockInputStreamPtr stream) {
std::size_t count = 0;
stream->readPrefix();
for (;;)
{
auto block = stream->read();
if (!block)
{
break;
}
count += block.rows();
}
stream->readSuffix();
return count;
};

auto get_seg_read_task = [&](BlockInputStreamPtr stream) {
auto unordered_stream = std::dynamic_pointer_cast<UnorderedInputStream>(stream);
const auto & tasks = unordered_stream->task_pool->getTasks();
RUNTIME_CHECK(tasks.size() == 1, tasks.size());
return tasks.begin()->second;
};

auto clone_delta_index = [](SegmentReadTaskPtr seg_read_task) {
auto delta_snap = seg_read_task->read_snapshot->delta;
return delta_snap->getSharedDeltaIndex()->tryClone(delta_snap->getRows(), delta_snap->getDeletes());
};

auto check_delta_index
= [](DeltaIndexPtr delta_index, size_t expect_rows, size_t expect_deletes, Int64 expect_max_dup_tuple_id) {
auto [placed_rows, placed_deletes] = delta_index->getPlacedStatus();
ASSERT_EQ(placed_rows, expect_rows);
ASSERT_EQ(placed_deletes, expect_deletes);
ASSERT_EQ(delta_index->getDeltaTree()->maxDupTupleID(), expect_max_dup_tuple_id);
};

auto ensure_place = [&](SegmentReadTaskPtr seg_read_task) {
auto pk_ver_col_defs = std::make_shared<ColumnDefines>(
ColumnDefines{getExtraHandleColumnDefine(dm_context->is_common_handle), getVersionColumnDefine()});
auto delta_reader = std::make_shared<DeltaValueReader>(
*dm_context,
seg_read_task->read_snapshot->delta,
pk_ver_col_defs,
RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));
return seg_read_task->segment->ensurePlace(
*dm_context,
seg_read_task->read_snapshot->stable,
delta_reader,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
std::numeric_limits<UInt64>::max());
};

// Write [0, 128) with ts 1 for initializing stable.
write_block(0, 128, 1);
store->mergeDeltaAll(*db_context);

// Write [50, 60) with ts 2 for initializing delta.
write_block(50, 60, 2);

// Scan table normally.
{
auto stream = create_stream();
auto count = count_rows(stream);
ASSERT_EQ(count, 128);
}

// The snapshot does not include all the duplicated tuples of the delta index.
// This snapshot should rebuild delta index for itself.
// https://github.com/pingcap/tiflash/issues/8845
{
// Create snapshot but not place index
auto stream1 = create_stream();

// !!!Duplicated!!!: Write [50, 60) with ts 2
write_block(50, 60, 2);

// Place index with newest data.
auto stream2 = create_stream();
auto count2 = count_rows(stream2);
ASSERT_EQ(count2, 128);

// stream1 should not resue delta index of stream2

// Check cloning delta index
{
auto seg_read_task = get_seg_read_task(stream1);

// Shared delta index has been placed to the newest by `count_rows(stream2)`.
auto shared_delta_index = seg_read_task->read_snapshot->delta->getSharedDeltaIndex();
check_delta_index(shared_delta_index, 20, 0, 19);

// Cannot clone delta index because it contains duplicated records in the gap of snapshot and the shared delta index.
auto cloned_delta_index = clone_delta_index(seg_read_task);
check_delta_index(cloned_delta_index, 0, 0, -1);
}
// Check scanning result of stream1
auto count1 = count_rows(stream1);
ASSERT_EQ(count1, count2);
}

// Make sure shared delta index can be reused by new snapshot
{
auto stream = create_stream();
auto seg_read_task = get_seg_read_task(stream);
auto cloned_delta_index = clone_delta_index(seg_read_task);
check_delta_index(cloned_delta_index, 20, 0, 19);
}

// The snapshot includes all the duplicated tuples of the delta index.
// Delta index can be reused safely.
{
write_block(70, 80, 2);
auto stream = create_stream();
auto seg_read_task = get_seg_read_task(stream);
auto shared_delta_index = seg_read_task->read_snapshot->delta->getSharedDeltaIndex();
check_delta_index(shared_delta_index, 20, 0, 19);
auto cloned_delta_index = clone_delta_index(seg_read_task);
check_delta_index(cloned_delta_index, 20, 0, 19);
auto [placed_delta_index, fully_indexed] = ensure_place(seg_read_task);
ASSERT_TRUE(fully_indexed);
check_delta_index(placed_delta_index, 30, 0, 19);
auto count = count_rows(stream);
ASSERT_EQ(count, 128);
}

{
write_block(75, 85, 2);
auto stream = create_stream();
auto seg_read_task = get_seg_read_task(stream);
auto shared_delta_index = seg_read_task->read_snapshot->delta->getSharedDeltaIndex();
check_delta_index(shared_delta_index, 30, 0, 19);
auto cloned_delta_index = clone_delta_index(seg_read_task);
check_delta_index(cloned_delta_index, 30, 0, 19);
auto [placed_delta_index, fully_indexed] = ensure_place(seg_read_task);
ASSERT_TRUE(fully_indexed);
check_delta_index(placed_delta_index, 40, 0, 34);
auto count = count_rows(stream);
ASSERT_EQ(count, 128);
}
}

TEST_P(DeltaMergeStoreRWTest, DupHandleVersionAndDeltaIndexAdvancedThanSnapshot)
try
{
dupHandleVersionAndDeltaIndexAdvancedThanSnapshot();
}
CATCH

} // namespace tests
} // namespace DM
} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class DeltaMergeStoreRWTest
{
TiFlashStorageTestBasic::SetUp();
store = reload();
dm_context = store->newDMContext(*db_context, db_context->getSettingsRef());
}

DeltaMergeStorePtr
Expand Down Expand Up @@ -180,8 +181,11 @@ class DeltaMergeStoreRWTest
protected:
TestMode mode;
DeltaMergeStorePtr store;
DMContextPtr dm_context;

constexpr static const char * TRACING_NAME = "DeltaMergeStoreRWTest";

void dupHandleVersionAndDeltaIndexAdvancedThanSnapshot();
};
} // namespace tests
} // namespace DM
Expand Down
Loading