diff --git a/Makefile b/Makefile index 45243bf0..83ada106 100755 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ srcdir = . MODULE_big = multicorn -OBJS = src/errors.o src/python.o src/query.o src/multicorn.o +OBJS = src/deparse.o src/errors.o src/python.o src/query.o src/multicorn.o DATA = $(filter-out $(wildcard sql/*--*.sql),$(wildcard sql/*.sql)) @@ -19,7 +19,7 @@ directories.stamp: $(OBJS): directories.stamp -install: python_code +install: python_code sql/$(EXTENSION)--$(EXTVERSION).sql: sql/$(EXTENSION).sql directories.stamp cp $< $@ diff --git a/python/multicorn/__init__.py b/python/multicorn/__init__.py index 72d8ebb7..0ebcd28f 100755 --- a/python/multicorn/__init__.py +++ b/python/multicorn/__init__.py @@ -212,6 +212,31 @@ def can_sort(self, sortkeys): """ return [] + def can_pushdown_upperrel(self): + """ + Method called from the planner to ask the FDW whether it supports upper + relation pushdown (i.e. aggregation, grouping, etc.), and if so return + a data structure with appropriate details. + + Return: + None if pushdown not supported, otherwise a dictionary containing + more granular details for the planning phase, in the form: + + { + "groupby_supported": , # can be ommited if false + "agg_functions": { + : , + ... + }, + } + + Each entry in `agg_functions` dict corresponds to a maping between + the name of a aggregation function in PostgreSQL, and the equivalent + foreign function. If no mapping exists for an aggregate function any + queries containing it won't be pushed down. + """ + return None + def get_path_keys(self): u""" Method called from the planner to add additional Path to the planner. @@ -269,7 +294,7 @@ def get_path_keys(self): """ return [] - def explain(self, quals, columns, sortkeys=None, verbose=False): + def explain(self, quals, columns, sortkeys=None, aggs=None, group_clauses=None, verbose=False): """Hook called on explain. The arguments are the same as the :meth:`execute`, with the addition of @@ -280,7 +305,7 @@ def explain(self, quals, columns, sortkeys=None, verbose=False): """ return [] - def execute(self, quals, columns, sortkeys=None): + def execute(self, quals, columns, sortkeys=None, aggs=None, group_clauses=None): """Execute a query in the foreign data wrapper. This method is called at the first iteration. @@ -311,8 +336,16 @@ def execute(self, quals, columns, sortkeys=None): You should return AT LEAST those columns when returning a dict. If returning a sequence, every column from the table should be in the sequence. + + Kwargs: sortkeys (list): A list of :class:`SortKey` that the FDW said it can enforce. + aggs (dict): A dictionary mapping aggregation key with function and + column to be used in the aggregation operation. Result should be + returned under the provided aggregation key. + group_clauses (list): A list of columns used in GROUP BY statements. + For each column provided the returned response should have a + corresponding value in each row using that column name as the key. Returns: An iterable of python objects which can be converted back to PostgreSQL. diff --git a/src/deparse.c b/src/deparse.c new file mode 100644 index 00000000..e813201d --- /dev/null +++ b/src/deparse.c @@ -0,0 +1,603 @@ +/*------------------------------------------------------------------------- + * + * Multicorn Foreign Data Wrapper for PostgreSQL + * + * IDENTIFICATION + * deparse.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "pgtime.h" +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/sysattr.h" +#include "catalog/pg_aggregate.h" +#include "catalog/pg_collation.h" +#include "catalog/pg_namespace.h" +#include "catalog/pg_operator.h" +#include "catalog/pg_proc.h" +#include "catalog/pg_type.h" +#include "commands/defrem.h" +#include "nodes/nodeFuncs.h" +#include "nodes/plannodes.h" +#include "optimizer/clauses.h" +#include "optimizer/optimizer.h" +#include "optimizer/tlist.h" +#include "parser/parsetree.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/syscache.h" +#include "utils/timestamp.h" +#include "utils/typcache.h" +#include "commands/tablecmds.h" + +#include "multicorn.h" + + +/* + * Local (per-tree-level) context for multicorn_foreign_expr_walker's search. + * This is concerned with identifying collations used in the expression. + */ +typedef enum +{ + FDW_COLLATE_NONE, /* expression is of a noncollatable type */ + FDW_COLLATE_SAFE, /* collation derives from a foreign Var */ + FDW_COLLATE_UNSAFE /* collation derives from something else */ +} FDWCollateState; + + +/* + * Global context for multicorn_foreign_expr_walker's search of an expression tree. + */ +typedef struct foreign_glob_cxt +{ + PlannerInfo *root; /* global planner state */ + RelOptInfo *foreignrel; /* the foreign relation we are planning for */ + + /* + * For join pushdown, only a limited set of operators are allowed to be + * pushed. This flag helps us identify if we are walking through the list + * of join conditions. Also true for aggregate relations to restrict + * aggregates for specified list. + */ + bool is_remote_cond; /* true for join or aggregate relations */ + Relids relids; /* relids of base relations in the underlying + * scan */ +} foreign_glob_cxt; + +typedef struct foreign_loc_cxt +{ + Oid collation; /* OID of current collation, if any */ + FDWCollateState state; /* state of current collation choice */ +} foreign_loc_cxt; + +static Value *multicorn_deparse_function_name(Oid funcid); + +/* + * Examine each qual clause in input_conds, and classify them into two groups, + * which are returned as two lists: + * - remote_conds contains expressions that can be evaluated remotely + * - local_conds contains expressions that can't be evaluated remotely + */ +void +multicorn_classify_conditions(PlannerInfo *root, + RelOptInfo *baserel, + List *input_conds, + List **remote_conds, + List **local_conds) +{ + ListCell *lc; + + *remote_conds = NIL; + *local_conds = NIL; + + foreach(lc, input_conds) + { + RestrictInfo *ri = lfirst_node(RestrictInfo, lc); + + if (multicorn_is_foreign_expr(root, baserel, ri->clause)) + *remote_conds = lappend(*remote_conds, ri); + else + *local_conds = lappend(*local_conds, ri); + } +} + +/* + * Return true if given object is one of PostgreSQL's built-in objects. + * + * We use FirstBootstrapObjectId as the cutoff, so that we only consider + * objects with hand-assigned OIDs to be "built in", not for instance any + * function or type defined in the information_schema. + * + * Our constraints for dealing with types are tighter than they are for + * functions or operators: we want to accept only types that are in pg_catalog, + * else format_type might incorrectly fail to schema-qualify their names. + * (This could be fixed with some changes to format_type, but for now there's + * no need.) Thus we must exclude information_schema types. + * + * XXX there is a problem with this, which is that the set of built-in + * objects expands over time. Something that is built-in to us might not + * be known to the remote server, if it's of an older version. But keeping + * track of that would be a huge exercise. + */ +static bool +multicorn_is_builtin(Oid oid) +{ + return (oid < FirstBootstrapObjectId); +} + +/* + * Check if expression is safe to execute remotely, and return true if so. + * + * In addition, *outer_cxt is updated with collation information. + * + * We must check that the expression contains only node types we can deparse, + * that all types/functions/operators are safe to send (which we approximate + * as being built-in), and that all collations used in the expression derive + * from Vars of the foreign table. Because of the latter, the logic is + * pretty close to assign_collations_walker() in parse_collate.c, though we + * can assume here that the given expression is valid. + */ +static bool +multicorn_foreign_expr_walker(Node *node, + foreign_glob_cxt *glob_cxt, + foreign_loc_cxt *outer_cxt) +{ + bool check_type = true; + MulticornPlanState *fpinfo; + foreign_loc_cxt inner_cxt; + Oid collation = InvalidOid; + FDWCollateState state = FDW_COLLATE_NONE; + HeapTuple tuple; + + /* Need do nothing for empty subexpressions */ + if (node == NULL) + return true; + + /* Needed to asses per-instance FDW shipability properties */ + fpinfo = (MulticornPlanState *) (glob_cxt->foreignrel->fdw_private); + + /* Set up inner_cxt for possible recursion to child nodes */ + inner_cxt.collation = InvalidOid; + inner_cxt.state = FDW_COLLATE_NONE; + switch (nodeTag(node)) + { + case T_Var: + { + Var *var = (Var *) node; + + /* + * If the Var is from the foreign table, we consider its + * collation (if any) safe to use. If it is from another + * table, we treat its collation the same way as we would a + * Param's collation, ie it's not safe for it to have a + * non-default collation. + */ + if (bms_is_member(var->varno, glob_cxt->relids) && + var->varlevelsup == 0) + { + /* Var belongs to foreign table */ + + /* + * System columns (e.g. oid, ctid) should not be sent to + * the remote, since we don't make any effort to ensure + * that local and remote values match (tableoid, in + * particular, almost certainly doesn't match). + */ + if (var->varattno < 0) + return false; + + /* Else check the collation */ + collation = var->varcollid; + state = OidIsValid(collation) ? FDW_COLLATE_SAFE : FDW_COLLATE_NONE; + } + else + { + /* Var belongs to some other table */ + collation = var->varcollid; + if (collation == InvalidOid || + collation == DEFAULT_COLLATION_OID) + { + /* + * It's noncollatable, or it's safe to combine with a + * collatable foreign Var, so set state to NONE. + */ + state = FDW_COLLATE_NONE; + } + else + { + /* + * Do not fail right away, since the Var might appear + * in a collation-insensitive context. + */ + state = FDW_COLLATE_UNSAFE; + } + } + } + break; + case T_Aggref: + { + Aggref *agg = (Aggref *) node; + ListCell *lc; + char *opername = NULL; + Oid schema; + + /* get function name and schema */ + tuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(agg->aggfnoid)); + if (!HeapTupleIsValid(tuple)) + { + elog(ERROR, "cache lookup failed for function %u", agg->aggfnoid); + } + opername = pstrdup(((Form_pg_proc) GETSTRUCT(tuple))->proname.data); + schema = ((Form_pg_proc) GETSTRUCT(tuple))->pronamespace; + ReleaseSysCache(tuple); + + /* ignore functions in other than the pg_catalog schema */ + if (schema != PG_CATALOG_NAMESPACE) + return false; + + /* Make sure the specific function at hand is shippable + * NB: here we deviate from standard FDW code, since the allowed + * function list is fetched from the Python FDW instance + */ + if (!list_member(fpinfo->agg_functions, makeString(opername))) + return false; + + /* Not safe to pushdown when not in grouping context */ + if (!IS_UPPER_REL(glob_cxt->foreignrel)) + return false; + + /* Only non-split aggregates are pushable. */ + if (agg->aggsplit != AGGSPLIT_SIMPLE) + return false; + + /* + * For now we don't push down DISTINCT or COUNT(*) aggregations. + * TODO: Enable this + */ + if (agg->aggdistinct || agg->aggstar) + return false; + + /* + * Recurse to input args. aggdirectargs, aggorder and + * aggdistinct are all present in args, so no need to check + * their shippability explicitly. + */ + foreach(lc, agg->args) + { + Node *n = (Node *) lfirst(lc); + + /* If TargetEntry, extract the expression from it */ + if (IsA(n, TargetEntry)) + { + TargetEntry *tle = (TargetEntry *) n; + + n = (Node *) tle->expr; + } + + if (!multicorn_foreign_expr_walker(n, glob_cxt, &inner_cxt)) + return false; + } + + if (agg->aggorder || agg->aggfilter) + { + return false; + } + + /* + * If aggregate's input collation is not derived from a + * foreign Var, it can't be sent to remote. + */ + if (agg->inputcollid == InvalidOid) + /* OK, inputs are all noncollatable */ ; + else if (inner_cxt.state != FDW_COLLATE_SAFE || + agg->inputcollid != inner_cxt.collation) + return false; + + /* + * Detect whether node is introducing a collation not derived + * from a foreign Var. (If so, we just mark it unsafe for now + * rather than immediately returning false, since the parent + * node might not care.) + */ + collation = agg->aggcollid; + if (collation == InvalidOid) + state = FDW_COLLATE_NONE; + else if (inner_cxt.state == FDW_COLLATE_SAFE && + collation == inner_cxt.collation) + state = FDW_COLLATE_SAFE; + else if (collation == DEFAULT_COLLATION_OID) + state = FDW_COLLATE_NONE; + else + state = FDW_COLLATE_UNSAFE; + } + break; + default: + + /* + * If it's anything else, assume it's unsafe. This list can be + * expanded later, but don't forget to add deparse support below. + */ + return false; + } + + /* + * If result type of given expression is not built-in, it can't be sent to + * remote because it might have incompatible semantics on remote side. + */ + if (check_type && !multicorn_is_builtin(exprType(node))) + return false; + + /* + * Now, merge my collation information into my parent's state. + */ + if (state > outer_cxt->state) + { + /* Override previous parent state */ + outer_cxt->collation = collation; + outer_cxt->state = state; + } + else if (state == outer_cxt->state) + { + /* Merge, or detect error if there's a collation conflict */ + switch (state) + { + case FDW_COLLATE_NONE: + /* Nothing + nothing is still nothing */ + break; + case FDW_COLLATE_SAFE: + if (collation != outer_cxt->collation) + { + /* + * Non-default collation always beats default. + */ + if (outer_cxt->collation == DEFAULT_COLLATION_OID) + { + /* Override previous parent state */ + outer_cxt->collation = collation; + } + else if (collation != DEFAULT_COLLATION_OID) + { + /* + * Conflict; show state as indeterminate. We don't + * want to "return false" right away, since parent + * node might not care about collation. + */ + outer_cxt->state = FDW_COLLATE_UNSAFE; + } + } + break; + case FDW_COLLATE_UNSAFE: + /* We're still conflicted ... */ + break; + } + } + /* It looks OK */ + return true; +} + +/* + * Returns true if given expr is safe to evaluate on the foreign server. + */ +bool +multicorn_is_foreign_expr(PlannerInfo *root, + RelOptInfo *baserel, + Expr *expr) +{ + foreign_glob_cxt glob_cxt; + foreign_loc_cxt loc_cxt; + MulticornPlanState *fpinfo = (MulticornPlanState *) (baserel->fdw_private); + + /* + * Check that the expression consists of nodes that are safe to execute + * remotely. + */ + glob_cxt.root = root; + glob_cxt.foreignrel = baserel; + + /* + * For an upper relation, use relids from its underneath scan relation, + * because the upperrel's own relids currently aren't set to anything + * meaningful by the core code. For other relation, use their own relids. + */ + if (IS_UPPER_REL(baserel)) + glob_cxt.relids = fpinfo->outerrel->relids; + else + glob_cxt.relids = baserel->relids; + loc_cxt.collation = InvalidOid; + loc_cxt.state = FDW_COLLATE_NONE; + if (!multicorn_foreign_expr_walker((Node *) expr, &glob_cxt, &loc_cxt)) + return false; + + /* + * If the expression has a valid collation that does not arise from a + * foreign var, the expression can not be sent over. + */ + if (loc_cxt.state == FDW_COLLATE_UNSAFE) + return false; + + /* + * An expression which includes any mutable functions can't be sent over + * because its result is not stable. For example, sending now() remote + * side could cause confusion from clock offsets. Future versions might + * be able to make this choice with more granularity. (We check this last + * because it requires a lot of expensive catalog lookups.) + */ + if (contain_mutable_functions((Node *) expr)) + return false; + + /* OK to evaluate on the remote server */ + return true; +} + + +/* + * Returns true if given expr is something we'd have to send the value of + * to the foreign server. + * + * This should return true when the expression is a shippable node that + * deparseExpr would add to context->params_list. Note that we don't care + * if the expression *contains* such a node, only whether one appears at top + * level. We need this to detect cases where setrefs.c would recognize a + * false match between an fdw_exprs item (which came from the params_list) + * and an entry in fdw_scan_tlist (which we're considering putting the given + * expression into). + */ +bool +multicorn_is_foreign_param(PlannerInfo *root, + RelOptInfo *baserel, + Expr *expr) +{ + if (expr == NULL) + return false; + + switch (nodeTag(expr)) + { + case T_Var: + { + /* It would have to be sent unless it's a foreign Var */ + Var *var = (Var *) expr; + MulticornPlanState *fpinfo = (MulticornPlanState *) (baserel->fdw_private); + Relids relids; + + if (IS_UPPER_REL(baserel)) + relids = fpinfo->outerrel->relids; + else + relids = baserel->relids; + + if (bms_is_member(var->varno, relids) && var->varlevelsup == 0) + return false; /* foreign Var, so not a param */ + else + return true; /* it'd have to be a param */ + break; + } + case T_Param: + /* Params always have to be sent to the foreign server */ + return true; + default: + break; + } + return false; +} + +/* + * Build the targetlist for given relation to be deparsed as SELECT clause. + * + * The output targetlist contains the columns that need to be fetched from the + * foreign server for the given relation. If foreignrel is an upper relation, + * then the output targetlist can also contains expressions to be evaluated on + * foreign server. + */ +List * +multicorn_build_tlist_to_deparse(RelOptInfo *foreignrel) +{ + List *tlist = NIL; + MulticornPlanState *fpinfo = (MulticornPlanState *) foreignrel->fdw_private; + ListCell *lc; + + /* + * For an upper relation, we have already built the target list while + * checking shippability, so just return that. + */ + if (IS_UPPER_REL(foreignrel)) + return fpinfo->grouped_tlist; + + /* + * We require columns specified in foreignrel->reltarget->exprs and those + * required for evaluating the local conditions. + */ + tlist = add_to_flat_tlist(tlist, + pull_var_clause((Node *) foreignrel->reltarget->exprs, + PVC_RECURSE_PLACEHOLDERS)); + foreach(lc, fpinfo->local_conds) + { + RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); + + tlist = add_to_flat_tlist(tlist, + pull_var_clause((Node *) rinfo->clause, + PVC_RECURSE_PLACEHOLDERS)); + } + + return tlist; +} + +/* + * Iterate through the targets and extract relveant information needed to execute + * the aggregation and/or grouping on the remote data source through Python. + * + * NB: Logic here is strongly coupled to multicorn_foreign_grouping_ok(), i.e. + * if there is no ressortgroupref set, we automatically assume the only other + * option is a Aggref node type. + * Moreover, for the Aggref node type we assume only a single element in args + * (e.g. sum(column2)). In particular, this is because in + * multicorn_foreign_expr_walker() we don't have T_OpExpr case yet. + */ +void +multicorn_extract_upper_rel_info(PlannerInfo *root, List *tlist, MulticornPlanState *fpinfo) +{ + ListCell *lc; + TargetEntry *tle; + Var *var; + Value *colname, *function; + Aggref *aggref; + StringInfo agg_key = makeStringInfo(); + + foreach(lc, tlist) + { + tle = lfirst_node(TargetEntry, lc); + + if (tle->ressortgroupref) + { + /* GROUP BY target */ + var = (Var *) tle->expr; + colname = colnameFromVar(var, root); + + fpinfo->group_clauses = lappend(fpinfo->group_clauses, colname); + fpinfo->upper_rel_targets = lappend(fpinfo->upper_rel_targets, colname); + } + else + { + /* Aggregation target */ + aggref = (Aggref *) tle->expr; + function = multicorn_deparse_function_name(aggref->aggfnoid); + + var = linitial(pull_var_clause((Node *) aggref, + PVC_RECURSE_AGGREGATES | + PVC_RECURSE_PLACEHOLDERS)); + colname = colnameFromVar(var, root); + + initStringInfo(agg_key); + appendStringInfoString(agg_key, strVal(function)); + appendStringInfoString(agg_key, "."); + appendStringInfoString(agg_key, strVal(colname)); + + fpinfo->aggs = lappend(fpinfo->aggs, list_make3(makeString(agg_key->data), function, colname)); + fpinfo->upper_rel_targets = lappend(fpinfo->upper_rel_targets, makeString(agg_key->data)); + } + } +} + +/* + * multicorn_deparse_function_name + * Deparses function name from given function oid. + */ +static Value * +multicorn_deparse_function_name(Oid funcid) +{ + HeapTuple proctup; + Form_pg_proc procform; + char *proname; + + proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid)); + if (!HeapTupleIsValid(proctup)) + elog(ERROR, "cache lookup failed for function %u", funcid); + procform = (Form_pg_proc) GETSTRUCT(proctup); + + proname = NameStr(procform->proname); + + ReleaseSysCache(proctup); + return makeString(proname); +} diff --git a/src/multicorn.c b/src/multicorn.c index 87c193ea..410e3f71 100644 --- a/src/multicorn.c +++ b/src/multicorn.c @@ -12,6 +12,8 @@ #include "optimizer/planmain.h" #include "optimizer/restrictinfo.h" #include "optimizer/clauses.h" +#include "optimizer/optimizer.h" +#include "optimizer/tlist.h" #if PG_VERSION_NUM < 120000 #include "optimizer/var.h" #include "dynloader.h" @@ -108,6 +110,10 @@ static void multicorn_subxact_callback(SubXactEvent event, SubTransactionId mySu SubTransactionId parentSubid, void *arg); #endif +static void multicornGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, + RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra + ); + #if PG_VERSION_NUM >= 90500 static List *multicornImportForeignSchema(ImportForeignSchemaStmt * stmt, Oid serverOid); @@ -117,6 +123,15 @@ static bool multicornIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *re static void multicorn_xact_callback(XactEvent event, void *arg); +/* Functions relating to aggregation/grouping pushdown */ +static bool multicorn_foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, + Node *havingQual); +static void multicorn_add_foreign_grouping_paths(PlannerInfo *root, + RelOptInfo *input_rel, + RelOptInfo *grouped_rel, + GroupPathExtraData *extra +); + /* Functions relating to scanning through subrelations */ static bool subscanReadRow(TupleTableSlot *slot, Relation subscanRel, void *subscanState); static void subscanEnd(ForeignScanState *node); @@ -223,6 +238,9 @@ multicorn_handler(PG_FUNCTION_ARGS) fdw_routine->IsForeignScanParallelSafe = multicornIsForeignScanParallelSafe; #endif + /* Support functions for upper relation push-down */ + fdw_routine->GetForeignUpperPaths = multicornGetForeignUpperPaths; + PG_RETURN_POINTER(fdw_routine); } @@ -286,6 +304,14 @@ multicornGetForeignRelSize(PlannerInfo *root, TupleDesc desc; baserel->fdw_private = planstate; + + /* Base foreign tables need to be push down always */ + planstate->pushdown_safe = true; + + /* Initialize upperrel pushdown info */ + planstate->groupby_supported = false; + planstate->agg_functions = NULL; + planstate->fdw_instance = getInstance(foreigntableid); planstate->foreigntableid = foreigntableid; /* Initialize the conversion info array */ @@ -299,7 +325,7 @@ multicornGetForeignRelSize(PlannerInfo *root, planstate->cinfos = palloc0(sizeof(ConversionInfo *) * planstate->numattrs); - initConversioninfo(planstate->cinfos, attinmeta); + initConversioninfo(planstate->cinfos, attinmeta, NULL); /* * needWholeRow = rel->trigdesc && rel->trigdesc->trig_insert_after_row; * @@ -332,7 +358,21 @@ multicornGetForeignRelSize(PlannerInfo *root, } else { - /* Pull "var" clauses to build an appropriate target list */ + /* + * Pull "var" clauses to build an appropriate target list + * + * baserel->reltarget->exprs can be used to determine which columns need + * to be fetched; but note that it only lists columns that have to be + * emitted by the ForeignScan plan node, not columns that are used in + * qual evaluation but not output by the query. + * + * baserel->baserestrictinfo is particularly interesting, as it contains + * restriction quals (WHERE clauses) that should be used to filter the + * rows to be fetched. (The FDW itself is not required to enforce these + * quals, as the core executor can check them instead.) + * + * For more details see: https://www.postgresql.org/docs/12/fdw-planning.html + */ #if PG_VERSION_NUM >= 90600 foreach(lc, extractColumns(baserel->reltarget->exprs, baserel->baserestrictinfo)) #else @@ -346,14 +386,14 @@ multicornGetForeignRelSize(PlannerInfo *root, * Store only a Value node containing the string name of the * column. */ - colname = colnameFromVar(var, root, planstate); + colname = colnameFromVar(var, root); if (colname != NULL && strVal(colname) != NULL) { planstate->target_list = lappend(planstate->target_list, colname); } } } - + /* Extract the restrictions from the plan. */ foreach(lc, baserel->baserestrictinfo) { @@ -361,6 +401,18 @@ multicornGetForeignRelSize(PlannerInfo *root, &planstate->qual_list); } + + /* + * Identify which baserestrictinfo clauses can be sent to the remote + * server and which can't. + * TODO: for now all WHERE clauses will be classified as local conditions + * since multicorn_foreign_expr_walker lacks T_OpExpr and T_Const cases, + * thus preventing pushdown. When adding support for this make sure to align + * the code in multicorn_extract_upper_rel_info as well. + */ + multicorn_classify_conditions(root, baserel, baserel->baserestrictinfo, + &planstate->remote_conds, &planstate->local_conds); + /* Inject the "rows" and "width" attribute into the baserel */ #if PG_VERSION_NUM >= 90600 getRelSize(planstate, root, &baserel->rows, &baserel->reltarget->width); @@ -467,7 +519,7 @@ multicornGetForeignPaths(PlannerInfo *root, */ static ForeignScan * multicornGetForeignPlan(PlannerInfo *root, - RelOptInfo *baserel, + RelOptInfo *foreignrel, Oid foreigntableid, ForeignPath *best_path, List *tlist, @@ -477,33 +529,70 @@ multicornGetForeignPlan(PlannerInfo *root, #endif ) { - Index scan_relid = baserel->relid; - MulticornPlanState *planstate = (MulticornPlanState *) baserel->fdw_private; - ListCell *lc; + MulticornPlanState *planstate = (MulticornPlanState *) foreignrel->fdw_private; + Index scan_relid; + List *fdw_scan_tlist = NIL; + ListCell *lc; + #if PG_VERSION_NUM >= 90600 best_path->path.pathtarget->width = planstate->width; #endif - scan_clauses = extract_actual_clauses(scan_clauses, false); + + if (IS_SIMPLE_REL(foreignrel)) + { + /* + * For base relations, set scan_relid as the relid of the relation. + */ + scan_relid = foreignrel->relid; + } + else + { + /* + * Join relation or upper relation - set scan_relid to 0. + */ + scan_relid = 0; + + /* + * For a join rel, baserestrictinfo is NIL and we are not considering + * parameterization right now, so there should be no scan_clauses for + * a joinrel or an upper rel either. + */ + Assert(!scan_clauses); + + /* Build the list of columns to be fetched from the foreign server. */ + fdw_scan_tlist = multicorn_build_tlist_to_deparse(foreignrel); + } + + best_path->path.pathtarget->width = planstate->width; + planstate->pathkeys = (List *) best_path->fdw_private; + + scan_clauses = extract_actual_clauses(scan_clauses, false); /* Extract the quals coming from a parameterized path, if any */ if (best_path->path.param_info) { foreach(lc, scan_clauses) { - extractRestrictions(baserel->relids, (Expr *) lfirst(lc), + extractRestrictions(foreignrel->relids, (Expr *) lfirst(lc), &planstate->qual_list); } } - planstate->pathkeys = (List *) best_path->fdw_private; + + /* Extract data needed for aggregations on the Python side */ + if (IS_UPPER_REL(foreignrel)) + { + multicorn_extract_upper_rel_info(root, fdw_scan_tlist, planstate); + } + return make_foreignscan(tlist, scan_clauses, scan_relid, - scan_clauses, /* no expressions to evaluate */ + scan_clauses, serializePlanState(planstate) #if PG_VERSION_NUM >= 90500 - , NULL + , fdw_scan_tlist , NULL /* All quals are meant to be rechecked */ - , NULL + , outer_plan #endif ); } @@ -542,12 +631,33 @@ multicornBeginForeignScan(ForeignScanState *node, int eflags) { ForeignScan *fscan = (ForeignScan *) node->ss.ps.plan; MulticornExecState *execstate; - TupleDesc tupdesc = RelationGetDescr(node->ss.ss_currentRelation); ListCell *lc; - execstate = initializeExecState(fscan->fdw_private); - execstate->values = palloc(sizeof(Datum) * tupdesc->natts); - execstate->nulls = palloc(sizeof(bool) * tupdesc->natts); + execstate = initializeExecState(fscan->fdw_private); + + /* + * Get info we'll need for converting data fetched from the foreign server + * into local representation and error reporting during that process. + */ + if (fscan->scan.scanrelid > 0) + { + execstate->rel = node->ss.ss_currentRelation; + execstate->tupdesc = RelationGetDescr(execstate->rel); + initConversioninfo(execstate->cinfos, TupleDescGetAttInMetadata(execstate->tupdesc), NULL); + } + else + { + execstate->rel = NULL; +#if (PG_VERSION_NUM >= 140000) + execstate->tupdesc = multicorn_get_tupdesc_for_join_scan_tuples(node); +#else + execstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor; +#endif + initConversioninfo(execstate->cinfos, TupleDescGetAttInMetadata(execstate->tupdesc), execstate->upper_rel_targets); + } + + execstate->values = palloc(sizeof(Datum) * execstate->tupdesc->natts); + execstate->nulls = palloc(sizeof(bool) * execstate->tupdesc->natts); execstate->qual_list = NULL; foreach(lc, fscan->fdw_exprs) { @@ -555,7 +665,6 @@ multicornBeginForeignScan(ForeignScanState *node, int eflags) ((Expr *) lfirst(lc)), &execstate->qual_list); } - initConversioninfo(execstate->cinfos, TupleDescGetAttInMetadata(tupdesc)); execstate->subscanCxt = AllocSetContextCreate( node->ss.ps.state->es_query_cxt, @@ -707,7 +816,7 @@ static List *buildCStoreQualList(AttrNumber *attrMap, int map_length, List *qual return result; } -/* +/* * Debug functions to print TupleDesc structs (stolen from private PG), * not currently used anywhere. */ @@ -728,7 +837,7 @@ static void print_desc(TupleDesc desc) { int i; for (i = 0; i < desc->natts; ++i) - { + { printatt((unsigned) i + 1, TupleDescAttr(desc, i)); } } @@ -766,7 +875,7 @@ static void subscanEnd(ForeignScanState *node) } /* - * Read a single row from the subscan relatio + * Read a single row from the subscan relation */ static bool subscanReadRow(TupleTableSlot *slot, Relation subscanRel, void *subscanState) { @@ -812,8 +921,8 @@ multicornIterateForeignScan(ForeignScanState *node) MulticornExecState *execstate = node->fdw_state; PyObject *p_value; MemoryContext oldcontext; - - /* The data flow here is kind of complex: we treat strings returned by + + /* The data flow here is kind of complex: we treat strings returned by * Python as relation names to read data from directly instead of getting Python * to return it to us. * @@ -825,9 +934,9 @@ multicornIterateForeignScan(ForeignScanState *node) * expects the relations to actually exist at query plan time -- for example, * CStore's read planner needs the table's RangeTblEntry to be present in the * planner's simple_rte_array, which we don't want to modify at query execution time. - * + * */ - + ExecClearTuple(slot); while (1) { @@ -902,7 +1011,7 @@ multicornIterateForeignScan(ForeignScanState *node) char *relation; Py_ssize_t strlength = 0; - + if (PyString_AsStringAndSize(p_value, &relation, &strlength) < 0) { elog(ERROR, "Could not convert subrelation to string!"); @@ -1121,7 +1230,7 @@ multicornBeginForeignModify(ModifyTableState *mtstate, modstate->buffer = makeStringInfo(); modstate->fdw_instance = getInstance(rel->rd_id); modstate->rowidAttrName = getRowIdColumn(modstate->fdw_instance); - initConversioninfo(modstate->cinfos, TupleDescGetAttInMetadata(desc)); + initConversioninfo(modstate->cinfos, TupleDescGetAttInMetadata(desc), NULL); oldcontext = MemoryContextSwitchTo(TopMemoryContext); MemoryContextSwitchTo(oldcontext); if (ps->ps_ResultTupleSlot) @@ -1130,7 +1239,7 @@ multicornBeginForeignModify(ModifyTableState *mtstate, modstate->resultCinfos = palloc0(sizeof(ConversionInfo *) * resultTupleDesc->natts); - initConversioninfo(modstate->resultCinfos, TupleDescGetAttInMetadata(resultTupleDesc)); + initConversioninfo(modstate->resultCinfos, TupleDescGetAttInMetadata(resultTupleDesc), NULL); } for (i = 0; i < desc->natts; i++) { @@ -1453,6 +1562,400 @@ multicornIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, } #endif +/* + * Merge FDW options from input relations into a new set of options for a join + * or an upper rel. + * + * For a join relation, FDW-specific information about the inner and outer + * relations is provided using fpinfo_i and fpinfo_o. For an upper relation, + * fpinfo_o provides the information for the input relation; fpinfo_i is + * expected to NULL. + */ +static void +multicorn_merge_fdw_options(MulticornPlanState *fpinfo, + const MulticornPlanState *fpinfo_o, + const MulticornPlanState *fpinfo_i) +{ + /* We must always have fpinfo_o. */ + Assert(fpinfo_o); + + /* fpinfo_i may be NULL, but if present the servers must both match. */ + Assert(!fpinfo_i || + fpinfo_i->server->serverid == fpinfo_o->server->serverid); + + /* + * Copy the server specific FDW options. (For a join, both relations come + * from the same server, so the server options should have the same value + * for both relations.) + */ + fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost; + fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost; + fpinfo->shippable_extensions = fpinfo_o->shippable_extensions; + fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate; + fpinfo->fetch_size = fpinfo_o->fetch_size; + + /* Multicorn specific options, differing from othe FDW implementations */ + fpinfo->fdw_instance = fpinfo_o->fdw_instance; + fpinfo->foreigntableid = fpinfo_o->foreigntableid; + fpinfo->numattrs = fpinfo_o->numattrs; + fpinfo->cinfos = fpinfo_o->cinfos; + fpinfo->pathkeys = fpinfo_o->pathkeys; + fpinfo->target_list = fpinfo_o->target_list; + fpinfo->qual_list = fpinfo_o->qual_list; + fpinfo->pathkeys = fpinfo_o->pathkeys; + + /* Merge the table level options from either side of the join. */ + if (fpinfo_i) + { + /* + * We'll prefer to use remote estimates for this join if any table + * from either side of the join is using remote estimates. This is + * most likely going to be preferred since they're already willing to + * pay the price of a round trip to get the remote EXPLAIN. In any + * case it's not entirely clear how we might otherwise handle this + * best. + */ + fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate || + fpinfo_i->use_remote_estimate; + + /* + * Set fetch size to maximum of the joining sides, since we are + * expecting the rows returned by the join to be proportional to the + * relation sizes. + */ + fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size); + } +} + +/* + * Assess whether the aggregation, grouping and having operations can be pushed + * down to the foreign server. As a side effect, save information we obtain in + * this function to MulticornPlanState of the input relation. + */ +static bool +multicorn_foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, + Node *havingQual) +{ + Query *query = root->parse; + MulticornPlanState *fpinfo = (MulticornPlanState *) grouped_rel->fdw_private; + PathTarget *grouping_target = grouped_rel->reltarget; + // Q: Or perhaps like in SQLite FDW `PathTarget *grouping_target = root->upper_targets[UPPERREL_GROUP_AGG];`? + // A: It appears they are the same thing; see also GridDBs sortgrouprefs explanation + MulticornPlanState *ofpinfo; + ListCell *lc; + int i; + List *tlist = NIL; + + /* We currently don't support pushing Grouping Sets. */ + if (query->groupingSets) + return false; + + /* Get the fpinfo of the underlying scan relation. */ + ofpinfo = (MulticornPlanState *) fpinfo->outerrel->fdw_private; + + /* + * If underlying scan relation has any local conditions, those conditions + * are required to be applied before performing aggregation. Hence the + * aggregate cannot be pushed down. + */ + if (ofpinfo->local_conds) + return false; + + /* + * Examine grouping expressions, as well as other expressions we'd need to + * compute, and check whether they are safe to push down to the foreign + * server. All GROUP BY expressions will be part of the grouping target + * and thus there is no need to search for them separately. Add grouping + * expressions into target list which will be passed to foreign server. + * + * A tricky fine point is that we must not put any expression into the + * target list that is just a foreign param (that is, something that + * deparse.c would conclude has to be sent to the foreign server). If we + * do, the expression will also appear in the fdw_exprs list of the plan + * node, and setrefs.c will get confused and decide that the fdw_exprs + * entry is actually a reference to the fdw_scan_tlist entry, resulting in + * a broken plan. Somewhat oddly, it's OK if the expression contains such + * a node, as long as it's not at top level; then no match is possible. + */ + i = 0; + foreach(lc, grouping_target->exprs) + { + Expr *expr = (Expr *) lfirst(lc); + Index sgref = get_pathtarget_sortgroupref(grouping_target, i); + ListCell *l; + + /* Check whether this expression is part of GROUP BY clause */ + if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause)) + { + TargetEntry *tle; + + /* + * Ensure GROUP BY clauses are shippable at all by the corresponding + * Python FDW instance. + */ + if (!fpinfo->groupby_supported) + return false; + + /* + * If any GROUP BY expression is not shippable, then we cannot + * push down aggregation to the foreign server. + */ + if (!multicorn_is_foreign_expr(root, grouped_rel, expr)) + return false; + + /* + * If it would be a foreign param, we can't put it into the tlist, + * so we have to fail. + */ + if (multicorn_is_foreign_param(root, grouped_rel, expr)) + return false; + + /* + * Pushable, so add to tlist. We need to create a TLE for this + * expression and apply the sortgroupref to it. We cannot use + * add_to_flat_tlist() here because that avoids making duplicate + * entries in the tlist. If there are duplicate entries with + * distinct sortgrouprefs, we have to duplicate that situation in + * the output tlist. + */ + tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false); + tle->ressortgroupref = sgref; + tlist = lappend(tlist, tle); + } + else + { + /* + * Ensure aggregation functions are shippable at all by the corresponding + * Python FDW instance. + */ + if (!fpinfo->agg_functions) + return false; + + /* + * Non-grouping expression we need to compute. Can we ship it + * as-is to the foreign server? + */ + if (multicorn_is_foreign_expr(root, grouped_rel, expr) && + !multicorn_is_foreign_param(root, grouped_rel, expr)) + { + /* Yes, so add to tlist as-is; OK to suppress duplicates */ + tlist = add_to_flat_tlist(tlist, list_make1(expr)); + } + else + { + /* Not pushable as a whole; extract its Vars and aggregates */ + List *aggvars; + + aggvars = pull_var_clause((Node *) expr, + PVC_INCLUDE_AGGREGATES); + + /* + * If any aggregate expression is not shippable, then we + * cannot push down aggregation to the foreign server. (We + * don't have to check is_foreign_param, since that certainly + * won't return true for any such expression.) + */ + if (!multicorn_is_foreign_expr(root, grouped_rel, (Expr *) aggvars)) + return false; + + /* + * Add aggregates, if any, into the targetlist. Plain Vars + * outside an aggregate can be ignored, because they should be + * either same as some GROUP BY column or part of some GROUP + * BY expression. In either case, they are already part of + * the targetlist and thus no need to add them again. In fact + * including plain Vars in the tlist when they do not match a + * GROUP BY column would cause the foreign server to complain + * that the shipped query is invalid. + */ + foreach(l, aggvars) + { + Expr *expr = (Expr *) lfirst(l); + + if (IsA(expr, Aggref)) + tlist = add_to_flat_tlist(tlist, list_make1(expr)); + } + } + } + + i++; + } + + /* + * TODO: Enable HAVING clause pushdowns. + * Note that certain simple HAVING clauses get transformed to WHERE clauses + * internally for performance reasons, i.e. smaller scan size. Example is a + * HAVING clause on a column that is also a part of the GROUP BY clause, in + * which case WHERE clause effectively achieves the same thing. In those + * cases the havingQual is NULL, even though root->hasHavingQual is true. + */ + if (havingQual) + { + return false; + } + + /* Store generated targetlist */ + fpinfo->grouped_tlist = tlist; + + /* Safe to pushdown */ + fpinfo->pushdown_safe = true; + + /* + * If user is willing to estimate cost for a scan using EXPLAIN, he + * intends to estimate scans on that relation more accurately. Then, it + * makes sense to estimate the cost of the grouping on that relation more + * accurately using EXPLAIN. + */ + fpinfo->use_remote_estimate = ofpinfo->use_remote_estimate; + + /* Copy startup and tuple cost as is from underneath input rel's fpinfo */ + fpinfo->fdw_startup_cost = ofpinfo->fdw_startup_cost; + fpinfo->fdw_tuple_cost = ofpinfo->fdw_tuple_cost; + + /* + * Set # of retrieved rows and cached relation costs to some negative + * value, so that we can detect when they are set to some sensible values, + * during one (usually the first) of the calls to multicorn_estimate_path_cost_size. + */ + fpinfo->retrieved_rows = -1; + fpinfo->rel_startup_cost = -1; + fpinfo->rel_total_cost = -1; + + return true; +} + +/* + * multicornGetForeignUpperPaths + * Add paths for post-join operations like aggregation, grouping etc. if + * corresponding operations are safe to push down. + * + * Right now, we only support aggregate, grouping and having clause pushdown. + */ +static void +multicornGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, + RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra +) +{ + MulticornPlanState *fpinfo; + + /* + * If input rel is not safe to pushdown, then simply return as we cannot + * perform any post-join operations on the foreign server. + */ + if (!input_rel->fdw_private || + !((MulticornPlanState *) input_rel->fdw_private)->pushdown_safe) + return; + + /* Ignore stages we don't support; and skip any duplicate calls. */ + if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private) + return; + + /* + * Check with the Python FDW instance whether it supports pushdown at all + * NB: Here we deviate from other FDWs, in that we don't know whether the + * something can be pushed down without consulting the corresponding Python + * FDW instance. + */ + + if (!canPushdownUpperrel((MulticornPlanState *) input_rel->fdw_private)) + { + return; + } + + fpinfo = (MulticornPlanState *) palloc0(sizeof(MulticornPlanState)); + fpinfo->pushdown_safe = false; + fpinfo->stage = stage; + output_rel->fdw_private = fpinfo; + + switch (stage) + { + case UPPERREL_GROUP_AGG: + multicorn_add_foreign_grouping_paths(root, input_rel, output_rel, (GroupPathExtraData *) extra); + break; + default: + elog(ERROR, "unexpected upper relation: %d", (int) stage); + break; + } +} + +/* + * multicorn_add_foreign_grouping_paths + * Add foreign path for grouping and/or aggregation. + * + * Given input_rel represents the underlying scan. The paths are added to the + * given grouped_rel. + */ +static void +multicorn_add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, + RelOptInfo *grouped_rel, + GroupPathExtraData *extra +) +{ + Query *parse = root->parse; + MulticornPlanState *ifpinfo = input_rel->fdw_private; + MulticornPlanState *fpinfo = grouped_rel->fdw_private; + ForeignPath *grouppath; + double rows; + int width; + Cost startup_cost; + Cost total_cost; + + /* Nothing to be done, if there is no grouping or aggregation required. */ + if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs && + !root->hasHavingQual) + return; + + Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE || + extra->patype == PARTITIONWISE_AGGREGATE_FULL); + + /* save the input_rel as outerrel in fpinfo */ + fpinfo->outerrel = input_rel; + + /* + * Copy foreign table, foreign server, user mapping, shippable extensions + * etc. details from the input relation's fpinfo. + */ + fpinfo->table = ifpinfo->table; + fpinfo->server = ifpinfo->server; + fpinfo->user = ifpinfo->user; + + /* copy the upperrel pushdown info as well */ + fpinfo->groupby_supported = ifpinfo->groupby_supported; + fpinfo->agg_functions = ifpinfo->agg_functions; + + multicorn_merge_fdw_options(fpinfo, ifpinfo, NULL); + + /* + * Assess if it is safe to push down aggregation and grouping. + * + * Use HAVING qual from extra. In case of child partition, it will have + * translated Vars. + */ + if (!multicorn_foreign_grouping_ok(root, grouped_rel, extra->havingQual)) + return; + + /* Use small cost to push down aggregate always */ + rows = width = startup_cost = total_cost = 1; + /* Now update this information in the fpinfo */ + fpinfo->rows = rows; + fpinfo->width = width; + fpinfo->startup_cost = startup_cost; + fpinfo->total_cost = total_cost; + + /* Create and add foreign path to the grouping relation. */ + grouppath = create_foreign_upper_path(root, + grouped_rel, + grouped_rel->reltarget, + rows, + startup_cost, + total_cost, + NIL, /* no pathkeys */ + NULL, + NIL); /* no fdw_private */ + + /* Add generated path into grouped_rel by add_path(). */ + add_path(grouped_rel, (Path *) grouppath); +} /* * "Serialize" a MulticornPlanState, so that it is safe to be carried @@ -1471,6 +1974,12 @@ serializePlanState(MulticornPlanState * state) result = lappend(result, serializeDeparsedSortGroup(state->pathkeys)); + result = lappend(result, state->upper_rel_targets); + + result = lappend(result, state->aggs); + + result = lappend(result, state->group_clauses); + return result; } @@ -1499,5 +2008,8 @@ initializeExecState(void *internalstate) execstate->nulls = palloc(attnum * sizeof(bool)); execstate->subscanRel = NULL; execstate->subscanState = NULL; + execstate->upper_rel_targets = list_nth(values, 4); + execstate->aggs = list_nth(values, 5); + execstate->group_clauses = list_nth(values, 6); return execstate; } diff --git a/src/multicorn.h b/src/multicorn.h index 8b00f98d..9f74eeb8 100644 --- a/src/multicorn.h +++ b/src/multicorn.h @@ -59,7 +59,28 @@ typedef struct ConversionInfo bool need_quote; } ConversionInfo; - +/* + * Context for multicorn_deparse_expr + */ +typedef struct deparse_expr_cxt +{ + PlannerInfo *root; /* global planner state */ + RelOptInfo *foreignrel; /* the foreign relation we are planning for */ + RelOptInfo *scanrel; /* the underlying scan relation. Same as + * foreignrel, when that represents a join or + * a base relation. */ + + StringInfo buf; /* output buffer to append to */ + List **params_list; /* exprs that will become remote Params */ + bool can_skip_cast; /* outer function can skip int2/int4/int8/float4/float8 cast */ +} deparse_expr_cxt; + +/* + * FDW-specific planner information kept in RelOptInfo.fdw_private for a + * multicorn foreign table. + * multicornGetForeignJoinPaths creates it for a joinrel (not implemented yet), + * and mutlicornGetForeignUpperPaths creates it for an upperrel. + */ typedef struct MulticornPlanState { Oid foreigntableid; @@ -78,6 +99,63 @@ typedef struct MulticornPlanState * getRelSize to GetForeignPlan. */ int width; + + /* Details about upperrel pushdown fetched from the Python FDW instance */ + bool groupby_supported; + List *agg_functions; + + /* + * Aggregation and grouping data to be passed to the execution phase. + * See MulticornExecState for more details. + */ + List *upper_rel_targets; + List *aggs; + List *group_clauses; + + /* + * True means that the relation can be pushed down. Always true for simple + * foreign scan. + */ + bool pushdown_safe; + + /* baserestrictinfo clauses, broken down into safe and unsafe subsets. */ + List *remote_conds; + List *local_conds; + + /* Actual remote restriction clauses for scan (sans RestrictInfos) */ + List *final_remote_exprs; + + /* Estimated size and cost for a scan or join. */ + double rows; + Cost startup_cost; + Cost total_cost; + + /* Costs excluding costs for transferring data from the foreign server */ + double retrieved_rows; + Cost rel_startup_cost; + Cost rel_total_cost; + + /* Options extracted from catalogs. */ + bool use_remote_estimate; + Cost fdw_startup_cost; + Cost fdw_tuple_cost; + List *shippable_extensions; /* OIDs of whitelisted extensions */ + + /* Join information */ + RelOptInfo *outerrel; + + /* Upper relation information */ + UpperRelationKind stage; + + /* Cached catalog information. */ + ForeignTable *table; + ForeignServer *server; + UserMapping *user; /* only set in use_remote_estimate mode */ + + int fetch_size; /* fetch size for this remote table */ + + /* Grouping information */ + List *grouped_tlist; } MulticornPlanState; typedef struct MulticornExecState @@ -91,6 +169,24 @@ typedef struct MulticornExecState Datum *values; bool *nulls; ConversionInfo **cinfos; + /* + * List containing targets to be returned from Python in case of aggregations. + * List elements are aggregation keys or group_clauses elements. + */ + List *upper_rel_targets; + /* + * In case the query contains aggregations, the lists below details which + * functions correspond to which columns. + * List elements are themselves Lists of String nodes, denoting agg key, + * operation and column names, respectively. The agg key corresponds to the + * upper_rel_targets list entries. + */ + List *aggs; + /* + * List containing GROUP BY information. + * List elements are column names for grouping. + */ + List *group_clauses; /* Common buffer to avoid repeated allocations */ StringInfo buffer; AttrNumber rowidAttno; @@ -105,6 +201,10 @@ typedef struct MulticornExecState TupleTableSlot *subscanSlot; AttrNumber *subscanAttrMap; uint64 tuplesRead; + + Relation rel; /* relcache entry for the foreign table. NULL + * for a foreign join scan. */ + TupleDesc tupdesc; /* tuple descriptor of scan */ } MulticornExecState; typedef struct MulticornModifyState @@ -158,6 +258,21 @@ typedef struct MulticornDeparsedSortGroup PathKey *key; } MulticornDeparsedSortGroup; +/* deparse.c */ +extern void multicorn_classify_conditions(PlannerInfo *root, + RelOptInfo *baserel, + List *input_conds, + List **remote_conds, + List **local_conds); +extern bool multicorn_is_foreign_expr(PlannerInfo *root, + RelOptInfo *baserel, + Expr *expr); +extern bool multicorn_is_foreign_param(PlannerInfo *root, + RelOptInfo *baserel, + Expr *expr); +extern List *multicorn_build_tlist_to_deparse(RelOptInfo *foreignrel); +extern void multicorn_extract_upper_rel_info(PlannerInfo *root, List *tlist, MulticornPlanState *fpinfo); + /* errors.c */ void errorCheck(void); @@ -177,6 +292,7 @@ PyObject *tupleTableSlotToPyObject(TupleTableSlot *slot, ConversionInfo ** cin char *getRowIdColumn(PyObject *fdw_instance); PyObject *optionsListToPyDict(List *options); const char *getPythonEncodingName(void); +bool canPushdownUpperrel(MulticornPlanState * state); void getRelSize(MulticornPlanState * state, PlannerInfo *root, @@ -201,10 +317,9 @@ void extractRestrictions(Relids base_relids, List **quals); List *extractColumns(List *reltargetlist, List *restrictinfolist); void initConversioninfo(ConversionInfo ** cinfo, - AttInMetadata *attinmeta); + AttInMetadata *attinmeta, List *upper_rel_targets); -Value *colnameFromVar(Var *var, PlannerInfo *root, - MulticornPlanState * state); +Value *colnameFromVar(Var *var, PlannerInfo *root); void computeDeparsedSortGroup(List *deparsed, MulticornPlanState *planstate, List **apply_pathkeys, diff --git a/src/python.c b/src/python.c index 3116ff8f..47a2c3b1 100644 --- a/src/python.c +++ b/src/python.c @@ -969,6 +969,50 @@ execute(ForeignScanState *node, ExplainState *es) if(PyList_Size(p_pathkeys) > 0){ PyDict_SetItemString(kwargs, "sortkeys", p_pathkeys); } + if (state->aggs) + { + PyObject *aggs = PyDict_New(); + ListCell *lc_agg; + List *agg_list; + + foreach(lc_agg, state->aggs) + { + PyObject *agg, + *function, + *column; + + agg = PyDict_New(); + + agg_list = (List *)lfirst(lc_agg); + function = PyUnicode_FromString(strVal(lsecond(agg_list))); + column = PyUnicode_FromString(strVal(lthird(agg_list))); + + PyDict_SetItemString(agg, "function", function); + PyDict_SetItemString(agg, "column", column); + PyDict_SetItemString(aggs, strVal(linitial(agg_list)), agg); + Py_DECREF(agg); + Py_DECREF(function); + Py_DECREF(column); + } + + PyDict_SetItemString(kwargs, "aggs", aggs); + Py_DECREF(aggs); + } + if (state->group_clauses) + { + PyObject *group_clauses = PyList_New(0); + ListCell *lc_groupc; + + foreach(lc_groupc, state->group_clauses) + { + PyObject *column = PyUnicode_FromString(strVal(lfirst(lc_groupc))); + PyList_Append(group_clauses, column); + Py_DECREF(column); + } + + PyDict_SetItemString(kwargs, "group_clauses", group_clauses); + Py_DECREF(group_clauses); + } if(es != NULL){ PyObject * verbose; if(es->verbose){ @@ -1013,11 +1057,29 @@ void pynumberToCString(PyObject *pyobject, StringInfo buffer, ConversionInfo * cinfo) { - PyObject *pTempStr; + PyObject *pTempStr, *pyTempLong; char *tempbuffer; Py_ssize_t strlength = 0; - pTempStr = PyObject_Str(pyobject); + if ( + !PyLong_Check(pyobject) && + (cinfo->atttypoid == INT2OID || cinfo->atttypoid == INT4OID || cinfo->atttypoid == INT8OID) + ) + { + /* + * Certain data sources, such as ElasticSearch, can return floats for + * aggregations of integers that are expected to return integers + * (e.g. min, max, sum), so we basically need to do int() here. + */ + pyTempLong = PyNumber_Long(pyobject); + pTempStr = PyObject_Str(pyTempLong); + Py_DECREF(pyTempLong); + } + else + { + pTempStr = PyObject_Str(pyobject); + } + PyString_AsStringAndSize(pTempStr, &tempbuffer, &strlength); appendBinaryStringInfo(buffer, tempbuffer, strlength); Py_DECREF(pTempStr); @@ -1642,6 +1704,78 @@ canSort(MulticornPlanState * state, List *deparsed) return result; } +/* + * Call the can_pushdown_upperrel method from the python implementation, to + * determine whether upper relations can be pushed down to the corresponding + * data source to begin with. + * + * If yes, then also initialize some fields in MulticornPlanState needed for + * more granular conditional logic for assesing whether the particular query + * is suitable for pushdown. + */ +bool +canPushdownUpperrel(MulticornPlanState * state) +{ + PyObject *fdw_instance = state->fdw_instance, + *p_upperrel_pushdown, + *p_object, + *p_agg_funcs, + *p_agg_func, + *p_item; + Py_ssize_t i, + size, + strlength; + char *tempbuffer; + StringInfo agg_func; + bool pushdown_upperrel = false; + + p_upperrel_pushdown = PyObject_CallMethod(fdw_instance, "can_pushdown_upperrel", "()"); + errorCheck(); + + if (p_upperrel_pushdown != NULL && p_upperrel_pushdown != Py_None) + { + /* Determine whether the FDW instance supports GROUP BYs */ + p_object = PyMapping_GetItemString(p_upperrel_pushdown, "groupby_supported"); + if (p_object != NULL && p_object != Py_None) + { + state->groupby_supported = PyObject_IsTrue(p_object); + } + Py_XDECREF(p_object); + + /* Determine which aggregation functions are supported */ + p_object = PyMapping_GetItemString(p_upperrel_pushdown, "agg_functions"); + if (p_object != NULL && p_object != Py_None) + { + p_agg_funcs = PyMapping_Keys(p_object); + size = PySequence_Size(p_agg_funcs); + + for (i = 0; i < size; i++) + { + agg_func = makeStringInfo(); + strlength = 0; + + p_item = PySequence_GetItem(p_agg_funcs, i); + p_agg_func = PyUnicode_AsEncodedString(p_item, getPythonEncodingName(), NULL); + errorCheck(); + PyBytes_AsStringAndSize(p_agg_func, &tempbuffer, &strlength); + appendBinaryStringInfo(agg_func, tempbuffer, strlength); + + state->agg_functions = lappend(state->agg_functions, makeString(agg_func->data)); + + Py_DECREF(p_agg_func); + Py_DECREF(p_item); + } + Py_DECREF(p_agg_funcs); + } + Py_XDECREF(p_object); + + pushdown_upperrel = true; + } + + Py_XDECREF(p_upperrel_pushdown); + return pushdown_upperrel; +} + PyObject * tupleTableSlotToPyObject(TupleTableSlot *slot, ConversionInfo ** cinfos) { diff --git a/src/query.c b/src/query.c index dd8997e4..60423ff3 100644 --- a/src/query.c +++ b/src/query.c @@ -107,7 +107,7 @@ extractColumns(List *reltargetlist, List *restrictinfolist) * objects back to suitable postgresql data structures. */ void -initConversioninfo(ConversionInfo ** cinfos, AttInMetadata *attinmeta) +initConversioninfo(ConversionInfo ** cinfos, AttInMetadata *attinmeta, List *upper_rel_targets) { int i; @@ -117,7 +117,17 @@ initConversioninfo(ConversionInfo ** cinfos, AttInMetadata *attinmeta) Oid outfuncoid; bool typIsVarlena; + char *attrname = NameStr(attr->attname); + if (upper_rel_targets) + { + /* + * For aggregations/groupings the targets lack attname, so we instead + * refer to the targets through references generated in + * multicorn_extract_upper_rel_info(). + */ + attrname = strVal(list_nth(upper_rel_targets, i)); + } if (!attr->attisdropped) { @@ -130,7 +140,7 @@ initConversioninfo(ConversionInfo ** cinfos, AttInMetadata *attinmeta) cinfo->atttypmod = attinmeta->atttypmods[i]; cinfo->attioparam = attinmeta->attioparams[i]; cinfo->attinfunc = &attinmeta->attinfuncs[i]; - cinfo->attrname = NameStr(attr->attname); + cinfo->attrname = attrname; cinfo->attnum = i + 1; cinfo->attndims = attr->attndims; cinfo->need_quote = false; @@ -438,7 +448,7 @@ extractClauseFromNullTest(Relids base_relids, * Returns a "Value" node containing the string name of the column from a var. */ Value * -colnameFromVar(Var *var, PlannerInfo *root, MulticornPlanState * planstate) +colnameFromVar(Var *var, PlannerInfo *root) { RangeTblEntry *rte = rte = planner_rt_fetch(var->varno, root); char *attname = get_attname(rte->relid, var->varattno);