Skip to content

Commit

Permalink
Restore parallel replication async deadlock kill
Browse files Browse the repository at this point in the history
Restore the original code for parallel replication deadlock kill.
When a deadlock kill is detected inside the storage engine, the kill
is not done immediately, to avoid calling back into the storage engine
kill_query method with various lock subsystem mutexes held. Instead the
kill is queued and done later by a slave background thread.

This avoids the need for locking hacks in kill_query, and for the
TokuDB implementation solves a problem with the fine-grained
per-locktree locking order.

With this patch, I was not able so far to reproduce neither the hangs
nor the crash/assertion seen previously, though more tests will be
meeded to say this conclusively.
  • Loading branch information
knielsen committed Aug 24, 2016
1 parent f54e35b commit b256733
Show file tree
Hide file tree
Showing 17 changed files with 203 additions and 98 deletions.
11 changes: 11 additions & 0 deletions mysql-test/suite/perfschema/r/threads_mysql.result
Expand Up @@ -44,6 +44,16 @@ processlist_info NULL
unified_parent_thread_id unified parent_thread_id
role NULL
instrumented YES
name thread/sql/slave_background
type BACKGROUND
processlist_user NULL
processlist_host NULL
processlist_db NULL
processlist_command NULL
processlist_info NULL
unified_parent_thread_id unified parent_thread_id
role NULL
instrumented YES
CREATE TEMPORARY TABLE t1 AS
SELECT thread_id FROM performance_schema.threads
WHERE name LIKE 'thread/sql%';
Expand Down Expand Up @@ -105,4 +115,5 @@ parent_thread_name child_thread_name
thread/sql/event_scheduler thread/sql/event_worker
thread/sql/main thread/sql/one_connection
thread/sql/main thread/sql/signal_handler
thread/sql/main thread/sql/slave_background
thread/sql/one_connection thread/sql/event_scheduler
30 changes: 17 additions & 13 deletions sql/mysqld.cc
Expand Up @@ -382,7 +382,7 @@ static bool binlog_format_used= false;
LEX_STRING opt_init_connect, opt_init_slave;
mysql_cond_t COND_thread_cache;
static mysql_cond_t COND_flush_thread_cache;
mysql_cond_t COND_slave_init;
mysql_cond_t COND_slave_background;
static DYNAMIC_ARRAY all_options;

/* Global variables */
Expand Down Expand Up @@ -720,7 +720,7 @@ mysql_mutex_t
LOCK_crypt,
LOCK_global_system_variables,
LOCK_user_conn, LOCK_slave_list, LOCK_active_mi,
LOCK_connection_count, LOCK_error_messages, LOCK_slave_init;
LOCK_connection_count, LOCK_error_messages, LOCK_slave_background;

mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats,
LOCK_global_table_stats, LOCK_global_index_stats;
Expand Down Expand Up @@ -902,7 +902,7 @@ PSI_mutex_key key_LOCK_gtid_waiting;

PSI_mutex_key key_LOCK_after_binlog_sync;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered,
key_LOCK_slave_init;
key_LOCK_slave_background;
PSI_mutex_key key_TABLE_SHARE_LOCK_share;

static PSI_mutex_info all_server_mutexes[]=
Expand Down Expand Up @@ -968,7 +968,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_prepare_ordered, "LOCK_prepare_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_after_binlog_sync, "LOCK_after_binlog_sync", PSI_FLAG_GLOBAL},
{ &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_slave_init, "LOCK_slave_init", PSI_FLAG_GLOBAL},
{ &key_LOCK_slave_background, "LOCK_slave_background", PSI_FLAG_GLOBAL},
{ &key_LOG_INFO_lock, "LOG_INFO::lock", 0},
{ &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL},
{ &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL},
Expand Down Expand Up @@ -1023,7 +1023,7 @@ PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
key_COND_rpl_thread_stop, key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_group_commit_orderer,
key_COND_prepare_ordered, key_COND_slave_init;
key_COND_prepare_ordered, key_COND_slave_background;
PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;

