Skip to content

Commit

Permalink
Add auto spill ut (#8096)
Browse files Browse the repository at this point in the history
ref #7738
  • Loading branch information
windtalker committed Sep 19, 2023
1 parent 158296d commit 32974c1
Show file tree
Hide file tree
Showing 18 changed files with 628 additions and 47 deletions.
59 changes: 31 additions & 28 deletions dbms/src/Common/FailPoint.cpp
Expand Up @@ -125,34 +125,37 @@ namespace DB
M(pause_query_init) \
M(pause_passive_flush_before_persist_region)

#define APPLY_FOR_RANDOM_FAILPOINTS(M) \
M(random_tunnel_wait_timeout_failpoint) \
M(random_tunnel_write_failpoint) \
M(random_tunnel_init_rpc_failure_failpoint) \
M(random_receiver_local_msg_push_failure_failpoint) \
M(random_receiver_sync_msg_push_failure_failpoint) \
M(random_receiver_async_msg_push_failure_failpoint) \
M(random_limit_check_failpoint) \
M(random_join_build_failpoint) \
M(random_join_prob_failpoint) \
M(random_aggregate_create_state_failpoint) \
M(random_aggregate_merge_failpoint) \
M(random_sharedquery_failpoint) \
M(random_interpreter_failpoint) \
M(random_task_manager_find_task_failure_failpoint) \
M(random_min_tso_scheduler_failpoint) \
M(random_pipeline_model_task_run_failpoint) \
M(random_pipeline_model_task_construct_failpoint) \
M(random_pipeline_model_event_schedule_failpoint) \
M(random_pipeline_model_event_finish_failpoint) \
M(random_pipeline_model_operator_run_failpoint) \
M(random_pipeline_model_cancel_failpoint) \
M(random_pipeline_model_execute_prefix_failpoint) \
M(random_pipeline_model_execute_suffix_failpoint) \
M(random_spill_to_disk_failpoint) \
M(random_restore_from_disk_failpoint) \
M(random_exception_when_connect_local_tunnel) \
M(random_exception_when_construct_async_request_handler)
#define APPLY_FOR_RANDOM_FAILPOINTS(M) \
M(random_tunnel_wait_timeout_failpoint) \
M(random_tunnel_write_failpoint) \
M(random_tunnel_init_rpc_failure_failpoint) \
M(random_receiver_local_msg_push_failure_failpoint) \
M(random_receiver_sync_msg_push_failure_failpoint) \
M(random_receiver_async_msg_push_failure_failpoint) \
M(random_limit_check_failpoint) \
M(random_join_build_failpoint) \
M(random_join_prob_failpoint) \
M(random_aggregate_create_state_failpoint) \
M(random_aggregate_merge_failpoint) \
M(random_sharedquery_failpoint) \
M(random_interpreter_failpoint) \
M(random_task_manager_find_task_failure_failpoint) \
M(random_min_tso_scheduler_failpoint) \
M(random_pipeline_model_task_run_failpoint) \
M(random_pipeline_model_task_construct_failpoint) \
M(random_pipeline_model_event_schedule_failpoint) \
M(random_pipeline_model_event_finish_failpoint) \
M(random_pipeline_model_operator_run_failpoint) \
M(random_pipeline_model_cancel_failpoint) \
M(random_pipeline_model_execute_prefix_failpoint) \
M(random_pipeline_model_execute_suffix_failpoint) \
M(random_spill_to_disk_failpoint) \
M(random_restore_from_disk_failpoint) \
M(random_exception_when_connect_local_tunnel) \
M(random_exception_when_construct_async_request_handler) \
M(random_fail_in_resize_callback) \
M(random_marked_for_auto_spill)

namespace FailPoints
{
#define M(NAME) extern const char(NAME)[] = #NAME "";
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Expand Up @@ -161,6 +161,7 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, String log_identifier, s
, warnings(max_recorded_error_count)
, warning_count(0)
{
query_operator_spill_contexts = std::make_shared<QueryOperatorSpillContexts>(MPPQueryId(0, 0, 0, 0, ""), 100);
initOutputInfo();
}

Expand Down
162 changes: 162 additions & 0 deletions dbms/src/Flash/tests/gtest_auto_spill_aggregation.cpp
@@ -0,0 +1,162 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FailPoint.h>
#include <Interpreters/Context.h>
#include <TestUtils/ColumnGenerator.h>
#include <TestUtils/ExecutorTestUtils.h>
#include <TestUtils/mockExecutor.h>

namespace DB
{
namespace FailPoints
{
extern const char random_marked_for_auto_spill[];
extern const char random_fail_in_resize_callback[];
} // namespace FailPoints

namespace tests
{
class AutoSpillAggregationTestRunner : public DB::tests::ExecutorTest
{
public:
void initializeContext() override
{
ExecutorTest::initializeContext();
dag_context_ptr->log = Logger::get("AutoSpillAggTest");
}
};

#define WRAP_FOR_SPILL_TEST_BEGIN \
std::vector<bool> pipeline_bools{false, true}; \
for (auto enable_pipeline : pipeline_bools) \
{ \
enablePipeline(enable_pipeline);

#define WRAP_FOR_SPILL_TEST_END }

/// todo add more tests
TEST_F(AutoSpillAggregationTestRunner, TriggerByRandomMarkForSpill)
try
{
DB::MockColumnInfoVec column_infos{
{"a", TiDB::TP::TypeLongLong},
{"b", TiDB::TP::TypeLongLong},
{"c", TiDB::TP::TypeLongLong},
{"d", TiDB::TP::TypeLongLong},
{"e", TiDB::TP::TypeLongLong}};
ColumnsWithTypeAndName column_datas;
size_t table_rows = 102400;
size_t duplicated_rows = 51200;
UInt64 max_block_size = 500;
size_t original_max_streams = 20;
size_t total_data_size = 0;
for (const auto & column_info : mockColumnInfosToTiDBColumnInfos(column_infos))
{
ColumnGeneratorOpts opts{
table_rows,
getDataTypeByColumnInfoForComputingLayer(column_info)->getName(),
RANDOM,
column_info.name};
column_datas.push_back(ColumnGenerator::instance().generate(opts));
total_data_size += column_datas.back().column->byteSize();
}
for (auto & column_data : column_datas)
column_data.column->assumeMutable()->insertRangeFrom(*column_data.column, 0, duplicated_rows);
context.addMockTable("spill_sort_test", "simple_table", column_infos, column_datas, 8);

auto request = context.scan("spill_sort_test", "simple_table")
.aggregation({Min(col("c")), Max(col("d")), Count(col("e"))}, {col("a"), col("b")})
.build(context);
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
/// disable spill
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(0)));
context.context->setSetting("max_memory_usage", Field(static_cast<UInt64>(0)));
enablePipeline(false);
auto ref_columns = executeStreams(request, original_max_streams);
/// enable spill
DB::FailPointHelper::enableRandomFailPoint(DB::FailPoints::random_marked_for_auto_spill, 0.5);
WRAP_FOR_SPILL_TEST_BEGIN
context.context->setSetting("group_by_two_level_threshold", Field(static_cast<UInt64>(1)));
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(1)));
context.context->setSetting("max_memory_usage", Field(static_cast<UInt64>(total_data_size * 1000)));
context.context->setSetting("auto_memory_revoke_trigger_threshold", Field(0.7));
/// don't use `executeAndAssertColumnsEqual` since it takes too long to run
/// test parallel aggregation
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreamsWithMemoryTracker(request, original_max_streams));
WRAP_FOR_SPILL_TEST_END
DB::FailPointHelper::disableFailPoint(DB::FailPoints::random_marked_for_auto_spill);
}
CATCH

