Skip to content

Commit

Permalink
Allow foreign and custom joins to handle EvalPlanQual rechecks.
Browse files Browse the repository at this point in the history
Commit e7cb7ee provided basic
infrastructure for allowing a foreign data wrapper or custom scan
provider to replace a join of one or more tables with a scan.
However, this infrastructure failed to take into account the need
for possible EvalPlanQual rechecks, and ExecScanFetch would fail
an assertion (or just overwrite memory) if such a check was attempted
for a plan containing a pushed-down join.  To fix, adjust the EPQ
machinery to skip some processing steps when scanrelid == 0, making
those the responsibility of scan's recheck method, which also has
the responsibility in this case of correctly populating the relevant
slot.

To allow foreign scans to gain control in the right place to make
use of this new facility, add a new, optional RecheckForeignScan
method.  Also, allow a foreign scan to have a child plan, which can
be used to correctly populate the slot (or perhaps for something
else, but this is the only use currently envisioned).

KaiGai Kohei, reviewed by Robert Haas, Etsuro Fujita, and Kyotaro
Horiguchi.
  • Loading branch information
robertmhaas committed Dec 8, 2015
1 parent edca44b commit 385f337
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 18 deletions.
10 changes: 7 additions & 3 deletions contrib/file_fdw/file_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ static ForeignScan *fileGetForeignPlan(PlannerInfo *root,
Oid foreigntableid,
ForeignPath *best_path,
List *tlist,
List *scan_clauses);
List *scan_clauses,
Plan *outer_plan);
static void fileExplainForeignScan(ForeignScanState *node, ExplainState *es);
static void fileBeginForeignScan(ForeignScanState *node, int eflags);
static TupleTableSlot *fileIterateForeignScan(ForeignScanState *node);
Expand Down Expand Up @@ -525,6 +526,7 @@ fileGetForeignPaths(PlannerInfo *root,
total_cost,
NIL, /* no pathkeys */
NULL, /* no outer rel either */
NULL, /* no extra plan */
coptions));

/*
Expand All @@ -544,7 +546,8 @@ fileGetForeignPlan(PlannerInfo *root,
Oid foreigntableid,
ForeignPath *best_path,
List *tlist,
List *scan_clauses)
List *scan_clauses,
Plan *outer_plan)
{
Index scan_relid = baserel->relid;

Expand All @@ -564,7 +567,8 @@ fileGetForeignPlan(PlannerInfo *root,
NIL, /* no expressions to evaluate */
best_path->fdw_private,
NIL, /* no custom tlist */
NIL /* no remote quals */ );
NIL, /* no remote quals */
outer_plan);
}

/*
Expand Down
12 changes: 9 additions & 3 deletions contrib/postgres_fdw/postgres_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
Oid foreigntableid,
ForeignPath *best_path,
List *tlist,
List *scan_clauses);
List *scan_clauses,
Plan *outer_plan);
static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
static void postgresReScanForeignScan(ForeignScanState *node);
Expand Down Expand Up @@ -535,6 +536,7 @@ postgresGetForeignPaths(PlannerInfo *root,
fpinfo->total_cost,
NIL, /* no pathkeys */
NULL, /* no outer rel either */
NULL, /* no extra plan */
NIL); /* no fdw_private list */
add_path(baserel, (Path *) path);

Expand Down Expand Up @@ -589,6 +591,7 @@ postgresGetForeignPaths(PlannerInfo *root,
total_cost,
usable_pathkeys,
NULL,
NULL,
NIL));
}

Expand Down Expand Up @@ -756,6 +759,7 @@ postgresGetForeignPaths(PlannerInfo *root,
total_cost,
NIL, /* no pathkeys */
param_info->ppi_req_outer,
NULL,
NIL); /* no fdw_private list */
add_path(baserel, (Path *) path);
}
Expand All @@ -771,7 +775,8 @@ postgresGetForeignPlan(PlannerInfo *root,
Oid foreigntableid,
ForeignPath *best_path,
List *tlist,
List *scan_clauses)
List *scan_clauses,
Plan *outer_plan)
{
PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
Index scan_relid = baserel->relid;
Expand Down Expand Up @@ -915,7 +920,8 @@ postgresGetForeignPlan(PlannerInfo *root,
params_list,
fdw_private,
NIL, /* no custom tlist */
remote_exprs);
remote_exprs,
outer_plan);
}

