Skip to content

Commit

Permalink
max_block_size take effect on NonJoinedBlockInputStream (pingcap#…
Browse files Browse the repository at this point in the history
…6663)

ref pingcap#3436

Signed-off-by: ywqzzy <592838129@qq.com>
  • Loading branch information
windtalker authored and ywqzzy committed Feb 13, 2023
1 parent cb073aa commit ac062c6
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 36 deletions.
34 changes: 24 additions & 10 deletions dbms/src/DataStreams/NonJoinedBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct AdderNonJoined;
template <typename Mapped>
struct AdderNonJoined<ASTTableJoin::Strictness::Any, Mapped>
{
static size_t add(const Mapped & mapped, size_t key_num, size_t num_columns_left, MutableColumns & columns_left, size_t num_columns_right, MutableColumns & columns_right)
static size_t add(const Mapped & mapped, size_t key_num, size_t num_columns_left, MutableColumns & columns_left, size_t num_columns_right, MutableColumns & columns_right, const void *&, const size_t)
{
for (size_t j = 0; j < num_columns_left; ++j)
/// should fill the key column with key columns from right block
Expand All @@ -48,10 +48,13 @@ struct AdderNonJoined<ASTTableJoin::Strictness::Any, Mapped>
template <typename Mapped>
struct AdderNonJoined<ASTTableJoin::Strictness::All, Mapped>
{
static size_t add(const Mapped & mapped, size_t key_num, size_t num_columns_left, MutableColumns & columns_left, size_t num_columns_right, MutableColumns & columns_right)
static size_t add(const Mapped & mapped, size_t key_num, size_t num_columns_left, MutableColumns & columns_left, size_t num_columns_right, MutableColumns & columns_right, const void *& next_element_in_row_list, const size_t max_row_added)
{
size_t rows_added = 0;
for (auto current = &static_cast<const typename Mapped::Base_t &>(mapped); current != nullptr; current = current->next)
auto current = &static_cast<const typename Mapped::Base_t &>(mapped);
if unlikely (next_element_in_row_list != nullptr)
current = reinterpret_cast<const typename Mapped::Base_t *>(next_element_in_row_list);
for (; rows_added < max_row_added && current != nullptr; current = current->next)
{
for (size_t j = 0; j < num_columns_left; ++j)
/// should fill the key column with key columns from right block
Expand All @@ -62,8 +65,9 @@ struct AdderNonJoined<ASTTableJoin::Strictness::All, Mapped>

for (size_t j = 0; j < num_columns_right; ++j)
columns_right[j]->insertFrom(*current->block->getByPosition(key_num + j).column.get(), current->row_num);
rows_added++;
++rows_added;
}
next_element_in_row_list = current;
return rows_added;
}
};
Expand Down Expand Up @@ -216,7 +220,7 @@ size_t NonJoinedBlockInputStream::fillColumns(const Map & map,
/// first add rows that is not in the hash table
while (current_not_mapped_row != nullptr)
{
rows_added++;
++rows_added;
for (size_t j = 0; j < num_columns_left; ++j)
/// should fill the key column with key columns from right block
/// but we don't care about the key column now so just insert a default value is ok.
Expand Down Expand Up @@ -249,7 +253,7 @@ size_t NonJoinedBlockInputStream::fillColumns(const Map & map,
auto it = reinterpret_cast<typename Map::SegmentType::HashTable::const_iterator *>(position.get());
auto end = map.getSegmentTable(current_segment).end();

for (; *it != end || current_segment + step < map.getSegmentSize(); ++(*it))
for (; *it != end || current_segment + step < map.getSegmentSize();)
{
if (*it == end)
{
Expand All @@ -268,15 +272,25 @@ size_t NonJoinedBlockInputStream::fillColumns(const Map & map,
break;
}
if ((*it)->getMapped().getUsed())
{
++(*it);
continue;
}

rows_added += AdderNonJoined<STRICTNESS, typename Map::mapped_type>::add((*it)->getMapped(), key_num, num_columns_left, mutable_columns_left, num_columns_right, mutable_columns_right);

if (rows_added >= max_block_size)
rows_added += AdderNonJoined<STRICTNESS, typename Map::mapped_type>::add((*it)->getMapped(), key_num, num_columns_left, mutable_columns_left, num_columns_right, mutable_columns_right, next_element_in_row_list, max_block_size - rows_added);
assert(rows_added <= max_block_size);
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any)
{
++(*it);
break;
}
else if (next_element_in_row_list == nullptr)
{
/// next_element_in_row_list == nullptr means current row_list is done, so move the iterator
++(*it);
}

if (rows_added == max_block_size)
break;
}
return rows_added;
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/NonJoinedBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class NonJoinedBlockInputStream : public IProfilingBlockInputStream
MutableColumns columns_right;

std::unique_ptr<void, std::function<void(void *)>> position; /// type erasure
const void * next_element_in_row_list = nullptr;
size_t current_segment = 0;
Join::RowRefList * current_not_mapped_row = nullptr;

Expand Down
74 changes: 48 additions & 26 deletions dbms/src/Flash/tests/gtest_join_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,32 @@ try
ASSERT_EQ(expect[i][j], blocks[j].rows());
}
}
// test non joined data
context.addMockTable("split_test", "t3", {{"a", TiDB::TP::TypeLong}}, {toVec<Int32>("a", {2, 2, 2, 2, 2})});
context.addMockTable("split_test", "t4", {{"a", TiDB::TP::TypeLong}}, {toVec<Int32>("a", {1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3})});
request = context
.scan("split_test", "t3")
.join(context.scan("split_test", "t4"), tipb::JoinType::TypeRightOuterJoin, {col("a")})
.build(context);

expect = {{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
{2, 2, 2, 2, 2, 2, 2, 2, 2, 2},
{7, 7, 6},
{20},
{20},
{20},
{20},
{20}};
for (size_t i = 0; i < block_sizes.size(); ++i)
{
context.context.setSetting("max_block_size", Field(static_cast<UInt64>(block_sizes[i])));
auto blocks = getExecuteStreamsReturnBlocks(request);
ASSERT_EQ(expect[i].size(), blocks.size());
for (size_t j = 0; j < blocks.size(); ++j)
{
ASSERT_EQ(expect[i][j], blocks[j].rows());
}
}
}
CATCH

Expand Down Expand Up @@ -815,36 +841,34 @@ try

/// case 1, right join without right condition
auto request = context
.scan("outer_join_test", left_table_names[0])
.join(context.scan("outer_join_test", right_table_names[0]), tipb::JoinType::TypeRightOuterJoin, {col("a")})
.scan("outer_join_test", right_table_names[0])
.join(context.scan("outer_join_test", left_table_names[0]), tipb::JoinType::TypeLeftOuterJoin, {col("a")})
.project({fmt::format("{}.a", left_table_names[0]), fmt::format("{}.b", left_table_names[0]), fmt::format("{}.a", right_table_names[0]), fmt::format("{}.b", right_table_names[0])})
.build(context);
context.context.setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
/// use 1 build concurrency join 1 probe concurrency as the reference
/// use right_table left join left_table as the reference
auto ref_columns = executeStreams(request, original_max_streams);

/// case 1.1 table scan join table scan
for (size_t left_index = 0; left_index < left_table_names.size(); ++left_index)
for (auto & left_table_name : left_table_names)
{
for (size_t right_index = 0; right_index < right_table_names.size(); ++right_index)
for (auto & right_table_name : right_table_names)
{
if (left_index == 0 && right_index == 0)
continue;
request = context
.scan("outer_join_test", left_table_names[left_index])
.join(context.scan("outer_join_test", right_table_names[right_index]), tipb::JoinType::TypeRightOuterJoin, {col("a")})
.scan("outer_join_test", left_table_name)
.join(context.scan("outer_join_test", right_table_name), tipb::JoinType::TypeRightOuterJoin, {col("a")})
.build(context);
auto result_columns = executeStreams(request, original_max_streams);
ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns);
}
}
/// case 1.2 table scan join fine grained exchange receiver
for (size_t left_index = 0; left_index < left_table_names.size(); ++left_index)
for (auto & left_table_name : left_table_names)
{
for (size_t right_index = 0; right_index < right_exchange_receiver_concurrency.size(); ++right_index)
for (size_t exchange_concurrency : right_exchange_receiver_concurrency)
{
size_t exchange_concurrency = right_exchange_receiver_concurrency[right_index];
request = context
.scan("outer_join_test", left_table_names[left_index])
.scan("outer_join_test", left_table_name)
.join(context.receive(fmt::format("right_exchange_receiver_{}_concurrency", exchange_concurrency), exchange_concurrency), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {}, {}, {}, exchange_concurrency)
.build(context);
auto result_columns = executeStreams(request, original_max_streams);
Expand All @@ -853,36 +877,34 @@ try
}
/// case 2, right join with right condition
request = context
.scan("outer_join_test", left_table_names[0])
.join(context.scan("outer_join_test", right_table_names[0]), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {gt(col(right_table_names[0] + ".b"), lit(Field(static_cast<Int64>(1000))))}, {}, {}, 0)
.scan("outer_join_test", right_table_names[0])
.join(context.scan("outer_join_test", left_table_names[0]), tipb::JoinType::TypeLeftOuterJoin, {col("a")}, {gt(col(right_table_names[0] + ".b"), lit(Field(static_cast<Int64>(1000))))}, {}, {}, {}, 0)
.project({fmt::format("{}.a", left_table_names[0]), fmt::format("{}.b", left_table_names[0]), fmt::format("{}.a", right_table_names[0]), fmt::format("{}.b", right_table_names[0])})
.build(context);
context.context.setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
/// use 1 build concurrency join 1 probe concurrency as the reference
/// use right_table left join left_table as the reference
ref_columns = executeStreams(request, original_max_streams);
/// case 2.1 table scan join table scan
for (size_t left_index = 0; left_index < left_table_names.size(); ++left_index)
for (auto & left_table_name : left_table_names)
{
for (size_t right_index = 0; right_index < right_table_names.size(); ++right_index)
for (auto & right_table_name : right_table_names)
{
if (left_index == 0 && right_index == 0)
continue;
request = context
.scan("outer_join_test", left_table_names[left_index])
.join(context.scan("outer_join_test", right_table_names[right_index]), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {gt(col(right_table_names[right_index] + ".b"), lit(Field(static_cast<Int64>(1000))))}, {}, {}, 0)
.scan("outer_join_test", left_table_name)
.join(context.scan("outer_join_test", right_table_name), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {gt(col(right_table_name + ".b"), lit(Field(static_cast<Int64>(1000))))}, {}, {}, 0)
.build(context);
auto result_columns = executeStreams(request, original_max_streams);
ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns);
}
}
/// case 2.2 table scan join fine grained exchange receiver
for (size_t left_index = 0; left_index < left_table_names.size(); ++left_index)
for (auto & left_table_name : left_table_names)
{
for (size_t right_index = 0; right_index < right_exchange_receiver_concurrency.size(); ++right_index)
for (size_t exchange_concurrency : right_exchange_receiver_concurrency)
{
size_t exchange_concurrency = right_exchange_receiver_concurrency[right_index];
String exchange_name = fmt::format("right_exchange_receiver_{}_concurrency", exchange_concurrency);
request = context
.scan("outer_join_test", left_table_names[left_index])
.scan("outer_join_test", left_table_name)
.join(context.receive(fmt::format("right_exchange_receiver_{}_concurrency", exchange_concurrency), exchange_concurrency), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {gt(col(exchange_name + ".b"), lit(Field(static_cast<Int64>(1000))))}, {}, {}, exchange_concurrency)
.build(context);
auto result_columns = executeStreams(request, original_max_streams);
Expand Down

0 comments on commit ac062c6

Please sign in to comment.