Skip to content

Commit

Permalink
Cleanup planner and process utility hooks
Browse files Browse the repository at this point in the history
Make sure parameter names are in lowercase-underscore format. Also,
rename the query tree walker change_table_name_walker() to
hypertable_query_walker() to reflect that it is no long renaming
anything (only setting up state in case of a query on a hypertable).
  • Loading branch information
erimatnor committed Jul 10, 2017
1 parent 84b0338 commit 142f58c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 47 deletions.
57 changes: 27 additions & 30 deletions src/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ void _planner_fini(void);
static planner_hook_type prev_planner_hook;
static set_rel_pathlist_hook_type prev_set_rel_pathlist_hook;

typedef struct ChangeTableNameCtx
typedef struct HypertableQueryCtx
{
Query *parse;
Query *parent;
CmdType commandType;
CmdType cmdtype;
Cache *hcache;
Hypertable *hentry;
} ChangeTableNameCtx;
} HypertableQueryCtx;

typedef struct AddPartFuncQualCtx
{
Expand All @@ -55,32 +55,27 @@ typedef struct GlobalPlannerCtx
static GlobalPlannerCtx *global_planner_ctx = NULL;

/*
* Change all main tables to one of the replicas in the parse tree.
*
* Identify queries on a hypertable by walking the query tree. If the query is
* indeed on a hypertable, setup the necessary state and/or make modifications
* to the query tree.
*/
static bool
change_table_name_walker(Node *node, void *context)
hypertable_query_walker(Node *node, void *context)
{
if (node == NULL)
{
return false;
}

if (IsA(node, RangeTblEntry))
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node;
ChangeTableNameCtx *ctx = (ChangeTableNameCtx *) context;
RangeTblEntry *rte = (RangeTblEntry *) node;
HypertableQueryCtx *ctx = (HypertableQueryCtx *) context;

if (rangeTableEntry->rtekind == RTE_RELATION && rangeTableEntry->inh
&& ctx->commandType != CMD_INSERT
)
if (rte->rtekind == RTE_RELATION && rte->inh && ctx->cmdtype != CMD_INSERT)
{
Hypertable *hentry = hypertable_cache_get_entry(ctx->hcache, rangeTableEntry->relid);
Hypertable *hentry = hypertable_cache_get_entry(ctx->hcache, rte->relid);

if (hentry != NULL)
{
ctx->hentry = hentry;
}
}

return false;
Expand All @@ -90,25 +85,26 @@ change_table_name_walker(Node *node, void *context)
if (IsA(node, Query))
{
bool result;
ChangeTableNameCtx *ctx = (ChangeTableNameCtx *) context;
CmdType old = ctx->commandType;
HypertableQueryCtx *ctx = (HypertableQueryCtx *) context;
CmdType old = ctx->cmdtype;
Query *query = (Query *) node;
Query *oldparent = ctx->parent;

/* adjust context */
ctx->commandType = ((Query *) node)->commandType;
ctx->parent = (Query *) node;
ctx->cmdtype = query->commandType;
ctx->parent = query;

result = query_tree_walker(ctx->parent, change_table_name_walker,
result = query_tree_walker(ctx->parent, hypertable_query_walker,
context, QTW_EXAMINE_RTES);

/* restore context */
ctx->commandType = old;
ctx->cmdtype = old;
ctx->parent = oldparent;

return result;
}

return expression_tree_walker(node, change_table_name_walker, context);
return expression_tree_walker(node, hypertable_query_walker, context);
}

/* Returns the partitioning info for a var if the var is a partitioning
Expand Down Expand Up @@ -300,7 +296,7 @@ add_partitioning_func_qual(Query *parse, Cache *hcache, Hypertable *hentry)
}

static PlannedStmt *
timescaledb_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
timescaledb_planner(Query *parse, int cursor_opts, ParamListInfo bound_params)
{
PlannedStmt *rv = NULL;
GlobalPlannerCtx gpc;
Expand All @@ -309,15 +305,16 @@ timescaledb_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)

if (extension_is_loaded())
{
ChangeTableNameCtx context;
HypertableQueryCtx context;

/* replace call to main table with call to the replica table */
context.hcache = hypertable_cache_pin();
context.parse = parse;
context.parent = parse;
context.commandType = parse->commandType;
context.cmdtype = parse->commandType;
context.hentry = NULL;
change_table_name_walker((Node *) parse, &context);
hypertable_query_walker((Node *) parse, &context);

/* note assumes 1 hypertable per query */
if (context.hentry != NULL)
{
Expand All @@ -326,17 +323,17 @@ timescaledb_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
}

cache_release(context.hcache);

}

if (prev_planner_hook != NULL)
{
/* Call any earlier hooks */
rv = (prev_planner_hook) (parse, cursorOptions, boundParams);
rv = (prev_planner_hook) (parse, cursor_opts, bound_params);
}
else
{
/* Call the standard planner */
rv = standard_planner(parse, cursorOptions, boundParams);
rv = standard_planner(parse, cursor_opts, bound_params);
}

global_planner_ctx = NULL;
Expand Down
34 changes: 17 additions & 17 deletions src/process_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,58 +17,57 @@ static ProcessUtility_hook_type prev_ProcessUtility_hook;
/* Calls the default ProcessUtility */
static void
prev_ProcessUtility(Node *parsetree,
const char *queryString,
const char *query_string,
ProcessUtilityContext context,
ParamListInfo params,
DestReceiver *dest,
char *completionTag)
char *completion_tag)
{
if (prev_ProcessUtility_hook != NULL)
{
/* Call any earlier hooks */
(prev_ProcessUtility_hook) (parsetree, queryString, context, params, dest, completionTag);
(prev_ProcessUtility_hook) (parsetree, query_string, context, params, dest, completion_tag);
}
else
{
/* Call the standard */
standard_ProcessUtility(parsetree, queryString, context, params, dest, completionTag);
standard_ProcessUtility(parsetree, query_string, context, params, dest, completion_tag);
}
}