TEST_F(AutoSpillAggregationTestRunner, TriggerByRandomFailInResizeCallback)
try
{
DB::MockColumnInfoVec column_infos{
{"a", TiDB::TP::TypeLongLong},
{"b", TiDB::TP::TypeLongLong},
{"c", TiDB::TP::TypeLongLong},
{"d", TiDB::TP::TypeLongLong},
{"e", TiDB::TP::TypeLongLong}};
ColumnsWithTypeAndName column_datas;
size_t table_rows = 1024000;
size_t duplicated_rows = 51200;
UInt64 max_block_size = 500;
size_t original_max_streams = 20;
size_t total_data_size = 0;
for (const auto & column_info : mockColumnInfosToTiDBColumnInfos(column_infos))
{
ColumnGeneratorOpts opts{
table_rows,
getDataTypeByColumnInfoForComputingLayer(column_info)->getName(),
RANDOM,
column_info.name};
column_datas.push_back(ColumnGenerator::instance().generate(opts));
total_data_size += column_datas.back().column->byteSize();
}
for (auto & column_data : column_datas)
column_data.column->assumeMutable()->insertRangeFrom(*column_data.column, 0, duplicated_rows);
context.addMockTable("spill_sort_test", "simple_table", column_infos, column_datas, 8);

auto request = context.scan("spill_sort_test", "simple_table")
.aggregation({Min(col("c")), Max(col("d")), Count(col("e"))}, {col("a"), col("b")})
.build(context);
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
/// disable spill
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(0)));
context.context->setSetting("max_memory_usage", Field(static_cast<UInt64>(0)));
enablePipeline(false);
auto ref_columns = executeStreams(request, original_max_streams);
/// enable spill
DB::FailPointHelper::enableRandomFailPoint(DB::FailPoints::random_fail_in_resize_callback, 0.5);
WRAP_FOR_SPILL_TEST_BEGIN
context.context->setSetting("group_by_two_level_threshold", Field(static_cast<UInt64>(1)));
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(1)));
context.context->setSetting("max_memory_usage", Field(static_cast<UInt64>(total_data_size * 1000)));
context.context->setSetting("auto_memory_revoke_trigger_threshold", Field(0.7));
/// don't use `executeAndAssertColumnsEqual` since it takes too long to run
/// test parallel aggregation
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreamsWithMemoryTracker(request, original_max_streams));
WRAP_FOR_SPILL_TEST_END
DB::FailPointHelper::disableFailPoint(DB::FailPoints::random_fail_in_resize_callback);
}
CATCH

#undef WRAP_FOR_SPILL_TEST_BEGIN
#undef WRAP_FOR_SPILL_TEST_END
#undef WRAP_FOR_AGG_PARTIAL_BLOCK_START
#undef WRAP_FOR_AGG_PARTIAL_BLOCK_END

} // namespace tests
} // namespace DB

0 comments on commit 32974c1

Please sign in to comment.