Skip to content

Commit

Permalink
Add scan plan logic for remote joins
Browse files Browse the repository at this point in the history
This patch adds the missing functionality to create scan plans for
remote joins. Most of the code is a backport from PG Upstream.
  • Loading branch information
jnidzwetzki committed Jan 23, 2023
1 parent f211294 commit 2a042c3
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 17 deletions.
2 changes: 1 addition & 1 deletion tsl/src/fdw/data_node_scan_plan.c
Expand Up @@ -836,7 +836,7 @@ data_node_scan_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *best_

memset(&scaninfo, 0, sizeof(ScanInfo));

fdw_scan_info_init(&scaninfo, root, rel, &best_path->path, clauses);
fdw_scan_info_init(&scaninfo, root, rel, &best_path->path, clauses, NULL);

cscan->methods = &data_node_scan_plan_methods;
cscan->custom_plans = custom_plans;
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/fdw/fdw.c
Expand Up @@ -139,7 +139,7 @@ get_foreign_plan(PlannerInfo *root, RelOptInfo *foreignrel, Oid foreigntableid,

memset(&info, 0, sizeof(ScanInfo));

fdw_scan_info_init(&info, root, foreignrel, &best_path->path, scan_clauses);
fdw_scan_info_init(&info, root, foreignrel, &best_path->path, scan_clauses, outer_plan);

/*
* Create the ForeignScan node for the given relation.
Expand Down
109 changes: 95 additions & 14 deletions tsl/src/fdw/scan_plan.c
Expand Up @@ -132,7 +132,7 @@ add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, Path *epq_pa

if (create_scan_path)
{
Assert(IS_SIMPLE_REL(rel));
Assert(IS_SIMPLE_REL(rel) || IS_JOIN_REL(rel));
scan_path = create_scan_path(root,
rel,
NULL,
Expand Down Expand Up @@ -437,11 +437,11 @@ eval_stable_functions(PlannerInfo *root, Node *node)

void
fdw_scan_info_init(ScanInfo *scaninfo, PlannerInfo *root, RelOptInfo *rel, Path *best_path,
List *scan_clauses)
List *scan_clauses, Plan *outer_plan)
{
TsFdwRelInfo *fpinfo = fdw_relinfo_get(rel);
List *remote_where = NIL;
List *remote_having = NIL;
List *remote_exprs = NIL;
List *local_exprs = NIL;
List *params_list = NIL;
List *fdw_scan_tlist = NIL;
Expand Down Expand Up @@ -487,11 +487,11 @@ fdw_scan_info_init(ScanInfo *scaninfo, PlannerInfo *root, RelOptInfo *rel, Path
continue;

if (list_member_ptr(fpinfo->remote_conds, rinfo))
remote_where = lappend(remote_where, rinfo->clause);
remote_exprs = lappend(remote_exprs, rinfo->clause);
else if (list_member_ptr(fpinfo->local_conds, rinfo))
local_exprs = lappend(local_exprs, rinfo->clause);
else if (ts_is_foreign_expr(root, rel, rinfo->clause))
remote_where = lappend(remote_where, rinfo->clause);
remote_exprs = lappend(remote_exprs, rinfo->clause);
else
local_exprs = lappend(local_exprs, rinfo->clause);
}
Expand All @@ -500,13 +500,95 @@ fdw_scan_info_init(ScanInfo *scaninfo, PlannerInfo *root, RelOptInfo *rel, Path
* For a base-relation scan, we have to support EPQ recheck, which
* should recheck all the remote quals.
*/
fdw_recheck_quals = remote_where;
fdw_recheck_quals = remote_exprs;
}
else if (IS_JOIN_REL(rel))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("foreign joins are not supported")));
/*
* Join relation or upper relation - set scan_relid to 0.
*/
scan_relid = 0;

/*
* For a join rel, baserestrictinfo is NIL and we are not considering
* parameterization right now, so there should be no scan_clauses for
* a joinrel or an upper rel either.
*/
Assert(!scan_clauses);

/*
* Instead we get the conditions to apply from the fdw_private
* structure.
*/
remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
local_exprs = extract_actual_clauses(fpinfo->local_conds, false);

/*
* We leave fdw_recheck_quals empty in this case, since we never need
* to apply EPQ recheck clauses. In the case of a joinrel, EPQ
* recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
* If we're planning an upperrel (ie, remote grouping or aggregation)
* then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
* allowed, and indeed we *can't* put the remote clauses into
* fdw_recheck_quals because the unaggregated Vars won't be available
* locally.
*/

/* Build the list of columns to be fetched from the foreign server. */
fdw_scan_tlist = build_tlist_to_deparse(rel);

