Skip to content

Commit

Permalink
Storages: Fix cloning delta index when there are duplicated tuples (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 16, 2024
1 parent 29e8094 commit 28b4c22
Show file tree
Hide file tree
Showing 11 changed files with 467 additions and 39 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,8 @@ class DeltaValueReader

bool shouldPlace(
const DMContext & context,
DeltaIndexPtr my_delta_index,
size_t placed_rows,
size_t placed_delete_ranges,
const RowKeyRange & segment_range,
const RowKeyRange & relevant_range,
UInt64 max_version);
Expand Down
27 changes: 15 additions & 12 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ size_t DeltaValueReader::readRows(
//
// So here, we should filter out those out-of-range rows.

auto mem_table_rows_offset = delta_snap->getMemTableSetRowsOffset();
auto total_delta_rows = delta_snap->getRows();
const auto mem_table_rows_offset = delta_snap->getMemTableSetRowsOffset();
const auto total_delta_rows = delta_snap->getRows();

auto persisted_files_start = std::min(offset, mem_table_rows_offset);
auto persisted_files_end = std::min(offset + limit, mem_table_rows_offset);
auto mem_table_start = offset <= mem_table_rows_offset
const auto persisted_files_start = std::min(offset, mem_table_rows_offset);
const auto persisted_files_end = std::min(offset + limit, mem_table_rows_offset);
const auto mem_table_start = offset <= mem_table_rows_offset
? 0
: std::min(offset - mem_table_rows_offset, total_delta_rows - mem_table_rows_offset);
auto mem_table_end = offset + limit <= mem_table_rows_offset
const auto mem_table_end = offset + limit <= mem_table_rows_offset
? 0
: std::min(offset + limit - mem_table_rows_offset, total_delta_rows - mem_table_rows_offset);

Expand All @@ -147,8 +147,12 @@ size_t DeltaValueReader::readRows(
}
if (mem_table_start < mem_table_end)
{
actual_read += mem_table_reader
->readRows(output_cols, mem_table_start, mem_table_end - mem_table_start, range, row_ids);
actual_read += mem_table_reader->readRows( //
output_cols,
mem_table_start,
mem_table_end - mem_table_start,
range,
row_ids);
}

if (row_ids != nullptr)
Expand Down Expand Up @@ -213,14 +217,13 @@ BlockOrDeletes DeltaValueReader::getPlaceItems(

bool DeltaValueReader::shouldPlace(
const DMContext & context,
DeltaIndexPtr my_delta_index,
const size_t placed_rows,
const size_t placed_delete_ranges,
const RowKeyRange & segment_range_,
const RowKeyRange & relevant_range,
UInt64 max_version)
{
auto [placed_rows, placed_delete_ranges] = my_delta_index->getPlacedStatus();

// Already placed.
// The placed_rows, placed_delete_range already contains the data in delta_snap
if (placed_rows >= delta_snap->getRows() && placed_delete_ranges == delta_snap->getDeletes())
return false;

Expand Down
28 changes: 16 additions & 12 deletions dbms/src/Storages/DeltaMerge/DeltaIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,17 @@ class DeltaIndex
}
}

DeltaIndexPtr tryCloneInner(size_t placed_deletes_limit, const Updates * updates = nullptr)
DeltaIndexPtr tryCloneInner(size_t rows_limit, size_t placed_deletes_limit, const Updates * updates = nullptr)
{
DeltaTreePtr delta_tree_copy;
size_t placed_rows_copy = 0;
size_t placed_deletes_copy = 0;
// Make sure the delta index do not place more deletes than `placed_deletes_limit`.
// Because delete ranges can break MVCC view.
{
std::scoped_lock lock(mutex);
// Safe to reuse the copy of the existing DeltaIndex
if (placed_deletes <= placed_deletes_limit)
// Make sure the MVCC view will not be broken by the mismatch of delta index and snapshot:
// - First, make sure the delta index do not place more deletes than `placed_deletes_limit`.
// - Second, make sure the snapshot includes all duplicated tuples in the delta index.
if (placed_deletes <= placed_deletes_limit && delta_tree->maxDupTupleID() < static_cast<Int64>(rows_limit))
{
delta_tree_copy = delta_tree;
placed_rows_copy = placed_rows;
Expand Down Expand Up @@ -195,8 +195,9 @@ class DeltaIndex
{
std::scoped_lock lock(mutex);

if ((maybe_advanced.placed_rows >= placed_rows && maybe_advanced.placed_deletes >= placed_deletes)
&& !(maybe_advanced.placed_rows == placed_rows && maybe_advanced.placed_deletes == placed_deletes))
if ((maybe_advanced.placed_rows >= placed_rows && maybe_advanced.placed_deletes >= placed_deletes) // advance
// not excatly the same
&& (maybe_advanced.placed_rows != placed_rows || maybe_advanced.placed_deletes != placed_deletes))
{
delta_tree = maybe_advanced.delta_tree;
placed_rows = maybe_advanced.placed_rows;
Expand All @@ -206,14 +207,17 @@ class DeltaIndex
return false;
}

DeltaIndexPtr tryClone(size_t /*rows*/, size_t deletes) { return tryCloneInner(deletes); }
/**
* Try to get a clone of current instance.
* Return an empty DeltaIndex if `deletes < this->placed_deletes` because the advanced delta-index will break
* the MVCC view.
*/
DeltaIndexPtr tryClone(size_t rows, size_t deletes) { return tryCloneInner(rows, deletes); }

DeltaIndexPtr cloneWithUpdates(const Updates & updates)
{
if (unlikely(updates.empty()))
throw Exception("Unexpected empty updates");

return tryCloneInner(updates.front().delete_ranges_offset, &updates);
RUNTIME_CHECK_MSG(!updates.empty(), "Unexpected empty updates");
return tryCloneInner(updates.front().rows_offset, updates.front().delete_ranges_offset, &updates);
}

const std::optional<Remote::RNDeltaIndexCache::CacheKey> & getRNCacheKey() const { return rn_cache_key; }
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaPlace.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,10 @@ bool placeInsert(
tuple_id = delta_value_space_offset + (offset + i);

if (dup)
{
delta_tree.addDelete(rid);
delta_tree.setMaxDupTupleID(tuple_id);
}
delta_tree.addInsert(rid, tuple_id);
}

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,7 @@ class DeltaTree
size_t num_inserts = 0;
size_t num_deletes = 0;
size_t num_entries = 0;
Int64 max_dup_tuple_id = -1;

std::unique_ptr<Allocator> allocator;
size_t bytes = 0;
Expand Down Expand Up @@ -1040,6 +1041,8 @@ class DeltaTree
size_t numEntries() const { return num_entries; }
size_t numInserts() const { return num_inserts; }
size_t numDeletes() const { return num_deletes; }
Int64 maxDupTupleID() const { return max_dup_tuple_id; }
void setMaxDupTupleID(Int64 tuple_id) { max_dup_tuple_id = std::max(tuple_id, max_dup_tuple_id); }

void addDelete(UInt64 rid);
void addInsert(UInt64 rid, UInt64 tuple_id);
Expand All @@ -1056,6 +1059,7 @@ DT_CLASS::DeltaTree(const DT_CLASS::Self & o)
, num_inserts(o.num_inserts)
, num_deletes(o.num_deletes)
, num_entries(o.num_entries)
, max_dup_tuple_id(o.max_dup_tuple_id)
, allocator(std::make_unique<Allocator>())
{
// If exception is thrown before clear copying_nodes, all nodes will be destroyed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

namespace DB::DM
{
namespace tests
{
class DeltaMergeStoreRWTest;
}
class UnorderedInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "UnorderedInputStream";
Expand Down Expand Up @@ -151,5 +155,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream
// runtime filter
std::vector<RuntimeFilterPtr> runtime_filter_list;
int max_wait_time_ms;

friend class tests::DeltaMergeStoreRWTest;
};
} // namespace DB::DM
15 changes: 12 additions & 3 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2485,7 +2485,8 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(
{
const auto & stable_snap = segment_snap->stable;
auto delta_snap = delta_reader->getDeltaSnap();
// Clone a new delta index.
// Try to clone from the sahred delta index, if it fails to reuse the shared delta index,
// it will return an empty delta index and we should place it in the following branch.
auto my_delta_index = delta_snap->getSharedDeltaIndex()->tryClone(delta_snap->getRows(), delta_snap->getDeletes());
auto my_delta_tree = my_delta_index->getDeltaTree();

Expand All @@ -2501,9 +2502,17 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(
auto [my_placed_rows, my_placed_deletes] = my_delta_index->getPlacedStatus();

// Let's do a fast check, determine whether we need to do place or not.
if (!delta_reader->shouldPlace(dm_context, my_delta_index, rowkey_range, relevant_range, max_version))
if (!delta_reader->shouldPlace( //
dm_context,
my_placed_rows,
my_placed_deletes,
rowkey_range,
relevant_range,
max_version))
{
// We can reuse the shared-delta-index
return {my_delta_index, false};

}
CurrentMetrics::Increment cur_dm_segments{CurrentMetrics::DT_PlaceIndexUpdate};
GET_METRIC(tiflash_storage_subtask_count, type_place_index_update).Increment();
Stopwatch watch;
Expand Down
Loading

0 comments on commit 28b4c22

Please sign in to comment.