static PSI_cond_info all_server_conds[]=
Expand Down Expand Up @@ -1073,15 +1073,15 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
{ &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0},
{ &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
{ &key_COND_slave_init, "COND_slave_init", 0},
{ &key_COND_slave_background, "COND_slave_background", 0},
{ &key_COND_wait_gtid, "COND_wait_gtid", 0},
{ &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0}
};

PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_main,
key_thread_one_connection, key_thread_signal_hand,
key_thread_slave_init, key_rpl_parallel_thread;
key_thread_slave_background, key_rpl_parallel_thread;

static PSI_thread_info all_server_threads[]=
{
Expand All @@ -1107,7 +1107,7 @@ static PSI_thread_info all_server_threads[]=
{ &key_thread_main, "main", PSI_FLAG_GLOBAL},
{ &key_thread_one_connection, "one_connection", 0},
{ &key_thread_signal_hand, "signal_handler", PSI_FLAG_GLOBAL},
{ &key_thread_slave_init, "slave_init", PSI_FLAG_GLOBAL},
{ &key_thread_slave_background, "slave_background", PSI_FLAG_GLOBAL},
{ &key_rpl_parallel_thread, "rpl_parallel_thread", 0}
};

Expand Down Expand Up @@ -2268,8 +2268,8 @@ static void clean_up_mutexes()
mysql_cond_destroy(&COND_prepare_ordered);
mysql_mutex_destroy(&LOCK_after_binlog_sync);
mysql_mutex_destroy(&LOCK_commit_ordered);
mysql_mutex_destroy(&LOCK_slave_init);
mysql_cond_destroy(&COND_slave_init);
mysql_mutex_destroy(&LOCK_slave_background);
mysql_cond_destroy(&COND_slave_background);
DBUG_VOID_RETURN;
}

Expand Down Expand Up @@ -4647,9 +4647,9 @@ static int init_thread_environment()
MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered,
MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_slave_init, &LOCK_slave_init,
mysql_mutex_init(key_LOCK_slave_background, &LOCK_slave_background,
MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_slave_init, &COND_slave_init, NULL);
mysql_cond_init(key_COND_slave_background, &COND_slave_background, NULL);

#ifdef HAVE_OPENSSL
mysql_mutex_init(key_LOCK_des_key_file,
Expand Down Expand Up @@ -10139,6 +10139,8 @@ PSI_stage_info stage_waiting_for_rpl_thread_pool= { 0, "Waiting while replicatio
PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0};
PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0};
PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0};
PSI_stage_info stage_slave_background_process_request= { 0, "Processing requests", 0};
PSI_stage_info stage_slave_background_wait_request= { 0, "Waiting for requests", 0};

#ifdef HAVE_PSI_INTERFACE

Expand Down Expand Up @@ -10268,7 +10270,9 @@ PSI_stage_info *all_server_stages[]=
& stage_waiting_to_get_readlock,
& stage_master_gtid_wait_primary,
& stage_master_gtid_wait,
& stage_gtid_wait_other_connection
& stage_gtid_wait_other_connection,
& stage_slave_background_process_request,
& stage_slave_background_wait_request
};

PSI_socket_key key_socket_tcpip, key_socket_unix, key_socket_client_connection;
Expand Down
10 changes: 6 additions & 4 deletions sql/mysqld.h
Expand Up @@ -341,8 +341,8 @@ extern PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;

extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_kill_server, key_thread_main,
key_thread_one_connection, key_thread_signal_hand, key_thread_slave_init,
key_rpl_parallel_thread;
key_thread_one_connection, key_thread_signal_hand,
key_thread_slave_background, key_rpl_parallel_thread;

extern PSI_file_key key_file_binlog, key_file_binlog_index, key_file_casetest,
key_file_dbopt, key_file_des_key_file, key_file_ERRMSG, key_select_to_file,
Expand Down Expand Up @@ -493,6 +493,8 @@ extern PSI_stage_info stage_waiting_for_rpl_thread_pool;
extern PSI_stage_info stage_master_gtid_wait_primary;
extern PSI_stage_info stage_master_gtid_wait;
extern PSI_stage_info stage_gtid_wait_other_connection;
extern PSI_stage_info stage_slave_background_process_request;
extern PSI_stage_info stage_slave_background_wait_request;