/*
Expand Down
42 changes: 39 additions & 3 deletions doc/src/sgml/fdwhandler.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ GetForeignPlan (PlannerInfo *root,
Oid foreigntableid,
ForeignPath *best_path,
List *tlist,
List *scan_clauses);
List *scan_clauses,
Plan *outer_plan);
</programlisting>

Create a <structname>ForeignScan</> plan node from the selected foreign
Expand Down Expand Up @@ -765,6 +766,35 @@ RefetchForeignRow (EState *estate,
See <xref linkend="fdw-row-locking"> for more information.
</para>

<para>
<programlisting>
bool
RecheckForeignScan (ForeignScanState *node, TupleTableSlot *slot);
</programlisting>
Recheck that a previously-returned tuple still matches the relevant
scan and join qualifiers, and possibly provide a modified version of
the tuple. For foreign data wrappers which do not perform join pushdown,
it will typically be more convenient to set this to <literal>NULL</> and
instead set <structfield>fdw_recheck_quals</structfield> appropriately.
When outer joins are pushed down, however, it isn't sufficient to
reapply the checks relevant to all the base tables to the result tuple,
even if all needed attributes are present, because failure to match some
qualifier might result in some attributes going to NULL, rather than in
no tuple being returned. <literal>RecheckForeignScan</> can recheck
qualifiers and return true if they are still satisfied and false
otherwise, but it can also store a replacement tuple into the supplied
slot.
</para>

<para>
To implement join pushdown, a foreign data wrapper will typically
construct an alternative local join plan which is used only for
rechecks; this will become the outer subplan of the
<literal>ForeignScan</>. When a recheck is required, this subplan
can be executed and the resulting tuple can be stored in the slot.
This plan need not be efficient since no base table will return more
that one row; for example, it may implement all joins as nested loops.
</para>
</sect2>

<sect2 id="fdw-callbacks-explain">
Expand Down Expand Up @@ -1137,11 +1167,17 @@ GetForeignServerByName(const char *name, bool missing_ok);

<para>
Any clauses removed from the plan node's qual list must instead be added
to <literal>fdw_recheck_quals</> in order to ensure correct behavior
to <literal>fdw_recheck_quals</> or rechecked by
<literal>RecheckForeignScan</> in order to ensure correct behavior
at the <literal>READ COMMITTED</> isolation level. When a concurrent
update occurs for some other table involved in the query, the executor
may need to verify that all of the original quals are still satisfied for
the tuple, possibly against a different set of parameter values.
the tuple, possibly against a different set of parameter values. Using
<literal>fdw_recheck_quals</> is typically easier than implementing checks
inside <literal>RecheckForeignScan</>, but this method will be
insufficient when outer joins have been pushed down, since the join tuples
in that case might have some fields go to NULL without rejecting the
tuple entirely.
</para>

<para>
Expand Down
44 changes: 40 additions & 4 deletions src/backend/executor/execScan.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,21 @@ ExecScanFetch(ScanState *node,
*/
Index scanrelid = ((Scan *) node->ps.plan)->scanrelid;

