Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPP: update the state of building a hash table when createOnce throw exceptions (#4202) #4266

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(segment_merge_after_ingest_packs) \
M(force_formal_page_file_not_exists) \
M(force_legacy_or_checkpoint_page_file_exists) \
M(exception_in_creating_set_input_stream)
M(exception_in_creating_set_input_stream) \
M(exception_when_read_from_log) \
M(exception_mpp_hash_build)

#define APPLY_FOR_FAILPOINTS(M) \
M(force_set_page_file_write_errno) \
Expand Down
31 changes: 23 additions & 8 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ namespace DB
namespace FailPoints
{
extern const char exception_in_creating_set_input_stream[];
}
extern const char exception_mpp_hash_build[];
} // namespace FailPoints
namespace ErrorCodes
{
extern const int SET_SIZE_LIMIT_EXCEEDED;
Expand Down Expand Up @@ -93,9 +94,10 @@ void CreatingSetsBlockInputStream::createAll()
for (auto &elem : subqueries_for_sets)
{
if (elem.second.join)
elem.second.join->setFinishBuildTable(false);
elem.second.join->setBuildTableState(Join::BuildTableState::WAITING);
}
}
Stopwatch watch;
for (auto & subqueries_for_sets : subqueries_for_sets_list)
{
for (auto & elem : subqueries_for_sets)
Expand All @@ -115,29 +117,34 @@ void CreatingSetsBlockInputStream::createAll()
}

if (!exception_from_workers.empty())
{
LOG_ERROR(log,
"Creating all tasks of " << std::to_string(mpp_task_id) << " takes " << std::to_string(watch.elapsedSeconds())
<< " sec with exception and rethrow the first of total " << exception_from_workers.size()
<< " exceptions.");
std::rethrow_exception(exception_from_workers.front());

}
LOG_DEBUG(
log, "Creating all tasks of " << std::to_string(mpp_task_id) << " takes " << std::to_string(watch.elapsedSeconds()) << "sec. ");
created = true;
}
}

void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery, MemoryTracker * memory_tracker)
{
Stopwatch watch;
try
{

current_memory_tracker = memory_tracker;
LOG_DEBUG(log,
(subquery.set ? "Creating set. " : "")
<< (subquery.join ? "Creating join. " : "") << (subquery.table ? "Filling temporary table. " : "") << " for task "
<< std::to_string(mpp_task_id));
Stopwatch watch;

BlockOutputStreamPtr table_out;
if (subquery.table)
table_out = subquery.table->write({}, {});


bool done_with_set = !subquery.set;
bool done_with_join = !subquery.join;
bool done_with_table = !subquery.table;
Expand Down Expand Up @@ -194,7 +201,10 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery, MemoryTr


if (subquery.join)
subquery.join->setFinishBuildTable(true);
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_build);
subquery.join->setBuildTableState(Join::BuildTableState::SUCCEED);
}

if (table_out)
table_out->writeSuffix();
Expand Down Expand Up @@ -235,10 +245,15 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery, MemoryTr
LOG_DEBUG(log, "Subquery has empty result for task " << std::to_string(mpp_task_id) << ".");
}
}
catch (std::exception & e)
catch (...)
{
std::unique_lock<std::mutex> lock(exception_mutex);
exception_from_workers.push_back(std::current_exception());
if (subquery.join)
subquery.join->setBuildTableState(Join::BuildTableState::FAILED);
LOG_ERROR(log,
"task" << std::to_string(mpp_task_id) << " throw exception: " << getCurrentExceptionMessage(false, true) << " In "
<< std::to_string(watch.elapsedSeconds()) << " sec. ");
}
}

Expand Down
11 changes: 6 additions & 5 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ static bool isCrossJoin(ASTTableJoin::Kind kind)
|| kind == ASTTableJoin::Kind::Cross_Right || kind == ASTTableJoin::Kind::Cross_Anti;
}


Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool use_nulls_, const SizeLimits & limits,
ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_, size_t build_concurrency_, const TiDB::TiDBCollators & collators_,
const String & left_filter_column_, const String & right_filter_column_, const String & other_filter_column_,
Expand All @@ -72,7 +71,7 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u
other_condition_ptr(other_condition_ptr_),
original_strictness(strictness),
max_block_size_for_cross_join(max_block_size_),
have_finish_build(true),
build_table_state(BuildTableState::SUCCEED),
log(&Logger::get("Join")),
limits(limits)
{
Expand All @@ -98,10 +97,10 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u
throw Exception("Not supported: non right join with right conditions");
}

