Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#9000
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
JinheLin authored and JaySon-Huang committed May 22, 2024
1 parent 7f6cd1b commit 20bedd4
Show file tree
Hide file tree
Showing 11 changed files with 553 additions and 21 deletions.
10 changes: 10 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,21 @@ class DeltaValueReader
// We use the result to update DeltaTree.
BlockOrDeletes getPlaceItems(size_t rows_begin, size_t deletes_begin, size_t rows_end, size_t deletes_end);

<<<<<<< HEAD
bool shouldPlace(const DMContext & context,
DeltaIndexPtr my_delta_index,
const RowKeyRange & segment_range,
const RowKeyRange & relevant_range,
UInt64 max_version);
=======
bool shouldPlace(
const DMContext & context,
size_t placed_rows,
size_t placed_delete_ranges,
const RowKeyRange & segment_range,
const RowKeyRange & relevant_range,
UInt64 start_ts);
>>>>>>> 8e170090fa (Storages: Fix cloning delta index when there are duplicated tuples (#9000))
};

class DeltaValueInputStream : public IBlockInputStream
Expand Down
47 changes: 45 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,50 @@ size_t DeltaValueReader::readRows(MutableColumns & output_cols, size_t offset, s
//
// So here, we should filter out those out-of-range rows.

auto mem_table_rows_offset = delta_snap->getMemTableSetRowsOffset();
auto total_delta_rows = delta_snap->getRows();
const auto mem_table_rows_offset = delta_snap->getMemTableSetRowsOffset();
const auto total_delta_rows = delta_snap->getRows();

<<<<<<< HEAD
auto persisted_files_start = std::min(offset, mem_table_rows_offset);
auto persisted_files_end = std::min(offset + limit, mem_table_rows_offset);
auto mem_table_start = offset <= mem_table_rows_offset ? 0 : std::min(offset - mem_table_rows_offset, total_delta_rows - mem_table_rows_offset);
auto mem_table_end = offset + limit <= mem_table_rows_offset ? 0 : std::min(offset + limit - mem_table_rows_offset, total_delta_rows - mem_table_rows_offset);
=======
const auto persisted_files_start = std::min(offset, mem_table_rows_offset);
const auto persisted_files_end = std::min(offset + limit, mem_table_rows_offset);
const auto mem_table_start = offset <= mem_table_rows_offset
? 0
: std::min(offset - mem_table_rows_offset, total_delta_rows - mem_table_rows_offset);
const auto mem_table_end = offset + limit <= mem_table_rows_offset
? 0
: std::min(offset + limit - mem_table_rows_offset, total_delta_rows - mem_table_rows_offset);
>>>>>>> 8e170090fa (Storages: Fix cloning delta index when there are duplicated tuples (#9000))

size_t actual_read = 0;
if (persisted_files_start < persisted_files_end)
actual_read += persisted_files_reader->readRows(output_cols, persisted_files_start, persisted_files_end - persisted_files_start, range);
if (mem_table_start < mem_table_end)
<<<<<<< HEAD
actual_read += mem_table_reader->readRows(output_cols, mem_table_start, mem_table_end - mem_table_start, range);
=======
{
actual_read += mem_table_reader->readRows( //
output_cols,
mem_table_start,
mem_table_end - mem_table_start,
range,
row_ids);
}

if (row_ids != nullptr)
{
std::transform(
row_ids->cbegin() + persisted_read_rows,
row_ids->cend(),
row_ids->begin() + persisted_read_rows, // write to the same location
[mem_table_rows_offset](UInt32 id) { return id + mem_table_rows_offset; });
}
>>>>>>> 8e170090fa (Storages: Fix cloning delta index when there are duplicated tuples (#9000))

return actual_read;
}
Expand Down Expand Up @@ -137,6 +168,7 @@ BlockOrDeletes DeltaValueReader::getPlaceItems(size_t rows_begin, size_t deletes
return res;
}

<<<<<<< HEAD
bool DeltaValueReader::shouldPlace(const DMContext & context,
DeltaIndexPtr my_delta_index,
const RowKeyRange & segment_range_,
Expand All @@ -146,6 +178,17 @@ bool DeltaValueReader::shouldPlace(const DMContext & context,
auto [placed_rows, placed_delete_ranges] = my_delta_index->getPlacedStatus();

// Already placed.
=======
bool DeltaValueReader::shouldPlace(
const DMContext & context,
const size_t placed_rows,
const size_t placed_delete_ranges,
const RowKeyRange & segment_range_,
const RowKeyRange & relevant_range,
UInt64 start_ts)
{
// The placed_rows, placed_delete_range already contains the data in delta_snap
>>>>>>> 8e170090fa (Storages: Fix cloning delta index when there are duplicated tuples (#9000))
if (placed_rows >= delta_snap->getRows() && placed_delete_ranges == delta_snap->getDeletes())
return false;

Expand Down
28 changes: 16 additions & 12 deletions dbms/src/Storages/DeltaMerge/DeltaIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ class DeltaIndex
}
}

DeltaIndexPtr tryCloneInner(size_t placed_deletes_limit, const Updates * updates = nullptr)
DeltaIndexPtr tryCloneInner(size_t rows_limit, size_t placed_deletes_limit, const Updates * updates = nullptr)
{
DeltaTreePtr delta_tree_copy;
size_t placed_rows_copy = 0;
size_t placed_deletes_copy = 0;
// Make sure the delta index do not place more deletes than `placed_deletes_limit`.
// Because delete ranges can break MVCC view.
{
std::scoped_lock lock(mutex);
// Safe to reuse the copy of the existing DeltaIndex
if (placed_deletes <= placed_deletes_limit)
// Make sure the MVCC view will not be broken by the mismatch of delta index and snapshot:
// - First, make sure the delta index do not place more deletes than `placed_deletes_limit`.
// - Second, make sure the snapshot includes all duplicated tuples in the delta index.
if (placed_deletes <= placed_deletes_limit && delta_tree->maxDupTupleID() < static_cast<Int64>(rows_limit))
{
delta_tree_copy = delta_tree;
placed_rows_copy = placed_rows;
Expand Down Expand Up @@ -186,8 +186,9 @@ class DeltaIndex
{
std::scoped_lock lock(mutex);

if ((maybe_advanced.placed_rows >= placed_rows && maybe_advanced.placed_deletes >= placed_deletes)
&& !(maybe_advanced.placed_rows == placed_rows && maybe_advanced.placed_deletes == placed_deletes))
if ((maybe_advanced.placed_rows >= placed_rows && maybe_advanced.placed_deletes >= placed_deletes) // advance
// not excatly the same
&& (maybe_advanced.placed_rows != placed_rows || maybe_advanced.placed_deletes != placed_deletes))
{
delta_tree = maybe_advanced.delta_tree;
placed_rows = maybe_advanced.placed_rows;
Expand All @@ -197,14 +198,17 @@ class DeltaIndex
return false;
}

DeltaIndexPtr tryClone(size_t /*rows*/, size_t deletes) { return tryCloneInner(deletes); }
/**
* Try to get a clone of current instance.
* Return an empty DeltaIndex if `deletes < this->placed_deletes` because the advanced delta-index will break
* the MVCC view.
*/
DeltaIndexPtr tryClone(size_t rows, size_t deletes) { return tryCloneInner(rows, deletes); }

DeltaIndexPtr cloneWithUpdates(const Updates & updates)
{
if (unlikely(updates.empty()))
throw Exception("Unexpected empty updates");

return tryCloneInner(updates.front().delete_ranges_offset, &updates);
RUNTIME_CHECK_MSG(!updates.empty(), "Unexpected empty updates");
return tryCloneInner(updates.front().rows_offset, updates.front().delete_ranges_offset, &updates);
}
};

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaPlace.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,10 @@ bool placeInsert(const SkippableBlockInputStreamPtr & stable, //
tuple_id = delta_value_space_offset + (offset + i);

if (dup)
{
delta_tree.addDelete(rid);
delta_tree.setMaxDupTupleID(tuple_id);
}
delta_tree.addInsert(rid, tuple_id);
}

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ class DeltaTree
size_t num_inserts = 0;
size_t num_deletes = 0;
size_t num_entries = 0;
Int64 max_dup_tuple_id = -1;

std::unique_ptr<Allocator> allocator;
size_t bytes = 0;
Expand Down Expand Up @@ -989,6 +990,8 @@ class DeltaTree
size_t numEntries() const { return num_entries; }
size_t numInserts() const { return num_inserts; }
size_t numDeletes() const { return num_deletes; }
Int64 maxDupTupleID() const { return max_dup_tuple_id; }
void setMaxDupTupleID(Int64 tuple_id) { max_dup_tuple_id = std::max(tuple_id, max_dup_tuple_id); }

void addDelete(UInt64 rid);
void addInsert(UInt64 rid, UInt64 tuple_id);
Expand All @@ -1005,6 +1008,7 @@ DT_CLASS::DeltaTree(const DT_CLASS::Self & o)
, num_inserts(o.num_inserts)
, num_deletes(o.num_deletes)
, num_entries(o.num_entries)
, max_dup_tuple_id(o.max_dup_tuple_id)
, allocator(std::make_unique<Allocator>())
{
// If exception is thrown before clear copying_nodes, all nodes will be destroyed.
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ extern const char pause_when_reading_from_dt_stream[];

namespace DB::DM
{
namespace tests
{
class DeltaMergeStoreRWTest;
}
class UnorderedInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "UnorderedInputStream";
Expand Down Expand Up @@ -140,6 +144,15 @@ class UnorderedInputStream : public IProfilingBlockInputStream
LoggerPtr log;
int64_t ref_no;
size_t total_rows = 0;
<<<<<<< HEAD
bool task_pool_added;
=======

// runtime filter
std::vector<RuntimeFilterPtr> runtime_filter_list;
int max_wait_time_ms;

friend class tests::DeltaMergeStoreRWTest;
>>>>>>> 8e170090fa (Storages: Fix cloning delta index when there are duplicated tuples (#9000))
};
} // namespace DB::DM
15 changes: 14 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1998,7 +1998,8 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(const DMContext & dm_context
UInt64 max_version) const
{
auto delta_snap = delta_reader->getDeltaSnap();
// Clone a new delta index.
// Try to clone from the sahred delta index, if it fails to reuse the shared delta index,
// it will return an empty delta index and we should place it in the following branch.
auto my_delta_index = delta_snap->getSharedDeltaIndex()->tryClone(delta_snap->getRows(), delta_snap->getDeletes());
auto my_delta_tree = my_delta_index->getDeltaTree();

Expand All @@ -2014,7 +2015,19 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(const DMContext & dm_context
auto [my_placed_rows, my_placed_deletes] = my_delta_index->getPlacedStatus();

// Let's do a fast check, determine whether we need to do place or not.
<<<<<<< HEAD
if (!delta_reader->shouldPlace(dm_context, my_delta_index, rowkey_range, relevant_range, max_version))
=======
if (!delta_reader->shouldPlace( //
dm_context,
my_placed_rows,
my_placed_deletes,
rowkey_range,
relevant_range,
start_ts))
{
// We can reuse the shared-delta-index
>>>>>>> 8e170090fa (Storages: Fix cloning delta index when there are duplicated tuples (#9000))
return {my_delta_index, false};

CurrentMetrics::Increment cur_dm_segments{CurrentMetrics::DT_PlaceIndexUpdate};
Expand Down
Loading

0 comments on commit 20bedd4

Please sign in to comment.