#ifdef HAVE_PSI_STATEMENT_INTERFACE
/**
Expand Down Expand Up @@ -561,7 +563,7 @@ extern mysql_mutex_t
LOCK_slave_list, LOCK_active_mi, LOCK_manager,
LOCK_global_system_variables, LOCK_user_conn,
LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count,
LOCK_slave_init;
LOCK_slave_background;
extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count;
#ifdef HAVE_OPENSSL
extern char* des_key_file;
Expand All @@ -573,7 +575,7 @@ extern mysql_rwlock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave;
extern mysql_rwlock_t LOCK_system_variables_hash;
extern mysql_cond_t COND_thread_count;
extern mysql_cond_t COND_manager;
extern mysql_cond_t COND_slave_init;
extern mysql_cond_t COND_slave_background;
extern int32 thread_running;
extern int32 thread_count, service_thread_count;

Expand Down
2 changes: 0 additions & 2 deletions sql/rpl_parallel.cc
Expand Up @@ -654,7 +654,6 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
Format_description_log_event *description_event= NULL;

do_retry:
fprintf(stderr, "TODO2: retry_event_group(GTID %u-%u-%lu thd %p) retry %lu\n", rgi->current_gtid.domain_id, rgi->current_gtid.server_id, (ulong)rgi->current_gtid.seq_no, thd, retries);

event_count= 0;
err= 0;
Expand Down Expand Up @@ -1173,7 +1172,6 @@ handle_rpl_parallel_thread(void *arg)
signal_error_to_sql_driver_thread(thd, rgi, 1);
}

fprintf(stderr, "TODO1 THD %p new GTID %u-%u-%lu\n", thd, rgi->current_gtid.domain_id, rgi->current_gtid.server_id, (ulong)rgi->current_gtid.seq_no);
}

group_rgi= rgi;
Expand Down
132 changes: 111 additions & 21 deletions sql/slave.cc
Expand Up @@ -283,21 +283,30 @@ static void init_slave_psi_keys(void)
#endif /* HAVE_PSI_INTERFACE */


static bool slave_init_thread_running;
static bool slave_background_thread_running;
static bool slave_background_thread_stop;
static bool slave_background_thread_gtid_loaded;

struct slave_background_kill_t {
slave_background_kill_t *next;
THD *to_kill;
} *slave_background_kill_list;


pthread_handler_t
handle_slave_init(void *arg __attribute__((unused)))
handle_slave_background(void *arg __attribute__((unused)))
{
THD *thd;
PSI_stage_info old_stage;
bool stop;

my_thread_init();
thd= new THD;
thd->thread_stack= (char*) &thd; /* Set approximate stack start */
mysql_mutex_lock(&LOCK_thread_count);
thd->thread_id= thread_id++;
mysql_mutex_unlock(&LOCK_thread_count);
thd->system_thread = SYSTEM_THREAD_SLAVE_INIT;
thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND;
thread_safe_increment32(&service_thread_count);
thd->store_globals();
thd->security_ctx->skip_grants();
Expand All @@ -311,49 +320,127 @@ handle_slave_init(void *arg __attribute__((unused)))
thd->get_stmt_da()->sql_errno(),
thd->get_stmt_da()->message());

mysql_mutex_lock(&LOCK_slave_background);
slave_background_thread_gtid_loaded= true;
mysql_cond_broadcast(&COND_slave_background);

THD_STAGE_INFO(thd, stage_slave_background_process_request);
do
{
slave_background_kill_t *kill_list;

thd->ENTER_COND(&COND_slave_background, &LOCK_slave_background,
&stage_slave_background_wait_request,
&old_stage);
for (;;)
{
stop= abort_loop || thd->killed || slave_background_thread_stop;
kill_list= slave_background_kill_list;
if (stop || kill_list)
break;
mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
}

slave_background_kill_list= NULL;
thd->EXIT_COND(&old_stage);

while (kill_list)
{
slave_background_kill_t *p = kill_list;
kill_list= p->next;

mysql_mutex_lock(&p->to_kill->LOCK_thd_data);
p->to_kill->awake(KILL_CONNECTION);
mysql_mutex_unlock(&p->to_kill->LOCK_thd_data);
my_free(p);
}
mysql_mutex_lock(&LOCK_slave_background);
} while (!stop);

