Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PGPRO-5340] #44

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions aqo--1.2.sql
Expand Up @@ -128,3 +128,8 @@ DELETE FROM public.aqo_data ad WHERE (ad.fspace_hash = $1);
DELETE FROM public.aqo_query_stat aq WHERE (aq.query_hash = $1);
DELETE FROM public.aqo_query_texts aq WHERE (aq.query_hash = $1);
$func$ LANGUAGE SQL;

CREATE FUNCTION public.aqo_profile_mem_hash(OUT query_hash integer, OUT execution_time numeric(10, 5))
RETURNS SETOF record
AS 'MODULE_PATHNAME', 'aqo_profile_mem_hash'
LANGUAGE C STRICT;
18 changes: 18 additions & 0 deletions aqo.c
Expand Up @@ -41,6 +41,9 @@ bool force_collect_stat;
*/
bool aqo_show_hash;
bool aqo_show_details;
int aqo_profile_mem;

int size_of_log_buffer = 0;

/* GUC variables */
static const struct config_enum_entry format_options[] = {
Expand Down Expand Up @@ -196,6 +199,21 @@ _PG_init(void)
NULL
);

DefineCustomIntVariable(
"aqo.profile_mem",
NULL,
NULL,
&aqo_profile_mem,
-1,
-1,
INT_MAX,
PGC_USERSET,
0,
NULL,
NULL,
NULL
);

prev_planner_hook = planner_hook;
planner_hook = aqo_planner;
prev_ExecutorStart_hook = ExecutorStart_hook;
Expand Down
9 changes: 9 additions & 0 deletions aqo.h
Expand Up @@ -175,6 +175,8 @@ extern int aqo_mode;
extern bool force_collect_stat;
extern bool aqo_show_hash;
extern bool aqo_show_details;
extern int aqo_profile_mem;
extern int size_of_log_buffer;

/*
* It is mostly needed for auto tuning of query. with auto tuning mode aqo
Expand Down Expand Up @@ -219,6 +221,12 @@ typedef struct QueryContextData
double query_planning_time;
} QueryContextData;

typedef struct ProfileMemData
{
int hash;
double execution_time;
} ProfileMemData;

extern double predicted_ppi_rows;
extern double fss_ppi_hash;

Expand Down Expand Up @@ -373,4 +381,5 @@ extern Oid get_aqo_schema(void);
extern void init_lock_tag(LOCKTAG *tag, uint32 key1, uint32 key2);

extern List *cur_classes;

#endif
110 changes: 109 additions & 1 deletion postprocessing.c
Expand Up @@ -25,7 +25,16 @@
#include "optimizer/optimizer.h"
#include "postgres_fdw.h"
#include "utils/queryenvironment.h"
#include "funcapi.h"
#include "miscadmin.h"

static HTAB *profile_mem_queries = NULL;

typedef struct
{
int key;
double time;
} ProfileMemEntry;

typedef struct
{
Expand Down Expand Up @@ -73,6 +82,73 @@ static void StorePlanInternals(QueryDesc *queryDesc);
static bool ExtractFromQueryEnv(QueryDesc *queryDesc);
static void RemoveFromQueryEnv(QueryDesc *queryDesc);

PG_FUNCTION_INFO_V1(aqo_profile_mem_hash);

Datum
aqo_profile_mem_hash(PG_FUNCTION_ARGS)
{
HASH_SEQ_STATUS hash_seq;
ProfileMemEntry *entry;
TupleDesc tupdesc;
HeapTuple tuple;
AttInMetadata *attinmeta;
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;

if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot accept a set")));
if (!(rsinfo->allowedModes & SFRM_Materialize))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not allowed in this context")));

if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");

per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);

tupstore = tuplestore_begin_heap(true, false, work_mem);
attinmeta = TupleDescGetAttInMetadata(tupdesc);
rsinfo->returnMode = SFRM_Materialize;
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;

MemoryContextSwitchTo(oldcontext);

if (!profile_mem_queries)
{
ReleaseTupleDesc(tupdesc);
tuplestore_donestoring(tupstore);
elog(WARNING, "Hash table 'profile_mem_queries' doesn't exist");
PG_RETURN_VOID();
}

hash_seq_init(&hash_seq, profile_mem_queries);
while (((entry = (ProfileMemEntry *) hash_seq_search(&hash_seq)) != NULL))
{
char **values;

values = (char **) palloc(2 * sizeof(char *));
values[0] = (char *) palloc(16 * sizeof(char));
values[1] = (char *) palloc(16 * sizeof(char));

snprintf(values[0], 16, "%d", entry->key);
snprintf(values[1], 16, "%0.5f", entry->time);

tuple = BuildTupleFromCStrings(attinmeta, values);
tuplestore_puttuple(tupstore, tuple);
}

ReleaseTupleDesc(tupdesc);
tuplestore_donestoring(tupstore);

PG_RETURN_VOID();
}

/*
* This is the critical section: only one runner is allowed to be inside this
Expand Down Expand Up @@ -584,14 +660,18 @@ aqo_ExecutorEnd(QueryDesc *queryDesc)
{
double totaltime;
double cardinality_error;
HASHCTL hash_ctl;
bool found;
QueryStat *stat = NULL;
ProfileMemEntry *pentry;
instr_time endtime;
EphemeralNamedRelation enr = get_ENR(queryDesc->queryEnv, PlanStateInfo);
LOCKTAG tag;

cardinality_sum_errors = 0.;
cardinality_num_objects = 0;


if (!ExtractFromQueryEnv(queryDesc))
/* AQO keep all query-related preferences at the query context.
* It is needed to prevent from possible recursive changes, at
Expand All @@ -600,7 +680,7 @@ aqo_ExecutorEnd(QueryDesc *queryDesc)
* stage for this query.
*/
goto end;

