Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: Fix cloning delta index when there are duplicated tuples (#9000) #9018

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,8 @@ class DeltaValueReader

bool shouldPlace(
const DMContext & context,
DeltaIndexPtr my_delta_index,
size_t placed_rows,
size_t placed_delete_ranges,
const RowKeyRange & segment_range,
const RowKeyRange & relevant_range,
UInt64 max_version);
Expand Down
27 changes: 15 additions & 12 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ size_t DeltaValueReader::readRows(
//
// So here, we should filter out those out-of-range rows.

auto mem_table_rows_offset = delta_snap->getMemTableSetRowsOffset();
auto total_delta_rows = delta_snap->getRows();
const auto mem_table_rows_offset = delta_snap->getMemTableSetRowsOffset();
const auto total_delta_rows = delta_snap->getRows();

auto persisted_files_start = std::min(offset, mem_table_rows_offset);
auto persisted_files_end = std::min(offset + limit, mem_table_rows_offset);
auto mem_table_start = offset <= mem_table_rows_offset
const auto persisted_files_start = std::min(offset, mem_table_rows_offset);
const auto persisted_files_end = std::min(offset + limit, mem_table_rows_offset);
const auto mem_table_start = offset <= mem_table_rows_offset
? 0
: std::min(offset - mem_table_rows_offset, total_delta_rows - mem_table_rows_offset);
auto mem_table_end = offset + limit <= mem_table_rows_offset
const auto mem_table_end = offset + limit <= mem_table_rows_offset
? 0
: std::min(offset + limit - mem_table_rows_offset, total_delta_rows - mem_table_rows_offset);

Expand All @@ -147,8 +147,12 @@ size_t DeltaValueReader::readRows(
}
if (mem_table_start < mem_table_end)
{
actual_read += mem_table_reader
->readRows(output_cols, mem_table_start, mem_table_end - mem_table_start, range, row_ids);
actual_read += mem_table_reader->readRows( //
output_cols,
mem_table_start,
mem_table_end - mem_table_start,
range,
row_ids);
}

if (row_ids != nullptr)
Expand Down Expand Up @@ -213,14 +217,13 @@ BlockOrDeletes DeltaValueReader::getPlaceItems(

bool DeltaValueReader::shouldPlace(
const DMContext & context,
DeltaIndexPtr my_delta_index,
const size_t placed_rows,
const size_t placed_delete_ranges,
const RowKeyRange & segment_range_,
const RowKeyRange & relevant_range,
UInt64 max_version)
{
auto [placed_rows, placed_delete_ranges] = my_delta_index->getPlacedStatus();

// Already placed.
// The placed_rows, placed_delete_range already contains the data in delta_snap
if (placed_rows >= delta_snap->getRows() && placed_delete_ranges == delta_snap->getDeletes())
return false;

Expand Down
28 changes: 16 additions & 12 deletions dbms/src/Storages/DeltaMerge/DeltaIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,17 @@ class DeltaIndex
}
}

DeltaIndexPtr tryCloneInner(size_t placed_deletes_limit, const Updates * updates = nullptr)
DeltaIndexPtr tryCloneInner(size_t rows_limit, size_t placed_deletes_limit, const Updates * updates = nullptr)
{
DeltaTreePtr delta_tree_copy;
size_t placed_rows_copy = 0;
size_t placed_deletes_copy = 0;
// Make sure the delta index do not place more deletes than `placed_deletes_limit`.
// Because delete ranges can break MVCC view.
{
std::scoped_lock lock(mutex);
// Safe to reuse the copy of the existing DeltaIndex
if (placed_deletes <= placed_deletes_limit)
// Make sure the MVCC view will not be broken by the mismatch of delta index and snapshot:
// - First, make sure the delta index do not place more deletes than `placed_deletes_limit`.
// - Second, make sure the snapshot includes all duplicated tuples in the delta index.
if (placed_deletes <= placed_deletes_limit && delta_tree->maxDupTupleID() < static_cast<Int64>(rows_limit))
{
delta_tree_copy = delta_tree;
placed_rows_copy = placed_rows;
Expand Down Expand Up @@ -195,8 +195,9 @@ class DeltaIndex
{
std::scoped_lock lock(mutex);

if ((maybe_advanced.placed_rows >= placed_rows && maybe_advanced.placed_deletes >= placed_deletes)
&& !(maybe_advanced.placed_rows == placed_rows && maybe_advanced.placed_deletes == placed_deletes))
if ((maybe_advanced.placed_rows >= placed_rows && maybe_advanced.placed_deletes >= placed_deletes) // advance
// not excatly the same
&& (maybe_advanced.placed_rows != placed_rows || maybe_advanced.placed_deletes != placed_deletes))
{
delta_tree = maybe_advanced.delta_tree;
placed_rows = maybe_advanced.placed_rows;
Expand All @@ -206,14 +207,17 @@ class DeltaIndex
return false;
}

DeltaIndexPtr tryClone(size_t /*rows*/, size_t deletes) { return tryCloneInner(deletes); }
/**
* Try to get a clone of current instance.
* Return an empty DeltaIndex if `deletes < this->placed_deletes` because the advanced delta-index will break
* the MVCC view.
*/
DeltaIndexPtr tryClone(size_t rows, size_t deletes) { return tryCloneInner(rows, deletes); }

DeltaIndexPtr cloneWithUpdates(const Updates & updates)
{
if (unlikely(updates.empty()))
throw Exception("Unexpected empty updates");

return tryCloneInner(updates.front().delete_ranges_offset, &updates);
RUNTIME_CHECK_MSG(!updates.empty(), "Unexpected empty updates");
return tryCloneInner(updates.front().rows_offset, updates.front().delete_ranges_offset, &updates);
}

const std::optional<Remote::RNDeltaIndexCache::CacheKey> & getRNCacheKey() const { return rn_cache_key; }
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaPlace.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,10 @@ bool placeInsert(
tuple_id = delta_value_space_offset + (offset + i);

if (dup)
{
delta_tree.addDelete(rid);
delta_tree.setMaxDupTupleID(tuple_id);
}
delta_tree.addInsert(rid, tuple_id);
}

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,7 @@ class DeltaTree
size_t num_inserts = 0;
size_t num_deletes = 0;
size_t num_entries = 0;
Int64 max_dup_tuple_id = -1;

std::unique_ptr<Allocator> allocator;
size_t bytes = 0;
Expand Down Expand Up @@ -1040,6 +1041,8 @@ class DeltaTree
size_t numEntries() const { return num_entries; }
size_t numInserts() const { return num_inserts; }
size_t numDeletes() const { return num_deletes; }
Int64 maxDupTupleID() const { return max_dup_tuple_id; }
void setMaxDupTupleID(Int64 tuple_id) { max_dup_tuple_id = std::max(tuple_id, max_dup_tuple_id); }

void addDelete(UInt64 rid);
void addInsert(UInt64 rid, UInt64 tuple_id);
Expand All @@ -1056,6 +1059,7 @@ DT_CLASS::DeltaTree(const DT_CLASS::Self & o)
, num_inserts(o.num_inserts)
, num_deletes(o.num_deletes)
, num_entries(o.num_entries)
, max_dup_tuple_id(o.max_dup_tuple_id)
, allocator(std::make_unique<Allocator>())
{
// If exception is thrown before clear copying_nodes, all nodes will be destroyed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

namespace DB::DM
{
namespace tests
{
class DeltaMergeStoreRWTest;
}
class UnorderedInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "UnorderedInputStream";
Expand Down Expand Up @@ -151,5 +155,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream
// runtime filter
std::vector<RuntimeFilterPtr> runtime_filter_list;
int max_wait_time_ms;

friend class tests::DeltaMergeStoreRWTest;
};
} // namespace DB::DM
15 changes: 12 additions & 3 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2485,7 +2485,8 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(
{
const auto & stable_snap = segment_snap->stable;
auto delta_snap = delta_reader->getDeltaSnap();
// Clone a new delta index.
// Try to clone from the sahred delta index, if it fails to reuse the shared delta index,
// it will return an empty delta index and we should place it in the following branch.
auto my_delta_index = delta_snap->getSharedDeltaIndex()->tryClone(delta_snap->getRows(), delta_snap->getDeletes());
auto my_delta_tree = my_delta_index->getDeltaTree();

Expand All @@ -2501,9 +2502,17 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(
auto [my_placed_rows, my_placed_deletes] = my_delta_index->getPlacedStatus();

// Let's do a fast check, determine whether we need to do place or not.
if (!delta_reader->shouldPlace(dm_context, my_delta_index, rowkey_range, relevant_range, max_version))
if (!delta_reader->shouldPlace( //
dm_context,
my_placed_rows,
my_placed_deletes,
rowkey_range,
relevant_range,
max_version))
{
// We can reuse the shared-delta-index
return {my_delta_index, false};

}
CurrentMetrics::Increment cur_dm_segments{CurrentMetrics::DT_PlaceIndexUpdate};
GET_METRIC(tiflash_storage_subtask_count, type_place_index_update).Increment();
Stopwatch watch;
Expand Down
Loading