Skip to content

Commit

Permalink
Bug#27839644 GLOBAL STATUS VARIABLES DRIFT AFTER ROLLBACK
Browse files Browse the repository at this point in the history
This patch addresses a problem where the status counters for a
closing connection may increase after they have been added to
the global status totals, and then decrease when the associated
THD is deleted. This temporary bump in the counters can confuse
monitoring processes.

To fix the problem, add_to_status() is moved to the end of
THD::release_resources(), ensuring that status counter
increments will be added to the global status totals.
  • Loading branch information
Christopher Powers committed Dec 2, 2018
1 parent b17a06f commit acdf282
Show file tree
Hide file tree
Showing 14 changed files with 77 additions and 45 deletions.
18 changes: 16 additions & 2 deletions sql/handler.cc
Expand Up @@ -1351,6 +1351,7 @@ int ha_prepare(THD *thd) {

while (ha_info) {
handlerton *ht = ha_info->ht();
DBUG_ASSERT(!thd->status_var_aggregated);
thd->status_var.ha_prepare_count++;
if (ht->prepare) {
DBUG_EXECUTE_IF("simulate_xa_failure_prepare", {
Expand Down Expand Up @@ -1814,6 +1815,7 @@ int ha_commit_low(THD *thd, bool all, bool run_after_commit) {
my_strerror(errbuf, MYSQL_ERRMSG_SIZE, err));
error = 1;
}
DBUG_ASSERT(!thd->status_var_aggregated);
thd->status_var.ha_commit_count++;
ha_info_next = ha_info->next();
if (restore_backup_ha_data) reattach_engine_ha_data_to_thd(thd, ht);
Expand Down Expand Up @@ -1874,6 +1876,7 @@ int ha_rollback_low(THD *thd, bool all) {
my_strerror(errbuf, MYSQL_ERRMSG_SIZE, err));
error = 1;
}
DBUG_ASSERT(!thd->status_var_aggregated);
thd->status_var.ha_rollback_count++;
ha_info_next = ha_info->next();
if (restore_backup_ha_data) reattach_engine_ha_data_to_thd(thd, ht);
Expand Down Expand Up @@ -2036,6 +2039,7 @@ int ha_commit_attachable(THD *thd) {
DBUG_ASSERT(false);
error = 1;
}
DBUG_ASSERT(!thd->status_var_aggregated);
thd->status_var.ha_commit_count++;
ha_info_next = ha_info->next();

Expand Down Expand Up @@ -2124,6 +2128,7 @@ int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv) {
my_strerror(errbuf, MYSQL_ERRMSG_SIZE, err));
error = 1;
}
DBUG_ASSERT(!thd->status_var_aggregated);
thd->status_var.ha_savepoint_rollback_count++;
if (ht->prepare == 0) trn_ctx->set_no_2pc(trx_scope, true);
}
Expand All @@ -2142,6 +2147,7 @@ int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv) {
my_strerror(errbuf, MYSQL_ERRMSG_SIZE, err));
error = 1;
}
DBUG_ASSERT(!thd->status_var_aggregated);
thd->status_var.ha_rollback_count++;
ha_info_next = ha_info->next();
ha_info->reset(); /* keep it conveniently zero-filled */
Expand Down Expand Up @@ -2180,6 +2186,7 @@ int ha_prepare_low(THD *thd, bool all) {
my_strerror(errbuf, MYSQL_ERRMSG_SIZE, err));
error = 1;
}
DBUG_ASSERT(!thd->status_var_aggregated);
thd->status_var.ha_prepare_count++;
}
DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE(););
Expand Down Expand Up @@ -2220,6 +2227,7 @@ int ha_savepoint(THD *thd, SAVEPOINT *sv) {
my_strerror(errbuf, MYSQL_ERRMSG_SIZE, err));
error = 1;
}
DBUG_ASSERT(!thd->status_var_aggregated);
thd->status_var.ha_savepoint_count++;
}
/*
Expand Down Expand Up @@ -5460,7 +5468,10 @@ static int ha_discover(THD *thd, const char *db, const char *name,
&args))
error = 0;

if (!error) thd->status_var.ha_discover_count++;
if (!error) {
DBUG_ASSERT(!thd->status_var_aggregated);
thd->status_var.ha_discover_count++;
}
DBUG_RETURN(error);
}

Expand Down Expand Up @@ -6294,7 +6305,10 @@ int DsMrr_impl::dsmrr_init(RANGE_SEQ_IF *seq_funcs, void *seq_init_param,

is_mrr_assoc = !(mode & HA_MRR_NO_ASSOCIATION);

if (is_mrr_assoc) table->in_use->status_var.ha_multi_range_read_init_count++;
if (is_mrr_assoc) {
DBUG_ASSERT(!thd->status_var_aggregated);
table->in_use->status_var.ha_multi_range_read_init_count++;
}

rowids_buf_end = buf->buffer_end;
elem_size = h->ref_length + (int)is_mrr_assoc * sizeof(void *);
Expand Down
13 changes: 7 additions & 6 deletions sql/mysqld.cc
Expand Up @@ -1478,7 +1478,8 @@ ulong sql_rnd_with_mutex() {
return tmp;
}

struct System_status_var *get_thd_status_var(THD *thd) {
struct System_status_var *get_thd_status_var(THD *thd, bool *aggregated) {
*aggregated = thd->status_var_aggregated;
return &thd->status_var;
}

Expand Down Expand Up @@ -10128,11 +10129,11 @@ class Reset_thd_status : public Do_THD_Impl {
public:
Reset_thd_status() {}
virtual void operator()(THD *thd) {
/*
Add thread's status variabes to global status
and reset thread's status variables.
*/
add_to_status(&global_status_var, &thd->status_var, true);
/* Update the global status if not done so already. */
if (!thd->status_var_aggregated) {
add_to_status(&global_status_var, &thd->status_var);
}
reset_system_status_vars(&thd->status_var);
}
};