void Join::setFinishBuildTable(bool finish_)
void Join::setBuildTableState(BuildTableState state_)
{
std::lock_guard<std::mutex> lk(build_table_mutex);
have_finish_build = finish_;
build_table_state = state_;
build_table_cv.notify_all();
}

Expand Down Expand Up @@ -1517,7 +1516,9 @@ void Join::joinBlock(Block & block) const
{
std::unique_lock lk(build_table_mutex);

build_table_cv.wait(lk, [&]() { return have_finish_build; });
build_table_cv.wait(lk, [&]() { return build_table_state != BuildTableState::WAITING; });
if (build_table_state == BuildTableState::FAILED) /// throw this exception once failed to build the hash table
throw Exception("Build failed before join probe!");
}

std::shared_lock lock(rwlock);
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Interpreters/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,13 @@ class Join
bool isBuildSetExceeded() const { return build_set_exceeded.load(); }
size_t getNotJoinedStreamConcurrency() const { return build_concurrency; };

void setFinishBuildTable(bool);
enum BuildTableState
{
WAITING,
FAILED,
SUCCEED
};
void setBuildTableState(BuildTableState state_);

/// Reference to the row in block.
struct RowRef
Expand Down Expand Up @@ -446,7 +452,7 @@ class Join

mutable std::mutex build_table_mutex;
mutable std::condition_variable build_table_cv;
bool have_finish_build;
BuildTableState build_table_state;

Poco::Logger * log;

Expand Down
32 changes: 32 additions & 0 deletions tests/fullstack-test/mpp/mpp_fail.test
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,37 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Exchang
=> DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run)
=> DBGInvoke __disable_fail_point(exception_during_mpp_close_tunnel)

## exception during mpp hash build
## desc format='brief' select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id;
## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+
## | id | estRows | task | access object | operator info |
## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+
## | Projection | 0.99 | root | | test.t.id |
## | └─TableReader | 0.99 | root | | data:ExchangeSender |
## | └─ExchangeSender | 0.99 | batchCop[tiflash] | | ExchangeType: PassThrough |
## | └─HashJoin | 0.99 | batchCop[tiflash] | | inner join, equal:[eq(test.t.id, test.t.id)] |
## | ├─HashJoin(Build) | 0.99 | batchCop[tiflash] | | inner join, equal:[eq(test.t.id, test.t.id)] |
## | │ ├─ExchangeReceiver(Build) | 1.00 | batchCop[tiflash] | | |
## | │ │ └─ExchangeSender | 1.00 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] |
## | │ │ └─Selection | 1.00 | batchCop[tiflash] | | lt(test.t.id, 2), not(isnull(test.t.id)) |
## | │ │ └─TableFullScan | 3.00 | batchCop[tiflash] | table:t1 | keep order:false, stats:pseudo |
## | │ └─ExchangeReceiver(Probe) | 1.00 | batchCop[tiflash] | | |
## | │ └─ExchangeSender | 1.00 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] |
## | │ └─Selection | 1.00 | batchCop[tiflash] | | lt(test.t.id, 2), not(isnull(test.t.id)) |
## | │ └─TableFullScan | 3.00 | batchCop[tiflash] | table:t2 | keep order:false, stats:pseudo |
## | └─Projection(Probe) | 2.40 | batchCop[tiflash] | | test.t.id |
## | └─HashAgg | 2.40 | batchCop[tiflash] | | group by:test.t.id, funcs:firstrow(test.t.id)->test.t.id |
## | └─ExchangeReceiver | 2.40 | batchCop[tiflash] | | |
## | └─ExchangeSender | 2.40 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] |
## | └─HashAgg | 2.40 | batchCop[tiflash] | | group by:test.t.id, |
## | └─Selection | 3.00 | batchCop[tiflash] | | not(isnull(test.t.id)) |
## | └─TableFullScan | 3.00 | batchCop[tiflash] | table:t | keep order:false, stats:pseudo |
## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+
## ensure build1, build2-probe1, probe2 in the CreatingSets, test the bug where build1 throw exception but not change the build state, thus block the build2-probe1, at last this query hangs.
=> DBGInvoke __enable_fail_point(exception_mpp_hash_build)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; set @@tidb_broadcast_join_threshold_count=0; set @@tidb_broadcast_join_threshold_size=0; select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id;
ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Fail point FailPoints::exception_mpp_hash_build is triggered.
=> DBGInvoke __disable_fail_point(exception_mpp_hash_build)

# Clean up.
mysql> drop table if exists test.t