Skip to content

Commit

Permalink
Support commit order deadlocks resolution in dependency replication
Browse files Browse the repository at this point in the history
Summary: This is a port of D28619160 (facebook/mysql-5.6@ce8c474)

Reviewed By: hermanlee

Differential Revision: D28974185

fbshipit-source-id: 4139ad303f8
  • Loading branch information
abhinav04sharma authored and facebook-github-bot committed Jun 9, 2021
1 parent 9fbe1c1 commit d11fc6b
Show file tree
Hide file tree
Showing 15 changed files with 259 additions and 9 deletions.
3 changes: 3 additions & 0 deletions mysql-test/include/have_mts_dependency_replication_stmt.inc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
if (`SELECT @@GLOBAL.mts_dependency_replication != 'STMT'`) {
skip Test needs to run with STMT dependency replication;
}
21 changes: 21 additions & 0 deletions mysql-test/suite/innodb/r/innodb_row_lock_wait_callback.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
create table t1 (a int primary key, b int unique, c int) engine = innodb;
insert into t1 values(1, 1, 1);
insert into t1 values(10, 10, 10);
set @@global.debug = "+d,report_row_lock_wait";
select @@tx_isolation;
@@tx_isolation
REPEATABLE-READ
begin;
delete from t1 where a > 5;
begin;
insert into t1 values(6, 6, 6);
set debug_sync="now wait_for signal.reached";
set debug_sync="now signal signal.done";
set @@global.debug = "-d,report_row_lock_wait";
rollback;
select * from t1;
a b c
1 1 1
6 6 6
10 10 10
drop table t1;
30 changes: 30 additions & 0 deletions mysql-test/suite/innodb/t/innodb_row_lock_wait_callback.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
source include/have_debug.inc;
#source include/have_innodb.inc;
source include/have_debug_sync.inc;

create table t1 (a int primary key, b int unique, c int) engine = innodb;
insert into t1 values(1, 1, 1);
insert into t1 values(10, 10, 10);
set @@global.debug = "+d,report_row_lock_wait";

select @@tx_isolation;

connect (con1, localhost, root);
begin;
delete from t1 where a > 5; # this will take a gap lock

connection default;
begin;
send insert into t1 values(6, 6, 6); # this will block on gap lock

connection con1;
set debug_sync="now wait_for signal.reached"; # callback was fired
set debug_sync="now signal signal.done";
set @@global.debug = "-d,report_row_lock_wait";
rollback;
disconnect con1;

connection default;
reap;
select * from t1;
drop table t1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
include/master-slave.inc
Warnings:
Note #### Sending passwords in plain text without SSL/TLS is extremely insecure.
Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information.
[connection master]
call mtr.add_suppression("Commit order deadlock between");
create table t1 (a int primary key, b int) engine = innodb;
insert into t1 values(3, 3);
include/sync_slave_sql_with_master.inc
stop slave;
set @@global.debug = "+d,dbug.dep_fake_gap_lock_on_insert";
begin;
insert into t1 values(1, 1);
insert into t1 values(1, 1);
insert into t1 values(2, 2);
update t1 set b = 20 where a = 2;
update t1 set b = 200 where a = 2;
update t1 set b = 30 where a = 3;
update t1 set b = 300 where a = 3;
start slave;
rollback;
include/sync_slave_sql_with_master.inc
select * from t1;
a b
1 1
2 200
3 300
stop slave;
set @@global.debug = "-d,dbug.dep_fake_gap_lock_on_insert";
start slave;
drop table t1;
include/sync_slave_sql_with_master.inc
include/rpl_end.inc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
--slave_check_before_image_consistency=ON
--slave_parallel_workers=8
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
source include/master-slave.inc;
source include/have_mts_dependency_replication_stmt.inc;
source include/have_debug.inc;

call mtr.add_suppression("Commit order deadlock between");

connection master;
create table t1 (a int primary key, b int) engine = innodb;
insert into t1 values(3, 3);
source include/sync_slave_sql_with_master.inc;

connection slave;
stop slave;
# We'll take a fake gap lock after execution of every insert event
set @@global.debug = "+d,dbug.dep_fake_gap_lock_on_insert";

# Start a trx to block the 1st insert that primary will send to create a gap in
# commit order
begin;
insert into t1 values(1, 1);