Assert(scanrelid > 0);
if (estate->es_epqTupleSet[scanrelid - 1])
if (scanrelid == 0)
{
TupleTableSlot *slot = node->ss_ScanTupleSlot;

/*
* This is a ForeignScan or CustomScan which has pushed down a
* join to the remote side. The recheck method is responsible not
* only for rechecking the scan/join quals but also for storing
* the correct tuple in the slot.
*/
if (!(*recheckMtd) (node, slot))
ExecClearTuple(slot); /* would not be returned by scan */
return slot;
}
else if (estate->es_epqTupleSet[scanrelid - 1])
{
TupleTableSlot *slot = node->ss_ScanTupleSlot;

Expand Down Expand Up @@ -347,8 +360,31 @@ ExecScanReScan(ScanState *node)
{
Index scanrelid = ((Scan *) node->ps.plan)->scanrelid;

Assert(scanrelid > 0);
if (scanrelid > 0)
estate->es_epqScanDone[scanrelid - 1] = false;
else
{
Bitmapset *relids;
int rtindex = -1;

estate->es_epqScanDone[scanrelid - 1] = false;
/*
* If an FDW or custom scan provider has replaced the join with a
* scan, there are multiple RTIs; reset the epqScanDone flag for
* all of them.
*/
if (IsA(node->ps.plan, ForeignScan))
relids = ((ForeignScan *) node->ps.plan)->fs_relids;
else if (IsA(node->ps.plan, CustomScan))
relids = ((CustomScan *) node->ps.plan)->custom_relids;
else
elog(ERROR, "unexpected scan node: %d",
(int) nodeTag(node->ps.plan));

while ((rtindex = bms_next_member(relids, rtindex)) >= 0)
{
Assert(rtindex > 0);
estate->es_epqScanDone[rtindex - 1] = false;
}
}
}
}
32 changes: 32 additions & 0 deletions src/backend/executor/nodeForeignscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ ForeignNext(ForeignScanState *node)
static bool
ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot)
{
FdwRoutine *fdwroutine = node->fdwroutine;
ExprContext *econtext;

/*
Expand All @@ -85,6 +86,18 @@ ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot)

ResetExprContext(econtext);

/*
* If an outer join is pushed down, RecheckForeignScan may need to store a
* different tuple in the slot, because a different set of columns may go
* to NULL upon recheck. Otherwise, it shouldn't need to change the slot
* contents, just return true or false to indicate whether the quals still
* pass. For simple cases, setting fdw_recheck_quals may be easier than
* providing this callback.
*/
if (fdwroutine->RecheckForeignScan &&
!fdwroutine->RecheckForeignScan(node, slot))
return false;

return ExecQual(node->fdw_recheck_quals, econtext, false);
}

Expand Down Expand Up @@ -205,6 +218,11 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
scanstate->fdwroutine = fdwroutine;
scanstate->fdw_state = NULL;

/* Initialize any outer plan. */
if (outerPlan(node))
outerPlanState(scanstate) =
ExecInitNode(outerPlan(node), estate, eflags);

/*
* Tell the FDW to initialize the scan.
*/
Expand All @@ -225,6 +243,10 @@ ExecEndForeignScan(ForeignScanState *node)
/* Let the FDW shut down */
node->fdwroutine->EndForeignScan(node);

/* Shut down any outer plan. */
if (outerPlanState(node))
ExecEndNode(outerPlanState(node));

/* Free the exprcontext */
ExecFreeExprContext(&node->ss.ps);

Expand All @@ -246,7 +268,17 @@ ExecEndForeignScan(ForeignScanState *node)
void
ExecReScanForeignScan(ForeignScanState *node)
{
PlanState *outerPlan = outerPlanState(node);

node->fdwroutine->ReScanForeignScan(node);

/*
* If chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode. outerPlan may also be NULL, in which case there
* is nothing to rescan at all.
*/
if (outerPlan != NULL && outerPlan->chgParam == NULL)
ExecReScan(outerPlan);

ExecScanReScan(&node->ss);
}
1 change: 1 addition & 0 deletions src/backend/nodes/outfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -1683,6 +1683,7 @@ _outForeignPath(StringInfo str, const ForeignPath *node)

_outPathInfo(str, (const Path *) node);

WRITE_NODE_FIELD(fdw_outerpath);
WRITE_NODE_FIELD(fdw_private);
}

