From 090d4b6f55292c18cb0278fe05f2d71f139e758e Mon Sep 17 00:00:00 2001 From: Alena Rybakina Date: Mon, 11 Jul 2022 11:54:01 +0300 Subject: [PATCH] Add smart statement timeout for learning aqo in special quesries within through manual retraining. AQO evaluates whether enough to execute the query through comparison integral error value with its fixed value (0.1), also if integral error didn't change compared to previous iterations, smart statemet timeout value will be increased. Besides, smart statemet timeout value won't be increased, if there is reached limit value, namely statement timeout. The initial smart_statement_timeout value is aqo statement timeout value or 0. Smart statement timeout value and number of its using are saved in aqo_queries. --- aqo--1.5--1.6.sql | 21 +++++++ aqo.c | 13 ++++ aqo.h | 11 ++++ auto_tuning.c | 4 +- expected/smart_statement_timeout.out | 89 ++++++++++++++++++++++++++++ postprocessing.c | 51 ++++++++++++++-- preprocessing.c | 2 + regress_schedule | 4 +- sql/smart_statement_timeout.sql | 45 ++++++++++++++ storage.c | 52 +++++++++++++++- storage.h | 3 + 11 files changed, 286 insertions(+), 9 deletions(-) create mode 100644 expected/smart_statement_timeout.out create mode 100644 sql/smart_statement_timeout.sql diff --git a/aqo--1.5--1.6.sql b/aqo--1.5--1.6.sql index 4101d33d..af2d2986 100644 --- a/aqo--1.5--1.6.sql +++ b/aqo--1.5--1.6.sql @@ -3,9 +3,12 @@ -- complain if script is sourced in psql, rather than via CREATE EXTENSION \echo Use "ALTER EXTENSION aqo UPDATE TO '1.6'" to load this file. \quit +DROP VIEW aqo_queries; + DROP FUNCTION aqo_enable_query; DROP FUNCTION aqo_disable_query; DROP FUNCTION aqo_cleanup; +DROP FUNCTION aqo_queries; CREATE FUNCTION aqo_enable_class(queryid bigint) RETURNS void @@ -30,3 +33,21 @@ AS 'MODULE_PATHNAME', 'aqo_cleanup' LANGUAGE C STRICT VOLATILE; COMMENT ON FUNCTION aqo_cleanup() IS 'Remove unneeded rows from the AQO ML storage'; + +/* + * VIEWs to discover AQO data. + */ +CREATE FUNCTION aqo_queries ( + OUT queryid bigint, + OUT fs bigint, + OUT learn_aqo boolean, + OUT use_aqo boolean, + OUT auto_tuning boolean, + OUT smart_timeout bigint, + OUT count_increase_timeout bigint +) +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'aqo_queries' +LANGUAGE C STRICT VOLATILE PARALLEL SAFE; + +CREATE VIEW aqo_queries AS SELECT * FROM aqo_queries(); diff --git a/aqo.c b/aqo.c index fab93494..22aebb59 100644 --- a/aqo.c +++ b/aqo.c @@ -34,6 +34,7 @@ void _PG_init(void); /* Strategy of determining feature space for new queries. */ int aqo_mode = AQO_MODE_CONTROLLED; bool force_collect_stat; +int aqo_statement_timeout; /* * Show special info in EXPLAIN mode. @@ -47,6 +48,7 @@ bool force_collect_stat; */ bool aqo_show_hash; bool aqo_show_details; +bool change_flex_timeout; /* GUC variables */ static const struct config_enum_entry format_options[] = { @@ -305,6 +307,17 @@ _PG_init(void) NULL, NULL ); + DefineCustomIntVariable("aqo.statement_timeout", + "Time limit on learning.", + NULL, + &aqo_statement_timeout, + 0, + 0, INT_MAX, + PGC_USERSET, + 0, + NULL, + NULL, + NULL); prev_shmem_startup_hook = shmem_startup_hook; shmem_startup_hook = aqo_init_shmem; diff --git a/aqo.h b/aqo.h index 0a373147..aa391e22 100644 --- a/aqo.h +++ b/aqo.h @@ -199,8 +199,15 @@ typedef struct QueryContextData instr_time start_execution_time; double planning_time; + int64 smart_timeout; + int64 count_increase_timeout; } QueryContextData; +/* + * Indicator for using smart statement timeout for query + */ +extern bool change_flex_timeout; + struct StatEntry; extern double predicted_ppi_rows; @@ -249,6 +256,7 @@ extern ExplainOnePlan_hook_type prev_ExplainOnePlan_hook; extern ExplainOneNode_hook_type prev_ExplainOneNode_hook; extern void ppi_hook(ParamPathInfo *ppi); +extern int aqo_statement_timeout; /* Hash functions */ void get_eclasses(List *clauselist, int *nargs, int **args_hash, @@ -297,5 +305,8 @@ extern void selectivity_cache_clear(void); extern bool IsQueryDisabled(void); +extern bool update_query_timeout(uint64 queryid, int64 smart_timeout); +extern double get_mean(double *elems, int nelems); + extern List *cur_classes; #endif diff --git a/auto_tuning.c b/auto_tuning.c index 53016199..a4165609 100644 --- a/auto_tuning.c +++ b/auto_tuning.c @@ -27,17 +27,15 @@ */ double auto_tuning_convergence_error = 0.01; -static double get_mean(double *elems, int nelems); static double get_estimation(double *elems, int nelems); static bool is_stable(double *elems, int nelems); static bool converged_cq(double *elems, int nelems); static bool is_in_infinite_loop_cq(double *elems, int nelems); - /* * Returns mean value of the array of doubles. */ -static double +double get_mean(double *elems, int nelems) { double sum = 0; diff --git a/expected/smart_statement_timeout.out b/expected/smart_statement_timeout.out new file mode 100644 index 00000000..97919a77 --- /dev/null +++ b/expected/smart_statement_timeout.out @@ -0,0 +1,89 @@ +DROP TABLE IF EXISTS a,b CASCADE; +NOTICE: table "a" does not exist, skipping +NOTICE: table "b" does not exist, skipping +CREATE TABLE a (x1 int, x2 int, x3 int); +INSERT INTO a (x1, x2, x3) SELECT mod(ival,4), mod(ival,10), mod(ival,10) FROM generate_series(1,100) As ival; +CREATE TABLE b (y1 int, y2 int, y3 int); +INSERT INTO b (y1, y2, y3) SELECT mod(ival + 1,4), mod(ival + 1,10), mod(ival + 1,10) FROM generate_series(1,100) As ival; +CREATE EXTENSION IF NOT EXISTS aqo; +SET aqo.join_threshold = 0; +SET aqo.mode = 'learn'; +SET aqo.show_details = 'off'; +SET aqo.learn_statement_timeout = 'on'; +SET statement_timeout = 1500; -- [1.5s] +SET aqo.statement_timeout = 500; -- [0.5s] +SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1; +NOTICE: [AQO] Time limit for execution of the statement was expired. AQO tried to learn on partial data. Timeout is 0 +NOTICE: [AQO] Time limit for execution of the statement was increased. Current timeout is 1 + count | count +-------+------- + 62500 | 62500 +(1 row) + +select smart_timeout, count_increase_timeout from aqo_queries, aqo_query_texts + where query_text = 'SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1;' + and aqo_query_texts.queryid = aqo_queries.queryid limit 1; + smart_timeout | count_increase_timeout +---------------+------------------------ + 1 | 1 +(1 row) + +SET aqo.learn_statement_timeout = 'off'; +SET aqo.statement_timeout = 1000; -- [1s] +INSERT INTO a (x1, x2, x3) SELECT mod(ival,20), mod(ival,10), mod(ival,10) FROM generate_series(1,1000) As ival; +SET aqo.learn_statement_timeout = 'on'; +SET aqo.statement_timeout = 500; -- [0.5s] +SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1; +NOTICE: [AQO] Time limit for execution of the statement was expired. AQO tried to learn on partial data. Timeout is 1 +NOTICE: [AQO] Time limit for execution of the statement was increased. Current timeout is 6 + count | count +--------+-------- + 563300 | 562500 +(1 row) + +select smart_timeout, count_increase_timeout from aqo_queries, aqo_query_texts + where query_text = 'SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1;' + and aqo_query_texts.queryid = aqo_queries.queryid limit 1; + smart_timeout | count_increase_timeout +---------------+------------------------ + 6 | 2 +(1 row) + +SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1; +NOTICE: [AQO] Time limit for execution of the statement was expired. AQO tried to learn on partial data. Timeout is 6 +NOTICE: [AQO] Time limit for execution of the statement was increased. Current timeout is 63 + count | count +--------+-------- + 563300 | 562500 +(1 row) + +select smart_timeout, count_increase_timeout from aqo_queries, aqo_query_texts + where query_text = 'SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1;' + and aqo_query_texts.queryid = aqo_queries.queryid limit 1; + smart_timeout | count_increase_timeout +---------------+------------------------ + 63 | 3 +(1 row) + +SET statement_timeout = 100; -- [0.1s] +SET aqo.statement_timeout = 150; +SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1; +NOTICE: [AQO] Time limit for execution of the statement was expired. AQO tried to learn on partial data. Timeout is 63 +ERROR: canceling statement due to statement timeout +select smart_timeout, count_increase_timeout from aqo_queries, aqo_query_texts + where query_text = 'SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1;' + and aqo_query_texts.queryid = aqo_queries.queryid limit 1; + smart_timeout | count_increase_timeout +---------------+------------------------ + 63 | 3 +(1 row) + +SELECT 1 FROM aqo_reset(); + ?column? +---------- + 1 +(1 row) + +DROP TABLE a; +DROP TABLE b; +DROP EXTENSION aqo; diff --git a/postprocessing.c b/postprocessing.c index 165391dd..4af2be74 100644 --- a/postprocessing.c +++ b/postprocessing.c @@ -44,6 +44,8 @@ typedef struct static double cardinality_sum_errors; static int cardinality_num_objects; +static int64 max_timeout_value; +static int64 growth_rate = 3; /* * Store an AQO-related query data into the Query Environment structure. @@ -625,15 +627,46 @@ aqo_timeout_handler(void) ctx.learn = query_context.learn_aqo; ctx.isTimedOut = true; - elog(NOTICE, "[AQO] Time limit for execution of the statement was expired. AQO tried to learn on partial data."); + if (aqo_statement_timeout == 0) + elog(NOTICE, "[AQO] Time limit for execution of the statement was expired. AQO tried to learn on partial data."); + else + elog(NOTICE, "[AQO] Time limit for execution of the statement was expired. AQO tried to learn on partial data. Timeout is %ld", max_timeout_value); + learnOnPlanState(timeoutCtl.queryDesc->planstate, (void *) &ctx); MemoryContextSwitchTo(oldctx); } +/* + * Function for updating smart statement timeout + */ +static int64 +increase_smart_timeout() +{ + int64 smart_timeout_fin_time = (query_context.smart_timeout + 1) * pow(growth_rate, query_context.count_increase_timeout); + + if (query_context.smart_timeout == max_timeout_value && !update_query_timeout(query_context.query_hash, smart_timeout_fin_time)) + elog(NOTICE, "[AQO] Timeout is not updated!"); + + return smart_timeout_fin_time; +} + static bool set_timeout_if_need(QueryDesc *queryDesc) { - TimestampTz fin_time; + int64 fintime = (int64) get_timeout_finish_time(STATEMENT_TIMEOUT)-1; + + if (aqo_learn_statement_timeout && aqo_statement_timeout > 0) + { + max_timeout_value = Min(query_context.smart_timeout, (int64) aqo_statement_timeout); + if (max_timeout_value > fintime) + { + max_timeout_value = fintime; + } + } + else + { + max_timeout_value = fintime; + } if (IsParallelWorker()) /* @@ -663,8 +696,7 @@ set_timeout_if_need(QueryDesc *queryDesc) else Assert(!get_timeout_active(timeoutCtl.id)); - fin_time = get_timeout_finish_time(STATEMENT_TIMEOUT); - enable_timeout_at(timeoutCtl.id, fin_time - 1); + enable_timeout_at(timeoutCtl.id, (TimestampTz) max_timeout_value); /* Save pointer to queryDesc to use at learning after a timeout interruption. */ timeoutCtl.queryDesc = queryDesc; @@ -720,6 +752,7 @@ aqo_ExecutorEnd(QueryDesc *queryDesc) instr_time endtime; EphemeralNamedRelation enr = get_ENR(queryDesc->queryEnv, PlanStateInfo); MemoryContext oldctx = MemoryContextSwitchTo(AQOLearnMemCtx); + double error = .0; cardinality_sum_errors = 0.; cardinality_num_objects = 0; @@ -778,6 +811,16 @@ aqo_ExecutorEnd(QueryDesc *queryDesc) /* Store all learn data into the AQO service relations. */ if (!query_context.adding_query && query_context.auto_tuning) automatical_query_tuning(query_context.query_hash, stat); + + error = stat->est_error_aqo[stat->cur_stat_slot_aqo-1] - cardinality_sum_errors/(1 + cardinality_num_objects); + + if ( aqo_learn_statement_timeout && aqo_statement_timeout > 0 && error >= 0.1) + { + int64 fintime = increase_smart_timeout(); + elog(NOTICE, "[AQO] Time limit for execution of the statement was increased. Current timeout is %ld", fintime); + } + + pfree(stat); } } diff --git a/preprocessing.c b/preprocessing.c index 7b909bdf..2fcb7037 100644 --- a/preprocessing.c +++ b/preprocessing.c @@ -249,6 +249,8 @@ aqo_planner(Query *parse, elog(ERROR, "unrecognized mode in AQO: %d", aqo_mode); break; } + query_context.count_increase_timeout = 0; + query_context.smart_timeout = 0; } else /* Query class exists in a ML knowledge base. */ { diff --git a/regress_schedule b/regress_schedule index 418e14ec..883b77c5 100644 --- a/regress_schedule +++ b/regress_schedule @@ -12,11 +12,13 @@ test: unsupported test: clean_aqo_data test: parallel_workers test: plancache -# Performance-dependent test. Can be ignored if executes in containers or on slow machines +# Performance-dependent tests. Can be ignored if executes in containers or on slow machines ignore: statement_timeout +ignore: smart_statement_timeout test: statement_timeout test: temp_tables test: top_queries test: relocatable test: look_a_like test: feature_subspace +test: smart_statement_timeout diff --git a/sql/smart_statement_timeout.sql b/sql/smart_statement_timeout.sql new file mode 100644 index 00000000..a0573dee --- /dev/null +++ b/sql/smart_statement_timeout.sql @@ -0,0 +1,45 @@ +DROP TABLE IF EXISTS a,b CASCADE; +CREATE TABLE a (x1 int, x2 int, x3 int); +INSERT INTO a (x1, x2, x3) SELECT mod(ival,4), mod(ival,10), mod(ival,10) FROM generate_series(1,100) As ival; + +CREATE TABLE b (y1 int, y2 int, y3 int); +INSERT INTO b (y1, y2, y3) SELECT mod(ival + 1,4), mod(ival + 1,10), mod(ival + 1,10) FROM generate_series(1,100) As ival; + +CREATE EXTENSION IF NOT EXISTS aqo; +SET aqo.join_threshold = 0; +SET aqo.mode = 'learn'; +SET aqo.show_details = 'off'; +SET aqo.learn_statement_timeout = 'on'; +SET statement_timeout = 1500; -- [1.5s] +SET aqo.statement_timeout = 500; -- [0.5s] + +SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1; +select smart_timeout, count_increase_timeout from aqo_queries, aqo_query_texts + where query_text = 'SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1;' + and aqo_query_texts.queryid = aqo_queries.queryid limit 1; + +SET aqo.learn_statement_timeout = 'off'; +SET aqo.statement_timeout = 1000; -- [1s] +INSERT INTO a (x1, x2, x3) SELECT mod(ival,20), mod(ival,10), mod(ival,10) FROM generate_series(1,1000) As ival; +SET aqo.learn_statement_timeout = 'on'; +SET aqo.statement_timeout = 500; -- [0.5s] +SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1; +select smart_timeout, count_increase_timeout from aqo_queries, aqo_query_texts + where query_text = 'SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1;' + and aqo_query_texts.queryid = aqo_queries.queryid limit 1; +SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1; +select smart_timeout, count_increase_timeout from aqo_queries, aqo_query_texts + where query_text = 'SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1;' + and aqo_query_texts.queryid = aqo_queries.queryid limit 1; + +SET statement_timeout = 100; -- [0.1s] +SET aqo.statement_timeout = 150; +SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1; +select smart_timeout, count_increase_timeout from aqo_queries, aqo_query_texts + where query_text = 'SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1;' + and aqo_query_texts.queryid = aqo_queries.queryid limit 1; + +SELECT 1 FROM aqo_reset(); +DROP TABLE a; +DROP TABLE b; +DROP EXTENSION aqo; diff --git a/storage.c b/storage.c index fcbe5569..8591dc01 100644 --- a/storage.c +++ b/storage.c @@ -55,7 +55,7 @@ typedef enum { } aqo_data_cols; typedef enum { - AQ_QUERYID = 0, AQ_FS, AQ_LEARN_AQO, AQ_USE_AQO, AQ_AUTO_TUNING, + AQ_QUERYID = 0, AQ_FS, AQ_LEARN_AQO, AQ_USE_AQO, AQ_AUTO_TUNING, AQ_SMART_TIMEOUT, AQ_COUNT_INCREASE_TIMEOUT, AQ_TOTAL_NCOLS } aqo_queries_cols; @@ -1800,6 +1800,8 @@ aqo_queries(PG_FUNCTION_ARGS) values[AQ_LEARN_AQO] = BoolGetDatum(entry->learn_aqo); values[AQ_USE_AQO] = BoolGetDatum(entry->use_aqo); values[AQ_AUTO_TUNING] = BoolGetDatum(entry->auto_tuning); + values[AQ_SMART_TIMEOUT] = Int64GetDatum(entry->smart_timeout); + values[AQ_COUNT_INCREASE_TIMEOUT] = Int64GetDatum(entry->count_increase_timeout); tuplestore_putvalues(tupstore, tupDesc, values, nulls); } @@ -1851,6 +1853,8 @@ aqo_queries_store(uint64 queryid, entry->learn_aqo = learn_aqo; entry->use_aqo = use_aqo; entry->auto_tuning = auto_tuning; + entry->smart_timeout = 0; + entry->count_increase_timeout = 0; aqo_state->queries_changed = true; LWLockRelease(&aqo_state->queries_lock); @@ -1966,11 +1970,57 @@ aqo_queries_find(uint64 queryid, QueryContextData *ctx) ctx->learn_aqo = entry->learn_aqo; ctx->use_aqo = entry->use_aqo; ctx->auto_tuning = entry->auto_tuning; + ctx->smart_timeout = entry->smart_timeout; + ctx->count_increase_timeout = entry->count_increase_timeout; } LWLockRelease(&aqo_state->queries_lock); return found; } +/* + * Function for update and save value of smart statement timeout + * for query in aqu_queries table + */ +bool +update_query_timeout(uint64 queryid, int64 smart_timeout) +{ + QueriesEntry *entry; + bool found; + bool tblOverflow; + HASHACTION action; + + Assert(queries_htab); + + /* Guard for default feature space */ + Assert(queryid != 0); + + LWLockAcquire(&aqo_state->queries_lock, LW_EXCLUSIVE); + + /* Check hash table overflow */ + tblOverflow = hash_get_num_entries(queries_htab) < fs_max_items ? false : true; + action = tblOverflow ? HASH_FIND : HASH_ENTER; + + entry = (QueriesEntry *) hash_search(queries_htab, &queryid, action, + &found); + + /* Initialize entry on first usage */ + if (!found && action == HASH_FIND) + { + /* + * Hash table is full. To avoid possible problems - don't try to add + * more, just exit + */ + LWLockRelease(&aqo_state->queries_lock); + return false; + } + + entry->smart_timeout = smart_timeout; + entry->count_increase_timeout = entry->count_increase_timeout + 1; + + LWLockRelease(&aqo_state->queries_lock); + return true; +} + /* * Update AQO preferences for a given queryid value. * if incoming param is null - leave it unchanged. diff --git a/storage.h b/storage.h index 94891c5d..e43315ae 100644 --- a/storage.h +++ b/storage.h @@ -80,6 +80,9 @@ typedef struct QueriesEntry bool learn_aqo; bool use_aqo; bool auto_tuning; + + int64 smart_timeout; + int64 count_increase_timeout; } QueriesEntry; extern int querytext_max_size;