slave_background_thread_running= false;
mysql_cond_broadcast(&COND_slave_background);
mysql_mutex_unlock(&LOCK_slave_background);

delete thd;
thread_safe_decrement32(&service_thread_count);
signal_thd_deleted();
my_thread_end();

mysql_mutex_lock(&LOCK_slave_init);
slave_init_thread_running= false;
mysql_cond_broadcast(&COND_slave_init);
mysql_mutex_unlock(&LOCK_slave_init);

return 0;
}



void
slave_background_kill_request(THD *to_kill)
{
slave_background_kill_t *p=
(slave_background_kill_t *)my_malloc(sizeof(*p), MYF(MY_WME));
if (p)
{
p->to_kill= to_kill;
to_kill->rgi_slave->killed_for_retry= true;
mysql_mutex_lock(&LOCK_slave_background);
p->next= slave_background_kill_list;
slave_background_kill_list= p;
mysql_mutex_unlock(&LOCK_slave_background);
mysql_cond_signal(&COND_slave_background);
}
}


/*
Start the slave init thread.
Start the slave background thread.
This thread is used to load the GTID state from mysql.gtid_slave_pos at
server start; reading from table requires valid THD, which is otherwise not
available during server init.
This thread is currently used for two purposes:
1. To load the GTID state from mysql.gtid_slave_pos at server start; reading
from table requires valid THD, which is otherwise not available during
server init.
2. To kill worker thread transactions during parallel replication, when a
storage engine attempts to take an errorneous conflicting lock that would
cause a deadlock. Killing is done asynchroneously, as the kill may not
be safe within the context of a callback from inside storage engine
locking code.
*/
static int
run_slave_init_thread()
start_slave_background_thread()
{
pthread_t th;

slave_init_thread_running= true;
if (mysql_thread_create(key_thread_slave_init, &th, &connection_attrib,
handle_slave_init, NULL))
slave_background_thread_running= true;
slave_background_thread_stop= false;
slave_background_thread_gtid_loaded= false;
if (mysql_thread_create(key_thread_slave_background,
&th, &connection_attrib, handle_slave_background,
NULL))
{
sql_print_error("Failed to create thread while initialising slave");
return 1;
}

mysql_mutex_lock(&LOCK_slave_init);
while (slave_init_thread_running)
mysql_cond_wait(&COND_slave_init, &LOCK_slave_init);
mysql_mutex_unlock(&LOCK_slave_init);
mysql_mutex_lock(&LOCK_slave_background);
while (!slave_background_thread_gtid_loaded)
mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
mysql_mutex_unlock(&LOCK_slave_background);

return 0;
}


static void
stop_slave_background_thread()
{
mysql_mutex_lock(&LOCK_slave_background);
slave_background_thread_stop= true;
mysql_cond_broadcast(&COND_slave_background);
while (slave_background_thread_running)
mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
mysql_mutex_unlock(&LOCK_slave_background);
}


/* Initialize slave structures */

int init_slave()
Expand All @@ -365,7 +452,7 @@ int init_slave()
init_slave_psi_keys();
#endif

if (run_slave_init_thread())
if (start_slave_background_thread())
return 1;

if (global_rpl_thread_pool.init(opt_slave_parallel_threads))
Expand Down Expand Up @@ -1004,6 +1091,9 @@ void end_slave()
master_info_index= 0;
active_mi= 0;
mysql_mutex_unlock(&LOCK_active_mi);

stop_slave_background_thread();

global_rpl_thread_pool.destroy();
free_all_rpl_filters();
DBUG_VOID_RETURN;
Expand Down
1 change: 1 addition & 0 deletions sql/slave.h
Expand Up @@ -250,6 +250,7 @@ pthread_handler_t handle_slave_io(void *arg);
void slave_output_error_info(rpl_group_info *rgi, THD *thd);
pthread_handler_t handle_slave_sql(void *arg);
bool net_request_file(NET* net, const char* fname);
void slave_background_kill_request(THD *to_kill);

extern bool volatile abort_loop;
extern Master_info *active_mi; /* active_mi for multi-master */
Expand Down

0 comments on commit b256733

Please sign in to comment.