connection master;
insert into t1 values(1, 1); # this will be blocked by trx above
insert into t1 values(2, 2); # this will take the fake gap lock and wait for commit order
update t1 set b = 20 where a = 2; # this will wait for above trx due to deps
update t1 set b = 200 where a = 2; # this will wait for above trx due to deps
update t1 set b = 30 where a = 3; # this will wait for commit order
update t1 set b = 300 where a = 3; # this will wait for above trx due to deps

connection slave1;
start slave;
# Wait for the 2nd and 5th trx to start waiting for commit ordering
let $wait_condition= SELECT COUNT(*) = 2 FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE STATE LIKE "%Waiting for preceding transaction to commit%";
let $wait_timeout= 120;
source include/wait_condition.inc;

# Wait for the 3rd, 4th and 6th trx to wait for dependencies
let $wait_condition= SELECT COUNT(*) = 3 FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE STATE LIKE "%Waiting for dependencies to be satisfied%";
let $wait_timeout= 120;
source include/wait_condition.inc;

connection slave;
# unblock 1st insert trx, it'll try to lock fake gap and fire the callback
rollback;

# Wait for 2nd transcation to be retried after receiving the commit order
# deadlock signal
let $wait_condition= SELECT VARIABLE_VALUE = 1 FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Slave_commit_order_deadlocks';
let $wait_timeout= 120;
source include/wait_condition.inc;

connection master;
source include/sync_slave_sql_with_master.inc;

connection slave;
select * from t1;

stop slave;
set @@global.debug = "-d,dbug.dep_fake_gap_lock_on_insert";
start slave;

connection master;
drop table t1;
source include/sync_slave_sql_with_master.inc;

source include/rpl_end.inc;
4 changes: 2 additions & 2 deletions sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16016,7 +16016,7 @@ void Log_event::prepare_dep(Relay_log_info *rli,
get_type_str(), rli->get_group_master_log_name(),
rli->get_group_master_log_pos());

submode->dep_sync_group = true;
submode->set_dep_sync_group(true);
ev->is_begin_event = true;
}
}
Expand Down Expand Up @@ -16069,7 +16069,7 @@ void Query_log_event::prepare_dep(Relay_log_info *rli,
}
}

submode->dep_sync_group = true;
submode->set_dep_sync_group(true);
}

DBUG_VOID_RETURN;
Expand Down
38 changes: 38 additions & 0 deletions sql/mysqld.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,7 @@ double slave_high_priority_lock_wait_timeout_double = 1.0;
ulonglong slave_high_priority_lock_wait_timeout_nsec = 1.0;
std::atomic<ulonglong> slave_high_priority_ddl_executed(0);
std::atomic<ulonglong> slave_high_priority_ddl_killed_connections(0);
std::atomic<ulonglong> slave_commit_order_deadlocks(0);
bool log_datagram = false;
ulong log_datagram_usecs = 0;
int log_datagram_sock = -1;
Expand Down Expand Up @@ -9591,6 +9592,24 @@ static int show_slave_dependency_next_waits(THD *, SHOW_VAR *var, char *buff) {
return 0;
}

static int show_slave_dependency_num_syncs(THD *, SHOW_VAR *var, char *buff) {
channel_map.rdlock();
Master_info *mi = channel_map.get_default_channel_mi();

if (mi && mi->rli && mi->rli->current_mts_submode &&
is_mts_parallel_type_dependency(mi->rli)) {
var->type = SHOW_LONGLONG;
var->value = buff;
*((ulonglong *)buff) = (ulonglong) static_cast<Mts_submode_dependency *>(
mi->rli->current_mts_submode)
->num_syncs.load();
} else
var->type = SHOW_UNDEF;

channel_map.unlock();
return 0;
}