njoins = (enr != NULL) ? *(int *) enr->reldata : -1;

Assert(!IsParallelWorker());
Expand Down Expand Up @@ -630,6 +710,32 @@ aqo_ExecutorEnd(QueryDesc *queryDesc)
(uint32) query_context.fspace_hash);
LockAcquire(&tag, ExclusiveLock, false, false);

if (aqo_profile_mem > 0)
{
if (profile_mem_queries == NULL)
{
hash_ctl.keysize = sizeof(int);
hash_ctl.entrysize = sizeof(ProfileMemEntry);

profile_mem_queries = ShmemInitHash("aqo_profile_mem_queries", aqo_profile_mem, aqo_profile_mem, &hash_ctl, HASH_ELEM | HASH_BLOBS);
}

INSTR_TIME_SET_CURRENT(endtime);
INSTR_TIME_SUBTRACT(endtime, query_context.query_starttime);
totaltime = INSTR_TIME_GET_DOUBLE(endtime);

pentry = (ProfileMemEntry *) hash_search(profile_mem_queries, &query_context.query_hash, HASH_ENTER, &found);

if (found)
{
pentry->time += totaltime - query_context.query_planning_time;
}
else
{
pentry->time = totaltime - query_context.query_planning_time;
}
}

if (query_context.collect_stat)
{
INSTR_TIME_SET_CURRENT(endtime);
Expand All @@ -646,6 +752,7 @@ aqo_ExecutorEnd(QueryDesc *queryDesc)
{
/* Calculate AQO statistics. */
if (query_context.use_aqo)
{
/* For the case, when query executed with AQO predictions. */
update_query_stat_row(stat->execution_time_with_aqo,
&stat->execution_time_with_aqo_size,
Expand All @@ -657,6 +764,7 @@ aqo_ExecutorEnd(QueryDesc *queryDesc)
totaltime - query_context.query_planning_time,
cardinality_error,
&stat->executions_with_aqo);
}
else
/* For the case, when query executed without AQO predictions. */
update_query_stat_row(stat->execution_time_without_aqo,
Expand Down