Expand Down
13 changes: 10 additions & 3 deletions src/backend/optimizer/plan/createplan.c
Original file line number Diff line number Diff line change
Expand Up @@ -2095,11 +2095,16 @@ create_foreignscan_plan(PlannerInfo *root, ForeignPath *best_path,
Index scan_relid = rel->relid;
Oid rel_oid = InvalidOid;
Bitmapset *attrs_used = NULL;
Plan *outer_plan = NULL;
ListCell *lc;
int i;

Assert(rel->fdwroutine != NULL);

/* transform the child path if any */
if (best_path->fdw_outerpath)
outer_plan = create_plan_recurse(root, best_path->fdw_outerpath);

/*
* If we're scanning a base relation, fetch its OID. (Irrelevant if
* scanning a join relation.)
Expand Down Expand Up @@ -2129,7 +2134,8 @@ create_foreignscan_plan(PlannerInfo *root, ForeignPath *best_path,
*/
scan_plan = rel->fdwroutine->GetForeignPlan(root, rel, rel_oid,
best_path,
tlist, scan_clauses);
tlist, scan_clauses,
outer_plan);

/* Copy cost data from Path to Plan; no need to make FDW do this */
copy_generic_path_info(&scan_plan->scan.plan, &best_path->path);
Expand Down Expand Up @@ -3747,15 +3753,16 @@ make_foreignscan(List *qptlist,
List *fdw_exprs,
List *fdw_private,
List *fdw_scan_tlist,
List *fdw_recheck_quals)
List *fdw_recheck_quals,
Plan *outer_plan)
{
ForeignScan *node = makeNode(ForeignScan);
Plan *plan = &node->scan.plan;

/* cost will be filled in by create_foreignscan_plan */
plan->targetlist = qptlist;
plan->qual = qpqual;
plan->lefttree = NULL;
plan->lefttree = outer_plan;
plan->righttree = NULL;
node->scan.scanrelid = scanrelid;
/* fs_server will be filled in by create_foreignscan_plan */
Expand Down
2 changes: 2 additions & 0 deletions src/backend/optimizer/util/pathnode.c
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,7 @@ create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel,
double rows, Cost startup_cost, Cost total_cost,
List *pathkeys,
Relids required_outer,
Path *fdw_outerpath,
List *fdw_private)
{
ForeignPath *pathnode = makeNode(ForeignPath);
Expand All @@ -1521,6 +1522,7 @@ create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel,
pathnode->path.total_cost = total_cost;
pathnode->path.pathkeys = pathkeys;

pathnode->fdw_outerpath = fdw_outerpath;
pathnode->fdw_private = fdw_private;

return pathnode;
Expand Down
7 changes: 6 additions & 1 deletion src/include/foreign/fdwapi.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,17 @@ typedef ForeignScan *(*GetForeignPlan_function) (PlannerInfo *root,
Oid foreigntableid,
ForeignPath *best_path,
List *tlist,
List *scan_clauses);
List *scan_clauses,
Plan *outer_plan);

typedef void (*BeginForeignScan_function) (ForeignScanState *node,
int eflags);

typedef TupleTableSlot *(*IterateForeignScan_function) (ForeignScanState *node);

typedef bool (*RecheckForeignScan_function) (ForeignScanState *node,
TupleTableSlot *slot);

typedef void (*ReScanForeignScan_function) (ForeignScanState *node);

typedef void (*EndForeignScan_function) (ForeignScanState *node);
Expand Down Expand Up @@ -162,6 +166,7 @@ typedef struct FdwRoutine
/* Functions for SELECT FOR UPDATE/SHARE row locking */
GetForeignRowMarkType_function GetForeignRowMarkType;
RefetchForeignRow_function RefetchForeignRow;
RecheckForeignScan_function RecheckForeignScan;

/* Support functions for EXPLAIN */
ExplainForeignScan_function ExplainForeignScan;
Expand Down
1 change: 1 addition & 0 deletions src/include/nodes/relation.h
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,7 @@ typedef struct TidPath
typedef struct ForeignPath
{
Path path;
Path *fdw_outerpath;
List *fdw_private;
} ForeignPath;

Expand Down
1 change: 1 addition & 0 deletions src/include/optimizer/pathnode.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ extern ForeignPath *create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel,
double rows, Cost startup_cost, Cost total_cost,
List *pathkeys,
Relids required_outer,
Path *fdw_outerpath,
List *fdw_private);

extern Relids calc_nestloop_required_outer(Path *outer_path, Path *inner_path);
Expand Down
Loading

0 comments on commit 385f337

Please sign in to comment.