Skip to content

Commit

Permalink
WL#15829: Selective offload project changes
Browse files Browse the repository at this point in the history
This worklog introduces dynamic offload of Queries to RAPID in following
ways:

When system variable rapid_use_dynamic_offload is 0/false , then we
fall back to normal cost threshold classifier, which also implies that
when use secondary engine is set to forced, eligible queries will go to
secondary engine, regardless of cost threshold or this classifier.

When rapid_use_dynamic_offload is 1/true, then we proceed with looking
for optimal execution engine for this queries, if secondary engine is
found more optimal, then query is offloaded, otherwise it is sent back
to mysql. This is handled in following scenarios:

1. Static Scenario: When there's no Change propagation or Queue on RAPID
side, this introduces decision tree which has > 85 % precision in
training which queries should be faster on mysql or which queries
should be faster on mysql, and accepts or rejects queries.  the decision
tree takes around 20-100 microseconds for fast queries, hence
minimal overhead, for bigger queries this introduces overhead of
upto maximum observed 700 microseconds, these end up with long execution
time, hence not a problem. For very fast queries, defined here by having
cost < 10 and of the form point select, dynamic offload is not applied,
since 100 % of these queries  (out of 16667 samples) are faster on
MySQL. Additionally, routing these "very fast queries" through dynamic
offload leads to performance regressions due to 3 phase optimisation.

2. Dynamic Scenario: When there's CP or queuing on RAPID, this worklog
 introduces dynamic feature normalization to factor into account
 extra catch up time RAPID needs, and factoring in that, attempts to
 verify if RAPID is still the best engine for execution. If queue is
 too long or CP is too long, this mechanism wants to progressively start
 shifting queries to mysql, moving gradually towards the heavier queries

The steps in this worklog with respect to query lifecycle in server with
secondary_engine = ON, are described below:

query
   |
Primary Tentatively optimisation -> mysql optimises for Innodb
   |
secondary_engine_pre_prepare_hook -> following Rapid function called:
   |  RapidCachePrimaryInfoAtPrimaryTentativelyStep
   |  If dynamic offload is enabled and query is not "very fast":
   |   This caches features from mysql plan in rapid_statement_context
   |   to be used for dynamic offload.
   |  If dynamic offload is disabled or the query is "very fast":
   |   This function invokes standary mysql cost threshold classifier,
   |   which decides if query needs further RAPID optimisation.
   |
   |
   |-> if returns False, then query proceeds to Innodb for execution
   |-> if returns true, step below is called
   |
 Secondary optimisation -> mysql optimises for RAPID
   |
prepare_secondary_engine -> following Rapid function is called:
   |   RapidPrepareEstimateQueryCosts
   |     In this function, Dynamic offload combines mysql plan features
   |      retrieved from rapid_statement_context
   |     and RAPID info such as rapid base table cardinality,
   |     dict encoding projection, varlen projection size, rapid queue
   |     size in to decide if query should be offloaded to RAPID.
   |
   |->if returns True, then query proceeds to Innodb for execution
   |->if returns False, step below is called
   |
optimize_secondary_engine -> following Rapid function is called
   |    RapidOptimize
   |     In this function, Dynamic offload retrieves info from
   |     rapid_statement_context and additionally looks at Change
   |     propagation lag to decide if query should be offloaded to rapid
   |
   |->if returns True, then query proceeds to Innodb for execution
   |->if returns False, then query goes to Rapid Execution.

Following new MYSQL ERR log messages are printed with this WL, when
dynamic offload is enabled, and query is not a "very fast query".

1. SelOffload allow decision 1 : as secondary not forced 1 and enable
 var value 1 and transactional enabled 1 and( big shape detected 0
  or small shape detected 1 ) inno: 10737418240 , rpd: 4294967296 ,
   no lh table: 1

   Message such as this shows if dynamic offload is used to classify
   this query or not. If not, why not, using each of the conditions.
   1 = pass, 0 = not pass.