Expand Down
2 changes: 1 addition & 1 deletion sql/mysqld.h
Expand Up @@ -120,7 +120,7 @@ void refresh_status();
bool is_secure_file_path(const char *path);
ulong sql_rnd_with_mutex();

struct System_status_var *get_thd_status_var(THD *thd);
struct System_status_var *get_thd_status_var(THD *thd, bool *aggregated);

// These are needed for unit testing.
void set_remaining_args(int argc, char **argv);
Expand Down
2 changes: 2 additions & 0 deletions sql/rpl_master.cc
Expand Up @@ -913,6 +913,7 @@ bool com_binlog_dump(THD *thd, char *packet, size_t packet_length) {
const uchar *packet_position = (uchar *)packet;
size_t packet_bytes_todo = packet_length;

DBUG_ASSERT(!thd->status_var_aggregated);
thd->status_var.com_other++;
thd->enable_slow_log = opt_log_slow_admin_statements;
if (check_global_access(thd, REPL_SLAVE_ACL)) DBUG_RETURN(false);
Expand Down Expand Up @@ -963,6 +964,7 @@ bool com_binlog_dump_gtid(THD *thd, char *packet, size_t packet_length) {
NULL /*no sid_lock because this is a completely local object*/);
Gtid_set slave_gtid_executed(&sid_map);

DBUG_ASSERT(!thd->status_var_aggregated);
thd->status_var.com_other++;
thd->enable_slow_log = opt_log_slow_admin_statements;
if (check_global_access(thd, REPL_SLAVE_ACL)) DBUG_RETURN(false);
Expand Down
37 changes: 23 additions & 14 deletions sql/sql_class.cc
Expand Up @@ -846,7 +846,8 @@ void THD::set_new_thread_id() {

void THD::cleanup_connection(void) {
mysql_mutex_lock(&LOCK_status);
add_to_status(&global_status_var, &status_var, true);
add_to_status(&global_status_var, &status_var);
reset_system_status_vars(&status_var);
mysql_mutex_unlock(&LOCK_status);

cleanup();
Expand Down Expand Up @@ -998,18 +999,6 @@ void THD::release_resources() {

Global_THD_manager::get_instance()->release_thread_id(m_thread_id);

mysql_mutex_lock(&LOCK_status);
add_to_status(&global_status_var, &status_var, false);
/*
Status queries after this point should not aggregate THD::status_var
since the values has been added to global_status_var.
The status values are not reset so that they can still be read
by performance schema.
*/
status_var_aggregated = true;

mysql_mutex_unlock(&LOCK_status);

/* Ensure that no one is using THD */
mysql_mutex_lock(&LOCK_thd_data);
mysql_mutex_lock(&LOCK_query_plan);
Expand Down Expand Up @@ -1057,6 +1046,13 @@ void THD::release_resources() {

if (current_thd == this) restore_globals();

mysql_mutex_lock(&LOCK_status);
/* Add thread status to the global totals. */
add_to_status(&global_status_var, &status_var);
/* Ensure that the thread status is not re-aggregated to the global totals. */
status_var_aggregated = true;
mysql_mutex_unlock(&LOCK_status);

m_release_resources_done = true;
}

Expand Down Expand Up @@ -1196,8 +1192,10 @@ void THD::awake(THD::killed_state state_to_set) {
/* Interrupt target waiting inside a storage engine. */
if (state_to_set != THD::NOT_KILLED) ha_kill_connection(this);

if (state_to_set == THD::KILL_TIMEOUT)
if (state_to_set == THD::KILL_TIMEOUT) {
DBUG_ASSERT(!status_var_aggregated);
status_var.max_execution_time_exceeded++;
}

/* Broadcast a condition to kick the target if it is waiting on it. */
if (is_killable) {
Expand Down Expand Up @@ -2040,69 +2038,79 @@ void THD::inc_examined_row_count(ha_rows count) {
}

void THD::inc_status_created_tmp_disk_tables() {
DBUG_ASSERT(!status_var_aggregated);
status_var.created_tmp_disk_tables++;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
PSI_STATEMENT_CALL(inc_statement_created_tmp_disk_tables)(m_statement_psi, 1);
#endif
}

void THD::inc_status_created_tmp_tables() {
DBUG_ASSERT(!status_var_aggregated);
status_var.created_tmp_tables++;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
PSI_STATEMENT_CALL(inc_statement_created_tmp_tables)(m_statement_psi, 1);
#endif
}

void THD::inc_status_select_full_join() {
DBUG_ASSERT(!status_var_aggregated);
status_var.select_full_join_count++;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
PSI_STATEMENT_CALL(inc_statement_select_full_join)(m_statement_psi, 1);
#endif
}

void THD::inc_status_select_full_range_join() {
DBUG_ASSERT(!status_var_aggregated);
status_var.select_full_range_join_count++;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
PSI_STATEMENT_CALL(inc_statement_select_full_range_join)(m_statement_psi, 1);
#endif
}

void THD::inc_status_select_range() {
DBUG_ASSERT(!status_var_aggregated);
status_var.select_range_count++;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
PSI_STATEMENT_CALL(inc_statement_select_range)(m_statement_psi, 1);
#endif
}

void THD::inc_status_select_range_check() {
DBUG_ASSERT(!status_var_aggregated);
status_var.select_range_check_count++;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
PSI_STATEMENT_CALL(inc_statement_select_range_check)(m_statement_psi, 1);
#endif
}

void THD::inc_status_select_scan() {
DBUG_ASSERT(!status_var_aggregated);
status_var.select_scan_count++;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
PSI_STATEMENT_CALL(inc_statement_select_scan)(m_statement_psi, 1);
#endif
}

void THD::inc_status_sort_merge_passes() {
DBUG_ASSERT(!status_var_aggregated);
status_var.filesort_merge_passes++;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
PSI_STATEMENT_CALL(inc_statement_sort_merge_passes)(m_statement_psi, 1);
#endif
}

void THD::inc_status_sort_range() {
DBUG_ASSERT(!status_var_aggregated);
status_var.filesort_range_count++;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
PSI_STATEMENT_CALL(inc_statement_sort_range)(m_statement_psi, 1);
#endif
}

void THD::inc_status_sort_rows(ha_rows count) {
DBUG_ASSERT(!status_var_aggregated);
status_var.filesort_rows += count;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
PSI_STATEMENT_CALL(inc_statement_sort_rows)
Expand All @@ -2111,6 +2119,7 @@ void THD::inc_status_sort_rows(ha_rows count) {
}

void THD::inc_status_sort_scan() {
DBUG_ASSERT(!status_var_aggregated);
status_var.filesort_scan_count++;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
PSI_STATEMENT_CALL(inc_statement_sort_scan)(m_statement_psi, 1);
Expand Down
1 change: 1 addition & 0 deletions sql/sql_class.h
Expand Up @@ -960,6 +960,7 @@ class THD : public MDL_context_owner,
actually reports the previous query, not itself.
*/
void save_current_query_costs() {
DBUG_ASSERT(!status_var_aggregated);
status_var.last_query_cost = m_current_query_cost;
status_var.last_query_partial_plans = m_current_query_partial_plans;
}
Expand Down
2 changes: 1 addition & 1 deletion sql/sql_show.cc
Expand Up @@ -2642,7 +2642,7 @@ class Add_status : public Do_THD_Impl {
Add_status(System_status_var *value) : m_stat_var(value) {}
virtual void operator()(THD *thd) {
if (!thd->status_var_aggregated)
add_to_status(m_stat_var, &thd->status_var, false);
add_to_status(m_stat_var, &thd->status_var);
}

private:
Expand Down
21 changes: 13 additions & 8 deletions sql/system_variables.cc
@@ -1,4 +1,4 @@
/* Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
/* Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
Expand Down Expand Up @@ -31,16 +31,14 @@
add_to_status()
to_var add to this array
from_var from this array
reset_from_var if true, then memset from_var variable with 0
NOTES
This function assumes that all variables are longlong/ulonglong.
If this assumption will change, then we have to explictely add
the other variables after the while loop
*/

void add_to_status(System_status_var *to_var, System_status_var *from_var,
bool reset_from_var) {
void add_to_status(System_status_var *to_var, System_status_var *from_var) {
int c;
ulonglong *end = (ulonglong *)((uchar *)to_var +
offsetof(System_status_var, LAST_STATUS_VAR) +
Expand All @@ -53,10 +51,6 @@ void add_to_status(System_status_var *to_var, System_status_var *from_var,

for (c = 0; c < SQLCOM_END; c++)
to_var->com_stat[(uint)c] += from_var->com_stat[(uint)c];

if (reset_from_var) {
memset(from_var, 0, sizeof(*from_var));
}
}

/*
Expand Down Expand Up @@ -89,3 +83,14 @@ void add_diff_to_status(System_status_var *to_var, System_status_var *from_var,
to_var->com_stat[(uint)c] +=
from_var->com_stat[(uint)c] - dec_var->com_stat[(uint)c];
}

/*
Reset a block of status variables.
SYNOPSIS
reset_system_status_vars
status_vars Struct of status variables to reset
*/
void reset_system_status_vars(System_status_var *status_vars) {
memset(status_vars, 0, sizeof(*status_vars));
}
5 changes: 3 additions & 2 deletions sql/system_variables.h
Expand Up @@ -477,7 +477,8 @@ const int COUNT_GLOBAL_STATUS_VARS =
void add_diff_to_status(System_status_var *to_var, System_status_var *from_var,
System_status_var *dec_var);

void add_to_status(System_status_var *to_var, System_status_var *from_var,
bool reset_from_var);
void add_to_status(System_status_var *to_var, System_status_var *from_var);

void reset_system_status_vars(System_status_var *status_vars);

#endif // SYSTEM_VARIABLES_INCLUDED
6 changes: 1 addition & 5 deletions storage/perfschema/pfs_host.cc
Expand Up @@ -271,11 +271,7 @@ void PFS_host::aggregate_memory(bool alive) {
}

void PFS_host::aggregate_status() {
/*
Aggregate STATUS_BY_HOST to:
- GLOBAL_STATUS
*/
m_status_stats.aggregate_to(&global_status_var);
/* No parent to aggregate to, clean the stats */
m_status_stats.reset();
}

Expand Down
7 changes: 6 additions & 1 deletion storage/perfschema/pfs_instr.cc
Expand Up @@ -1695,12 +1695,17 @@ void aggregate_all_memory(bool alive, PFS_memory_shared_stat *from_array,
void aggregate_thread_status(PFS_thread *thread, PFS_account *safe_account,
PFS_user *safe_user, PFS_host *safe_host) {
THD *thd = thread->m_thd;
bool aggregated = false;

if (thd == NULL) {
return;
}

System_status_var *status_var = get_thd_status_var(thd);
System_status_var *status_var = get_thd_status_var(thd, &aggregated);
if (unlikely(aggregated)) {
/* THD is being closed, status has already been aggregated. */
return;
}

if (likely(safe_account != NULL)) {
safe_account->aggregate_status_stats(status_var);
Expand Down

0 comments on commit acdf282

Please sign in to comment.