diff --git a/aqo--1.5--1.6.sql b/aqo--1.5--1.6.sql index 4101d33d..af2d2986 100644 --- a/aqo--1.5--1.6.sql +++ b/aqo--1.5--1.6.sql @@ -3,9 +3,12 @@ -- complain if script is sourced in psql, rather than via CREATE EXTENSION \echo Use "ALTER EXTENSION aqo UPDATE TO '1.6'" to load this file. \quit +DROP VIEW aqo_queries; + DROP FUNCTION aqo_enable_query; DROP FUNCTION aqo_disable_query; DROP FUNCTION aqo_cleanup; +DROP FUNCTION aqo_queries; CREATE FUNCTION aqo_enable_class(queryid bigint) RETURNS void @@ -30,3 +33,21 @@ AS 'MODULE_PATHNAME', 'aqo_cleanup' LANGUAGE C STRICT VOLATILE; COMMENT ON FUNCTION aqo_cleanup() IS 'Remove unneeded rows from the AQO ML storage'; + +/* + * VIEWs to discover AQO data. + */ +CREATE FUNCTION aqo_queries ( + OUT queryid bigint, + OUT fs bigint, + OUT learn_aqo boolean, + OUT use_aqo boolean, + OUT auto_tuning boolean, + OUT smart_timeout bigint, + OUT count_increase_timeout bigint +) +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'aqo_queries' +LANGUAGE C STRICT VOLATILE PARALLEL SAFE; + +CREATE VIEW aqo_queries AS SELECT * FROM aqo_queries(); diff --git a/aqo.c b/aqo.c index e38cff93..3eb06508 100644 --- a/aqo.c +++ b/aqo.c @@ -34,6 +34,7 @@ void _PG_init(void); /* Strategy of determining feature space for new queries. */ int aqo_mode = AQO_MODE_CONTROLLED; bool force_collect_stat; +int aqo_statement_timeout; bool aqo_predict_with_few_neighbors; /* @@ -48,6 +49,7 @@ bool aqo_predict_with_few_neighbors; */ bool aqo_show_hash; bool aqo_show_details; +bool change_flex_timeout; /* GUC variables */ static const struct config_enum_entry format_options[] = { @@ -306,6 +308,17 @@ _PG_init(void) NULL, NULL ); + DefineCustomIntVariable("aqo.statement_timeout", + "Time limit on learning.", + NULL, + &aqo_statement_timeout, + 0, + 0, INT_MAX, + PGC_USERSET, + 0, + NULL, + NULL, + NULL); DefineCustomIntVariable("aqo.k_neighbors_threshold", "Set the threshold of number of neighbors for predicting.", diff --git a/aqo.h b/aqo.h index 9418646c..9600b136 100644 --- a/aqo.h +++ b/aqo.h @@ -199,8 +199,15 @@ typedef struct QueryContextData instr_time start_execution_time; double planning_time; + int64 smart_timeout; + int64 count_increase_timeout; } QueryContextData; +/* + * Indicator for using smart statement timeout for query + */ +extern bool change_flex_timeout; + struct StatEntry; extern double predicted_ppi_rows; @@ -250,6 +257,7 @@ extern ExplainOnePlan_hook_type prev_ExplainOnePlan_hook; extern ExplainOneNode_hook_type prev_ExplainOneNode_hook; extern void ppi_hook(ParamPathInfo *ppi); +extern int aqo_statement_timeout; /* Hash functions */ void get_eclasses(List *clauselist, int *nargs, int **args_hash, @@ -298,5 +306,8 @@ extern void selectivity_cache_clear(void); extern bool IsQueryDisabled(void); +extern bool update_query_timeout(uint64 queryid, int64 smart_timeout); +extern double get_mean(double *elems, int nelems); + extern List *cur_classes; #endif diff --git a/auto_tuning.c b/auto_tuning.c index 53016199..a4165609 100644 --- a/auto_tuning.c +++ b/auto_tuning.c @@ -27,17 +27,15 @@ */ double auto_tuning_convergence_error = 0.01; -static double get_mean(double *elems, int nelems); static double get_estimation(double *elems, int nelems); static bool is_stable(double *elems, int nelems); static bool converged_cq(double *elems, int nelems); static bool is_in_infinite_loop_cq(double *elems, int nelems); - /* * Returns mean value of the array of doubles. */ -static double +double get_mean(double *elems, int nelems) { double sum = 0; diff --git a/expected/smart_statement_timeout.out b/expected/smart_statement_timeout.out new file mode 100644 index 00000000..97919a77 --- /dev/null +++ b/expected/smart_statement_timeout.out @@ -0,0 +1,89 @@ +DROP TABLE IF EXISTS a,b CASCADE; +NOTICE: table "a" does not exist, skipping +NOTICE: table "b" does not exist, skipping +CREATE TABLE a (x1 int, x2 int, x3 int); +INSERT INTO a (x1, x2, x3) SELECT mod(ival,4), mod(ival,10), mod(ival,10) FROM generate_series(1,100) As ival; +CREATE TABLE b (y1 int, y2 int, y3 int); +INSERT INTO b (y1, y2, y3) SELECT mod(ival + 1,4), mod(ival + 1,10), mod(ival + 1,10) FROM generate_series(1,100) As ival; +CREATE EXTENSION IF NOT EXISTS aqo; +SET aqo.join_threshold = 0; +SET aqo.mode = 'learn'; +SET aqo.show_details = 'off'; +SET aqo.learn_statement_timeout = 'on'; +SET statement_timeout = 1500; -- [1.5s] +SET aqo.statement_timeout = 500; -- [0.5s] +SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1; +NOTICE: [AQO] Time limit for execution of the statement was expired. AQO tried to learn on partial data. Timeout is 0 +NOTICE: [AQO] Time limit for execution of the statement was increased. Current timeout is 1 + count | count +-------+------- + 62500 | 62500 +(1 row) + +select smart_timeout, count_increase_timeout from aqo_queries, aqo_query_texts + where query_text = 'SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1;' + and aqo_query_texts.queryid = aqo_queries.queryid limit 1; + smart_timeout | count_increase_timeout +---------------+------------------------ + 1 | 1 +(1 row) + +SET aqo.learn_statement_timeout = 'off'; +SET aqo.statement_timeout = 1000; -- [1s] +INSERT INTO a (x1, x2, x3) SELECT mod(ival,20), mod(ival,10), mod(ival,10) FROM generate_series(1,1000) As ival; +SET aqo.learn_statement_timeout = 'on'; +SET aqo.statement_timeout = 500; -- [0.5s] +SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1; +NOTICE: [AQO] Time limit for execution of the statement was expired. AQO tried to learn on partial data. Timeout is 1 +NOTICE: [AQO] Time limit for execution of the statement was increased. Current timeout is 6 + count | count +--------+-------- + 563300 | 562500 +(1 row) + +select smart_timeout, count_increase_timeout from aqo_queries, aqo_query_texts + where query_text = 'SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1;' + and aqo_query_texts.queryid = aqo_queries.queryid limit 1; + smart_timeout | count_increase_timeout +---------------+------------------------ + 6 | 2 +(1 row) + +SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1; +NOTICE: [AQO] Time limit for execution of the statement was expired. AQO tried to learn on partial data. Timeout is 6 +NOTICE: [AQO] Time limit for execution of the statement was increased. Current timeout is 63 + count | count +--------+-------- + 563300 | 562500 +(1 row) + +select smart_timeout, count_increase_timeout from aqo_queries, aqo_query_texts + where query_text = 'SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1;' + and aqo_query_texts.queryid = aqo_queries.queryid limit 1; + smart_timeout | count_increase_timeout +---------------+------------------------ + 63 | 3 +(1 row) + +SET statement_timeout = 100; -- [0.1s] +SET aqo.statement_timeout = 150; +SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1; +NOTICE: [AQO] Time limit for execution of the statement was expired. AQO tried to learn on partial data. Timeout is 63 +ERROR: canceling statement due to statement timeout +select smart_timeout, count_increase_timeout from aqo_queries, aqo_query_texts + where query_text = 'SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1;' + and aqo_query_texts.queryid = aqo_queries.queryid limit 1; + smart_timeout | count_increase_timeout +---------------+------------------------ + 63 | 3 +(1 row) + +SELECT 1 FROM aqo_reset(); + ?column? +---------- + 1 +(1 row) + +DROP TABLE a; +DROP TABLE b; +DROP EXTENSION aqo; diff --git a/postprocessing.c b/postprocessing.c index 165391dd..4af2be74 100644 --- a/postprocessing.c +++ b/postprocessing.c @@ -44,6 +44,8 @@ typedef struct static double cardinality_sum_errors; static int cardinality_num_objects; +static int64 max_timeout_value; +static int64 growth_rate = 3; /* * Store an AQO-related query data into the Query Environment structure. @@ -625,15 +627,46 @@ aqo_timeout_handler(void) ctx.learn = query_context.learn_aqo; ctx.isTimedOut = true; - elog(NOTICE, "[AQO] Time limit for execution of the statement was expired. AQO tried to learn on partial data."); + if (aqo_statement_timeout == 0) + elog(NOTICE, "[AQO] Time limit for execution of the statement was expired. AQO tried to learn on partial data."); + else + elog(NOTICE, "[AQO] Time limit for execution of the statement was expired. AQO tried to learn on partial data. Timeout is %ld", max_timeout_value); + learnOnPlanState(timeoutCtl.queryDesc->planstate, (void *) &ctx); MemoryContextSwitchTo(oldctx); } +/* + * Function for updating smart statement timeout + */ +static int64 +increase_smart_timeout() +{ + int64 smart_timeout_fin_time = (query_context.smart_timeout + 1) * pow(growth_rate, query_context.count_increase_timeout); + + if (query_context.smart_timeout == max_timeout_value && !update_query_timeout(query_context.query_hash, smart_timeout_fin_time)) + elog(NOTICE, "[AQO] Timeout is not updated!"); + + return smart_timeout_fin_time; +} + static bool set_timeout_if_need(QueryDesc *queryDesc) { - TimestampTz fin_time; + int64 fintime = (int64) get_timeout_finish_time(STATEMENT_TIMEOUT)-1; + + if (aqo_learn_statement_timeout && aqo_statement_timeout > 0) + { + max_timeout_value = Min(query_context.smart_timeout, (int64) aqo_statement_timeout); + if (max_timeout_value > fintime) + { + max_timeout_value = fintime; + } + } + else + { + max_timeout_value = fintime; + } if (IsParallelWorker()) /* @@ -663,8 +696,7 @@ set_timeout_if_need(QueryDesc *queryDesc) else Assert(!get_timeout_active(timeoutCtl.id)); - fin_time = get_timeout_finish_time(STATEMENT_TIMEOUT); - enable_timeout_at(timeoutCtl.id, fin_time - 1); + enable_timeout_at(timeoutCtl.id, (TimestampTz) max_timeout_value); /* Save pointer to queryDesc to use at learning after a timeout interruption. */ timeoutCtl.queryDesc = queryDesc; @@ -720,6 +752,7 @@ aqo_ExecutorEnd(QueryDesc *queryDesc) instr_time endtime; EphemeralNamedRelation enr = get_ENR(queryDesc->queryEnv, PlanStateInfo); MemoryContext oldctx = MemoryContextSwitchTo(AQOLearnMemCtx); + double error = .0; cardinality_sum_errors = 0.; cardinality_num_objects = 0; @@ -778,6 +811,16 @@ aqo_ExecutorEnd(QueryDesc *queryDesc) /* Store all learn data into the AQO service relations. */ if (!query_context.adding_query && query_context.auto_tuning) automatical_query_tuning(query_context.query_hash, stat); + + error = stat->est_error_aqo[stat->cur_stat_slot_aqo-1] - cardinality_sum_errors/(1 + cardinality_num_objects); + + if ( aqo_learn_statement_timeout && aqo_statement_timeout > 0 && error >= 0.1) + { + int64 fintime = increase_smart_timeout(); + elog(NOTICE, "[AQO] Time limit for execution of the statement was increased. Current timeout is %ld", fintime); + } + + pfree(stat); } } diff --git a/preprocessing.c b/preprocessing.c index 7b909bdf..2fcb7037 100644 --- a/preprocessing.c +++ b/preprocessing.c @@ -249,6 +249,8 @@ aqo_planner(Query *parse, elog(ERROR, "unrecognized mode in AQO: %d", aqo_mode); break; } + query_context.count_increase_timeout = 0; + query_context.smart_timeout = 0; } else /* Query class exists in a ML knowledge base. */ { diff --git a/regress_schedule b/regress_schedule index 418e14ec..883b77c5 100644 --- a/regress_schedule +++ b/regress_schedule @@ -12,11 +12,13 @@ test: unsupported test: clean_aqo_data test: parallel_workers test: plancache -# Performance-dependent test. Can be ignored if executes in containers or on slow machines +# Performance-dependent tests. Can be ignored if executes in containers or on slow machines ignore: statement_timeout +ignore: smart_statement_timeout test: statement_timeout test: temp_tables test: top_queries test: relocatable test: look_a_like test: feature_subspace +test: smart_statement_timeout diff --git a/sql/smart_statement_timeout.sql b/sql/smart_statement_timeout.sql new file mode 100644 index 00000000..a0573dee --- /dev/null +++ b/sql/smart_statement_timeout.sql @@ -0,0 +1,45 @@ +DROP TABLE IF EXISTS a,b CASCADE; +CREATE TABLE a (x1 int, x2 int, x3 int); +INSERT INTO a (x1, x2, x3) SELECT mod(ival,4), mod(ival,10), mod(ival,10) FROM generate_series(1,100) As ival; + +CREATE TABLE b (y1 int, y2 int, y3 int); +INSERT INTO b (y1, y2, y3) SELECT mod(ival + 1,4), mod(ival + 1,10), mod(ival + 1,10) FROM generate_series(1,100) As ival; + +CREATE EXTENSION IF NOT EXISTS aqo; +SET aqo.join_threshold = 0; +SET aqo.mode = 'learn'; +SET aqo.show_details = 'off'; +SET aqo.learn_statement_timeout = 'on'; +SET statement_timeout = 1500; -- [1.5s] +SET aqo.statement_timeout = 500; -- [0.5s] + +SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1; +select smart_timeout, count_increase_timeout from aqo_queries, aqo_query_texts + where query_text = 'SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1;' + and aqo_query_texts.queryid = aqo_queries.queryid limit 1; + +SET aqo.learn_statement_timeout = 'off'; +SET aqo.statement_timeout = 1000; -- [1s] +INSERT INTO a (x1, x2, x3) SELECT mod(ival,20), mod(ival,10), mod(ival,10) FROM generate_series(1,1000) As ival; +SET aqo.learn_statement_timeout = 'on'; +SET aqo.statement_timeout = 500; -- [0.5s] +SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1; +select smart_timeout, count_increase_timeout from aqo_queries, aqo_query_texts + where query_text = 'SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1;' + and aqo_query_texts.queryid = aqo_queries.queryid limit 1; +SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1; +select smart_timeout, count_increase_timeout from aqo_queries, aqo_query_texts + where query_text = 'SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1;' + and aqo_query_texts.queryid = aqo_queries.queryid limit 1; + +SET statement_timeout = 100; -- [0.1s] +SET aqo.statement_timeout = 150; +SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1; +select smart_timeout, count_increase_timeout from aqo_queries, aqo_query_texts + where query_text = 'SELECT count(a.x1),count(B.y1) FROM A a LEFT JOIN B ON a.x1 = B.y1 LEFT JOIN A a1 ON a1.x1 = B.y1;' + and aqo_query_texts.queryid = aqo_queries.queryid limit 1; + +SELECT 1 FROM aqo_reset(); +DROP TABLE a; +DROP TABLE b; +DROP EXTENSION aqo; diff --git a/storage.c b/storage.c index 32446d6c..1796320e 100644 --- a/storage.c +++ b/storage.c @@ -55,7 +55,7 @@ typedef enum { } aqo_data_cols; typedef enum { - AQ_QUERYID = 0, AQ_FS, AQ_LEARN_AQO, AQ_USE_AQO, AQ_AUTO_TUNING, + AQ_QUERYID = 0, AQ_FS, AQ_LEARN_AQO, AQ_USE_AQO, AQ_AUTO_TUNING, AQ_SMART_TIMEOUT, AQ_COUNT_INCREASE_TIMEOUT, AQ_TOTAL_NCOLS } aqo_queries_cols; @@ -1850,6 +1850,8 @@ aqo_queries(PG_FUNCTION_ARGS) values[AQ_LEARN_AQO] = BoolGetDatum(entry->learn_aqo); values[AQ_USE_AQO] = BoolGetDatum(entry->use_aqo); values[AQ_AUTO_TUNING] = BoolGetDatum(entry->auto_tuning); + values[AQ_SMART_TIMEOUT] = Int64GetDatum(entry->smart_timeout); + values[AQ_COUNT_INCREASE_TIMEOUT] = Int64GetDatum(entry->count_increase_timeout); tuplestore_putvalues(tupstore, tupDesc, values, nulls); } @@ -1901,6 +1903,8 @@ aqo_queries_store(uint64 queryid, entry->learn_aqo = learn_aqo; entry->use_aqo = use_aqo; entry->auto_tuning = auto_tuning; + entry->smart_timeout = 0; + entry->count_increase_timeout = 0; aqo_state->queries_changed = true; LWLockRelease(&aqo_state->queries_lock); @@ -2016,11 +2020,57 @@ aqo_queries_find(uint64 queryid, QueryContextData *ctx) ctx->learn_aqo = entry->learn_aqo; ctx->use_aqo = entry->use_aqo; ctx->auto_tuning = entry->auto_tuning; + ctx->smart_timeout = entry->smart_timeout; + ctx->count_increase_timeout = entry->count_increase_timeout; } LWLockRelease(&aqo_state->queries_lock); return found; } +/* + * Function for update and save value of smart statement timeout + * for query in aqu_queries table + */ +bool +update_query_timeout(uint64 queryid, int64 smart_timeout) +{ + QueriesEntry *entry; + bool found; + bool tblOverflow; + HASHACTION action; + + Assert(queries_htab); + + /* Guard for default feature space */ + Assert(queryid != 0); + + LWLockAcquire(&aqo_state->queries_lock, LW_EXCLUSIVE); + + /* Check hash table overflow */ + tblOverflow = hash_get_num_entries(queries_htab) < fs_max_items ? false : true; + action = tblOverflow ? HASH_FIND : HASH_ENTER; + + entry = (QueriesEntry *) hash_search(queries_htab, &queryid, action, + &found); + + /* Initialize entry on first usage */ + if (!found && action == HASH_FIND) + { + /* + * Hash table is full. To avoid possible problems - don't try to add + * more, just exit + */ + LWLockRelease(&aqo_state->queries_lock); + return false; + } + + entry->smart_timeout = smart_timeout; + entry->count_increase_timeout = entry->count_increase_timeout + 1; + + LWLockRelease(&aqo_state->queries_lock); + return true; +} + /* * Update AQO preferences for a given queryid value. * if incoming param is null - leave it unchanged. diff --git a/storage.h b/storage.h index 0e7745e1..bbed7b51 100644 --- a/storage.h +++ b/storage.h @@ -80,6 +80,9 @@ typedef struct QueriesEntry bool learn_aqo; bool use_aqo; bool auto_tuning; + + int64 smart_timeout; + int64 count_increase_timeout; } QueriesEntry; extern int querytext_max_size;