/*
* Ensure that the outer plan produces a tuple whose descriptor
* matches our scan tuple slot. Also, remove the local conditions
* from outer plan's quals, lest they be evaluated twice, once by the
* local plan and once by the scan.
*/
if (outer_plan)
{
ListCell *lc;

/*
* Right now, we only consider grouping and aggregation beyond
* joins. Queries involving aggregates or grouping do not require
* EPQ mechanism, hence should not have an outer plan here.
*/
Assert(!IS_UPPER_REL(rel));

/*
* First, update the plan's qual list if possible. In some cases
* the quals might be enforced below the topmost plan level, in
* which case we'll fail to remove them; it's not worth working
* harder than this.
*/
foreach (lc, local_exprs)
{
Node *qual = lfirst(lc);

outer_plan->qual = list_delete(outer_plan->qual, qual);

/*
* For an inner join the local conditions of foreign scan plan
* can be part of the joinquals as well. (They might also be
* in the mergequals or hashquals, but we can't touch those
* without breaking the plan.)
*/
if (IsA(outer_plan, NestLoop) || IsA(outer_plan, MergeJoin) ||
IsA(outer_plan, HashJoin))
{
Join *join_plan = (Join *) outer_plan;

if (join_plan->jointype == JOIN_INNER)
join_plan->joinqual = list_delete(join_plan->joinqual, qual);
}
}

/*
* Now fix the subplan's tlist --- this might result in inserting
* a Result node atop the plan tree.
*/
outer_plan =
change_plan_targetlist(outer_plan, fdw_scan_tlist, best_path->parallel_safe);
}
}
else
{
Expand All @@ -530,7 +612,7 @@ fdw_scan_info_init(ScanInfo *scaninfo, PlannerInfo *root, RelOptInfo *rel, Path
*/
TsFdwRelInfo *ofpinfo;
ofpinfo = fdw_relinfo_get(fpinfo->outerrel);
remote_where = extract_actual_clauses(ofpinfo->remote_conds, false);
remote_exprs = extract_actual_clauses(ofpinfo->remote_conds, false);
remote_having = extract_actual_clauses(fpinfo->remote_conds, false);
local_exprs = extract_actual_clauses(fpinfo->local_conds, false);

Expand Down Expand Up @@ -559,7 +641,7 @@ fdw_scan_info_init(ScanInfo *scaninfo, PlannerInfo *root, RelOptInfo *rel, Path
* the whitelisted stable functions, see `function_is_whitelisted()`. So
* this code only has to deal with such functions.
*/
remote_where = (List *) eval_stable_functions(root, (Node *) remote_where);
remote_exprs = (List *) eval_stable_functions(root, (Node *) remote_exprs);
remote_having = (List *) eval_stable_functions(root, (Node *) remote_having);

/*
Expand All @@ -571,7 +653,7 @@ fdw_scan_info_init(ScanInfo *scaninfo, PlannerInfo *root, RelOptInfo *rel, Path
root,
rel,
fdw_scan_tlist,
remote_where,
remote_exprs,
remote_having,
best_path->pathkeys,
false,
Expand All @@ -580,7 +662,7 @@ fdw_scan_info_init(ScanInfo *scaninfo, PlannerInfo *root, RelOptInfo *rel, Path
fpinfo->sca);

/* Remember remote_exprs for possible use by PlanDirectModify */
fpinfo->final_remote_exprs = remote_where;
fpinfo->final_remote_exprs = remote_exprs;

/* Build the chunk oid list for use by EXPLAIN. */
List *chunk_oids = NIL;
Expand All @@ -602,7 +684,6 @@ fdw_scan_info_init(ScanInfo *scaninfo, PlannerInfo *root, RelOptInfo *rel, Path
makeInteger(fpinfo->fetch_size),
makeInteger(fpinfo->server->serverid),
chunk_oids);
Assert(!IS_JOIN_REL(rel));

if (IS_UPPER_REL(rel))
fdw_private = lappend(fdw_private, makeString(fpinfo->relation_name->data));
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/fdw/scan_plan.h
Expand Up @@ -40,7 +40,7 @@ typedef Path *(*CreateUpperPathFunc)(PlannerInfo *root, RelOptInfo *rel, PathTar
List *pathkeys, Path *fdw_outerpath, List *fdw_private);

extern void fdw_scan_info_init(ScanInfo *scaninfo, PlannerInfo *root, RelOptInfo *rel,
Path *best_path, List *scan_clauses);
Path *best_path, List *scan_clauses, Plan *outer_plan);

extern void fdw_add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, Path *epq_path,
CreatePathFunc create_scan_path);
Expand Down

0 comments on commit 2a042c3

Please sign in to comment.