Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions python/multicorn/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,17 @@ def can_pushdown_upperrel(self):
<PG_agg_func_name>: <foreign_agg_func_name>,
...
},
"supported_operators": [">", "<", "=", ...]
}

Each entry in `agg_functions` dict corresponds to a maping between
the name of a aggregation function in PostgreSQL, and the equivalent
foreign function. If no mapping exists for an aggregate function any
queries containing it won't be pushed down.

The `supported_operators` entry lists all operators that can be used
in qual (WHERE) clauses so that the aggregation pushdown will still
be supported.
"""
return None

Expand Down
73 changes: 23 additions & 50 deletions src/deparse.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,34 +76,6 @@ typedef struct foreign_loc_cxt

static Value *multicorn_deparse_function_name(Oid funcid);

/*
* Examine each qual clause in input_conds, and classify them into two groups,
* which are returned as two lists:
* - remote_conds contains expressions that can be evaluated remotely
* - local_conds contains expressions that can't be evaluated remotely
*/
void
multicorn_classify_conditions(PlannerInfo *root,
RelOptInfo *baserel,
List *input_conds,
List **remote_conds,
List **local_conds)
{
ListCell *lc;

*remote_conds = NIL;
*local_conds = NIL;

foreach(lc, input_conds)
{
RestrictInfo *ri = lfirst_node(RestrictInfo, lc);

if (multicorn_is_foreign_expr(root, baserel, ri->clause))
*remote_conds = lappend(*remote_conds, ri);
else
*local_conds = lappend(*local_conds, ri);
}
}

/*
* Return true if given object is one of PostgreSQL's built-in objects.
Expand Down Expand Up @@ -223,6 +195,7 @@ multicorn_foreign_expr_walker(Node *node,
Aggref *agg = (Aggref *) node;
ListCell *lc;
char *opername = NULL;
StringInfo opername_composite = makeStringInfo();
Oid schema;

/* get function name and schema */
Expand All @@ -239,11 +212,20 @@ multicorn_foreign_expr_walker(Node *node,
if (schema != PG_CATALOG_NAMESPACE)
return false;

/* Make sure the specific function at hand is shippable
/* Make sure the specific function at hand is shippable
* NB: here we deviate from standard FDW code, since the allowed
* function list is fetched from the Python FDW instance
*/
if (!list_member(fpinfo->agg_functions, makeString(opername)))
if (agg->aggstar)
{
initStringInfo(opername_composite);
appendStringInfoString(opername_composite, opername);
appendStringInfoString(opername_composite, ".*");

if (!list_member(fpinfo->agg_functions, makeString(opername_composite->data)))
return false;
}
else if (!list_member(fpinfo->agg_functions, makeString(opername)))
return false;

/* Not safe to pushdown when not in grouping context */
Expand All @@ -255,10 +237,10 @@ multicorn_foreign_expr_walker(Node *node,
return false;

/*
* For now we don't push down DISTINCT or COUNT(*) aggregations.
* For now we don't push down DISTINCT aggregations.
* TODO: Enable this
*/
if (agg->aggdistinct || agg->aggstar)
if (agg->aggdistinct)
return false;

/*
Expand Down Expand Up @@ -505,22 +487,6 @@ multicorn_build_tlist_to_deparse(RelOptInfo *foreignrel)
if (IS_UPPER_REL(foreignrel))
return fpinfo->grouped_tlist;

/*
* We require columns specified in foreignrel->reltarget->exprs and those
* required for evaluating the local conditions.
*/
tlist = add_to_flat_tlist(tlist,
pull_var_clause((Node *) foreignrel->reltarget->exprs,
PVC_RECURSE_PLACEHOLDERS));
foreach(lc, fpinfo->local_conds)
{
RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);

tlist = add_to_flat_tlist(tlist,
pull_var_clause((Node *) rinfo->clause,
PVC_RECURSE_PLACEHOLDERS));
}

return tlist;
}

Expand Down Expand Up @@ -564,10 +530,17 @@ multicorn_extract_upper_rel_info(PlannerInfo *root, List *tlist, MulticornPlanSt
aggref = (Aggref *) tle->expr;
function = multicorn_deparse_function_name(aggref->aggfnoid);

var = linitial(pull_var_clause((Node *) aggref,
if (aggref->aggstar)
{
colname = makeString("*");
}
else
{
var = linitial(pull_var_clause((Node *) aggref,
PVC_RECURSE_AGGREGATES |
PVC_RECURSE_PLACEHOLDERS));
colname = colnameFromVar(var, root);
colname = colnameFromVar(var, root);
}

initStringInfo(agg_key);
appendStringInfoString(agg_key, strVal(function));
Expand Down
93 changes: 70 additions & 23 deletions src/multicorn.c
Original file line number Diff line number Diff line change
Expand Up @@ -402,16 +402,7 @@ multicornGetForeignRelSize(PlannerInfo *root,

}

/*
* Identify which baserestrictinfo clauses can be sent to the remote
* server and which can't.
* TODO: for now all WHERE clauses will be classified as local conditions
* since multicorn_foreign_expr_walker lacks T_OpExpr and T_Const cases,
* thus preventing pushdown. When adding support for this make sure to align
* the code in multicorn_extract_upper_rel_info as well.
*/
multicorn_classify_conditions(root, baserel, baserel->baserestrictinfo,
&planstate->remote_conds, &planstate->local_conds);
planstate->baserestrictinfo = baserel->baserestrictinfo;

/* Inject the "rows" and "width" attribute into the baserel */
#if PG_VERSION_NUM >= 90600
Expand Down Expand Up @@ -529,7 +520,7 @@ multicornGetForeignPlan(PlannerInfo *root,
#endif
)
{
MulticornPlanState *planstate = (MulticornPlanState *) foreignrel->fdw_private;
MulticornPlanState *ofpinfo, *planstate = (MulticornPlanState *) foreignrel->fdw_private;
Index scan_relid;
List *fdw_scan_tlist = NIL;
ListCell *lc;
Expand Down Expand Up @@ -581,7 +572,24 @@ multicornGetForeignPlan(PlannerInfo *root,
/* Extract data needed for aggregations on the Python side */
if (IS_UPPER_REL(foreignrel))
{
/*
* TODO: fdw_scan_tlist is present in the execute phase as well, via
* node->ss.ps.plan.fdw_scan_tlist, and instead of root, one can employ
* rte = exec_rt_fetch(rtindex, estate) to fetch the column name through
* get_attname.
*
* Migrating the below function into multicornBeginForeignScan would thus
* reduce the duplication of plan and execute fields that are now being
* serialized.
*/
multicorn_extract_upper_rel_info(root, fdw_scan_tlist, planstate);

/*
* Since scan_clauses are empty in case of upper relations for some
* reason. We pass the clauses from the base relation obtained in MulticornGetForeignRelSize.
*/
ofpinfo = (MulticornPlanState *) planstate->outerrel->fdw_private;
planstate->baserestrictinfo = extract_actual_clauses(ofpinfo->baserestrictinfo, false);
}

return make_foreignscan(tlist,
Expand Down Expand Up @@ -632,6 +640,8 @@ multicornBeginForeignScan(ForeignScanState *node, int eflags)
ForeignScan *fscan = (ForeignScan *) node->ss.ps.plan;
MulticornExecState *execstate;
ListCell *lc;
int rtindex;
List *clauses;

execstate = initializeExecState(fscan->fdw_private);

Expand All @@ -641,30 +651,50 @@ multicornBeginForeignScan(ForeignScanState *node, int eflags)
*/
if (fscan->scan.scanrelid > 0)
{
/*
* Simple/base relation
*/

execstate->rel = node->ss.ss_currentRelation;
execstate->tupdesc = RelationGetDescr(execstate->rel);
initConversioninfo(execstate->cinfos, TupleDescGetAttInMetadata(execstate->tupdesc), NULL);

// Needed for parsing quals
rtindex = fscan->scan.scanrelid;
clauses = fscan->fdw_exprs;
}
else
{
/*
* Upper (aggregation) relation
*/

execstate->rel = NULL;
#if (PG_VERSION_NUM >= 140000)
execstate->tupdesc = multicorn_get_tupdesc_for_join_scan_tuples(node);
#else
execstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
#endif
initConversioninfo(execstate->cinfos, TupleDescGetAttInMetadata(execstate->tupdesc), execstate->upper_rel_targets);

/*
* In case of a join or aggregate use the lowest-numbered member RTE out
* of all all the base relations participating in the underlying scan.
* NB: This may not work well in case of joins, keep an eye out for it.
*/
rtindex = bms_next_member(fscan->fs_relids, -1);
clauses = execstate->baserestrictinfo;
}

execstate->values = palloc(sizeof(Datum) * execstate->tupdesc->natts);
execstate->nulls = palloc(sizeof(bool) * execstate->tupdesc->natts);
execstate->qual_list = NULL;
foreach(lc, fscan->fdw_exprs)
{
extractRestrictions(bms_make_singleton(fscan->scan.scanrelid),
((Expr *) lfirst(lc)),
&execstate->qual_list);
}
execstate->qual_list = NULL;
foreach(lc, clauses)
{
extractRestrictions(bms_make_singleton(rtindex),
((Expr *) lfirst(lc)),
&execstate->qual_list);
}

execstate->subscanCxt = AllocSetContextCreate(
node->ss.ps.state->es_query_cxt,
Expand Down Expand Up @@ -1654,12 +1684,16 @@ multicorn_foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
ofpinfo = (MulticornPlanState *) fpinfo->outerrel->fdw_private;

/*
* If underlying scan relation has any local conditions, those conditions
* are required to be applied before performing aggregation. Hence the
* aggregate cannot be pushed down.
* If underlying scan relation has any quals with unsupported operators
* we cannot pushdown the aggregation.
*/
if (ofpinfo->local_conds)
return false;
foreach(lc, ofpinfo->qual_list)
{
MulticornBaseQual *qual = (MulticornBaseQual *) lfirst(lc);

if (!list_member(ofpinfo->operators_supported, makeString(qual->opname)))
return false;
}

/*
* Examine grouping expressions, as well as other expressions we'd need to
Expand Down Expand Up @@ -1980,6 +2014,8 @@ serializePlanState(MulticornPlanState * state)

result = lappend(result, state->group_clauses);

result = lappend(result, state->baserestrictinfo);

return result;
}

Expand All @@ -1996,6 +2032,16 @@ initializeExecState(void *internalstate)
Oid foreigntableid = ((Const *) lsecond(values))->constvalue;
List *pathkeys;

ForeignTable *ftable = GetForeignTable(foreigntableid);
Relation rel = RelationIdGetRelation(ftable->relid);
AttInMetadata *attinmeta;

attinmeta = TupleDescGetAttInMetadata(rel->rd_att);
execstate->qual_cinfos = palloc0(sizeof(ConversionInfo *) *
attinmeta->tupdesc->natts);
initConversioninfo(execstate->qual_cinfos, attinmeta, NULL);
RelationClose(rel);

/* Those list must be copied, because their memory context can become */
/* invalid during the execution (in particular with the cursor interface) */
execstate->target_list = copyObject(lthird(values));
Expand All @@ -2011,5 +2057,6 @@ initializeExecState(void *internalstate)
execstate->upper_rel_targets = list_nth(values, 4);
execstate->aggs = list_nth(values, 5);
execstate->group_clauses = list_nth(values, 6);
execstate->baserestrictinfo = list_nth(values, 7);
return execstate;
}
22 changes: 14 additions & 8 deletions src/multicorn.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ typedef struct MulticornPlanState
/* Details about upperrel pushdown fetched from the Python FDW instance */
bool groupby_supported;
List *agg_functions;
List *operators_supported;

/*
* Aggregation and grouping data to be passed to the execution phase.
Expand All @@ -118,9 +119,8 @@ typedef struct MulticornPlanState
*/
bool pushdown_safe;

/* baserestrictinfo clauses, broken down into safe and unsafe subsets. */
List *remote_conds;
List *local_conds;
/* qual clauses */
List *baserestrictinfo;

/* Actual remote restriction clauses for scan (sans RestrictInfos) */
List *final_remote_exprs;
Expand Down Expand Up @@ -169,6 +169,12 @@ typedef struct MulticornExecState
Datum *values;
bool *nulls;
ConversionInfo **cinfos;
/*
* In case of aggregations the upper rel target list does not correspond to
* the base table target list, so separate conversion information must be
* provided when converting the quals in the execute method.
*/
ConversionInfo **qual_cinfos;
/*
* List containing targets to be returned from Python in case of aggregations.
* List elements are aggregation keys or group_clauses elements.
Expand All @@ -187,6 +193,11 @@ typedef struct MulticornExecState
* List elements are column names for grouping.
*/
List *group_clauses;
/*
* Qual conditions parsed in the MulticornGetForeignRelSize
*/
List *baserestrictinfo;

/* Common buffer to avoid repeated allocations */
StringInfo buffer;
AttrNumber rowidAttno;
Expand Down Expand Up @@ -259,11 +270,6 @@ typedef struct MulticornDeparsedSortGroup
} MulticornDeparsedSortGroup;

/* deparse.c */
extern void multicorn_classify_conditions(PlannerInfo *root,
RelOptInfo *baserel,
List *input_conds,
List **remote_conds,
List **local_conds);
extern bool multicorn_is_foreign_expr(PlannerInfo *root,
RelOptInfo *baserel,
Expr *expr);
Expand Down
Loading