/* Hook-intercept for ProcessUtility. Used to make COPY use a temp copy table and */
/* blocking renaming of hypertables. */
/* Hook-intercept for ProcessUtility. */
static void
timescaledb_ProcessUtility(Node *parsetree,
const char *queryString,
const char *query_string,
ProcessUtilityContext context,
ParamListInfo params,
DestReceiver *dest,
char *completionTag)
char *completion_tag)
{
if (!extension_is_loaded())
{
prev_ProcessUtility(parsetree, queryString, context, params, dest, completionTag);
prev_ProcessUtility(parsetree, query_string, context, params, dest, completion_tag);
return;
}

/* We don't support renaming hypertables yet so we need to block it */
if (IsA(parsetree, RenameStmt))
{
RenameStmt *renamestmt = (RenameStmt *) parsetree;
Oid relId = RangeVarGetRelid(renamestmt->relation, NoLock, true);
Oid relid = RangeVarGetRelid(renamestmt->relation, NoLock, true);

if (OidIsValid(relId))
if (OidIsValid(relid))
{
Cache *hcache = hypertable_cache_pin();
Hypertable *hentry = hypertable_cache_get_entry(hcache, relId);
Hypertable *hentry = hypertable_cache_get_entry(hcache, relid);

if (hentry != NULL && renamestmt->renameType == OBJECT_TABLE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Renaming hypertables is not yet supported")));
cache_release(hcache);
}
prev_ProcessUtility((Node *) renamestmt, queryString, context, params, dest, completionTag);
prev_ProcessUtility((Node *) renamestmt, query_string, context, params, dest, completion_tag);
return;
}
if (IsA(parsetree, CopyStmt))
Expand All @@ -80,16 +79,17 @@ timescaledb_ProcessUtility(Node *parsetree,
uint64 processed;

executor_level_enter();
DoCopy((CopyStmt *) parsetree, queryString, &processed);
DoCopy((CopyStmt *) parsetree, query_string, &processed);
executor_level_exit();
processed += executor_get_additional_tuples_processed();
if (completionTag)
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,

if (completion_tag)
snprintf(completion_tag, COMPLETION_TAG_BUFSIZE,
"COPY " UINT64_FORMAT, processed);
return;
}

prev_ProcessUtility(parsetree, queryString, context, params, dest, completionTag);
prev_ProcessUtility(parsetree, query_string, context, params, dest, completion_tag);
}

void
Expand Down

0 comments on commit 142f58c

Please sign in to comment.