diff --git a/python/multicorn/__init__.py b/python/multicorn/__init__.py index 0ebcd28f..81ff3648 100755 --- a/python/multicorn/__init__.py +++ b/python/multicorn/__init__.py @@ -228,12 +228,17 @@ def can_pushdown_upperrel(self): : , ... }, + "supported_operators": [">", "<", "=", ...] } Each entry in `agg_functions` dict corresponds to a maping between the name of a aggregation function in PostgreSQL, and the equivalent foreign function. If no mapping exists for an aggregate function any queries containing it won't be pushed down. + + The `supported_operators` entry lists all operators that can be used + in qual (WHERE) clauses so that the aggregation pushdown will still + be supported. """ return None diff --git a/src/deparse.c b/src/deparse.c index e813201d..60155a28 100644 --- a/src/deparse.c +++ b/src/deparse.c @@ -76,34 +76,6 @@ typedef struct foreign_loc_cxt static Value *multicorn_deparse_function_name(Oid funcid); -/* - * Examine each qual clause in input_conds, and classify them into two groups, - * which are returned as two lists: - * - remote_conds contains expressions that can be evaluated remotely - * - local_conds contains expressions that can't be evaluated remotely - */ -void -multicorn_classify_conditions(PlannerInfo *root, - RelOptInfo *baserel, - List *input_conds, - List **remote_conds, - List **local_conds) -{ - ListCell *lc; - - *remote_conds = NIL; - *local_conds = NIL; - - foreach(lc, input_conds) - { - RestrictInfo *ri = lfirst_node(RestrictInfo, lc); - - if (multicorn_is_foreign_expr(root, baserel, ri->clause)) - *remote_conds = lappend(*remote_conds, ri); - else - *local_conds = lappend(*local_conds, ri); - } -} /* * Return true if given object is one of PostgreSQL's built-in objects. @@ -223,6 +195,7 @@ multicorn_foreign_expr_walker(Node *node, Aggref *agg = (Aggref *) node; ListCell *lc; char *opername = NULL; + StringInfo opername_composite = makeStringInfo(); Oid schema; /* get function name and schema */ @@ -239,11 +212,20 @@ multicorn_foreign_expr_walker(Node *node, if (schema != PG_CATALOG_NAMESPACE) return false; - /* Make sure the specific function at hand is shippable + /* Make sure the specific function at hand is shippable * NB: here we deviate from standard FDW code, since the allowed * function list is fetched from the Python FDW instance */ - if (!list_member(fpinfo->agg_functions, makeString(opername))) + if (agg->aggstar) + { + initStringInfo(opername_composite); + appendStringInfoString(opername_composite, opername); + appendStringInfoString(opername_composite, ".*"); + + if (!list_member(fpinfo->agg_functions, makeString(opername_composite->data))) + return false; + } + else if (!list_member(fpinfo->agg_functions, makeString(opername))) return false; /* Not safe to pushdown when not in grouping context */ @@ -255,10 +237,10 @@ multicorn_foreign_expr_walker(Node *node, return false; /* - * For now we don't push down DISTINCT or COUNT(*) aggregations. + * For now we don't push down DISTINCT aggregations. * TODO: Enable this */ - if (agg->aggdistinct || agg->aggstar) + if (agg->aggdistinct) return false; /* @@ -505,22 +487,6 @@ multicorn_build_tlist_to_deparse(RelOptInfo *foreignrel) if (IS_UPPER_REL(foreignrel)) return fpinfo->grouped_tlist; - /* - * We require columns specified in foreignrel->reltarget->exprs and those - * required for evaluating the local conditions. - */ - tlist = add_to_flat_tlist(tlist, - pull_var_clause((Node *) foreignrel->reltarget->exprs, - PVC_RECURSE_PLACEHOLDERS)); - foreach(lc, fpinfo->local_conds) - { - RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); - - tlist = add_to_flat_tlist(tlist, - pull_var_clause((Node *) rinfo->clause, - PVC_RECURSE_PLACEHOLDERS)); - } - return tlist; } @@ -564,10 +530,17 @@ multicorn_extract_upper_rel_info(PlannerInfo *root, List *tlist, MulticornPlanSt aggref = (Aggref *) tle->expr; function = multicorn_deparse_function_name(aggref->aggfnoid); - var = linitial(pull_var_clause((Node *) aggref, + if (aggref->aggstar) + { + colname = makeString("*"); + } + else + { + var = linitial(pull_var_clause((Node *) aggref, PVC_RECURSE_AGGREGATES | PVC_RECURSE_PLACEHOLDERS)); - colname = colnameFromVar(var, root); + colname = colnameFromVar(var, root); + } initStringInfo(agg_key); appendStringInfoString(agg_key, strVal(function)); diff --git a/src/multicorn.c b/src/multicorn.c index 410e3f71..53f978ef 100644 --- a/src/multicorn.c +++ b/src/multicorn.c @@ -402,16 +402,7 @@ multicornGetForeignRelSize(PlannerInfo *root, } - /* - * Identify which baserestrictinfo clauses can be sent to the remote - * server and which can't. - * TODO: for now all WHERE clauses will be classified as local conditions - * since multicorn_foreign_expr_walker lacks T_OpExpr and T_Const cases, - * thus preventing pushdown. When adding support for this make sure to align - * the code in multicorn_extract_upper_rel_info as well. - */ - multicorn_classify_conditions(root, baserel, baserel->baserestrictinfo, - &planstate->remote_conds, &planstate->local_conds); + planstate->baserestrictinfo = baserel->baserestrictinfo; /* Inject the "rows" and "width" attribute into the baserel */ #if PG_VERSION_NUM >= 90600 @@ -529,7 +520,7 @@ multicornGetForeignPlan(PlannerInfo *root, #endif ) { - MulticornPlanState *planstate = (MulticornPlanState *) foreignrel->fdw_private; + MulticornPlanState *ofpinfo, *planstate = (MulticornPlanState *) foreignrel->fdw_private; Index scan_relid; List *fdw_scan_tlist = NIL; ListCell *lc; @@ -581,7 +572,24 @@ multicornGetForeignPlan(PlannerInfo *root, /* Extract data needed for aggregations on the Python side */ if (IS_UPPER_REL(foreignrel)) { + /* + * TODO: fdw_scan_tlist is present in the execute phase as well, via + * node->ss.ps.plan.fdw_scan_tlist, and instead of root, one can employ + * rte = exec_rt_fetch(rtindex, estate) to fetch the column name through + * get_attname. + * + * Migrating the below function into multicornBeginForeignScan would thus + * reduce the duplication of plan and execute fields that are now being + * serialized. + */ multicorn_extract_upper_rel_info(root, fdw_scan_tlist, planstate); + + /* + * Since scan_clauses are empty in case of upper relations for some + * reason. We pass the clauses from the base relation obtained in MulticornGetForeignRelSize. + */ + ofpinfo = (MulticornPlanState *) planstate->outerrel->fdw_private; + planstate->baserestrictinfo = extract_actual_clauses(ofpinfo->baserestrictinfo, false); } return make_foreignscan(tlist, @@ -632,6 +640,8 @@ multicornBeginForeignScan(ForeignScanState *node, int eflags) ForeignScan *fscan = (ForeignScan *) node->ss.ps.plan; MulticornExecState *execstate; ListCell *lc; + int rtindex; + List *clauses; execstate = initializeExecState(fscan->fdw_private); @@ -641,12 +651,24 @@ multicornBeginForeignScan(ForeignScanState *node, int eflags) */ if (fscan->scan.scanrelid > 0) { + /* + * Simple/base relation + */ + execstate->rel = node->ss.ss_currentRelation; execstate->tupdesc = RelationGetDescr(execstate->rel); initConversioninfo(execstate->cinfos, TupleDescGetAttInMetadata(execstate->tupdesc), NULL); + + // Needed for parsing quals + rtindex = fscan->scan.scanrelid; + clauses = fscan->fdw_exprs; } else { + /* + * Upper (aggregation) relation + */ + execstate->rel = NULL; #if (PG_VERSION_NUM >= 140000) execstate->tupdesc = multicorn_get_tupdesc_for_join_scan_tuples(node); @@ -654,17 +676,25 @@ multicornBeginForeignScan(ForeignScanState *node, int eflags) execstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor; #endif initConversioninfo(execstate->cinfos, TupleDescGetAttInMetadata(execstate->tupdesc), execstate->upper_rel_targets); + + /* + * In case of a join or aggregate use the lowest-numbered member RTE out + * of all all the base relations participating in the underlying scan. + * NB: This may not work well in case of joins, keep an eye out for it. + */ + rtindex = bms_next_member(fscan->fs_relids, -1); + clauses = execstate->baserestrictinfo; } execstate->values = palloc(sizeof(Datum) * execstate->tupdesc->natts); execstate->nulls = palloc(sizeof(bool) * execstate->tupdesc->natts); - execstate->qual_list = NULL; - foreach(lc, fscan->fdw_exprs) - { - extractRestrictions(bms_make_singleton(fscan->scan.scanrelid), - ((Expr *) lfirst(lc)), - &execstate->qual_list); - } + execstate->qual_list = NULL; + foreach(lc, clauses) + { + extractRestrictions(bms_make_singleton(rtindex), + ((Expr *) lfirst(lc)), + &execstate->qual_list); + } execstate->subscanCxt = AllocSetContextCreate( node->ss.ps.state->es_query_cxt, @@ -1654,12 +1684,16 @@ multicorn_foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, ofpinfo = (MulticornPlanState *) fpinfo->outerrel->fdw_private; /* - * If underlying scan relation has any local conditions, those conditions - * are required to be applied before performing aggregation. Hence the - * aggregate cannot be pushed down. + * If underlying scan relation has any quals with unsupported operators + * we cannot pushdown the aggregation. */ - if (ofpinfo->local_conds) - return false; + foreach(lc, ofpinfo->qual_list) + { + MulticornBaseQual *qual = (MulticornBaseQual *) lfirst(lc); + + if (!list_member(ofpinfo->operators_supported, makeString(qual->opname))) + return false; + } /* * Examine grouping expressions, as well as other expressions we'd need to @@ -1980,6 +2014,8 @@ serializePlanState(MulticornPlanState * state) result = lappend(result, state->group_clauses); + result = lappend(result, state->baserestrictinfo); + return result; } @@ -1996,6 +2032,16 @@ initializeExecState(void *internalstate) Oid foreigntableid = ((Const *) lsecond(values))->constvalue; List *pathkeys; + ForeignTable *ftable = GetForeignTable(foreigntableid); + Relation rel = RelationIdGetRelation(ftable->relid); + AttInMetadata *attinmeta; + + attinmeta = TupleDescGetAttInMetadata(rel->rd_att); + execstate->qual_cinfos = palloc0(sizeof(ConversionInfo *) * + attinmeta->tupdesc->natts); + initConversioninfo(execstate->qual_cinfos, attinmeta, NULL); + RelationClose(rel); + /* Those list must be copied, because their memory context can become */ /* invalid during the execution (in particular with the cursor interface) */ execstate->target_list = copyObject(lthird(values)); @@ -2011,5 +2057,6 @@ initializeExecState(void *internalstate) execstate->upper_rel_targets = list_nth(values, 4); execstate->aggs = list_nth(values, 5); execstate->group_clauses = list_nth(values, 6); + execstate->baserestrictinfo = list_nth(values, 7); return execstate; } diff --git a/src/multicorn.h b/src/multicorn.h index 9f74eeb8..c507effe 100644 --- a/src/multicorn.h +++ b/src/multicorn.h @@ -103,6 +103,7 @@ typedef struct MulticornPlanState /* Details about upperrel pushdown fetched from the Python FDW instance */ bool groupby_supported; List *agg_functions; + List *operators_supported; /* * Aggregation and grouping data to be passed to the execution phase. @@ -118,9 +119,8 @@ typedef struct MulticornPlanState */ bool pushdown_safe; - /* baserestrictinfo clauses, broken down into safe and unsafe subsets. */ - List *remote_conds; - List *local_conds; + /* qual clauses */ + List *baserestrictinfo; /* Actual remote restriction clauses for scan (sans RestrictInfos) */ List *final_remote_exprs; @@ -169,6 +169,12 @@ typedef struct MulticornExecState Datum *values; bool *nulls; ConversionInfo **cinfos; + /* + * In case of aggregations the upper rel target list does not correspond to + * the base table target list, so separate conversion information must be + * provided when converting the quals in the execute method. + */ + ConversionInfo **qual_cinfos; /* * List containing targets to be returned from Python in case of aggregations. * List elements are aggregation keys or group_clauses elements. @@ -187,6 +193,11 @@ typedef struct MulticornExecState * List elements are column names for grouping. */ List *group_clauses; + /* + * Qual conditions parsed in the MulticornGetForeignRelSize + */ + List *baserestrictinfo; + /* Common buffer to avoid repeated allocations */ StringInfo buffer; AttrNumber rowidAttno; @@ -259,11 +270,6 @@ typedef struct MulticornDeparsedSortGroup } MulticornDeparsedSortGroup; /* deparse.c */ -extern void multicorn_classify_conditions(PlannerInfo *root, - RelOptInfo *baserel, - List *input_conds, - List **remote_conds, - List **local_conds); extern bool multicorn_is_foreign_expr(PlannerInfo *root, RelOptInfo *baserel, Expr *expr); diff --git a/src/python.c b/src/python.c index 47a2c3b1..36963196 100644 --- a/src/python.c +++ b/src/python.c @@ -944,7 +944,7 @@ execute(ForeignScanState *node, ExplainState *es) } if (newqual != NULL) { - PyObject *python_qual = qualdefToPython((MulticornConstQual *) newqual, state->cinfos); + PyObject *python_qual = qualdefToPython((MulticornConstQual *) newqual, state->qual_cinfos); if (python_qual != NULL) { @@ -1408,6 +1408,40 @@ pyobjectToDatum(PyObject *object, StringInfo buffer, return value; } +void +pythonUnicodeSequenceToList(PyObject *pySequence, List **target) +{ + PyObject *p_item, + *p_string; + Py_ssize_t i, + size, + strlength; + char *tempbuffer; + StringInfo element; + + if (pySequence != NULL && pySequence != Py_None) + { + size = PySequence_Size(pySequence); + + for (i = 0; i < size; i++) + { + element = makeStringInfo(); + strlength = 0; + + p_item = PySequence_GetItem(pySequence, i); + p_string = PyUnicode_AsEncodedString(p_item, getPythonEncodingName(), NULL); + errorCheck(); + PyBytes_AsStringAndSize(p_string, &tempbuffer, &strlength); + appendBinaryStringInfo(element, tempbuffer, strlength); + + *target = lappend(*target, makeString(element->data)); + + Py_DECREF(p_item); + Py_DECREF(p_string); + } + } +} + PyObject * datumStringToPython(Datum datum, ConversionInfo * cinfo) { @@ -1720,13 +1754,7 @@ canPushdownUpperrel(MulticornPlanState * state) *p_upperrel_pushdown, *p_object, *p_agg_funcs, - *p_agg_func, - *p_item; - Py_ssize_t i, - size, - strlength; - char *tempbuffer; - StringInfo agg_func; + *p_ops; bool pushdown_upperrel = false; p_upperrel_pushdown = PyObject_CallMethod(fdw_instance, "can_pushdown_upperrel", "()"); @@ -1747,28 +1775,16 @@ canPushdownUpperrel(MulticornPlanState * state) if (p_object != NULL && p_object != Py_None) { p_agg_funcs = PyMapping_Keys(p_object); - size = PySequence_Size(p_agg_funcs); - - for (i = 0; i < size; i++) - { - agg_func = makeStringInfo(); - strlength = 0; - - p_item = PySequence_GetItem(p_agg_funcs, i); - p_agg_func = PyUnicode_AsEncodedString(p_item, getPythonEncodingName(), NULL); - errorCheck(); - PyBytes_AsStringAndSize(p_agg_func, &tempbuffer, &strlength); - appendBinaryStringInfo(agg_func, tempbuffer, strlength); - - state->agg_functions = lappend(state->agg_functions, makeString(agg_func->data)); - - Py_DECREF(p_agg_func); - Py_DECREF(p_item); - } + pythonUnicodeSequenceToList(p_agg_funcs, &state->agg_functions); Py_DECREF(p_agg_funcs); } Py_XDECREF(p_object); + /* Construct supported qual operators list */ + p_ops = PyMapping_GetItemString(p_upperrel_pushdown, "operators_supported"); + pythonUnicodeSequenceToList(p_ops, &state->operators_supported); + Py_XDECREF(p_ops); + pushdown_upperrel = true; } diff --git a/src/query.c b/src/query.c index 60423ff3..e54e99d0 100644 --- a/src/query.c +++ b/src/query.c @@ -122,10 +122,10 @@ initConversioninfo(ConversionInfo ** cinfos, AttInMetadata *attinmeta, List *upp if (upper_rel_targets) { /* - * For aggregations/groupings the targets lack attname, so we instead - * refer to the targets through references generated in - * multicorn_extract_upper_rel_info(). - */ + * For aggregations/groupings the targets lack attname, so we instead + * refer to the targets through references generated in + * multicorn_extract_upper_rel_info(). + */ attrname = strVal(list_nth(upper_rel_targets, i)); }