/**
After Multisource replication, this function only shows the value
of default channel.
Expand Down Expand Up @@ -9699,6 +9718,21 @@ static int show_slave_last_heartbeat(THD *thd, SHOW_VAR *var, char *buff) {
return 0;
}

static int show_slave_commit_order_deadlocks(THD *, SHOW_VAR *var, char *buff) {
channel_map.rdlock();
Master_info *mi = channel_map.get_default_channel_mi();

if (mi) {
var->type = SHOW_LONGLONG;
var->value = buff;
*((longlong *)buff) = slave_commit_order_deadlocks.load();
} else
var->type = SHOW_UNDEF;

channel_map.unlock();
return 0;
}

/**
Only for default channel. For details, refer to show_slave_running()
*/
Expand Down Expand Up @@ -10287,6 +10321,8 @@ SHOW_VAR status_vars[] = {
SHOW_SCOPE_GLOBAL},
{"Slave_retried_transactions", (char *)&show_slave_retried_trans, SHOW_FUNC,
SHOW_SCOPE_GLOBAL},
{"Slave_commit_order_deadlocks", (char *)&show_slave_commit_order_deadlocks,
SHOW_FUNC, SHOW_SCOPE_GLOBAL},
{"Slave_heartbeat_period", (char *)&show_heartbeat_period, SHOW_FUNC,
SHOW_SCOPE_GLOBAL},
{"Slave_received_heartbeats", (char *)&show_slave_received_heartbeats,
Expand All @@ -10311,6 +10347,8 @@ SHOW_VAR status_vars[] = {
SHOW_FUNC, SHOW_SCOPE_GLOBAL},
{"Slave_dependency_next_waits", (char *)&show_slave_dependency_next_waits,
SHOW_FUNC, SHOW_SCOPE_GLOBAL},
{"Slave_dependency_num_syncs", (char *)&show_slave_dependency_num_syncs,
SHOW_FUNC, SHOW_SCOPE_GLOBAL},
{"Slave_high_priority_ddl_executed",
(char *)&slave_high_priority_ddl_executed, SHOW_LONGLONG, SHOW_SCOPE_ALL},
{"Slave_high_priority_ddl_killed_connections",
Expand Down
1 change: 1 addition & 0 deletions sql/mysqld.h
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ extern ulonglong slave_high_priority_lock_wait_timeout_nsec;
extern double slave_high_priority_lock_wait_timeout_double;
extern std::atomic<ulonglong> slave_high_priority_ddl_killed_connections;
extern std::atomic<ulonglong> slave_high_priority_ddl_executed;
extern std::atomic<ulonglong> slave_commit_order_deadlocks;

extern ulonglong rbr_unsafe_queries;
extern ulong relay_io_connected;
Expand Down
16 changes: 11 additions & 5 deletions sql/rpl_mts_submode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ void Mts_submode_dependency::add_row_event(
if (unlikely(!rle->get_keys(rli, keylist) ||
keylist.size() + keys_accessed_by_group.size() >
rli->mts_dependency_max_keys)) {
dep_sync_group = true;
set_dep_sync_group(true);
keylist.clear();
keys_accessed_by_group.clear();
return;
Expand Down Expand Up @@ -1293,8 +1293,8 @@ bool Mts_submode_dependency::schedule_dep(Relay_log_info *rli, Log_event *ev) {

// we execute the trx in isolation if num_dbs is greater than one and if
// OVER_MAX_DBS_IN_EVENT_MTS is set
dep_sync_group = dep_sync_group || num_dbs == OVER_MAX_DBS_IN_EVENT_MTS ||
(dbs_accessed_by_group.size() > 1);
set_dep_sync_group(dep_sync_group || num_dbs == OVER_MAX_DBS_IN_EVENT_MTS ||
(dbs_accessed_by_group.size() > 1));
}

if (unlikely(dep_sync_group)) {
Expand Down Expand Up @@ -1335,7 +1335,7 @@ bool Mts_submode_dependency::schedule_dep(Relay_log_info *rli, Log_event *ev) {
// case: this group needs to be executed in isolation
if (unlikely(dep_sync_group && evw->is_end_event)) {
if (wait_for_workers_to_finish(rli) == -1) DBUG_RETURN(false);
dep_sync_group = false;
set_dep_sync_group(false);
}

DBUG_RETURN(true);
Expand Down Expand Up @@ -1390,13 +1390,19 @@ void Mts_submode_dependency::handle_terminal_event(
// If there are conflicts we expect row locks to kick in and in that case
// we'll automatically wait for the conflicting trx to commit.
//
// Note that this is not required for STMT mode since we'll already be
// depending on trxs based on actual row conflicts. So we depend on the end
// event directly.
//
// Why penultimate though? Why not just depend on the conflicting row event?
// This is done to support trx retries. On secondaries we're allowed to
// retry trx on temprary errors like lock wait timeouts. Depending on
// penultimate event allows the trx we depend on to retry execution,
// otherwise we'll end up taking the row lock as soon as the row we depend
// on is executed which can create deadlock if commit ordering is enabled.
auto to_add = prev_event ? prev_event : evw;
auto to_add = rli->mts_dependency_replication == DEP_RPL_TABLE && prev_event
? prev_event
: evw;
register_keys(to_add);

// update rli state
Expand Down
15 changes: 13 additions & 2 deletions sql/rpl_mts_submode.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ class Mts_submode_dependency : public Mts_submode {
/* Set of keys accessed by the group */
std::unordered_set<Dependency_key> keys_accessed_by_group;

bool dep_sync_group = false;

std::shared_ptr<Log_event_wrapper> prev_event;
std::shared_ptr<Log_event_wrapper> current_begin_event;
std::unordered_map<ulonglong, Table_map_log_event *> table_map_events;
Expand All @@ -148,6 +146,14 @@ class Mts_submode_dependency : public Mts_submode {
std::atomic<ulonglong> begin_event_waits{0};
std::atomic<ulonglong> next_event_waits{0};
std::atomic<ulonglong> num_in_flight_trx{0};
std::atomic<ulonglong> num_syncs{0};

bool dep_sync_group = false;

#ifndef DBUG_OFF
std::mutex dep_fake_gap_lock;
Slave_worker *dep_fake_gap_lock_worker = nullptr;
#endif

Mts_submode_dependency() {
type = MTS_PARALLEL_TYPE_DEPENDENCY;
Expand Down Expand Up @@ -181,6 +187,11 @@ class Mts_submode_dependency : public Mts_submode {
return nullptr;
}

void set_dep_sync_group(bool val) {
dep_sync_group = val;
if (dep_sync_group) ++num_syncs;
}

int wait_for_workers_to_finish(Relay_log_info *rli,
MY_ATTRIBUTE((unused))
Slave_worker *ignore = nullptr) {
Expand Down
28 changes: 28 additions & 0 deletions sql/rpl_rli_pdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <unordered_map>
#include <utility>

#include <../include/mysql/service_thd_engine_lock.h>
#include "lex_string.h"
#include "m_string.h"
#include "map_helpers.h"
Expand Down Expand Up @@ -2858,10 +2859,37 @@ int slave_worker_exec_single_job(Slave_worker *worker, Relay_log_info *rli,
error = worker->slave_worker_exec_event(ev);
ev_wrap->set_slave_worker(worker);

DBUG_EXECUTE_IF("dbug.dep_fake_gap_lock_on_insert", {
auto scheduler =
static_cast<Mts_submode_dependency *>(rli->current_mts_submode);
if (ev->get_type_code() == binary_log::WRITE_ROWS_EVENT) {
if (!scheduler->dep_fake_gap_lock.try_lock()) {
thd_report_row_lock_wait(thd,
scheduler->dep_fake_gap_lock_worker->info_thd);
scheduler->dep_fake_gap_lock.lock();
scheduler->dep_fake_gap_lock_worker = worker;
} else {
scheduler->dep_fake_gap_lock_worker = worker;
}
} else if (ev->ends_group() &&
worker == scheduler->dep_fake_gap_lock_worker) {
scheduler->dep_fake_gap_lock_worker = nullptr;
scheduler->dep_fake_gap_lock.unlock();
}
};);

set_timespec_nsec(&worker->ts_exec[1], 0); // pre-exec
worker->stats_exec_time +=
diff_timespec(&worker->ts_exec[1], &worker->ts_exec[0]);
if (error || worker->found_commit_order_deadlock()) {
DBUG_EXECUTE_IF("dbug.dep_fake_gap_lock_on_insert", {
auto scheduler =
static_cast<Mts_submode_dependency *>(rli->current_mts_submode);
if (worker == scheduler->dep_fake_gap_lock_worker) {
scheduler->dep_fake_gap_lock_worker = nullptr;
scheduler->dep_fake_gap_lock.unlock();
}
};);
error = worker->retry_transaction(start_relay_number, start_relay_pos,
ev_wrap->get_event_relay_log_number(),
ev_wrap->get_event_start_pos());
Expand Down

0 comments on commit d11fc6b

Please sign in to comment.