2. myqid=65 Selective offload classifier #1#1#1
    f_mysql_total_ts_nrows <= 2105.5 : 0.173916, f_MySQLCost <=
    68.3899040222168 : 0.028218, f_count_all_base_tables = 0 ,
    f_count_ref_index_ts = 0 ,f_BaseTableSumNrows <= 278177.5 :
    0.173916 are_all_ts_index_ref = true outcome=0

   Line such as this serialises what leg of decision tree decided
   outcome of this query 0 -> back to mysql 1 -> keep on rapid.
   each leg is uniquely searchable via identifier such as #1#1#1 here.

This worklog additionally introduces python scripts to run queries on
mysql client with multiple queries and multiple dmls at once, in
various modes such as simulator mode and standard benchmark modes.

By Default this WL is enabled, but before release it will be disabled.
This is tracked via BUG#36343189 #no-close.

Perf mode unittests will be enabled on jenkins after this wl.
Further cleanup will be done via BUG#36368437 #no-close.

Bugs tackled via this WL: 	BUG#35738194, Enh#34132523, Bug#36343208

Unrelated bugs fixed: BUG#35987975

Old gerrit review : 25567 (abandoned due to 1000 update limit reached)

Change-Id: Ie5f9fdcd8b55a669d04b389d3aec5f6b33f0fe2e
  • Loading branch information
Keshav Singh authored and dahlerlend committed Mar 7, 2024
1 parent df0167f commit d4471e0
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 37 deletions.
28 changes: 25 additions & 3 deletions sql/handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -2580,9 +2580,13 @@ using se_after_commit_t = void (*)(void *arg);
using se_before_rollback_t = void (*)(void *arg);

/**
* Notify plugins when a SELECT query was executed. The plugins will be notified
* only if the query is not considered "fast-running", i.e., its estimated cost
* is less than the currently configured 'secondary_engine_cost_threshold'.
Notify plugins when a SELECT query was executed. The plugins will be notified
only if the query is not considered secondary engine relevant, i.e.:
1. for a query with missing secondary_engine_statement_ctx, its estimated cost
is greater than the currently configured 'secondary_engine_cost_threshold'
2. for queries with secondary_engine_statement_ctx, wherever
secondary_engine_statement_ctx::is_primary_engine_optimal() returns False
indicating secondary engine relevance.
*/
using notify_after_select_t = void (*)(THD *thd, SelectExecutedIn executed_in);

Expand All @@ -2592,6 +2596,19 @@ using notify_after_select_t = void (*)(THD *thd, SelectExecutedIn executed_in);
using notify_create_table_t = void (*)(struct HA_CREATE_INFO *create_info,
const char *db, const char *table_name);

/**
Secondary engine hook called after PRIMARY_TENTATIVELY optimization is
complete, and decides if secondary engine optimization will be performed, and
comparison of primary engine cost and secondary engine cost will determine
which engine to use for execution.
@param[in] thd current thd.
@return :
@retval true When secondary_engine's prepare hook is to be further called
@retval false When secondary_engine's prepare hook is NOT to be further called
*/
using secondary_engine_pre_prepare_hook_t = bool (*)(THD *thd);

