From f51ee8d3ca7fef40c17b8cb86b0c79c85729422f Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Thu, 9 Dec 2021 16:24:34 +0000 Subject: [PATCH 01/17] Implmentation of aggregation/grouping pushdown The current implementation provides a mechanism for pushing down aggregation and/or grouping queries into the foreign data source. The Python side of the implementation will now receive two new kwargs, `aggs` and `group_clauses`, in which case it should return the corresponding aggreagation result. Still left to implement is consulting the Python side whether remote aggregation is possible at all, and if so which agregation functions are valid. Also missing are some more advanced aggregation cases (aggregating multiple functions, or handling `HAVING` clause for example). This is to be implemented separately. --- Makefile | 4 +- python/multicorn/__init__.py | 9 +- src/deparse.c | 577 +++++++++++++++++++++++++++++ src/multicorn.c | 691 +++++++++++++++++++++++++++++++++-- src/multicorn.h | 138 ++++++- src/python.c | 39 ++ src/query.c | 16 +- 7 files changed, 1435 insertions(+), 39 deletions(-) create mode 100644 src/deparse.c diff --git a/Makefile b/Makefile index 45243bf05..83ada1064 100755 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ srcdir = . MODULE_big = multicorn -OBJS = src/errors.o src/python.o src/query.o src/multicorn.o +OBJS = src/deparse.o src/errors.o src/python.o src/query.o src/multicorn.o DATA = $(filter-out $(wildcard sql/*--*.sql),$(wildcard sql/*.sql)) @@ -19,7 +19,7 @@ directories.stamp: $(OBJS): directories.stamp -install: python_code +install: python_code sql/$(EXTENSION)--$(EXTVERSION).sql: sql/$(EXTENSION).sql directories.stamp cp $< $@ diff --git a/python/multicorn/__init__.py b/python/multicorn/__init__.py index 72d8ebb75..e47d4b83b 100755 --- a/python/multicorn/__init__.py +++ b/python/multicorn/__init__.py @@ -280,7 +280,7 @@ def explain(self, quals, columns, sortkeys=None, verbose=False): """ return [] - def execute(self, quals, columns, sortkeys=None): + def execute(self, quals, columns, sortkeys=None, aggs=None, group_clauses=None): """Execute a query in the foreign data wrapper. This method is called at the first iteration. @@ -311,8 +311,15 @@ def execute(self, quals, columns, sortkeys=None): You should return AT LEAST those columns when returning a dict. If returning a sequence, every column from the table should be in the sequence. + + Kwargs: sortkeys (list): A list of :class:`SortKey` that the FDW said it can enforce. + aggs (dict): A dictionary mapping aggregation key with function and + column to be used in the aggregation operation. Result should be + returned under the provided aggregation key. + group_clauses (list): A list of columns used in GROUP BY statements. + The result should be returned for each column name provided. Returns: An iterable of python objects which can be converted back to PostgreSQL. diff --git a/src/deparse.c b/src/deparse.c new file mode 100644 index 000000000..f8162ce59 --- /dev/null +++ b/src/deparse.c @@ -0,0 +1,577 @@ +/*------------------------------------------------------------------------- + * + * Multicorn Foreign Data Wrapper for PostgreSQL + * + * IDENTIFICATION + * deparse.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "pgtime.h" +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/sysattr.h" +#include "catalog/pg_aggregate.h" +#include "catalog/pg_collation.h" +#include "catalog/pg_namespace.h" +#include "catalog/pg_operator.h" +#include "catalog/pg_proc.h" +#include "catalog/pg_type.h" +#include "commands/defrem.h" +#include "nodes/nodeFuncs.h" +#include "nodes/plannodes.h" +#include "optimizer/clauses.h" +#include "optimizer/optimizer.h" +#include "optimizer/tlist.h" +#include "parser/parsetree.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/syscache.h" +#include "utils/timestamp.h" +#include "utils/typcache.h" +#include "commands/tablecmds.h" + +#include "multicorn.h" + + +/* + * Local (per-tree-level) context for multicorn_foreign_expr_walker's search. + * This is concerned with identifying collations used in the expression. + */ +typedef enum +{ + FDW_COLLATE_NONE, /* expression is of a noncollatable type */ + FDW_COLLATE_SAFE, /* collation derives from a foreign Var */ + FDW_COLLATE_UNSAFE /* collation derives from something else */ +} FDWCollateState; + + +/* + * Global context for multicorn_foreign_expr_walker's search of an expression tree. + */ +typedef struct foreign_glob_cxt +{ + PlannerInfo *root; /* global planner state */ + RelOptInfo *foreignrel; /* the foreign relation we are planning for */ + + /* + * For join pushdown, only a limited set of operators are allowed to be + * pushed. This flag helps us identify if we are walking through the list + * of join conditions. Also true for aggregate relations to restrict + * aggregates for specified list. + */ + bool is_remote_cond; /* true for join or aggregate relations */ + Relids relids; /* relids of base relations in the underlying + * scan */ +} foreign_glob_cxt; + +typedef struct foreign_loc_cxt +{ + Oid collation; /* OID of current collation, if any */ + FDWCollateState state; /* state of current collation choice */ +} foreign_loc_cxt; + +static Value *multicorn_deparse_function_name(Oid funcid); +static void multicorn_deparse_explicit_target_list(List *tlist, + bool is_returning, + List **retrieved_attrs, + deparse_expr_cxt *context); + +/* + * Return true if given object is one of PostgreSQL's built-in objects. + * + * We use FirstBootstrapObjectId as the cutoff, so that we only consider + * objects with hand-assigned OIDs to be "built in", not for instance any + * function or type defined in the information_schema. + * + * Our constraints for dealing with types are tighter than they are for + * functions or operators: we want to accept only types that are in pg_catalog, + * else format_type might incorrectly fail to schema-qualify their names. + * (This could be fixed with some changes to format_type, but for now there's + * no need.) Thus we must exclude information_schema types. + * + * XXX there is a problem with this, which is that the set of built-in + * objects expands over time. Something that is built-in to us might not + * be known to the remote server, if it's of an older version. But keeping + * track of that would be a huge exercise. + */ +static bool +multicorn_is_builtin(Oid oid) +{ + return (oid < FirstBootstrapObjectId); +} + +/* + * Check if expression is safe to execute remotely, and return true if so. + * + * In addition, *outer_cxt is updated with collation information. + * + * We must check that the expression contains only node types we can deparse, + * that all types/functions/operators are safe to send (which we approximate + * as being built-in), and that all collations used in the expression derive + * from Vars of the foreign table. Because of the latter, the logic is + * pretty close to assign_collations_walker() in parse_collate.c, though we + * can assume here that the given expression is valid. + */ +static bool +multicorn_foreign_expr_walker(Node *node, + foreign_glob_cxt *glob_cxt, + foreign_loc_cxt *outer_cxt) +{ + bool check_type = true; + //MulticornPlanState *fpinfo; + foreign_loc_cxt inner_cxt; + Oid collation = InvalidOid; + FDWCollateState state = FDW_COLLATE_NONE; + HeapTuple tuple; + Form_pg_operator form; + + /* Need do nothing for empty subexpressions */ + if (node == NULL) + return true; + + /* May need server info from baserel's fdw_private struct */ + //fpinfo = (MulticornPlanState *) (glob_cxt->foreignrel->fdw_private); + + /* Set up inner_cxt for possible recursion to child nodes */ + inner_cxt.collation = InvalidOid; + inner_cxt.state = FDW_COLLATE_NONE; + switch (nodeTag(node)) + { + case T_Var: + { + Var *var = (Var *) node; + + /* + * If the Var is from the foreign table, we consider its + * collation (if any) safe to use. If it is from another + * table, we treat its collation the same way as we would a + * Param's collation, ie it's not safe for it to have a + * non-default collation. + */ + if (bms_is_member(var->varno, glob_cxt->relids) && + var->varlevelsup == 0) + { + /* Var belongs to foreign table */ + + /* + * System columns (e.g. oid, ctid) should not be sent to + * the remote, since we don't make any effort to ensure + * that local and remote values match (tableoid, in + * particular, almost certainly doesn't match). + */ + if (var->varattno < 0) + return false; + + /* Else check the collation */ + collation = var->varcollid; + state = OidIsValid(collation) ? FDW_COLLATE_SAFE : FDW_COLLATE_NONE; + } + else + { + /* Var belongs to some other table */ + collation = var->varcollid; + if (collation == InvalidOid || + collation == DEFAULT_COLLATION_OID) + { + /* + * It's noncollatable, or it's safe to combine with a + * collatable foreign Var, so set state to NONE. + */ + state = FDW_COLLATE_NONE; + } + else + { + /* + * Do not fail right away, since the Var might appear + * in a collation-insensitive context. + */ + state = FDW_COLLATE_UNSAFE; + } + } + } + break; + case T_Aggref: + { + Aggref *agg = (Aggref *) node; + ListCell *lc; + char *opername = NULL; + Oid schema; + + /* get function name and schema */ + tuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(agg->aggfnoid)); + if (!HeapTupleIsValid(tuple)) + { + elog(ERROR, "cache lookup failed for function %u", agg->aggfnoid); + } + opername = pstrdup(((Form_pg_proc) GETSTRUCT(tuple))->proname.data); + schema = ((Form_pg_proc) GETSTRUCT(tuple))->pronamespace; + ReleaseSysCache(tuple); + + /* ignore functions in other than the pg_catalog schema */ + if (schema != PG_CATALOG_NAMESPACE) + return false; + + /* TODO: this decision will need to be forwarded to Python */ + if (!(strcmp(opername, "sum") == 0 + || strcmp(opername, "avg") == 0 + || strcmp(opername, "max") == 0 + || strcmp(opername, "min") == 0 + || strcmp(opername, "count") == 0)) + { + return false; + } + + + /* Not safe to pushdown when not in grouping context */ + if (!IS_UPPER_REL(glob_cxt->foreignrel)) + return false; + + /* Only non-split aggregates are pushable. */ + if (agg->aggsplit != AGGSPLIT_SIMPLE) + return false; + + /* + * Recurse to input args. aggdirectargs, aggorder and + * aggdistinct are all present in args, so no need to check + * their shippability explicitly. + */ + foreach(lc, agg->args) + { + Node *n = (Node *) lfirst(lc); + + /* If TargetEntry, extract the expression from it */ + if (IsA(n, TargetEntry)) + { + TargetEntry *tle = (TargetEntry *) n; + + n = (Node *) tle->expr; + } + + if (!multicorn_foreign_expr_walker(n, glob_cxt, &inner_cxt)) + return false; + } + + if (agg->aggorder || agg->aggfilter) + { + return false; + } + + /* + * If aggregate's input collation is not derived from a + * foreign Var, it can't be sent to remote. + */ + if (agg->inputcollid == InvalidOid) + /* OK, inputs are all noncollatable */ ; + else if (inner_cxt.state != FDW_COLLATE_SAFE || + agg->inputcollid != inner_cxt.collation) + return false; + + /* + * Detect whether node is introducing a collation not derived + * from a foreign Var. (If so, we just mark it unsafe for now + * rather than immediately returning false, since the parent + * node might not care.) + */ + collation = agg->aggcollid; + if (collation == InvalidOid) + state = FDW_COLLATE_NONE; + else if (inner_cxt.state == FDW_COLLATE_SAFE && + collation == inner_cxt.collation) + state = FDW_COLLATE_SAFE; + else if (collation == DEFAULT_COLLATION_OID) + state = FDW_COLLATE_NONE; + else + state = FDW_COLLATE_UNSAFE; + } + break; + default: + + /* + * If it's anything else, assume it's unsafe. This list can be + * expanded later, but don't forget to add deparse support below. + */ + return false; + } + + /* + * If result type of given expression is not built-in, it can't be sent to + * remote because it might have incompatible semantics on remote side. + */ + if (check_type && !multicorn_is_builtin(exprType(node))) + return false; + + /* + * Now, merge my collation information into my parent's state. + */ + if (state > outer_cxt->state) + { + /* Override previous parent state */ + outer_cxt->collation = collation; + outer_cxt->state = state; + } + else if (state == outer_cxt->state) + { + /* Merge, or detect error if there's a collation conflict */ + switch (state) + { + case FDW_COLLATE_NONE: + /* Nothing + nothing is still nothing */ + break; + case FDW_COLLATE_SAFE: + if (collation != outer_cxt->collation) + { + /* + * Non-default collation always beats default. + */ + if (outer_cxt->collation == DEFAULT_COLLATION_OID) + { + /* Override previous parent state */ + outer_cxt->collation = collation; + } + else if (collation != DEFAULT_COLLATION_OID) + { + /* + * Conflict; show state as indeterminate. We don't + * want to "return false" right away, since parent + * node might not care about collation. + */ + outer_cxt->state = FDW_COLLATE_UNSAFE; + } + } + break; + case FDW_COLLATE_UNSAFE: + /* We're still conflicted ... */ + break; + } + } + /* It looks OK */ + return true; +} + +/* + * Returns true if given expr is safe to evaluate on the foreign server. + */ +bool +multicorn_is_foreign_expr(PlannerInfo *root, + RelOptInfo *baserel, + Expr *expr) +{ + foreign_glob_cxt glob_cxt; + foreign_loc_cxt loc_cxt; + MulticornPlanState *fpinfo = (MulticornPlanState *) (baserel->fdw_private); + + /* + * Check that the expression consists of nodes that are safe to execute + * remotely. + */ + glob_cxt.root = root; + glob_cxt.foreignrel = baserel; + + /* + * For an upper relation, use relids from its underneath scan relation, + * because the upperrel's own relids currently aren't set to anything + * meaningful by the core code. For other relation, use their own relids. + */ + if (IS_UPPER_REL(baserel)) + glob_cxt.relids = fpinfo->outerrel->relids; + else + glob_cxt.relids = baserel->relids; + loc_cxt.collation = InvalidOid; + loc_cxt.state = FDW_COLLATE_NONE; + if (!multicorn_foreign_expr_walker((Node *) expr, &glob_cxt, &loc_cxt)) + return false; + + /* + * If the expression has a valid collation that does not arise from a + * foreign var, the expression can not be sent over. + */ + if (loc_cxt.state == FDW_COLLATE_UNSAFE) + return false; + + /* + * An expression which includes any mutable functions can't be sent over + * because its result is not stable. For example, sending now() remote + * side could cause confusion from clock offsets. Future versions might + * be able to make this choice with more granularity. (We check this last + * because it requires a lot of expensive catalog lookups.) + */ + if (contain_mutable_functions((Node *) expr)) + return false; + + /* OK to evaluate on the remote server */ + return true; +} + + +/* + * Returns true if given expr is something we'd have to send the value of + * to the foreign server. + * + * This should return true when the expression is a shippable node that + * deparseExpr would add to context->params_list. Note that we don't care + * if the expression *contains* such a node, only whether one appears at top + * level. We need this to detect cases where setrefs.c would recognize a + * false match between an fdw_exprs item (which came from the params_list) + * and an entry in fdw_scan_tlist (which we're considering putting the given + * expression into). + */ +bool +multicorn_is_foreign_param(PlannerInfo *root, + RelOptInfo *baserel, + Expr *expr) +{ + if (expr == NULL) + return false; + + switch (nodeTag(expr)) + { + case T_Var: + { + /* It would have to be sent unless it's a foreign Var */ + Var *var = (Var *) expr; + MulticornPlanState *fpinfo = (MulticornPlanState *) (baserel->fdw_private); + Relids relids; + + if (IS_UPPER_REL(baserel)) + relids = fpinfo->outerrel->relids; + else + relids = baserel->relids; + + if (bms_is_member(var->varno, relids) && var->varlevelsup == 0) + return false; /* foreign Var, so not a param */ + else + return true; /* it'd have to be a param */ + break; + } + case T_Param: + /* Params always have to be sent to the foreign server */ + return true; + default: + break; + } + return false; +} + +/* + * Build the targetlist for given relation to be deparsed as SELECT clause. + * + * The output targetlist contains the columns that need to be fetched from the + * foreign server for the given relation. If foreignrel is an upper relation, + * then the output targetlist can also contains expressions to be evaluated on + * foreign server. + */ +List * +multicorn_build_tlist_to_deparse(RelOptInfo *foreignrel) +{ + List *tlist = NIL; + MulticornPlanState *fpinfo = (MulticornPlanState *) foreignrel->fdw_private; + ListCell *lc; + + /* + * For an upper relation, we have already built the target list while + * checking shippability, so just return that. + */ + if (IS_UPPER_REL(foreignrel)) + return fpinfo->grouped_tlist; + + /* + * We require columns specified in foreignrel->reltarget->exprs and those + * required for evaluating the local conditions. + */ + tlist = add_to_flat_tlist(tlist, + pull_var_clause((Node *) foreignrel->reltarget->exprs, + PVC_RECURSE_PLACEHOLDERS)); + foreach(lc, fpinfo->local_conds) + { + RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); + + tlist = add_to_flat_tlist(tlist, + pull_var_clause((Node *) rinfo->clause, + PVC_RECURSE_PLACEHOLDERS)); + } + + return tlist; +} + +/* + * Iterate through the targets and extract relveant information needed to execute + * the aggregation and/or grouping on the remote data source through Python. + * + * NB: Logic here is strongly coupled to multicorn_foreign_grouping_ok(), i.e. + * if there is no ressortgroupref set, we automatically assume the only other + * option is a Aggref node type. + * Moreover, for the Aggref node type we assume only a single element in args + * (i.e. only aggregations over single columns, e.g. sum(column2)). In particular, + * this is because in multicorn_foreign_expr_walker() we don't T_OpExpr case. + */ +void +multicorn_extract_upper_rel_info(PlannerInfo *root, List *tlist, MulticornPlanState *fpinfo) +{ + ListCell *lc; + TargetEntry *tle; + Var *var; + Value *colname, *function; + Aggref *aggref; + + foreach(lc, tlist) + { + tle = lfirst_node(TargetEntry, lc); + + if (tle->ressortgroupref) + { + /* GROUP BY target */ + var = (Var *) tle->expr; + colname = colnameFromVar(var, root); + + fpinfo->group_clauses = lappend(fpinfo->group_clauses, colname); + fpinfo->upper_rel_targets = lappend(fpinfo->upper_rel_targets, colname); + } + else + { + /* Aggregation target */ + aggref = (Aggref *) tle->expr; + function = multicorn_deparse_function_name(aggref->aggfnoid); + + var = linitial(pull_var_clause((Node *) aggref, + PVC_RECURSE_AGGREGATES | + PVC_RECURSE_PLACEHOLDERS)); + colname = colnameFromVar(var, root); + + StringInfo agg_key = makeStringInfo(); + initStringInfo(agg_key); + + appendStringInfoString(agg_key, strVal(function)); + appendStringInfoString(agg_key, "_"); + appendStringInfoString(agg_key, strVal(colname)); + + fpinfo->aggs = lappend(fpinfo->aggs, list_make3(makeString(agg_key->data), function, colname)); + fpinfo->upper_rel_targets = lappend(fpinfo->upper_rel_targets, makeString(agg_key->data)); + } + } +} + +/* + * multicorn_deparse_function_name + * Deparses function name from given function oid. + */ +static Value * +multicorn_deparse_function_name(Oid funcid) +{ + HeapTuple proctup; + Form_pg_proc procform; + const char *proname; + + proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid)); + if (!HeapTupleIsValid(proctup)) + elog(ERROR, "cache lookup failed for function %u", funcid); + procform = (Form_pg_proc) GETSTRUCT(proctup); + + proname = NameStr(procform->proname); + + ReleaseSysCache(proctup); + return makeString(proname); +} diff --git a/src/multicorn.c b/src/multicorn.c index 87c193ea1..55f5ece77 100644 --- a/src/multicorn.c +++ b/src/multicorn.c @@ -12,6 +12,8 @@ #include "optimizer/planmain.h" #include "optimizer/restrictinfo.h" #include "optimizer/clauses.h" +#include "optimizer/optimizer.h" +#include "optimizer/tlist.h" #if PG_VERSION_NUM < 120000 #include "optimizer/var.h" #include "dynloader.h" @@ -108,6 +110,10 @@ static void multicorn_subxact_callback(SubXactEvent event, SubTransactionId mySu SubTransactionId parentSubid, void *arg); #endif +static void multicornGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, + RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra + ); + #if PG_VERSION_NUM >= 90500 static List *multicornImportForeignSchema(ImportForeignSchemaStmt * stmt, Oid serverOid); @@ -117,6 +123,15 @@ static bool multicornIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *re static void multicorn_xact_callback(XactEvent event, void *arg); +/* Functions relating to aggregation/grouping pushdown */ +static bool multicorn_foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, + Node *havingQual); +static void multicorn_add_foreign_grouping_paths(PlannerInfo *root, + RelOptInfo *input_rel, + RelOptInfo *grouped_rel, + GroupPathExtraData *extra +); + /* Functions relating to scanning through subrelations */ static bool subscanReadRow(TupleTableSlot *slot, Relation subscanRel, void *subscanState); static void subscanEnd(ForeignScanState *node); @@ -223,6 +238,9 @@ multicorn_handler(PG_FUNCTION_ARGS) fdw_routine->IsForeignScanParallelSafe = multicornIsForeignScanParallelSafe; #endif + /* Support functions for upper relation push-down */ + fdw_routine->GetForeignUpperPaths = multicornGetForeignUpperPaths; + PG_RETURN_POINTER(fdw_routine); } @@ -286,6 +304,10 @@ multicornGetForeignRelSize(PlannerInfo *root, TupleDesc desc; baserel->fdw_private = planstate; + + /* Base foreign tables need to be push down always. */ + planstate->pushdown_safe = true; + planstate->fdw_instance = getInstance(foreigntableid); planstate->foreigntableid = foreigntableid; /* Initialize the conversion info array */ @@ -299,7 +321,7 @@ multicornGetForeignRelSize(PlannerInfo *root, planstate->cinfos = palloc0(sizeof(ConversionInfo *) * planstate->numattrs); - initConversioninfo(planstate->cinfos, attinmeta); + initConversioninfo(planstate->cinfos, attinmeta, NULL); /* * needWholeRow = rel->trigdesc && rel->trigdesc->trig_insert_after_row; * @@ -332,7 +354,21 @@ multicornGetForeignRelSize(PlannerInfo *root, } else { - /* Pull "var" clauses to build an appropriate target list */ + /* + * Pull "var" clauses to build an appropriate target list + * + * baserel->reltarget->exprs can be used to determine which columns need + * to be fetched; but note that it only lists columns that have to be + * emitted by the ForeignScan plan node, not columns that are used in + * qual evaluation but not output by the query. + * + * baserel->baserestrictinfo is particularly interesting, as it contains + * restriction quals (WHERE clauses) that should be used to filter the + * rows to be fetched. (The FDW itself is not required to enforce these + * quals, as the core executor can check them instead.) + * + * For more details see: https://www.postgresql.org/docs/12/fdw-planning.html + */ #if PG_VERSION_NUM >= 90600 foreach(lc, extractColumns(baserel->reltarget->exprs, baserel->baserestrictinfo)) #else @@ -346,14 +382,14 @@ multicornGetForeignRelSize(PlannerInfo *root, * Store only a Value node containing the string name of the * column. */ - colname = colnameFromVar(var, root, planstate); + colname = colnameFromVar(var, root); if (colname != NULL && strVal(colname) != NULL) { planstate->target_list = lappend(planstate->target_list, colname); } } } - + /* Extract the restrictions from the plan. */ foreach(lc, baserel->baserestrictinfo) { @@ -467,7 +503,7 @@ multicornGetForeignPaths(PlannerInfo *root, */ static ForeignScan * multicornGetForeignPlan(PlannerInfo *root, - RelOptInfo *baserel, + RelOptInfo *foreignrel, Oid foreigntableid, ForeignPath *best_path, List *tlist, @@ -477,33 +513,192 @@ multicornGetForeignPlan(PlannerInfo *root, #endif ) { - Index scan_relid = baserel->relid; - MulticornPlanState *planstate = (MulticornPlanState *) baserel->fdw_private; - ListCell *lc; + MulticornPlanState *planstate = (MulticornPlanState *) foreignrel->fdw_private; + Index scan_relid = foreignrel->relid; + List *remote_exprs = NIL; + List *local_exprs = NIL; + List *fdw_scan_tlist = NIL; + List *fdw_recheck_quals = NIL; + ListCell *lc; + #if PG_VERSION_NUM >= 90600 best_path->path.pathtarget->width = planstate->width; #endif - scan_clauses = extract_actual_clauses(scan_clauses, false); + + if (IS_SIMPLE_REL(foreignrel)) + { + /* + * For base relations, set scan_relid as the relid of the relation. + */ + scan_relid = foreignrel->relid; + + /* + * In a base-relation scan, we must apply the given scan_clauses. + * + * Separate the scan_clauses into those that can be executed remotely + * and those that can't. baserestrictinfo clauses that were + * previously determined to be safe or unsafe by classifyConditions + * are found in planstate->remote_conds and planstate->local_conds. Anything + * else in the scan_clauses list will be a join clause, which we have + * to check for remote-safety. + * + * Note: the join clauses we see here should be the exact same ones + * previously examined by multicornGetForeignPaths. Possibly it'd be + * worth passing forward the classification work done then, rather + * than repeating it here. + * + * This code must match "extract_actual_clauses(scan_clauses, false)" + * except for the additional decision about remote versus local + * execution. + */ + foreach(lc, scan_clauses) + { + RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); + + /* Ignore any pseudoconstants, they're dealt with elsewhere */ + if (rinfo->pseudoconstant) + continue; + + if (list_member_ptr(planstate->remote_conds, rinfo)) + remote_exprs = lappend(remote_exprs, rinfo->clause); + else if (list_member_ptr(planstate->local_conds, rinfo)) + local_exprs = lappend(local_exprs, rinfo->clause); + else if (multicorn_is_foreign_expr(root, foreignrel, rinfo->clause)) + remote_exprs = lappend(remote_exprs, rinfo->clause); + else + local_exprs = lappend(local_exprs, rinfo->clause); + } + + /* + * For a base-relation scan, we have to support EPQ recheck, which + * should recheck all the remote quals. + */ + fdw_recheck_quals = remote_exprs; + } + else + { + /* + * Join relation or upper relation - set scan_relid to 0. + */ + scan_relid = 0; + + /* + * For a join rel, baserestrictinfo is NIL and we are not considering + * parameterization right now, so there should be no scan_clauses for + * a joinrel or an upper rel either. + */ + Assert(!scan_clauses); + + /* + * Instead we get the conditions to apply from the fdw_private + * structure. + */ + remote_exprs = extract_actual_clauses(planstate->remote_conds, false); + local_exprs = extract_actual_clauses(planstate->local_conds, false); + + /* + * We leave fdw_recheck_quals empty in this case, since we never need + * to apply EPQ recheck clauses. In the case of a joinrel, EPQ + * recheck is handled elsewhere --- see multicornGetForeignJoinPaths(). + * If we're planning an upperrel (ie, remote grouping or aggregation) + * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be + * allowed, and indeed we *can't* put the remote clauses into + * fdw_recheck_quals because the unaggregated Vars won't be available + * locally. + */ + + /* Build the list of columns to be fetched from the foreign server. */ + fdw_scan_tlist = multicorn_build_tlist_to_deparse(foreignrel); + + /* + * Ensure that the outer plan produces a tuple whose descriptor + * matches our scan tuple slot. Also, remove the local conditions + * from outer plan's quals, lest they be evaluated twice, once by the + * local plan and once by the scan. + */ + if (outer_plan) + { + ListCell *lc; + + /* + * Right now, we only consider grouping and aggregation beyond + * joins. Queries involving aggregates or grouping do not require + * EPQ mechanism, hence should not have an outer plan here. + */ + Assert(!IS_UPPER_REL(foreignrel)); + + /* + * First, update the plan's qual list if possible. In some cases + * the quals might be enforced below the topmost plan level, in + * which case we'll fail to remove them; it's not worth working + * harder than this. + */ + foreach(lc, local_exprs) + { + Node *qual = lfirst(lc); + + outer_plan->qual = list_delete(outer_plan->qual, qual); + + /* + * For an inner join the local conditions of foreign scan plan + * can be part of the joinquals as well. (They might also be + * in the mergequals or hashquals, but we can't touch those + * without breaking the plan.) + */ + if (IsA(outer_plan, NestLoop) || + IsA(outer_plan, MergeJoin) || + IsA(outer_plan, HashJoin)) + { + Join *join_plan = (Join *) outer_plan; + + if (join_plan->jointype == JOIN_INNER) + join_plan->joinqual = list_delete(join_plan->joinqual, + qual); + } + } + + /* + * Now fix the subplan's tlist --- this might result in inserting + * a Result node atop the plan tree. + */ + outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist, + best_path->path.parallel_safe); + } + } + + /* Remember remote_exprs for possible use by postgresPlanDirectModify */ + planstate->final_remote_exprs = remote_exprs; + + best_path->path.pathtarget->width = planstate->width; + planstate->pathkeys = (List *) best_path->fdw_private; + + scan_clauses = extract_actual_clauses(scan_clauses, false); /* Extract the quals coming from a parameterized path, if any */ if (best_path->path.param_info) { foreach(lc, scan_clauses) { - extractRestrictions(baserel->relids, (Expr *) lfirst(lc), + extractRestrictions(foreignrel->relids, (Expr *) lfirst(lc), &planstate->qual_list); } } - planstate->pathkeys = (List *) best_path->fdw_private; + + /* Extract data needed for aggregations on the Python side */ + if (IS_UPPER_REL(foreignrel)) + { + multicorn_extract_upper_rel_info(root, fdw_scan_tlist, planstate); + } + return make_foreignscan(tlist, - scan_clauses, + local_exprs, scan_relid, scan_clauses, /* no expressions to evaluate */ serializePlanState(planstate) #if PG_VERSION_NUM >= 90500 - , NULL - , NULL /* All quals are meant to be rechecked */ - , NULL + , fdw_scan_tlist + , fdw_recheck_quals + , outer_plan #endif ); } @@ -542,12 +737,33 @@ multicornBeginForeignScan(ForeignScanState *node, int eflags) { ForeignScan *fscan = (ForeignScan *) node->ss.ps.plan; MulticornExecState *execstate; - TupleDesc tupdesc = RelationGetDescr(node->ss.ss_currentRelation); ListCell *lc; - execstate = initializeExecState(fscan->fdw_private); - execstate->values = palloc(sizeof(Datum) * tupdesc->natts); - execstate->nulls = palloc(sizeof(bool) * tupdesc->natts); + execstate = initializeExecState(fscan->fdw_private); + + /* + * Get info we'll need for converting data fetched from the foreign server + * into local representation and error reporting during that process. + */ + if (fscan->scan.scanrelid > 0) + { + execstate->rel = node->ss.ss_currentRelation; + execstate->tupdesc = RelationGetDescr(execstate->rel); + initConversioninfo(execstate->cinfos, TupleDescGetAttInMetadata(execstate->tupdesc), NULL); + } + else + { + execstate->rel = NULL; +#if (PG_VERSION_NUM >= 140000) + execstate->tupdesc = multicorn_get_tupdesc_for_join_scan_tuples(node); +#else + execstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor; +#endif + initConversioninfo(execstate->cinfos, TupleDescGetAttInMetadata(execstate->tupdesc), execstate->upper_rel_targets); + } + + execstate->values = palloc(sizeof(Datum) * execstate->tupdesc->natts); + execstate->nulls = palloc(sizeof(bool) * execstate->tupdesc->natts); execstate->qual_list = NULL; foreach(lc, fscan->fdw_exprs) { @@ -555,7 +771,6 @@ multicornBeginForeignScan(ForeignScanState *node, int eflags) ((Expr *) lfirst(lc)), &execstate->qual_list); } - initConversioninfo(execstate->cinfos, TupleDescGetAttInMetadata(tupdesc)); execstate->subscanCxt = AllocSetContextCreate( node->ss.ps.state->es_query_cxt, @@ -707,7 +922,7 @@ static List *buildCStoreQualList(AttrNumber *attrMap, int map_length, List *qual return result; } -/* +/* * Debug functions to print TupleDesc structs (stolen from private PG), * not currently used anywhere. */ @@ -728,7 +943,7 @@ static void print_desc(TupleDesc desc) { int i; for (i = 0; i < desc->natts; ++i) - { + { printatt((unsigned) i + 1, TupleDescAttr(desc, i)); } } @@ -766,7 +981,7 @@ static void subscanEnd(ForeignScanState *node) } /* - * Read a single row from the subscan relatio + * Read a single row from the subscan relation */ static bool subscanReadRow(TupleTableSlot *slot, Relation subscanRel, void *subscanState) { @@ -812,8 +1027,8 @@ multicornIterateForeignScan(ForeignScanState *node) MulticornExecState *execstate = node->fdw_state; PyObject *p_value; MemoryContext oldcontext; - - /* The data flow here is kind of complex: we treat strings returned by + + /* The data flow here is kind of complex: we treat strings returned by * Python as relation names to read data from directly instead of getting Python * to return it to us. * @@ -825,9 +1040,9 @@ multicornIterateForeignScan(ForeignScanState *node) * expects the relations to actually exist at query plan time -- for example, * CStore's read planner needs the table's RangeTblEntry to be present in the * planner's simple_rte_array, which we don't want to modify at query execution time. - * + * */ - + ExecClearTuple(slot); while (1) { @@ -902,7 +1117,7 @@ multicornIterateForeignScan(ForeignScanState *node) char *relation; Py_ssize_t strlength = 0; - + if (PyString_AsStringAndSize(p_value, &relation, &strlength) < 0) { elog(ERROR, "Could not convert subrelation to string!"); @@ -1121,7 +1336,7 @@ multicornBeginForeignModify(ModifyTableState *mtstate, modstate->buffer = makeStringInfo(); modstate->fdw_instance = getInstance(rel->rd_id); modstate->rowidAttrName = getRowIdColumn(modstate->fdw_instance); - initConversioninfo(modstate->cinfos, TupleDescGetAttInMetadata(desc)); + initConversioninfo(modstate->cinfos, TupleDescGetAttInMetadata(desc), NULL); oldcontext = MemoryContextSwitchTo(TopMemoryContext); MemoryContextSwitchTo(oldcontext); if (ps->ps_ResultTupleSlot) @@ -1130,7 +1345,7 @@ multicornBeginForeignModify(ModifyTableState *mtstate, modstate->resultCinfos = palloc0(sizeof(ConversionInfo *) * resultTupleDesc->natts); - initConversioninfo(modstate->resultCinfos, TupleDescGetAttInMetadata(resultTupleDesc)); + initConversioninfo(modstate->resultCinfos, TupleDescGetAttInMetadata(resultTupleDesc), NULL); } for (i = 0; i < desc->natts; i++) { @@ -1453,6 +1668,413 @@ multicornIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, } #endif +/* + * Merge FDW options from input relations into a new set of options for a join + * or an upper rel. + * + * For a join relation, FDW-specific information about the inner and outer + * relations is provided using fpinfo_i and fpinfo_o. For an upper relation, + * fpinfo_o provides the information for the input relation; fpinfo_i is + * expected to NULL. + */ +static void +multicorn_merge_fdw_options(MulticornPlanState *fpinfo, + const MulticornPlanState *fpinfo_o, + const MulticornPlanState *fpinfo_i) +{ + /* We must always have fpinfo_o. */ + Assert(fpinfo_o); + + /* fpinfo_i may be NULL, but if present the servers must both match. */ + Assert(!fpinfo_i || + fpinfo_i->server->serverid == fpinfo_o->server->serverid); + + /* + * Copy the server specific FDW options. (For a join, both relations come + * from the same server, so the server options should have the same value + * for both relations.) + */ + fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost; + fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost; + fpinfo->shippable_extensions = fpinfo_o->shippable_extensions; + fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate; + fpinfo->fetch_size = fpinfo_o->fetch_size; + + /* Multicorn specific options */ + fpinfo->fdw_instance = fpinfo_o->fdw_instance; + fpinfo->foreigntableid = fpinfo_o->foreigntableid; + fpinfo->numattrs = fpinfo_o->numattrs; + fpinfo->cinfos = fpinfo_o->cinfos; + fpinfo->pathkeys = fpinfo_o->pathkeys; + fpinfo->target_list = fpinfo_o->target_list; + fpinfo->qual_list = fpinfo_o->qual_list; + fpinfo->pathkeys = fpinfo_o->pathkeys; + + /* Merge the table level options from either side of the join. */ + if (fpinfo_i) + { + /* + * We'll prefer to use remote estimates for this join if any table + * from either side of the join is using remote estimates. This is + * most likely going to be preferred since they're already willing to + * pay the price of a round trip to get the remote EXPLAIN. In any + * case it's not entirely clear how we might otherwise handle this + * best. + */ + fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate || + fpinfo_i->use_remote_estimate; + + /* + * Set fetch size to maximum of the joining sides, since we are + * expecting the rows returned by the join to be proportional to the + * relation sizes. + */ + fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size); + } +} + +/* + * Assess whether the aggregation, grouping and having operations can be pushed + * down to the foreign server. As a side effect, save information we obtain in + * this function to MulticornPlanState of the input relation. + */ +static bool +multicorn_foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, + Node *havingQual) +{ + Query *query = root->parse; + MulticornPlanState *fpinfo = (MulticornPlanState *) grouped_rel->fdw_private; + PathTarget *grouping_target = grouped_rel->reltarget; + // Q: Or perhaps like in SQLite FDW `PathTarget *grouping_target = root->upper_targets[UPPERREL_GROUP_AGG];`? + // A: It appears they are the same thing; see also GridDBs sortgrouprefs explanation + MulticornPlanState *ofpinfo; + ListCell *lc; + int i; + List *tlist = NIL; + + /* We currently don't support pushing Grouping Sets. */ + if (query->groupingSets) + return false; + + /* Get the fpinfo of the underlying scan relation. */ + ofpinfo = (MulticornPlanState *) fpinfo->outerrel->fdw_private; + + /* + * If underlying scan relation has any local conditions, those conditions + * are required to be applied before performing aggregation. Hence the + * aggregate cannot be pushed down. + */ + if (ofpinfo->local_conds) + return false; + + /* + * Examine grouping expressions, as well as other expressions we'd need to + * compute, and check whether they are safe to push down to the foreign + * server. All GROUP BY expressions will be part of the grouping target + * and thus there is no need to search for them separately. Add grouping + * expressions into target list which will be passed to foreign server. + * + * A tricky fine point is that we must not put any expression into the + * target list that is just a foreign param (that is, something that + * deparse.c would conclude has to be sent to the foreign server). If we + * do, the expression will also appear in the fdw_exprs list of the plan + * node, and setrefs.c will get confused and decide that the fdw_exprs + * entry is actually a reference to the fdw_scan_tlist entry, resulting in + * a broken plan. Somewhat oddly, it's OK if the expression contains such + * a node, as long as it's not at top level; then no match is possible. + */ + i = 0; + foreach(lc, grouping_target->exprs) + { + Expr *expr = (Expr *) lfirst(lc); + Index sgref = get_pathtarget_sortgroupref(grouping_target, i); + ListCell *l; + + /* Check whether this expression is part of GROUP BY clause */ + if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause)) + { + TargetEntry *tle; + + /* + * If any GROUP BY expression is not shippable, then we cannot + * push down aggregation to the foreign server. + */ + if (!multicorn_is_foreign_expr(root, grouped_rel, expr)) + return false; + + /* + * If it would be a foreign param, we can't put it into the tlist, + * so we have to fail. + */ + if (multicorn_is_foreign_param(root, grouped_rel, expr)) + return false; + + /* + * Pushable, so add to tlist. We need to create a TLE for this + * expression and apply the sortgroupref to it. We cannot use + * add_to_flat_tlist() here because that avoids making duplicate + * entries in the tlist. If there are duplicate entries with + * distinct sortgrouprefs, we have to duplicate that situation in + * the output tlist. + */ + tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false); + tle->ressortgroupref = sgref; + tlist = lappend(tlist, tle); + } + else + { + /* + * Non-grouping expression we need to compute. Can we ship it + * as-is to the foreign server? + */ + if (multicorn_is_foreign_expr(root, grouped_rel, expr) && + !multicorn_is_foreign_param(root, grouped_rel, expr)) + { + /* Yes, so add to tlist as-is; OK to suppress duplicates */ + tlist = add_to_flat_tlist(tlist, list_make1(expr)); + } + else + { + /* Not pushable as a whole; extract its Vars and aggregates */ + List *aggvars; + + aggvars = pull_var_clause((Node *) expr, + PVC_INCLUDE_AGGREGATES); + + /* + * If any aggregate expression is not shippable, then we + * cannot push down aggregation to the foreign server. (We + * don't have to check is_foreign_param, since that certainly + * won't return true for any such expression.) + */ + if (!multicorn_is_foreign_expr(root, grouped_rel, (Expr *) aggvars)) + return false; + + /* + * Add aggregates, if any, into the targetlist. Plain Vars + * outside an aggregate can be ignored, because they should be + * either same as some GROUP BY column or part of some GROUP + * BY expression. In either case, they are already part of + * the targetlist and thus no need to add them again. In fact + * including plain Vars in the tlist when they do not match a + * GROUP BY column would cause the foreign server to complain + * that the shipped query is invalid. + */ + foreach(l, aggvars) + { + Expr *expr = (Expr *) lfirst(l); + + if (IsA(expr, Aggref)) + tlist = add_to_flat_tlist(tlist, list_make1(expr)); + } + } + } + + i++; + } + + /* + * TODO: Enable HAVING clause pushdowns + */ + if (havingQual) + { + return false; + } + + /* + * If there are any local conditions, pull Vars and aggregates from it and + * check whether they are safe to pushdown or not. + */ + if (fpinfo->local_conds) + { + List *aggvars = NIL; + ListCell *lc; + + foreach(lc, fpinfo->local_conds) + { + RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); + + aggvars = list_concat(aggvars, + pull_var_clause((Node *) rinfo->clause, + PVC_INCLUDE_AGGREGATES)); + } + + foreach(lc, aggvars) + { + Expr *expr = (Expr *) lfirst(lc); + + /* + * If aggregates within local conditions are not safe to push + * down, then we cannot push down the query. Vars are already + * part of GROUP BY clause which are checked above, so no need to + * access them again here. Again, we need not check + * is_foreign_param for a foreign aggregate. + */ + if (IsA(expr, Aggref)) + { + if (!multicorn_is_foreign_expr(root, grouped_rel, expr)) + return false; + + tlist = add_to_flat_tlist(tlist, list_make1(expr)); + } + } + } + + + /* Store generated targetlist */ + fpinfo->grouped_tlist = tlist; + + /* Safe to pushdown */ + fpinfo->pushdown_safe = true; + + /* + * If user is willing to estimate cost for a scan using EXPLAIN, he + * intends to estimate scans on that relation more accurately. Then, it + * makes sense to estimate the cost of the grouping on that relation more + * accurately using EXPLAIN. + */ + fpinfo->use_remote_estimate = ofpinfo->use_remote_estimate; + + /* Copy startup and tuple cost as is from underneath input rel's fpinfo */ + fpinfo->fdw_startup_cost = ofpinfo->fdw_startup_cost; + fpinfo->fdw_tuple_cost = ofpinfo->fdw_tuple_cost; + + /* + * Set # of retrieved rows and cached relation costs to some negative + * value, so that we can detect when they are set to some sensible values, + * during one (usually the first) of the calls to multicorn_estimate_path_cost_size. + */ + fpinfo->retrieved_rows = -1; + fpinfo->rel_startup_cost = -1; + fpinfo->rel_total_cost = -1; + + + /* + * Set the string describing this grouped relation to be used in EXPLAIN + * output of corresponding ForeignScan. + */ + fpinfo->relation_name = NULL; + + return true; +} + +/* + * multicornGetForeignUpperPaths + * Add paths for post-join operations like aggregation, grouping etc. if + * corresponding operations are safe to push down. + * + * Right now, we only support aggregate, grouping and having clause pushdown. + */ +static void +multicornGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, + RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra +) +{ + MulticornPlanState *fpinfo; + + debug_elog("multicorn_fdw : %s", __func__); + + /* + * If input rel is not safe to pushdown, then simply return as we cannot + * perform any post-join operations on the foreign server. + */ + if (!input_rel->fdw_private || + !((MulticornPlanState *) input_rel->fdw_private)->pushdown_safe) + return; + + /* Ignore stages we don't support; and skip any duplicate calls. */ + if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private) + return; + + fpinfo = (MulticornPlanState *) palloc0(sizeof(MulticornPlanState)); + fpinfo->pushdown_safe = false; + fpinfo->stage = stage; + output_rel->fdw_private = fpinfo; + + switch (stage) + { + case UPPERREL_GROUP_AGG: + multicorn_add_foreign_grouping_paths(root, input_rel, output_rel, (GroupPathExtraData *) extra); + break; + default: + elog(ERROR, "unexpected upper relation: %d", (int) stage); + break; + } +} + +/* + * multicorn_add_foreign_grouping_paths + * Add foreign path for grouping and/or aggregation. + * + * Given input_rel represents the underlying scan. The paths are added to the + * given grouped_rel. + */ +static void +multicorn_add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, + RelOptInfo *grouped_rel, + GroupPathExtraData *extra +) +{ + Query *parse = root->parse; + MulticornPlanState *ifpinfo = input_rel->fdw_private; + MulticornPlanState *fpinfo = grouped_rel->fdw_private; + ForeignPath *grouppath; + double rows; + int width; + Cost startup_cost; + Cost total_cost; + + /* Nothing to be done, if there is no grouping or aggregation required. */ + if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs && + !root->hasHavingQual) + return; + + Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE || + extra->patype == PARTITIONWISE_AGGREGATE_FULL); + + /* save the input_rel as outerrel in fpinfo */ + fpinfo->outerrel = input_rel; + + /* + * Copy foreign table, foreign server, user mapping, shippable extensions + * etc. details from the input relation's fpinfo. + */ + fpinfo->table = ifpinfo->table; + fpinfo->server = ifpinfo->server; + fpinfo->user = ifpinfo->user; + multicorn_merge_fdw_options(fpinfo, ifpinfo, NULL); + + /* + * Assess if it is safe to push down aggregation and grouping. + * + * Use HAVING qual from extra. In case of child partition, it will have + * translated Vars. + */ + if (!multicorn_foreign_grouping_ok(root, grouped_rel, extra->havingQual)) + return; + + /* Use small cost to push down aggregate always */ + rows = width = startup_cost = total_cost = 1; + /* Now update this information in the fpinfo */ + fpinfo->rows = rows; + fpinfo->width = width; + fpinfo->startup_cost = startup_cost; + fpinfo->total_cost = total_cost; + + /* Create and add foreign path to the grouping relation. */ + grouppath = create_foreign_upper_path(root, + grouped_rel, + grouped_rel->reltarget, + rows, + startup_cost, + total_cost, + NIL, /* no pathkeys */ + NULL, + NIL); /* no fdw_private */ + + /* Add generated path into grouped_rel by add_path(). */ + add_path(grouped_rel, (Path *) grouppath); +} /* * "Serialize" a MulticornPlanState, so that it is safe to be carried @@ -1471,6 +2093,12 @@ serializePlanState(MulticornPlanState * state) result = lappend(result, serializeDeparsedSortGroup(state->pathkeys)); + result = lappend(result, state->upper_rel_targets); + + result = lappend(result, state->aggs); + + result = lappend(result, state->group_clauses); + return result; } @@ -1499,5 +2127,8 @@ initializeExecState(void *internalstate) execstate->nulls = palloc(attnum * sizeof(bool)); execstate->subscanRel = NULL; execstate->subscanState = NULL; + execstate->upper_rel_targets = list_nth(values, 4); + execstate->aggs = list_nth(values, 5); + execstate->group_clauses = list_nth(values, 6); return execstate; } diff --git a/src/multicorn.h b/src/multicorn.h index 8b00f98d8..185b79a59 100644 --- a/src/multicorn.h +++ b/src/multicorn.h @@ -60,6 +60,43 @@ typedef struct ConversionInfo } ConversionInfo; +/* + * This enum describes what's kept in the fdw_private list for a ForeignPath. + * We store: + * + * 1) Boolean flag showing if the remote query has the final sort + * 2) Boolean flag showing if the remote query has the LIMIT clause + */ +enum FdwPathPrivateIndex +{ + /* has-final-sort flag (as an integer Value node) */ + FdwPathPrivateHasFinalSort, + /* has-limit flag (as an integer Value node) */ + FdwPathPrivateHasLimit +}; + +/* + * Context for multicorn_deparse_expr + */ +typedef struct deparse_expr_cxt +{ + PlannerInfo *root; /* global planner state */ + RelOptInfo *foreignrel; /* the foreign relation we are planning for */ + RelOptInfo *scanrel; /* the underlying scan relation. Same as + * foreignrel, when that represents a join or + * a base relation. */ + + StringInfo buf; /* output buffer to append to */ + List **params_list; /* exprs that will become remote Params */ + bool can_skip_cast; /* outer function can skip int2/int4/int8/float4/float8 cast */ +} deparse_expr_cxt; + +/* + * FDW-specific planner information kept in RelOptInfo.fdw_private for a + * multicorn foreign table. + * multicornGetForeignJoinPaths creates it for a joinrel (not implemented yet), + * and mutlicornGetForeignUpperPaths creates it for an upperrel. + */ typedef struct MulticornPlanState { Oid foreigntableid; @@ -78,6 +115,69 @@ typedef struct MulticornPlanState * getRelSize to GetForeignPlan. */ int width; + + /* + * Aggregation and grouping data to be passed to the execution phase. + * See MulticornExecState for more details. + */ + List *upper_rel_targets; + List *aggs; + List *group_clauses; + + /* + * True means that the relation can be pushed down. Always true for simple + * foreign scan. + */ + bool pushdown_safe; + + /* baserestrictinfo clauses, broken down into safe and unsafe subsets. */ + List *remote_conds; + List *local_conds; + + /* Actual remote restriction clauses for scan (sans RestrictInfos) */ + List *final_remote_exprs; + + /* Estimated size and cost for a scan or join. */ + double rows; + Cost startup_cost; + Cost total_cost; + + /* Costs excluding costs for transferring data from the foreign server */ + double retrieved_rows; + Cost rel_startup_cost; + Cost rel_total_cost; + + /* Options extracted from catalogs. */ + bool use_remote_estimate; + Cost fdw_startup_cost; + Cost fdw_tuple_cost; + List *shippable_extensions; /* OIDs of whitelisted extensions */ + + /* Join information */ + RelOptInfo *outerrel; + RelOptInfo *innerrel; + JoinType jointype; + List *joinclauses; + + /* Upper relation information */ + UpperRelationKind stage; + + /* Cached catalog information. */ + ForeignTable *table; + ForeignServer *server; + UserMapping *user; /* only set in use_remote_estimate mode */ + + int fetch_size; /* fetch size for this remote table */ + + /* + * Name of the relation while EXPLAINing ForeignScan. It is used for join + * relations but is set for all relations. For join relation, the name + * indicates which foreign tables are being joined and the join type used. + */ + char *relation_name; + + /* Grouping information */ + List *grouped_tlist; } MulticornPlanState; typedef struct MulticornExecState @@ -105,6 +205,29 @@ typedef struct MulticornExecState TupleTableSlot *subscanSlot; AttrNumber *subscanAttrMap; uint64 tuplesRead; + + Relation rel; /* relcache entry for the foreign table. NULL + * for a foreign join scan. */ + TupleDesc tupdesc; /* tuple descriptor of scan */ + + /* + * List containing targets to be returned from Python in case of aggregations. + * List elements are aggregation keys or group_clauses elements. + */ + List *upper_rel_targets; + /* + * In case the query contains aggregations, the lists below details which + * functions correspond to which columns. + * List elements are themselves Lists of String nodes, denoting agg key, + * operation and column names, respectively. The agg key corresponds to the + * upper_rel_targets list entries. + */ + List *aggs; + /* + * List containing GROUP BY information. + * List elements are column names for grouping. + */ + List *group_clauses; } MulticornExecState; typedef struct MulticornModifyState @@ -158,6 +281,16 @@ typedef struct MulticornDeparsedSortGroup PathKey *key; } MulticornDeparsedSortGroup; +/* deparse.c */ +extern bool multicorn_is_foreign_expr(PlannerInfo *root, + RelOptInfo *baserel, + Expr *expr); +extern bool multicorn_is_foreign_param(PlannerInfo *root, + RelOptInfo *baserel, + Expr *expr); +extern List *multicorn_build_tlist_to_deparse(RelOptInfo *foreignrel); +extern void multicorn_extract_upper_rel_info(PlannerInfo *root, List *tlist, MulticornPlanState *fpinfo); + /* errors.c */ void errorCheck(void); @@ -201,10 +334,9 @@ void extractRestrictions(Relids base_relids, List **quals); List *extractColumns(List *reltargetlist, List *restrictinfolist); void initConversioninfo(ConversionInfo ** cinfo, - AttInMetadata *attinmeta); + AttInMetadata *attinmeta, List *upper_rel_targets); -Value *colnameFromVar(Var *var, PlannerInfo *root, - MulticornPlanState * state); +Value *colnameFromVar(Var *var, PlannerInfo *root); void computeDeparsedSortGroup(List *deparsed, MulticornPlanState *planstate, List **apply_pathkeys, diff --git a/src/python.c b/src/python.c index 3116ff8f3..0b6544025 100644 --- a/src/python.c +++ b/src/python.c @@ -969,6 +969,45 @@ execute(ForeignScanState *node, ExplainState *es) if(PyList_Size(p_pathkeys) > 0){ PyDict_SetItemString(kwargs, "sortkeys", p_pathkeys); } + if (state->aggs) + { + PyObject *aggs = PyDict_New(); + ListCell *lc_agg; + List *agg_list; + + foreach(lc_agg, state->aggs) + { + PyObject *agg = PyDict_New(); + + agg_list = (List *)lfirst(lc_agg); + PyObject *function = PyUnicode_FromString(strVal(lsecond(agg_list))); + PyObject *column = PyUnicode_FromString(strVal(lthird(agg_list))); + + PyDict_SetItemString(agg, "function", function); + PyDict_SetItemString(agg, "column", column); + PyDict_SetItemString(aggs, strVal(linitial(agg_list)), agg); + Py_DECREF(agg); + Py_DECREF(function); + Py_DECREF(column); + } + + PyDict_SetItemString(kwargs, "aggs", aggs); + Py_DECREF(aggs); + } + if (state->group_clauses) + { + PyObject *group_clauses = PyList_New(0); + ListCell *lc_groupc; + + foreach(lc_groupc, state->group_clauses) + { + PyObject *column = PyUnicode_FromString(strVal(lfirst(lc_groupc))); + PyList_Append(group_clauses, column); + } + + PyDict_SetItemString(kwargs, "group_clauses", group_clauses); + Py_DECREF(group_clauses); + } if(es != NULL){ PyObject * verbose; if(es->verbose){ diff --git a/src/query.c b/src/query.c index dd8997e41..60423ff31 100644 --- a/src/query.c +++ b/src/query.c @@ -107,7 +107,7 @@ extractColumns(List *reltargetlist, List *restrictinfolist) * objects back to suitable postgresql data structures. */ void -initConversioninfo(ConversionInfo ** cinfos, AttInMetadata *attinmeta) +initConversioninfo(ConversionInfo ** cinfos, AttInMetadata *attinmeta, List *upper_rel_targets) { int i; @@ -117,7 +117,17 @@ initConversioninfo(ConversionInfo ** cinfos, AttInMetadata *attinmeta) Oid outfuncoid; bool typIsVarlena; + char *attrname = NameStr(attr->attname); + if (upper_rel_targets) + { + /* + * For aggregations/groupings the targets lack attname, so we instead + * refer to the targets through references generated in + * multicorn_extract_upper_rel_info(). + */ + attrname = strVal(list_nth(upper_rel_targets, i)); + } if (!attr->attisdropped) { @@ -130,7 +140,7 @@ initConversioninfo(ConversionInfo ** cinfos, AttInMetadata *attinmeta) cinfo->atttypmod = attinmeta->atttypmods[i]; cinfo->attioparam = attinmeta->attioparams[i]; cinfo->attinfunc = &attinmeta->attinfuncs[i]; - cinfo->attrname = NameStr(attr->attname); + cinfo->attrname = attrname; cinfo->attnum = i + 1; cinfo->attndims = attr->attndims; cinfo->need_quote = false; @@ -438,7 +448,7 @@ extractClauseFromNullTest(Relids base_relids, * Returns a "Value" node containing the string name of the column from a var. */ Value * -colnameFromVar(Var *var, PlannerInfo *root, MulticornPlanState * planstate) +colnameFromVar(Var *var, PlannerInfo *root) { RangeTblEntry *rte = rte = planner_rt_fetch(var->varno, root); char *attname = get_attname(rte->relid, var->varattno); From 26e0d3e2bf755966ddf3b50feaf649db36d19f81 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Fri, 10 Dec 2021 12:18:48 +0000 Subject: [PATCH 02/17] Consult Python FDW instance for upper rel pushdown Add a method to FDW Python instance that provides info on whether the pushdown is supported at all, and if so gives data for more granular decisions (for now only list of aggregation functions). Consult this method in `multicornGetForeignUpperPaths`. --- python/multicorn/__init__.py | 15 +++++++++ src/deparse.c | 19 +++++------- src/multicorn.c | 21 +++++++++++++ src/multicorn.h | 4 +++ src/python.c | 60 +++++++++++++++++++++++++++++++++--- 5 files changed, 103 insertions(+), 16 deletions(-) diff --git a/python/multicorn/__init__.py b/python/multicorn/__init__.py index e47d4b83b..9cd7f2915 100755 --- a/python/multicorn/__init__.py +++ b/python/multicorn/__init__.py @@ -212,6 +212,21 @@ def can_sort(self, sortkeys): """ return [] + def can_pushdown_upperrel(self): + """ + Method called from the planner to ask the FDW whether it supports upper + relation pushdown (i.e. aggregation, grouping, etc.), and if so return + a data structure with appropriate details. + + The FDW has to inspect every sort, and respond which one are handled. + The sorts are cumulatives. + + Return: + None if pushdown not supported, otherwise a dictionary containing + more granular details for the planning phase, in the form: + """ + return None + def get_path_keys(self): u""" Method called from the planner to add additional Path to the planner. diff --git a/src/deparse.c b/src/deparse.c index f8162ce59..dd8706d2f 100644 --- a/src/deparse.c +++ b/src/deparse.c @@ -122,7 +122,7 @@ multicorn_foreign_expr_walker(Node *node, foreign_loc_cxt *outer_cxt) { bool check_type = true; - //MulticornPlanState *fpinfo; + MulticornPlanState *fpinfo; foreign_loc_cxt inner_cxt; Oid collation = InvalidOid; FDWCollateState state = FDW_COLLATE_NONE; @@ -133,8 +133,8 @@ multicorn_foreign_expr_walker(Node *node, if (node == NULL) return true; - /* May need server info from baserel's fdw_private struct */ - //fpinfo = (MulticornPlanState *) (glob_cxt->foreignrel->fdw_private); + /* Needed to asses per-instance FDW shipability properties */ + fpinfo = (MulticornPlanState *) (glob_cxt->foreignrel->fdw_private); /* Set up inner_cxt for possible recursion to child nodes */ inner_cxt.collation = InvalidOid; @@ -215,15 +215,9 @@ multicorn_foreign_expr_walker(Node *node, if (schema != PG_CATALOG_NAMESPACE) return false; - /* TODO: this decision will need to be forwarded to Python */ - if (!(strcmp(opername, "sum") == 0 - || strcmp(opername, "avg") == 0 - || strcmp(opername, "max") == 0 - || strcmp(opername, "min") == 0 - || strcmp(opername, "count") == 0)) - { + /* Make sure the specific function at hand is shippable */ + if (!PySequence_Contains(fpinfo->agg_functions, PyUnicode_FromString(opername))) return false; - } /* Not safe to pushdown when not in grouping context */ @@ -548,6 +542,9 @@ multicorn_extract_upper_rel_info(PlannerInfo *root, List *tlist, MulticornPlanSt appendStringInfoString(agg_key, "_"); appendStringInfoString(agg_key, strVal(colname)); + // TODO: Ensure that there is no possibility of a match between agg_key + // a colname from a GROUP BY clause. + fpinfo->aggs = lappend(fpinfo->aggs, list_make3(makeString(agg_key->data), function, colname)); fpinfo->upper_rel_targets = lappend(fpinfo->upper_rel_targets, makeString(agg_key->data)); } diff --git a/src/multicorn.c b/src/multicorn.c index 55f5ece77..5483cf74c 100644 --- a/src/multicorn.c +++ b/src/multicorn.c @@ -307,6 +307,7 @@ multicornGetForeignRelSize(PlannerInfo *root, /* Base foreign tables need to be push down always. */ planstate->pushdown_safe = true; + planstate->groupby_supported = false; planstate->fdw_instance = getInstance(foreigntableid); planstate->foreigntableid = foreigntableid; @@ -1795,6 +1796,13 @@ multicorn_foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, { TargetEntry *tle; + /* + * Ensure GROUP BY clauses are shippable at all by the corresponding + * Python FDW instance. + */ + if (!fpinfo->groupby_supported) + return false; + /* * If any GROUP BY expression is not shippable, then we cannot * push down aggregation to the foreign server. @@ -1823,6 +1831,13 @@ multicorn_foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, } else { + /* + * Ensure aggregation functions are shippable at all by the corresponding + * Python FDW instance. + */ + if (!fpinfo->agg_functions) + return false; + /* * Non-grouping expression we need to compute. Can we ship it * as-is to the foreign server? @@ -1986,6 +2001,10 @@ multicornGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private) return; + /* Check with the Python FDW instance whether it supports pushdown at all */ + if (!canPushdownUpperrel((MulticornPlanState *) input_rel->fdw_private)) + return; + fpinfo = (MulticornPlanState *) palloc0(sizeof(MulticornPlanState)); fpinfo->pushdown_safe = false; fpinfo->stage = stage; @@ -2042,6 +2061,8 @@ multicorn_add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, fpinfo->table = ifpinfo->table; fpinfo->server = ifpinfo->server; fpinfo->user = ifpinfo->user; + fpinfo->groupby_supported = ifpinfo->groupby_supported; + fpinfo->agg_functions = ifpinfo->agg_functions; multicorn_merge_fdw_options(fpinfo, ifpinfo, NULL); /* diff --git a/src/multicorn.h b/src/multicorn.h index 185b79a59..4c73a500b 100644 --- a/src/multicorn.h +++ b/src/multicorn.h @@ -116,6 +116,10 @@ typedef struct MulticornPlanState */ int width; + /* Details about upperrel pushdown fetched from the Python FDW instance */ + bool groupby_supported; + PyObject *agg_functions; + /* * Aggregation and grouping data to be passed to the execution phase. * See MulticornExecState for more details. diff --git a/src/python.c b/src/python.c index 0b6544025..e8fd37d97 100644 --- a/src/python.c +++ b/src/python.c @@ -969,7 +969,7 @@ execute(ForeignScanState *node, ExplainState *es) if(PyList_Size(p_pathkeys) > 0){ PyDict_SetItemString(kwargs, "sortkeys", p_pathkeys); } - if (state->aggs) + if (state->aggs) { PyObject *aggs = PyDict_New(); ListCell *lc_agg; @@ -977,11 +977,15 @@ execute(ForeignScanState *node, ExplainState *es) foreach(lc_agg, state->aggs) { - PyObject *agg = PyDict_New(); - + PyObject *agg, + *function, + *column; + + agg = PyDict_New(); + agg_list = (List *)lfirst(lc_agg); - PyObject *function = PyUnicode_FromString(strVal(lsecond(agg_list))); - PyObject *column = PyUnicode_FromString(strVal(lthird(agg_list))); + function = PyUnicode_FromString(strVal(lsecond(agg_list))); + column = PyUnicode_FromString(strVal(lthird(agg_list))); PyDict_SetItemString(agg, "function", function); PyDict_SetItemString(agg, "column", column); @@ -1681,6 +1685,52 @@ canSort(MulticornPlanState * state, List *deparsed) return result; } +/* + * Call the can_pushdown_upperrel method from the python implementation, to + * determine whether upper relations can be pushed down to the corresponding + * data source to begin with. + * + * If yes, then also initialize some fields in MulticornPlanState needed for + * more granular conditional logic for assesing whether the particular query + * is suitable for pushdown. + */ +bool +canPushdownUpperrel(MulticornPlanState * state) +{ + PyObject *fdw_instance = state->fdw_instance, + *p_upperrel_pushdown, + *p_object; + Py_ssize_t i, size; + bool pushdown_upperrel = false; + + p_upperrel_pushdown = PyObject_CallMethod(fdw_instance, "can_pushdown_upperrel", "()"); + errorCheck(); + + if (p_upperrel_pushdown != NULL && p_upperrel_pushdown != Py_None) + { + /* Determine whether the FDW instance supports GROUP BYs */ + p_object = PyMapping_GetItemString(p_upperrel_pushdown, "groupby_supported"); + if (p_object != NULL && p_object != Py_None) + { + state->groupby_supported = PyObject_IsTrue(p_object); + Py_DECREF(p_object); + } + + /* Determine which aggregation functions are supported */ + p_object = PyMapping_GetItemString(p_upperrel_pushdown, "agg_functions"); + if (p_object != NULL && p_object != Py_None) + { + state->agg_functions = PyMapping_Keys(p_object); + Py_DECREF(p_object); + } + + pushdown_upperrel = true; + } + + Py_DECREF(p_upperrel_pushdown); + return pushdown_upperrel; +} + PyObject * tupleTableSlotToPyObject(TupleTableSlot *slot, ConversionInfo ** cinfos) { From 3f969ebc7fce0f166edfe3c3104c45eaa27fc269 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 13 Dec 2021 10:56:31 +0000 Subject: [PATCH 03/17] Add parsing of local and remote conditions in the plan phase Currently the parsing is incomplete for simple WHERE clauses due to the lack of T_OpExpr and T_Const cases in multicorn_foreign_expr_walker. Therefore, all WHERE clauses will be treated as local conditions, and not pushed down. --- src/deparse.c | 35 ++++++++++++-- src/multicorn.c | 124 +++++++++--------------------------------------- src/multicorn.h | 6 +++ src/python.c | 1 - 4 files changed, 59 insertions(+), 107 deletions(-) diff --git a/src/deparse.c b/src/deparse.c index dd8706d2f..16876853d 100644 --- a/src/deparse.c +++ b/src/deparse.c @@ -80,6 +80,35 @@ static void multicorn_deparse_explicit_target_list(List *tlist, List **retrieved_attrs, deparse_expr_cxt *context); +/* + * Examine each qual clause in input_conds, and classify them into two groups, + * which are returned as two lists: + * - remote_conds contains expressions that can be evaluated remotely + * - local_conds contains expressions that can't be evaluated remotely + */ +void +multicorn_classify_conditions(PlannerInfo *root, + RelOptInfo *baserel, + List *input_conds, + List **remote_conds, + List **local_conds) +{ + ListCell *lc; + + *remote_conds = NIL; + *local_conds = NIL; + + foreach(lc, input_conds) + { + RestrictInfo *ri = lfirst_node(RestrictInfo, lc); + + if (multicorn_is_foreign_expr(root, baserel, ri->clause)) + *remote_conds = lappend(*remote_conds, ri); + else + *local_conds = lappend(*local_conds, ri); + } +} + /* * Return true if given object is one of PostgreSQL's built-in objects. * @@ -127,7 +156,6 @@ multicorn_foreign_expr_walker(Node *node, Oid collation = InvalidOid; FDWCollateState state = FDW_COLLATE_NONE; HeapTuple tuple; - Form_pg_operator form; /* Need do nothing for empty subexpressions */ if (node == NULL) @@ -539,12 +567,9 @@ multicorn_extract_upper_rel_info(PlannerInfo *root, List *tlist, MulticornPlanSt initStringInfo(agg_key); appendStringInfoString(agg_key, strVal(function)); - appendStringInfoString(agg_key, "_"); + appendStringInfoString(agg_key, "."); appendStringInfoString(agg_key, strVal(colname)); - // TODO: Ensure that there is no possibility of a match between agg_key - // a colname from a GROUP BY clause. - fpinfo->aggs = lappend(fpinfo->aggs, list_make3(makeString(agg_key->data), function, colname)); fpinfo->upper_rel_targets = lappend(fpinfo->upper_rel_targets, makeString(agg_key->data)); } diff --git a/src/multicorn.c b/src/multicorn.c index 5483cf74c..49657e5e3 100644 --- a/src/multicorn.c +++ b/src/multicorn.c @@ -308,6 +308,7 @@ multicornGetForeignRelSize(PlannerInfo *root, /* Base foreign tables need to be push down always. */ planstate->pushdown_safe = true; planstate->groupby_supported = false; + planstate->agg_functions = NULL; planstate->fdw_instance = getInstance(foreigntableid); planstate->foreigntableid = foreigntableid; @@ -398,6 +399,18 @@ multicornGetForeignRelSize(PlannerInfo *root, &planstate->qual_list); } + + /* + * Identify which baserestrictinfo clauses can be sent to the remote + * server and which can't. + * TODO: for now all WHERE clauses will be classified as local conditions + * since multicorn_foreign_expr_walker lacks T_OpExpr and T_Const cases, + * thus preventing pushdown. When adding support for this make sure to align + * the code in multicorn_extract_upper_rel_info as well. + */ + multicorn_classify_conditions(root, baserel, baserel->baserestrictinfo, + &planstate->remote_conds, &planstate->local_conds); + /* Inject the "rows" and "width" attribute into the baserel */ #if PG_VERSION_NUM >= 90600 getRelSize(planstate, root, &baserel->rows, &baserel->reltarget->width); @@ -515,7 +528,7 @@ multicornGetForeignPlan(PlannerInfo *root, ) { MulticornPlanState *planstate = (MulticornPlanState *) foreignrel->fdw_private; - Index scan_relid = foreignrel->relid; + Index scan_relid; List *remote_exprs = NIL; List *local_exprs = NIL; List *fdw_scan_tlist = NIL; @@ -610,64 +623,9 @@ multicornGetForeignPlan(PlannerInfo *root, /* Build the list of columns to be fetched from the foreign server. */ fdw_scan_tlist = multicorn_build_tlist_to_deparse(foreignrel); - - /* - * Ensure that the outer plan produces a tuple whose descriptor - * matches our scan tuple slot. Also, remove the local conditions - * from outer plan's quals, lest they be evaluated twice, once by the - * local plan and once by the scan. - */ - if (outer_plan) - { - ListCell *lc; - - /* - * Right now, we only consider grouping and aggregation beyond - * joins. Queries involving aggregates or grouping do not require - * EPQ mechanism, hence should not have an outer plan here. - */ - Assert(!IS_UPPER_REL(foreignrel)); - - /* - * First, update the plan's qual list if possible. In some cases - * the quals might be enforced below the topmost plan level, in - * which case we'll fail to remove them; it's not worth working - * harder than this. - */ - foreach(lc, local_exprs) - { - Node *qual = lfirst(lc); - - outer_plan->qual = list_delete(outer_plan->qual, qual); - - /* - * For an inner join the local conditions of foreign scan plan - * can be part of the joinquals as well. (They might also be - * in the mergequals or hashquals, but we can't touch those - * without breaking the plan.) - */ - if (IsA(outer_plan, NestLoop) || - IsA(outer_plan, MergeJoin) || - IsA(outer_plan, HashJoin)) - { - Join *join_plan = (Join *) outer_plan; - - if (join_plan->jointype == JOIN_INNER) - join_plan->joinqual = list_delete(join_plan->joinqual, - qual); - } - } - - /* - * Now fix the subplan's tlist --- this might result in inserting - * a Result node atop the plan tree. - */ - outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist, - best_path->path.parallel_safe); - } } - /* Remember remote_exprs for possible use by postgresPlanDirectModify */ + /* Remember remote_exprs for possible use by multicornPlanDirectModify */ planstate->final_remote_exprs = remote_exprs; best_path->path.pathtarget->width = planstate->width; @@ -694,7 +652,7 @@ multicornGetForeignPlan(PlannerInfo *root, return make_foreignscan(tlist, local_exprs, scan_relid, - scan_clauses, /* no expressions to evaluate */ + scan_clauses, serializePlanState(planstate) #if PG_VERSION_NUM >= 90500 , fdw_scan_tlist @@ -1889,53 +1847,17 @@ multicorn_foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, } /* - * TODO: Enable HAVING clause pushdowns + * TODO: Enable HAVING clause pushdowns. + * Note that certain simple HAVING clauses get transformed to WHERE clauses + * internally, so those will be supported. Example is a HAVING clause on a + * column that is also a part of the GROUP BY clause, in which case WHERE + * clause effectively achieves the same thing. */ if (havingQual) { return false; } - /* - * If there are any local conditions, pull Vars and aggregates from it and - * check whether they are safe to pushdown or not. - */ - if (fpinfo->local_conds) - { - List *aggvars = NIL; - ListCell *lc; - - foreach(lc, fpinfo->local_conds) - { - RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); - - aggvars = list_concat(aggvars, - pull_var_clause((Node *) rinfo->clause, - PVC_INCLUDE_AGGREGATES)); - } - - foreach(lc, aggvars) - { - Expr *expr = (Expr *) lfirst(lc); - - /* - * If aggregates within local conditions are not safe to push - * down, then we cannot push down the query. Vars are already - * part of GROUP BY clause which are checked above, so no need to - * access them again here. Again, we need not check - * is_foreign_param for a foreign aggregate. - */ - if (IsA(expr, Aggref)) - { - if (!multicorn_is_foreign_expr(root, grouped_rel, expr)) - return false; - - tlist = add_to_flat_tlist(tlist, list_make1(expr)); - } - } - } - - /* Store generated targetlist */ fpinfo->grouped_tlist = tlist; @@ -1987,8 +1909,6 @@ multicornGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, { MulticornPlanState *fpinfo; - debug_elog("multicorn_fdw : %s", __func__); - /* * If input rel is not safe to pushdown, then simply return as we cannot * perform any post-join operations on the foreign server. @@ -2003,7 +1923,9 @@ multicornGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, /* Check with the Python FDW instance whether it supports pushdown at all */ if (!canPushdownUpperrel((MulticornPlanState *) input_rel->fdw_private)) + { return; + } fpinfo = (MulticornPlanState *) palloc0(sizeof(MulticornPlanState)); fpinfo->pushdown_safe = false; diff --git a/src/multicorn.h b/src/multicorn.h index 4c73a500b..c5620da70 100644 --- a/src/multicorn.h +++ b/src/multicorn.h @@ -286,6 +286,11 @@ typedef struct MulticornDeparsedSortGroup } MulticornDeparsedSortGroup; /* deparse.c */ +extern void multicorn_classify_conditions(PlannerInfo *root, + RelOptInfo *baserel, + List *input_conds, + List **remote_conds, + List **local_conds); extern bool multicorn_is_foreign_expr(PlannerInfo *root, RelOptInfo *baserel, Expr *expr); @@ -314,6 +319,7 @@ PyObject *tupleTableSlotToPyObject(TupleTableSlot *slot, ConversionInfo ** cin char *getRowIdColumn(PyObject *fdw_instance); PyObject *optionsListToPyDict(List *options); const char *getPythonEncodingName(void); +bool canPushdownUpperrel(MulticornPlanState * state); void getRelSize(MulticornPlanState * state, PlannerInfo *root, diff --git a/src/python.c b/src/python.c index e8fd37d97..66fc12444 100644 --- a/src/python.c +++ b/src/python.c @@ -1700,7 +1700,6 @@ canPushdownUpperrel(MulticornPlanState * state) PyObject *fdw_instance = state->fdw_instance, *p_upperrel_pushdown, *p_object; - Py_ssize_t i, size; bool pushdown_upperrel = false; p_upperrel_pushdown = PyObject_CallMethod(fdw_instance, "can_pushdown_upperrel", "()"); From 96744b4f0f2922a3de46a16f90ad600deb12cfb0 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 13 Dec 2021 12:31:22 +0000 Subject: [PATCH 04/17] Remove redundant code and fix compiler warnings --- src/deparse.c | 9 ++----- src/multicorn.c | 71 ++----------------------------------------------- src/multicorn.h | 40 +++++++++++++--------------- 3 files changed, 22 insertions(+), 98 deletions(-) diff --git a/src/deparse.c b/src/deparse.c index 16876853d..eccc8c285 100644 --- a/src/deparse.c +++ b/src/deparse.c @@ -75,10 +75,6 @@ typedef struct foreign_loc_cxt } foreign_loc_cxt; static Value *multicorn_deparse_function_name(Oid funcid); -static void multicorn_deparse_explicit_target_list(List *tlist, - bool is_returning, - List **retrieved_attrs, - deparse_expr_cxt *context); /* * Examine each qual clause in input_conds, and classify them into two groups, @@ -538,6 +534,7 @@ multicorn_extract_upper_rel_info(PlannerInfo *root, List *tlist, MulticornPlanSt Var *var; Value *colname, *function; Aggref *aggref; + StringInfo agg_key = makeStringInfo(); foreach(lc, tlist) { @@ -563,9 +560,7 @@ multicorn_extract_upper_rel_info(PlannerInfo *root, List *tlist, MulticornPlanSt PVC_RECURSE_PLACEHOLDERS)); colname = colnameFromVar(var, root); - StringInfo agg_key = makeStringInfo(); initStringInfo(agg_key); - appendStringInfoString(agg_key, strVal(function)); appendStringInfoString(agg_key, "."); appendStringInfoString(agg_key, strVal(colname)); @@ -585,7 +580,7 @@ multicorn_deparse_function_name(Oid funcid) { HeapTuple proctup; Form_pg_proc procform; - const char *proname; + char *proname; proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid)); if (!HeapTupleIsValid(proctup)) diff --git a/src/multicorn.c b/src/multicorn.c index 49657e5e3..1ee7daa29 100644 --- a/src/multicorn.c +++ b/src/multicorn.c @@ -529,10 +529,7 @@ multicornGetForeignPlan(PlannerInfo *root, { MulticornPlanState *planstate = (MulticornPlanState *) foreignrel->fdw_private; Index scan_relid; - List *remote_exprs = NIL; - List *local_exprs = NIL; List *fdw_scan_tlist = NIL; - List *fdw_recheck_quals = NIL; ListCell *lc; #if PG_VERSION_NUM >= 90600 @@ -545,49 +542,6 @@ multicornGetForeignPlan(PlannerInfo *root, * For base relations, set scan_relid as the relid of the relation. */ scan_relid = foreignrel->relid; - - /* - * In a base-relation scan, we must apply the given scan_clauses. - * - * Separate the scan_clauses into those that can be executed remotely - * and those that can't. baserestrictinfo clauses that were - * previously determined to be safe or unsafe by classifyConditions - * are found in planstate->remote_conds and planstate->local_conds. Anything - * else in the scan_clauses list will be a join clause, which we have - * to check for remote-safety. - * - * Note: the join clauses we see here should be the exact same ones - * previously examined by multicornGetForeignPaths. Possibly it'd be - * worth passing forward the classification work done then, rather - * than repeating it here. - * - * This code must match "extract_actual_clauses(scan_clauses, false)" - * except for the additional decision about remote versus local - * execution. - */ - foreach(lc, scan_clauses) - { - RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); - - /* Ignore any pseudoconstants, they're dealt with elsewhere */ - if (rinfo->pseudoconstant) - continue; - - if (list_member_ptr(planstate->remote_conds, rinfo)) - remote_exprs = lappend(remote_exprs, rinfo->clause); - else if (list_member_ptr(planstate->local_conds, rinfo)) - local_exprs = lappend(local_exprs, rinfo->clause); - else if (multicorn_is_foreign_expr(root, foreignrel, rinfo->clause)) - remote_exprs = lappend(remote_exprs, rinfo->clause); - else - local_exprs = lappend(local_exprs, rinfo->clause); - } - - /* - * For a base-relation scan, we have to support EPQ recheck, which - * should recheck all the remote quals. - */ - fdw_recheck_quals = remote_exprs; } else { @@ -603,31 +557,10 @@ multicornGetForeignPlan(PlannerInfo *root, */ Assert(!scan_clauses); - /* - * Instead we get the conditions to apply from the fdw_private - * structure. - */ - remote_exprs = extract_actual_clauses(planstate->remote_conds, false); - local_exprs = extract_actual_clauses(planstate->local_conds, false); - - /* - * We leave fdw_recheck_quals empty in this case, since we never need - * to apply EPQ recheck clauses. In the case of a joinrel, EPQ - * recheck is handled elsewhere --- see multicornGetForeignJoinPaths(). - * If we're planning an upperrel (ie, remote grouping or aggregation) - * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be - * allowed, and indeed we *can't* put the remote clauses into - * fdw_recheck_quals because the unaggregated Vars won't be available - * locally. - */ - /* Build the list of columns to be fetched from the foreign server. */ fdw_scan_tlist = multicorn_build_tlist_to_deparse(foreignrel); } - /* Remember remote_exprs for possible use by multicornPlanDirectModify */ - planstate->final_remote_exprs = remote_exprs; - best_path->path.pathtarget->width = planstate->width; planstate->pathkeys = (List *) best_path->fdw_private; @@ -650,13 +583,13 @@ multicornGetForeignPlan(PlannerInfo *root, } return make_foreignscan(tlist, - local_exprs, + scan_clauses, scan_relid, scan_clauses, serializePlanState(planstate) #if PG_VERSION_NUM >= 90500 , fdw_scan_tlist - , fdw_recheck_quals + , NIL , outer_plan #endif ); diff --git a/src/multicorn.h b/src/multicorn.h index c5620da70..aaae3c7e5 100644 --- a/src/multicorn.h +++ b/src/multicorn.h @@ -159,9 +159,6 @@ typedef struct MulticornPlanState /* Join information */ RelOptInfo *outerrel; - RelOptInfo *innerrel; - JoinType jointype; - List *joinclauses; /* Upper relation information */ UpperRelationKind stage; @@ -195,25 +192,6 @@ typedef struct MulticornExecState Datum *values; bool *nulls; ConversionInfo **cinfos; - /* Common buffer to avoid repeated allocations */ - StringInfo buffer; - AttrNumber rowidAttno; - char *rowidAttrName; - List *pathkeys; /* list of MulticornDeparsedSortGroup) */ - /* State related to scanning through CStore chunks / temporarily - * materialized tables - */ - MemoryContext subscanCxt; - void *subscanState; - Relation subscanRel; - TupleTableSlot *subscanSlot; - AttrNumber *subscanAttrMap; - uint64 tuplesRead; - - Relation rel; /* relcache entry for the foreign table. NULL - * for a foreign join scan. */ - TupleDesc tupdesc; /* tuple descriptor of scan */ - /* * List containing targets to be returned from Python in case of aggregations. * List elements are aggregation keys or group_clauses elements. @@ -232,6 +210,24 @@ typedef struct MulticornExecState * List elements are column names for grouping. */ List *group_clauses; + /* Common buffer to avoid repeated allocations */ + StringInfo buffer; + AttrNumber rowidAttno; + char *rowidAttrName; + List *pathkeys; /* list of MulticornDeparsedSortGroup) */ + /* State related to scanning through CStore chunks / temporarily + * materialized tables + */ + MemoryContext subscanCxt; + void *subscanState; + Relation subscanRel; + TupleTableSlot *subscanSlot; + AttrNumber *subscanAttrMap; + uint64 tuplesRead; + + Relation rel; /* relcache entry for the foreign table. NULL + * for a foreign join scan. */ + TupleDesc tupdesc; /* tuple descriptor of scan */ } MulticornExecState; typedef struct MulticornModifyState From b37a39efd6b2a943f275a038d88222bf72c3c10a Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 13 Dec 2021 14:17:30 +0000 Subject: [PATCH 05/17] Remove obsolete struct --- src/deparse.c | 4 ++-- src/multicorn.h | 16 ---------------- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/src/deparse.c b/src/deparse.c index eccc8c285..1aa4ceb98 100644 --- a/src/deparse.c +++ b/src/deparse.c @@ -523,8 +523,8 @@ multicorn_build_tlist_to_deparse(RelOptInfo *foreignrel) * if there is no ressortgroupref set, we automatically assume the only other * option is a Aggref node type. * Moreover, for the Aggref node type we assume only a single element in args - * (i.e. only aggregations over single columns, e.g. sum(column2)). In particular, - * this is because in multicorn_foreign_expr_walker() we don't T_OpExpr case. + * (e.g. sum(column2)). In particular, this is because in + * multicorn_foreign_expr_walker() we don't have T_OpExpr case yet. */ void multicorn_extract_upper_rel_info(PlannerInfo *root, List *tlist, MulticornPlanState *fpinfo) diff --git a/src/multicorn.h b/src/multicorn.h index aaae3c7e5..eb16524e6 100644 --- a/src/multicorn.h +++ b/src/multicorn.h @@ -59,22 +59,6 @@ typedef struct ConversionInfo bool need_quote; } ConversionInfo; - -/* - * This enum describes what's kept in the fdw_private list for a ForeignPath. - * We store: - * - * 1) Boolean flag showing if the remote query has the final sort - * 2) Boolean flag showing if the remote query has the LIMIT clause - */ -enum FdwPathPrivateIndex -{ - /* has-final-sort flag (as an integer Value node) */ - FdwPathPrivateHasFinalSort, - /* has-limit flag (as an integer Value node) */ - FdwPathPrivateHasLimit -}; - /* * Context for multicorn_deparse_expr */ From 8541184da17e2e9fdd0949a2c471ad6ce10723da Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Tue, 14 Dec 2021 10:24:27 +0000 Subject: [PATCH 06/17] Enable queries for aggregation/grouping --- python/multicorn/__init__.py | 2 +- src/multicorn.c | 7 ------- src/multicorn.h | 7 ------- 3 files changed, 1 insertion(+), 15 deletions(-) diff --git a/python/multicorn/__init__.py b/python/multicorn/__init__.py index 9cd7f2915..31af65413 100755 --- a/python/multicorn/__init__.py +++ b/python/multicorn/__init__.py @@ -284,7 +284,7 @@ def get_path_keys(self): """ return [] - def explain(self, quals, columns, sortkeys=None, verbose=False): + def explain(self, quals, columns, sortkeys=None, aggs=None, group_clauses=None, verbose=False): """Hook called on explain. The arguments are the same as the :meth:`execute`, with the addition of diff --git a/src/multicorn.c b/src/multicorn.c index 1ee7daa29..ccaa1b641 100644 --- a/src/multicorn.c +++ b/src/multicorn.c @@ -1818,13 +1818,6 @@ multicorn_foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, fpinfo->rel_startup_cost = -1; fpinfo->rel_total_cost = -1; - - /* - * Set the string describing this grouped relation to be used in EXPLAIN - * output of corresponding ForeignScan. - */ - fpinfo->relation_name = NULL; - return true; } diff --git a/src/multicorn.h b/src/multicorn.h index eb16524e6..82b1a9338 100644 --- a/src/multicorn.h +++ b/src/multicorn.h @@ -154,13 +154,6 @@ typedef struct MulticornPlanState int fetch_size; /* fetch size for this remote table */ - /* - * Name of the relation while EXPLAINing ForeignScan. It is used for join - * relations but is set for all relations. For join relation, the name - * indicates which foreign tables are being joined and the join type used. - */ - char *relation_name; - /* Grouping information */ List *grouped_tlist; } MulticornPlanState; From a7bbfd525cf96fa00b16855e58c6e16b1436236c Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Tue, 14 Dec 2021 13:08:19 +0000 Subject: [PATCH 07/17] Fix aggregation queries with DISTINCT clauses --- src/deparse.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/deparse.c b/src/deparse.c index 1aa4ceb98..300a11dc4 100644 --- a/src/deparse.c +++ b/src/deparse.c @@ -252,6 +252,10 @@ multicorn_foreign_expr_walker(Node *node, if (agg->aggsplit != AGGSPLIT_SIMPLE) return false; + /* For now we don't push down aggregations with DISTINCT */ + if (agg->aggdistinct) + return false; + /* * Recurse to input args. aggdirectargs, aggorder and * aggdistinct are all present in args, so no need to check From ab9d3ab1fe30d2895a530c8403bf29138bac8270 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Tue, 14 Dec 2021 14:01:21 +0000 Subject: [PATCH 08/17] Fix COUNT(*) statements from crashing For the first iteration disable pushdown of `COUNT(*)`, like for `DISTINCT` clauses. These can be added later on, and tested on their eqivalents in ES, `doc_count` and `cardinality`. --- src/deparse.c | 7 +++++-- src/multicorn.c | 7 ++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/deparse.c b/src/deparse.c index 300a11dc4..9e601c0a2 100644 --- a/src/deparse.c +++ b/src/deparse.c @@ -252,8 +252,11 @@ multicorn_foreign_expr_walker(Node *node, if (agg->aggsplit != AGGSPLIT_SIMPLE) return false; - /* For now we don't push down aggregations with DISTINCT */ - if (agg->aggdistinct) + /* + * For now we don't push down DISTINCT or COUNT(*) aggregations. + * TODO: Enable this + */ + if (agg->aggdistinct || agg->aggstar) return false; /* diff --git a/src/multicorn.c b/src/multicorn.c index ccaa1b641..724e22e9b 100644 --- a/src/multicorn.c +++ b/src/multicorn.c @@ -1782,9 +1782,10 @@ multicorn_foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, /* * TODO: Enable HAVING clause pushdowns. * Note that certain simple HAVING clauses get transformed to WHERE clauses - * internally, so those will be supported. Example is a HAVING clause on a - * column that is also a part of the GROUP BY clause, in which case WHERE - * clause effectively achieves the same thing. + * internally for performance reasons, i.e. smaller scan size. Example is a + * HAVING clause on a column that is also a part of the GROUP BY clause, in + * which case WHERE clause effectively achieves the same thing. In those + * cases the havingQual is NULL, even though root->hasHavingQual is true. */ if (havingQual) { From 7b05afbd7669ea808cfe386d2cb86752393e42fa Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 15 Dec 2021 09:52:51 +0000 Subject: [PATCH 09/17] Fix a couple of PyObject DECREF's and clarify docstrings --- python/multicorn/__init__.py | 19 +++++++++++++++---- src/python.c | 7 ++++--- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/python/multicorn/__init__.py b/python/multicorn/__init__.py index 31af65413..0ebcd28f0 100755 --- a/python/multicorn/__init__.py +++ b/python/multicorn/__init__.py @@ -218,12 +218,22 @@ def can_pushdown_upperrel(self): relation pushdown (i.e. aggregation, grouping, etc.), and if so return a data structure with appropriate details. - The FDW has to inspect every sort, and respond which one are handled. - The sorts are cumulatives. - Return: None if pushdown not supported, otherwise a dictionary containing more granular details for the planning phase, in the form: + + { + "groupby_supported": , # can be ommited if false + "agg_functions": { + : , + ... + }, + } + + Each entry in `agg_functions` dict corresponds to a maping between + the name of a aggregation function in PostgreSQL, and the equivalent + foreign function. If no mapping exists for an aggregate function any + queries containing it won't be pushed down. """ return None @@ -334,7 +344,8 @@ def execute(self, quals, columns, sortkeys=None, aggs=None, group_clauses=None): column to be used in the aggregation operation. Result should be returned under the provided aggregation key. group_clauses (list): A list of columns used in GROUP BY statements. - The result should be returned for each column name provided. + For each column provided the returned response should have a + corresponding value in each row using that column name as the key. Returns: An iterable of python objects which can be converted back to PostgreSQL. diff --git a/src/python.c b/src/python.c index 66fc12444..e0b99b301 100644 --- a/src/python.c +++ b/src/python.c @@ -1007,6 +1007,7 @@ execute(ForeignScanState *node, ExplainState *es) { PyObject *column = PyUnicode_FromString(strVal(lfirst(lc_groupc))); PyList_Append(group_clauses, column); + Py_DECREF(column); } PyDict_SetItemString(kwargs, "group_clauses", group_clauses); @@ -1712,21 +1713,21 @@ canPushdownUpperrel(MulticornPlanState * state) if (p_object != NULL && p_object != Py_None) { state->groupby_supported = PyObject_IsTrue(p_object); - Py_DECREF(p_object); } + Py_XDECREF(p_object); /* Determine which aggregation functions are supported */ p_object = PyMapping_GetItemString(p_upperrel_pushdown, "agg_functions"); if (p_object != NULL && p_object != Py_None) { state->agg_functions = PyMapping_Keys(p_object); - Py_DECREF(p_object); } + Py_XDECREF(p_object); pushdown_upperrel = true; } - Py_DECREF(p_upperrel_pushdown); + Py_XDECREF(p_upperrel_pushdown); return pushdown_upperrel; } From 051165e5f7ac8165b635998c8fc0a4e021ece63d Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 15 Dec 2021 11:15:48 +0000 Subject: [PATCH 10/17] Store supported aggregation functions as a PG List object --- src/deparse.c | 3 +-- src/multicorn.h | 4 ++-- src/python.c | 31 +++++++++++++++++++++++++++++-- 3 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/deparse.c b/src/deparse.c index 9e601c0a2..2bfd7e319 100644 --- a/src/deparse.c +++ b/src/deparse.c @@ -240,10 +240,9 @@ multicorn_foreign_expr_walker(Node *node, return false; /* Make sure the specific function at hand is shippable */ - if (!PySequence_Contains(fpinfo->agg_functions, PyUnicode_FromString(opername))) + if (!list_member(fpinfo->agg_functions, makeString(opername))) return false; - /* Not safe to pushdown when not in grouping context */ if (!IS_UPPER_REL(glob_cxt->foreignrel)) return false; diff --git a/src/multicorn.h b/src/multicorn.h index 82b1a9338..9f74eeb88 100644 --- a/src/multicorn.h +++ b/src/multicorn.h @@ -102,9 +102,9 @@ typedef struct MulticornPlanState /* Details about upperrel pushdown fetched from the Python FDW instance */ bool groupby_supported; - PyObject *agg_functions; + List *agg_functions; - /* + /* * Aggregation and grouping data to be passed to the execution phase. * See MulticornExecState for more details. */ diff --git a/src/python.c b/src/python.c index e0b99b301..35788c4d5 100644 --- a/src/python.c +++ b/src/python.c @@ -1700,7 +1700,15 @@ canPushdownUpperrel(MulticornPlanState * state) { PyObject *fdw_instance = state->fdw_instance, *p_upperrel_pushdown, - *p_object; + *p_object, + *p_agg_funcs, + *p_agg_func, + *p_item; + Py_ssize_t i, + size, + strlength; + char *tempbuffer; + StringInfo agg_func; bool pushdown_upperrel = false; p_upperrel_pushdown = PyObject_CallMethod(fdw_instance, "can_pushdown_upperrel", "()"); @@ -1720,7 +1728,26 @@ canPushdownUpperrel(MulticornPlanState * state) p_object = PyMapping_GetItemString(p_upperrel_pushdown, "agg_functions"); if (p_object != NULL && p_object != Py_None) { - state->agg_functions = PyMapping_Keys(p_object); + p_agg_funcs = PyMapping_Keys(p_object); + size = PySequence_Size(p_agg_funcs); + + for (i = 0; i < size; i++) + { + agg_func = makeStringInfo(); + strlength = 0; + + p_item = PySequence_GetItem(p_agg_funcs, i); + p_agg_func = PyUnicode_AsEncodedString(p_item, getPythonEncodingName(), NULL); + errorCheck(); + PyBytes_AsStringAndSize(p_agg_func, &tempbuffer, &strlength); + appendBinaryStringInfo(agg_func, tempbuffer, strlength); + + state->agg_functions = lappend(state->agg_functions, makeString(agg_func->data)); + + Py_DECREF(p_agg_func); + Py_DECREF(p_item); + } + Py_DECREF(p_agg_funcs); } Py_XDECREF(p_object); From 85e6cbcdd0046c83924798664c17eb95d5dbcdb8 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 15 Dec 2021 11:28:21 +0000 Subject: [PATCH 11/17] Add comments stressing deviations from commonly used FDW code --- src/deparse.c | 8 ++++++++ src/multicorn.c | 18 ++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/src/deparse.c b/src/deparse.c index 2bfd7e319..c11af440b 100644 --- a/src/deparse.c +++ b/src/deparse.c @@ -157,8 +157,10 @@ multicorn_foreign_expr_walker(Node *node, if (node == NULL) return true; + // MY CODE START /* Needed to asses per-instance FDW shipability properties */ fpinfo = (MulticornPlanState *) (glob_cxt->foreignrel->fdw_private); + // MY CODE END /* Set up inner_cxt for possible recursion to child nodes */ inner_cxt.collation = InvalidOid; @@ -239,9 +241,11 @@ multicorn_foreign_expr_walker(Node *node, if (schema != PG_CATALOG_NAMESPACE) return false; + // MY CODE START /* Make sure the specific function at hand is shippable */ if (!list_member(fpinfo->agg_functions, makeString(opername))) return false; + // MY CODE END /* Not safe to pushdown when not in grouping context */ if (!IS_UPPER_REL(glob_cxt->foreignrel)) @@ -251,12 +255,14 @@ multicorn_foreign_expr_walker(Node *node, if (agg->aggsplit != AGGSPLIT_SIMPLE) return false; + // MY CODE START /* * For now we don't push down DISTINCT or COUNT(*) aggregations. * TODO: Enable this */ if (agg->aggdistinct || agg->aggstar) return false; + // MY CODE END /* * Recurse to input args. aggdirectargs, aggorder and @@ -521,6 +527,7 @@ multicorn_build_tlist_to_deparse(RelOptInfo *foreignrel) return tlist; } +// MY CODE START /* * Iterate through the targets and extract relveant information needed to execute * the aggregation and/or grouping on the remote data source through Python. @@ -598,3 +605,4 @@ multicorn_deparse_function_name(Oid funcid) ReleaseSysCache(proctup); return makeString(proname); } +// MY CODE END diff --git a/src/multicorn.c b/src/multicorn.c index 724e22e9b..89bf63b53 100644 --- a/src/multicorn.c +++ b/src/multicorn.c @@ -305,10 +305,12 @@ multicornGetForeignRelSize(PlannerInfo *root, baserel->fdw_private = planstate; + // MY CODE START /* Base foreign tables need to be push down always. */ planstate->pushdown_safe = true; planstate->groupby_supported = false; planstate->agg_functions = NULL; + // MY CODE END planstate->fdw_instance = getInstance(foreigntableid); planstate->foreigntableid = foreigntableid; @@ -400,6 +402,7 @@ multicornGetForeignRelSize(PlannerInfo *root, } + // MY CODE START /* * Identify which baserestrictinfo clauses can be sent to the remote * server and which can't. @@ -410,6 +413,7 @@ multicornGetForeignRelSize(PlannerInfo *root, */ multicorn_classify_conditions(root, baserel, baserel->baserestrictinfo, &planstate->remote_conds, &planstate->local_conds); + // MY CODE END /* Inject the "rows" and "width" attribute into the baserel */ #if PG_VERSION_NUM >= 90600 @@ -576,11 +580,13 @@ multicornGetForeignPlan(PlannerInfo *root, } } + // MY CODE START /* Extract data needed for aggregations on the Python side */ if (IS_UPPER_REL(foreignrel)) { multicorn_extract_upper_rel_info(root, fdw_scan_tlist, planstate); } + // MY CODE END return make_foreignscan(tlist, scan_clauses, @@ -641,7 +647,9 @@ multicornBeginForeignScan(ForeignScanState *node, int eflags) { execstate->rel = node->ss.ss_currentRelation; execstate->tupdesc = RelationGetDescr(execstate->rel); + // MY CODE START initConversioninfo(execstate->cinfos, TupleDescGetAttInMetadata(execstate->tupdesc), NULL); + // MY CODE END } else { @@ -651,7 +659,9 @@ multicornBeginForeignScan(ForeignScanState *node, int eflags) #else execstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor; #endif + // MY CODE START initConversioninfo(execstate->cinfos, TupleDescGetAttInMetadata(execstate->tupdesc), execstate->upper_rel_targets); + // MY CODE END } execstate->values = palloc(sizeof(Datum) * execstate->tupdesc->natts); @@ -1592,6 +1602,7 @@ multicorn_merge_fdw_options(MulticornPlanState *fpinfo, fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate; fpinfo->fetch_size = fpinfo_o->fetch_size; + // MY CODE START /* Multicorn specific options */ fpinfo->fdw_instance = fpinfo_o->fdw_instance; fpinfo->foreigntableid = fpinfo_o->foreigntableid; @@ -1601,6 +1612,7 @@ multicorn_merge_fdw_options(MulticornPlanState *fpinfo, fpinfo->target_list = fpinfo_o->target_list; fpinfo->qual_list = fpinfo_o->qual_list; fpinfo->pathkeys = fpinfo_o->pathkeys; + // MY CODE END /* Merge the table level options from either side of the join. */ if (fpinfo_i) @@ -1779,6 +1791,7 @@ multicorn_foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, i++; } + // MY CODE START /* * TODO: Enable HAVING clause pushdowns. * Note that certain simple HAVING clauses get transformed to WHERE clauses @@ -1791,6 +1804,7 @@ multicorn_foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, { return false; } + // MY CODE END /* Store generated targetlist */ fpinfo->grouped_tlist = tlist; @@ -1848,11 +1862,13 @@ multicornGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private) return; + // MY CODE START /* Check with the Python FDW instance whether it supports pushdown at all */ if (!canPushdownUpperrel((MulticornPlanState *) input_rel->fdw_private)) { return; } + // MY CODE END fpinfo = (MulticornPlanState *) palloc0(sizeof(MulticornPlanState)); fpinfo->pushdown_safe = false; @@ -1910,8 +1926,10 @@ multicorn_add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, fpinfo->table = ifpinfo->table; fpinfo->server = ifpinfo->server; fpinfo->user = ifpinfo->user; + // MY CODE START fpinfo->groupby_supported = ifpinfo->groupby_supported; fpinfo->agg_functions = ifpinfo->agg_functions; + // MY CODE END multicorn_merge_fdw_options(fpinfo, ifpinfo, NULL); /* From 8efb2e8b3cc684249be9b9a10a170ff0b0d55cf8 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Thu, 16 Dec 2021 14:02:35 +0000 Subject: [PATCH 12/17] Fix cases where aggregations of integers do not return an integer --- src/python.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/python.c b/src/python.c index 35788c4d5..b27f75843 100644 --- a/src/python.c +++ b/src/python.c @@ -1061,6 +1061,14 @@ pynumberToCString(PyObject *pyobject, StringInfo buffer, char *tempbuffer; Py_ssize_t strlength = 0; + if ( + !PyLong_Check(pyobject) && + (cinfo->atttypoid == INT2OID || cinfo->atttypoid == INT4OID || cinfo->atttypoid == INT8OID) + ) + { + pyobject = PyNumber_Long(pyobject); + } + pTempStr = PyObject_Str(pyobject); PyString_AsStringAndSize(pTempStr, &tempbuffer, &strlength); appendBinaryStringInfo(buffer, tempbuffer, strlength); From df5e576bb4ffb490b57b02ba0645e4c3d3271b70 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Thu, 16 Dec 2021 14:13:20 +0000 Subject: [PATCH 13/17] Add comment with explanation for type conversion --- src/python.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/python.c b/src/python.c index b27f75843..6b470ebe3 100644 --- a/src/python.c +++ b/src/python.c @@ -1066,6 +1066,12 @@ pynumberToCString(PyObject *pyobject, StringInfo buffer, (cinfo->atttypoid == INT2OID || cinfo->atttypoid == INT4OID || cinfo->atttypoid == INT8OID) ) { + /* + * Certain data sources, such as ElasticSearch, can return floats for + * aggregations of integers that are expected to return integers + * (e.g. ElasticSearch for min, max, sum), so we basically need to do + * int() here. + */ pyobject = PyNumber_Long(pyobject); } From 67ffe224fc58c4a77ac29cf0d4e64507edb03cbd Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Fri, 17 Dec 2021 11:58:32 +0000 Subject: [PATCH 14/17] Fix typo in comment --- src/python.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/python.c b/src/python.c index 6b470ebe3..6f79ba05c 100644 --- a/src/python.c +++ b/src/python.c @@ -1069,8 +1069,7 @@ pynumberToCString(PyObject *pyobject, StringInfo buffer, /* * Certain data sources, such as ElasticSearch, can return floats for * aggregations of integers that are expected to return integers - * (e.g. ElasticSearch for min, max, sum), so we basically need to do - * int() here. + * (e.g. min, max, sum), so we basically need to do int() here. */ pyobject = PyNumber_Long(pyobject); } From 757e00c4f035159cbf80362f7646bcf059611df8 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 22 Dec 2021 09:49:25 +0000 Subject: [PATCH 15/17] Bring back comment on qual re-checking --- src/multicorn.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/multicorn.c b/src/multicorn.c index 89bf63b53..ecba4af62 100644 --- a/src/multicorn.c +++ b/src/multicorn.c @@ -595,7 +595,7 @@ multicornGetForeignPlan(PlannerInfo *root, serializePlanState(planstate) #if PG_VERSION_NUM >= 90500 , fdw_scan_tlist - , NIL + , NULL /* All quals are meant to be rechecked */ , outer_plan #endif ); From a651e110e1dac74d96b18eca36684a60f6145f6f Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Fri, 24 Dec 2021 15:38:20 +0000 Subject: [PATCH 16/17] Fix memory leak when converting row value to PyLong --- src/python.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/python.c b/src/python.c index 6f79ba05c..47a2c3b1c 100644 --- a/src/python.c +++ b/src/python.c @@ -1057,7 +1057,7 @@ void pynumberToCString(PyObject *pyobject, StringInfo buffer, ConversionInfo * cinfo) { - PyObject *pTempStr; + PyObject *pTempStr, *pyTempLong; char *tempbuffer; Py_ssize_t strlength = 0; @@ -1071,10 +1071,15 @@ pynumberToCString(PyObject *pyobject, StringInfo buffer, * aggregations of integers that are expected to return integers * (e.g. min, max, sum), so we basically need to do int() here. */ - pyobject = PyNumber_Long(pyobject); + pyTempLong = PyNumber_Long(pyobject); + pTempStr = PyObject_Str(pyTempLong); + Py_DECREF(pyTempLong); + } + else + { + pTempStr = PyObject_Str(pyobject); } - pTempStr = PyObject_Str(pyobject); PyString_AsStringAndSize(pTempStr, &tempbuffer, &strlength); appendBinaryStringInfo(buffer, tempbuffer, strlength); Py_DECREF(pTempStr); From 55feb66450c94cf428008596ce35792bd18d2f3c Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 27 Dec 2021 09:19:50 +0000 Subject: [PATCH 17/17] Refine the comments in deparse.c and multicorn.c where needed --- src/deparse.c | 13 ++++--------- src/multicorn.c | 35 ++++++++++++++--------------------- 2 files changed, 18 insertions(+), 30 deletions(-) diff --git a/src/deparse.c b/src/deparse.c index c11af440b..e813201de 100644 --- a/src/deparse.c +++ b/src/deparse.c @@ -157,10 +157,8 @@ multicorn_foreign_expr_walker(Node *node, if (node == NULL) return true; - // MY CODE START /* Needed to asses per-instance FDW shipability properties */ fpinfo = (MulticornPlanState *) (glob_cxt->foreignrel->fdw_private); - // MY CODE END /* Set up inner_cxt for possible recursion to child nodes */ inner_cxt.collation = InvalidOid; @@ -241,11 +239,12 @@ multicorn_foreign_expr_walker(Node *node, if (schema != PG_CATALOG_NAMESPACE) return false; - // MY CODE START - /* Make sure the specific function at hand is shippable */ + /* Make sure the specific function at hand is shippable + * NB: here we deviate from standard FDW code, since the allowed + * function list is fetched from the Python FDW instance + */ if (!list_member(fpinfo->agg_functions, makeString(opername))) return false; - // MY CODE END /* Not safe to pushdown when not in grouping context */ if (!IS_UPPER_REL(glob_cxt->foreignrel)) @@ -255,14 +254,12 @@ multicorn_foreign_expr_walker(Node *node, if (agg->aggsplit != AGGSPLIT_SIMPLE) return false; - // MY CODE START /* * For now we don't push down DISTINCT or COUNT(*) aggregations. * TODO: Enable this */ if (agg->aggdistinct || agg->aggstar) return false; - // MY CODE END /* * Recurse to input args. aggdirectargs, aggorder and @@ -527,7 +524,6 @@ multicorn_build_tlist_to_deparse(RelOptInfo *foreignrel) return tlist; } -// MY CODE START /* * Iterate through the targets and extract relveant information needed to execute * the aggregation and/or grouping on the remote data source through Python. @@ -605,4 +601,3 @@ multicorn_deparse_function_name(Oid funcid) ReleaseSysCache(proctup); return makeString(proname); } -// MY CODE END diff --git a/src/multicorn.c b/src/multicorn.c index ecba4af62..410e3f71b 100644 --- a/src/multicorn.c +++ b/src/multicorn.c @@ -305,12 +305,12 @@ multicornGetForeignRelSize(PlannerInfo *root, baserel->fdw_private = planstate; - // MY CODE START - /* Base foreign tables need to be push down always. */ + /* Base foreign tables need to be push down always */ planstate->pushdown_safe = true; + + /* Initialize upperrel pushdown info */ planstate->groupby_supported = false; planstate->agg_functions = NULL; - // MY CODE END planstate->fdw_instance = getInstance(foreigntableid); planstate->foreigntableid = foreigntableid; @@ -402,7 +402,6 @@ multicornGetForeignRelSize(PlannerInfo *root, } - // MY CODE START /* * Identify which baserestrictinfo clauses can be sent to the remote * server and which can't. @@ -413,7 +412,6 @@ multicornGetForeignRelSize(PlannerInfo *root, */ multicorn_classify_conditions(root, baserel, baserel->baserestrictinfo, &planstate->remote_conds, &planstate->local_conds); - // MY CODE END /* Inject the "rows" and "width" attribute into the baserel */ #if PG_VERSION_NUM >= 90600 @@ -580,13 +578,11 @@ multicornGetForeignPlan(PlannerInfo *root, } } - // MY CODE START /* Extract data needed for aggregations on the Python side */ if (IS_UPPER_REL(foreignrel)) { multicorn_extract_upper_rel_info(root, fdw_scan_tlist, planstate); } - // MY CODE END return make_foreignscan(tlist, scan_clauses, @@ -647,9 +643,7 @@ multicornBeginForeignScan(ForeignScanState *node, int eflags) { execstate->rel = node->ss.ss_currentRelation; execstate->tupdesc = RelationGetDescr(execstate->rel); - // MY CODE START initConversioninfo(execstate->cinfos, TupleDescGetAttInMetadata(execstate->tupdesc), NULL); - // MY CODE END } else { @@ -659,9 +653,7 @@ multicornBeginForeignScan(ForeignScanState *node, int eflags) #else execstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor; #endif - // MY CODE START initConversioninfo(execstate->cinfos, TupleDescGetAttInMetadata(execstate->tupdesc), execstate->upper_rel_targets); - // MY CODE END } execstate->values = palloc(sizeof(Datum) * execstate->tupdesc->natts); @@ -1602,8 +1594,7 @@ multicorn_merge_fdw_options(MulticornPlanState *fpinfo, fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate; fpinfo->fetch_size = fpinfo_o->fetch_size; - // MY CODE START - /* Multicorn specific options */ + /* Multicorn specific options, differing from othe FDW implementations */ fpinfo->fdw_instance = fpinfo_o->fdw_instance; fpinfo->foreigntableid = fpinfo_o->foreigntableid; fpinfo->numattrs = fpinfo_o->numattrs; @@ -1612,7 +1603,6 @@ multicorn_merge_fdw_options(MulticornPlanState *fpinfo, fpinfo->target_list = fpinfo_o->target_list; fpinfo->qual_list = fpinfo_o->qual_list; fpinfo->pathkeys = fpinfo_o->pathkeys; - // MY CODE END /* Merge the table level options from either side of the join. */ if (fpinfo_i) @@ -1791,7 +1781,6 @@ multicorn_foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, i++; } - // MY CODE START /* * TODO: Enable HAVING clause pushdowns. * Note that certain simple HAVING clauses get transformed to WHERE clauses @@ -1804,7 +1793,6 @@ multicorn_foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, { return false; } - // MY CODE END /* Store generated targetlist */ fpinfo->grouped_tlist = tlist; @@ -1862,13 +1850,17 @@ multicornGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private) return; - // MY CODE START - /* Check with the Python FDW instance whether it supports pushdown at all */ + /* + * Check with the Python FDW instance whether it supports pushdown at all + * NB: Here we deviate from other FDWs, in that we don't know whether the + * something can be pushed down without consulting the corresponding Python + * FDW instance. + */ + if (!canPushdownUpperrel((MulticornPlanState *) input_rel->fdw_private)) { return; } - // MY CODE END fpinfo = (MulticornPlanState *) palloc0(sizeof(MulticornPlanState)); fpinfo->pushdown_safe = false; @@ -1926,10 +1918,11 @@ multicorn_add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, fpinfo->table = ifpinfo->table; fpinfo->server = ifpinfo->server; fpinfo->user = ifpinfo->user; - // MY CODE START + + /* copy the upperrel pushdown info as well */ fpinfo->groupby_supported = ifpinfo->groupby_supported; fpinfo->agg_functions = ifpinfo->agg_functions; - // MY CODE END + multicorn_merge_fdw_options(fpinfo, ifpinfo, NULL); /*