/**
* Notify plugins when a table is dropped.
*/
Expand Down Expand Up @@ -2968,6 +2985,11 @@ struct handlerton {
secondary_engine_check_optimizer_request_t
secondary_engine_check_optimizer_request;

/* Pointer to a function that is called at the end of the PRIMARY_TENTATIVELY
* optimization stage, which also decides that the statement should be
* attempted offloaded to a secondary storage engine. */
secondary_engine_pre_prepare_hook_t secondary_engine_pre_prepare_hook;

se_before_commit_t se_before_commit;
se_after_commit_t se_after_commit;
se_before_rollback_t se_before_rollback;
Expand Down
3 changes: 3 additions & 0 deletions sql/sp_instr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,9 @@ bool sp_lex_instr::validate_lex_and_execute_core(THD *thd, uint *nextp,
thd->set_secondary_engine_optimization(
Secondary_engine_optimization::PRIMARY_TENTATIVELY);

auto scope_guard = create_scope_guard(
[thd] { thd->set_secondary_engine_statement_context(nullptr); });

while (true) {
DBUG_EXECUTE_IF("simulate_bug18831513", { invalidate(); });
if (is_invalid() || (m_lex->has_udf() && !m_first_execution)) {
Expand Down
7 changes: 7 additions & 0 deletions sql/sql_class.cc
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,11 @@ void THD::set_transaction(Transaction_ctx *transaction_ctx) {
m_transaction.reset(transaction_ctx);
}

void THD::set_secondary_engine_statement_context(
std::unique_ptr<Secondary_engine_statement_context> context) {
m_secondary_engine_statement_context = std::move(context);
}

bool THD::set_db(const LEX_CSTRING &new_db) {
bool result;
/*
Expand Down Expand Up @@ -1434,6 +1439,8 @@ THD::~THD() {
DBUG_TRACE;
DBUG_PRINT("info", ("THD dtor, this %p", this));

assert(m_secondary_engine_statement_context == nullptr);

if (has_incremented_gtid_automatic_count) {
gtid_state->decrease_gtid_automatic_tagged_count();
}
Expand Down
37 changes: 37 additions & 0 deletions sql/sql_class.h
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,24 @@ using Event_tracking_data =
std::pair<Event_tracking_class, Event_tracking_information *>;
using Event_tracking_data_stack = std::stack<Event_tracking_data>;

/**
Base class for secondary engine statement context objects. Secondary
storage engines may create classes derived from this one which
contain state they need to preserve in lifecycle of this query.
*/
class Secondary_engine_statement_context {
public:
/**
Destructs the secondary engine statement context object. It is
called after the query execution has completed. Secondary engines
may override the destructor in subclasses and add code that
performs cleanup tasks that are needed after query execution.
*/
virtual ~Secondary_engine_statement_context() = default;

virtual bool is_primary_engine_optimal() const { return true; }
};

/**
@class THD
For each client connection we create a separate thread with THD serving as
Expand Down Expand Up @@ -1046,6 +1064,12 @@ class THD : public MDL_context_owner,
*/
String m_rewritten_query;

/**
Current query's secondary engine statement context.
*/
std::unique_ptr<Secondary_engine_statement_context>
m_secondary_engine_statement_context;

public:
/* Used to execute base64 coded binlog events in MySQL server */
Relay_log_info *rli_fake;
Expand All @@ -1071,6 +1095,18 @@ class THD : public MDL_context_owner,
*/
void rpl_detach_engine_ha_data();

/*
Set secondary_engine_statement_context to new context.
This function assumes existing m_secondary_engine_statement_context is empty,
such that there's only context throughout the query's lifecycle.
*/
void set_secondary_engine_statement_context(
std::unique_ptr<Secondary_engine_statement_context> context);

Secondary_engine_statement_context *secondary_engine_statement_context() {
return m_secondary_engine_statement_context.get();
}

/**
When the thread is a binlog or slave applier it reattaches the engine
ha_data associated with it and memorizes the fact of that.
Expand Down Expand Up @@ -1126,6 +1162,7 @@ class THD : public MDL_context_owner,
@sa system_status_var::last_query_cost
*/
double m_current_query_cost;

/**
Current query partial plans.
@sa system_status_var::last_query_partial_plans
Expand Down
5 changes: 1 addition & 4 deletions sql/sql_cmd_ddl_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,7 @@ static bool populate_table(THD *thd, LEX *lex) {

if (unit->execute(thd)) return true;

if (thd->m_current_query_cost >
thd->variables.secondary_engine_cost_threshold) {
notify_plugins_after_select(thd, lex->m_sql_cmd);
}
notify_plugins_after_select(thd, lex->m_sql_cmd);

return false;
}
Expand Down
1 change: 1 addition & 0 deletions sql/sql_parse.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2227,6 +2227,7 @@ bool dispatch_command(THD *thd, const COM_DATA *com_data,

thd->bind_parameter_values = nullptr;
thd->bind_parameter_values_count = 0;
thd->set_secondary_engine_statement_context(nullptr);

/* Need to set error to true for graceful shutdown */
if ((thd->lex->sql_command == SQLCOM_SHUTDOWN) &&
Expand Down
3 changes: 3 additions & 0 deletions sql/sql_prepare.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2860,6 +2860,9 @@ bool Prepared_statement::execute_loop(THD *thd, String *expanded_query,
bool error;
bool reprepared_for_types [[maybe_unused]] = false;

auto scope_guard = create_scope_guard(
[thd] { thd->set_secondary_engine_statement_context(nullptr); });

/* Check if we got an error when sending long data */
if (m_arena.get_state() == Query_arena::STMT_ERROR) {
my_message(m_last_errno, m_last_error, MYF(0));
Expand Down
83 changes: 54 additions & 29 deletions sql/sql_select.cc
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,33 @@ void accumulate_statement_cost(const LEX *lex) {
lex->thd->m_current_query_cost = total_cost;
}

namespace {
/**
Gets the secondary storage engine pre prepare hook function, if any. If no
hook is found, this function returns false. If hook function is found, it
returns the return value of the hook. Please refer to
secondary_engine_pre_prepare_hook_t definition for description of its return
value.
*/
bool SecondaryEngineCallPrePrepareHook(THD *thd,
const LEX_CSTRING &secondary_engine) {
handlerton *hton = nullptr;
plugin_ref ref = ha_resolve_by_name(thd, &secondary_engine, false);
if (ref != nullptr) {
hton = plugin_data<handlerton *>(ref);
}

if (hton != nullptr) {
secondary_engine_pre_prepare_hook_t secondary_engine_pre_prepare_hook =
hton->secondary_engine_pre_prepare_hook;
if (secondary_engine_pre_prepare_hook != nullptr) {
return secondary_engine_pre_prepare_hook(thd);
}
}
return false;
}
} // namespace

/**
Checks if a query should be retried using a secondary storage engine.
Expand All @@ -894,7 +921,10 @@ static bool retry_with_secondary_engine(THD *thd) {

// Don't retry if there is a property of the statement that prevents use of
// secondary engines.
if (sql_cmd->eligible_secondary_storage_engine(thd) == nullptr) {
const LEX_CSTRING *secondary_engine =
thd->lex->m_sql_cmd->eligible_secondary_storage_engine(thd);

if (secondary_engine == nullptr) {
sql_cmd->disable_secondary_storage_engine();
return false;
}
Expand All @@ -917,33 +947,19 @@ static bool retry_with_secondary_engine(THD *thd) {
return true;
}

// Only attempt to use the secondary engine if the estimated cost of the query
// is higher than the specified cost threshold.
// We allow any query to be executed in the secondary_engine when it involves
// external tables.
if (!has_external_table(thd->lex) &&
(thd->m_current_query_cost <=
thd->variables.secondary_engine_cost_threshold)) {
Opt_trace_context *const trace = &thd->opt_trace;
if (trace->is_started()) {
const Opt_trace_object wrapper(trace);
Opt_trace_object oto(trace, "secondary_engine_not_used");
oto.add_alnum("reason",
"The estimated query cost does not exceed "
"secondary_engine_cost_threshold.");
oto.add("cost", thd->m_current_query_cost);
oto.add("threshold", thd->variables.secondary_engine_cost_threshold);
}
return false;
}

return true;
return SecondaryEngineCallPrePrepareHook(thd, *secondary_engine);
}

bool optimize_secondary_engine(THD *thd) {
if (retry_with_secondary_engine(thd)) {
// NOLINTNEXTLINE(cppcoreguidelines-avoid-do-while)
DBUG_EXECUTE_IF("emulate_user_query_kill", {
thd->get_stmt_da()->set_error_status(thd, ER_QUERY_INTERRUPTED);
return true;
});
thd->get_stmt_da()->reset_diagnostics_area();
thd->get_stmt_da()->set_error_status(thd, ER_PREPARE_FOR_SECONDARY_ENGINE);

return true;
}

Expand Down Expand Up @@ -972,6 +988,21 @@ bool optimize_secondary_engine(THD *thd) {
}

void notify_plugins_after_select(THD *thd, const Sql_cmd *cmd) {
/* Return if one of the 2 conditions is true:
* 1. when secondary engine statement context is not present, query cost is
* lower than the secondary than the engine threshold.
* 2. When secondary engine statement context is present, primary engine
* is the better execution engine for this query.
* This prevents calling plugin_foreach for short queries, reducing the
* overhead. */
if (((thd->secondary_engine_statement_context() == nullptr) &&
thd->m_current_query_cost <=
thd->variables.secondary_engine_cost_threshold) ||
((thd->secondary_engine_statement_context() != nullptr) &&
thd->secondary_engine_statement_context()
->is_primary_engine_optimal())) {
return;
}
auto executed_in = (cmd != nullptr && cmd->using_secondary_storage_engine())
? SelectExecutedIn::kSecondaryEngine
: SelectExecutedIn::kPrimaryEngine;
Expand Down Expand Up @@ -1027,13 +1058,7 @@ bool Sql_cmd_dml::execute_inner(THD *thd) {
} else {
if (unit->execute(thd)) return true;

/* Only call the plugin hook if the query cost is higher than the secondary
* engine threshold. This prevents calling plugin_foreach for short queries,
* reducing the overhead. */
if (thd->m_current_query_cost >
thd->variables.secondary_engine_cost_threshold) {
notify_plugins_after_select(thd, lex->m_sql_cmd);
}
notify_plugins_after_select(thd, lex->m_sql_cmd);
}

return false;
Expand Down
2 changes: 1 addition & 1 deletion storage/innobase/include/ha_prototypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ ulong thd_parallel_read_threads(THD *thd);

/** Return the maximum buffer size to use for DDL.
@param[in] thd Session instance, or nullptr to query the global
innodb_parallel_read_threads value.
innodb_ddl_buffer_size value.
@return memory upper limit in bytes. */
[[nodiscard]] ulong thd_ddl_buffer_size(THD *thd);

Expand Down
34 changes: 34 additions & 0 deletions storage/secondary_engine_mock/ha_mock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "sql/join_optimizer/access_path.h"
#include "sql/join_optimizer/make_join_hypergraph.h"
#include "sql/join_optimizer/walk_access_paths.h"
#include "sql/opt_trace.h"
#include "sql/sql_class.h"
#include "sql/sql_const.h"
#include "sql/sql_lex.h"
Expand Down Expand Up @@ -97,6 +98,11 @@ class LoadedTables {

LoadedTables *loaded_tables{nullptr};

/**
Statement context class for the MOCK engine.
*/
class Mock_statement_context : public Secondary_engine_statement_context {};

/**
Execution context class for the MOCK engine. It allocates some data
on the heap when it is constructed, and frees it when it is
Expand Down Expand Up @@ -228,6 +234,33 @@ int ha_mock::unload_table(const char *db_name, const char *table_name,

} // namespace mock

namespace {
bool SecondaryEnginePrePrepareHook(THD *thd) {
if (thd->m_current_query_cost <=
static_cast<double>(thd->variables.secondary_engine_cost_threshold)) {
Opt_trace_context *const trace = &thd->opt_trace;
if (trace->is_started()) {
const Opt_trace_object wrapper(trace);
Opt_trace_object oto(trace, "secondary_engine_not_used");
oto.add_alnum("reason",
"The estimated query cost does not exceed "
"secondary_engine_cost_threshold.");
oto.add("cost", thd->m_current_query_cost);
oto.add("threshold", thd->variables.secondary_engine_cost_threshold);
}
return false;
}

if (thd->secondary_engine_statement_context() == nullptr) {
/* Prepare this query's specific statment context */
std::unique_ptr<Secondary_engine_statement_context> ctx =
std::make_unique<Mock_statement_context>();
thd->set_secondary_engine_statement_context(std::move(ctx));
}
return true;
}
} // namespace

static bool PrepareSecondaryEngine(THD *thd, LEX *lex) {
DBUG_EXECUTE_IF("secondary_engine_mock_prepare_error", {
my_error(ER_SECONDARY_ENGINE_PLUGIN, MYF(0), "");
Expand Down Expand Up @@ -365,6 +398,7 @@ static int Init(MYSQL_PLUGIN p) {
hton->flags = HTON_IS_SECONDARY_ENGINE;
hton->db_type = DB_TYPE_UNKNOWN;
hton->prepare_secondary_engine = PrepareSecondaryEngine;
hton->secondary_engine_pre_prepare_hook = SecondaryEnginePrePrepareHook;
hton->optimize_secondary_engine = OptimizeSecondaryEngine;
hton->compare_secondary_engine_cost = CompareJoinCost;
hton->secondary_engine_flags =
Expand Down

0 comments on commit d4471e0

Please sign in to comment.