Permalink
Browse files

Generate parallel sequential scan plans in simple cases.

Add a new flag, consider_parallel, to each RelOptInfo, indicating
whether a plan for that relation could conceivably be run inside of
a parallel worker.  Right now, we're pretty conservative: for example,
it might be possible to defer applying a parallel-restricted qual
in a worker, and later do it in the leader, but right now we just
don't try to parallelize access to that relation.  That's probably
the right decision in most cases, anyway.

Using the new flag, generate parallel sequential scan plans for plain
baserels, meaning that we now have parallel sequential scan in
PostgreSQL.  The logic here is pretty unsophisticated right now: the
costing model probably isn't right in detail, and we can't push joins
beneath Gather nodes, so the number of plans that can actually benefit
from this is pretty limited right now.  Lots more work is needed.
Nevertheless, it seems time to enable this functionality so that all
this code can actually be tested easily by users and developers.

Note that, if you wish to test this functionality, it will be
necessary to set max_parallel_degree to a value greater than the
default of 0.  Once a few more loose ends have been tidied up here, we
might want to consider changing the default value of this GUC, but
I'm leaving it alone for now.

Along the way, fix a bug in cost_gather: the previous coding thought
that a Gather node's transfer overhead should be costed on the basis of
the relation size rather than the number of tuples that actually need
to be passed off to the leader.

Patch by me, reviewed in earlier versions by Amit Kapila.
  • Loading branch information...
Robert Haas
Robert Haas committed Nov 11, 2015
1 parent f0661c4 commit 80558c1f5aa109d08db0fbd76a6d370f900628a8
@@ -1882,6 +1882,7 @@ _outRelOptInfo(StringInfo str, const RelOptInfo *node)
WRITE_INT_FIELD(width);
WRITE_BOOL_FIELD(consider_startup);
WRITE_BOOL_FIELD(consider_param_startup);
WRITE_BOOL_FIELD(consider_parallel);
WRITE_NODE_FIELD(reltargetlist);
WRITE_NODE_FIELD(pathlist);
WRITE_NODE_FIELD(ppilist);
@@ -21,6 +21,7 @@
#include "access/tsmapi.h"
#include "catalog/pg_class.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_proc.h"
#include "foreign/fdwapi.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
@@ -71,6 +72,9 @@ static void set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
Index rti, RangeTblEntry *rte);
static void set_plain_rel_size(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte);
static void set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte);
static bool function_rte_parallel_ok(RangeTblEntry *rte);
static void set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte);
static void set_tablesample_rel_size(PlannerInfo *root, RelOptInfo *rel,
@@ -158,7 +162,8 @@ make_one_rel(PlannerInfo *root, List *joinlist)
set_base_rel_consider_startup(root);

/*
* Generate access paths for the base rels.
* Generate access paths for the base rels. set_base_rel_sizes also
* sets the consider_parallel flag for each baserel, if appropriate.
*/
set_base_rel_sizes(root);
set_base_rel_pathlists(root);
@@ -222,9 +227,12 @@ set_base_rel_consider_startup(PlannerInfo *root)
/*
* set_base_rel_sizes
* Set the size estimates (rows and widths) for each base-relation entry.
* Also determine whether to consider parallel paths for base relations.
*
* We do this in a separate pass over the base rels so that rowcount
* estimates are available for parameterized path generation.
* estimates are available for parameterized path generation, and also so
* that the consider_parallel flag is set correctly before we begin to
* generate paths.
*/
static void
set_base_rel_sizes(PlannerInfo *root)
@@ -234,6 +242,7 @@ set_base_rel_sizes(PlannerInfo *root)
for (rti = 1; rti < root->simple_rel_array_size; rti++)
{
RelOptInfo *rel = root->simple_rel_array[rti];
RangeTblEntry *rte;

/* there may be empty slots corresponding to non-baserel RTEs */
if (rel == NULL)
@@ -245,7 +254,19 @@ set_base_rel_sizes(PlannerInfo *root)
if (rel->reloptkind != RELOPT_BASEREL)
continue;

set_rel_size(root, rel, rti, root->simple_rte_array[rti]);
rte = root->simple_rte_array[rti];

/*
* If parallelism is allowable for this query in general, see whether
* it's allowable for this rel in particular. We have to do this
* before set_rel_size, because that if this is an inheritance parent,
* set_append_rel_size will pass the consider_parallel flag down to
* inheritance children.
*/
if (root->glob->parallelModeOK)
set_rel_consider_parallel(root, rel, rte);

set_rel_size(root, rel, rti, rte);
}
}

@@ -458,6 +479,131 @@ set_plain_rel_size(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
set_baserel_size_estimates(root, rel);
}

/*
* If this relation could possibly be scanned from within a worker, then set
* the consider_parallel flag. The flag has previously been initialized to
* false, so we just bail out if it becomes clear that we can't safely set it.
*/
static void
set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte)
{
/* Don't call this if parallelism is disallowed for the entire query. */
Assert(root->glob->parallelModeOK);

/* Don't call this for non-baserels. */
Assert(rel->reloptkind == RELOPT_BASEREL);

/* Assorted checks based on rtekind. */
switch (rte->rtekind)
{
case RTE_RELATION:
/*
* Currently, parallel workers can't access the leader's temporary
* tables. We could possibly relax this if the wrote all of its
* local buffers at the start of the query and made no changes
* thereafter (maybe we could allow hint bit changes), and if we
* taught the workers to read them. Writing a large number of
* temporary buffers could be expensive, though, and we don't have
* the rest of the necessary infrastructure right now anyway. So
* for now, bail out if we see a temporary table.
*/
if (get_rel_persistence(rte->relid) == RELPERSISTENCE_TEMP)
return;

/*
* Table sampling can be pushed down to workers if the sample
* function and its arguments are safe.
*/
if (rte->tablesample != NULL)
{
Oid proparallel = func_parallel(rte->tablesample->tsmhandler);

if (proparallel != PROPARALLEL_SAFE)
return;
if (has_parallel_hazard((Node *) rte->tablesample->args,
false))
return;
return;
}
break;

case RTE_SUBQUERY:
/*
* Subplans currently aren't passed to workers. Even if they
* were, the subplan might be using parallelism internally, and
* we can't support nested Gather nodes at present. Finally,
* we don't have a good way of knowing whether the subplan
* involves any parallel-restricted operations. It would be
* nice to relax this restriction some day, but it's going to
* take a fair amount of work.
*/
return;

case RTE_JOIN:
/* Shouldn't happen; we're only considering baserels here. */
Assert(false);
return;

case RTE_FUNCTION:
/* Check for parallel-restricted functions. */
if (!function_rte_parallel_ok(rte))
return;
break;

case RTE_VALUES:
/*
* The data for a VALUES clause is stored in the plan tree itself,
* so scanning it in a worker is fine.
*/
break;

case RTE_CTE:
/*
* CTE tuplestores aren't shared among parallel workers, so we
* force all CTE scans to happen in the leader. Also, populating
* the CTE would require executing a subplan that's not available
* in the worker, might be parallel-restricted, and must get
* executed only once.
*/
return;
}

/*
* If there's anything in baserestrictinfo that's parallel-restricted,
* we give up on parallelizing access to this relation. We could consider
* instead postponing application of the restricted quals until we're
* above all the parallelism in the plan tree, but it's not clear that
* this would be a win in very many cases, and it might be tricky to make
* outer join clauses work correctly.
*/
if (has_parallel_hazard((Node *) rel->baserestrictinfo, false))
return;

/* We have a winner. */
rel->consider_parallel = true;
}

/*
* Check whether a function RTE is scanning something parallel-restricted.
*/
static bool
function_rte_parallel_ok(RangeTblEntry *rte)
{
ListCell *lc;

foreach(lc, rte->functions)
{
RangeTblFunction *rtfunc = (RangeTblFunction *) lfirst(lc);

Assert(IsA(rtfunc, RangeTblFunction));
if (has_parallel_hazard(rtfunc->funcexpr, false))
return false;
}

return true;
}

/*
* set_plain_rel_pathlist
* Build access paths for a plain relation (no subquery, no inheritance)
@@ -466,6 +612,7 @@ static void
set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
{
Relids required_outer;
int parallel_threshold = 1000;

/*
* We don't support pushing join clauses into the quals of a seqscan, but
@@ -477,6 +624,40 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
/* Consider sequential scan */
add_path(rel, create_seqscan_path(root, rel, required_outer, 0));

/* Consider parallel sequential scan */
if (rel->consider_parallel && rel->pages > parallel_threshold &&
required_outer == NULL)
{
Path *path;
int parallel_degree = 1;

/*
* Limit the degree of parallelism logarithmically based on the size
* of the relation. This probably needs to be a good deal more
* sophisticated, but we need something here for now.
*/
while (rel->pages > parallel_threshold * 3 &&
parallel_degree < max_parallel_degree)
{
parallel_degree++;
parallel_threshold *= 3;
if (parallel_threshold >= PG_INT32_MAX / 3)
break;
}

/*
* Ideally we should consider postponing the gather operation until
* much later, after we've pushed joins and so on atop the parallel
* sequential scan path. But we don't have the infrastructure for
* that yet, so just do this for now.
*/
path = create_seqscan_path(root, rel, required_outer, parallel_degree);
path = (Path *)
create_gather_path(root, rel, path, required_outer,
parallel_degree);
add_path(rel, path);
}

/* Consider index scans */
create_index_paths(root, rel);

@@ -714,6 +895,9 @@ set_append_rel_size(PlannerInfo *root, RelOptInfo *rel,
continue;
}

/* Copy consider_parallel flag from parent. */
childrel->consider_parallel = rel->consider_parallel;

/*
* CE failed, so finish copying/modifying targetlist and join quals.
*
@@ -334,7 +334,7 @@ cost_gather(GatherPath *path, PlannerInfo *root,

/* Parallel setup and communication cost. */
startup_cost += parallel_setup_cost;
run_cost += parallel_tuple_cost * rel->tuples;
run_cost += parallel_tuple_cost * path->path.rows;

path->path.startup_cost = startup_cost;
path->path.total_cost = (startup_cost + run_cost);
@@ -20,6 +20,7 @@
*/
#include "postgres.h"

#include "optimizer/clauses.h"
#include "optimizer/orclauses.h"
#include "optimizer/pathnode.h"
#include "optimizer/paths.h"
@@ -70,6 +71,17 @@ query_planner(PlannerInfo *root, List *tlist,
/* We need a dummy joinrel to describe the empty set of baserels */
final_rel = build_empty_join_rel(root);

/*
* If query allows parallelism in general, check whether the quals
* are parallel-restricted. There's currently no real benefit to
* setting this flag correctly because we can't yet reference subplans
* from parallel workers. But that might change someday, so set this
* correctly anyway.
*/
if (root->glob->parallelModeOK)
final_rel->consider_parallel =
!has_parallel_hazard(parse->jointree->quals, false);

/* The only path for it is a trivial Result path */
add_path(final_rel, (Path *)
create_result_path((List *) parse->jointree->quals));
@@ -204,7 +204,8 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
/*
* Assess whether it's feasible to use parallel mode for this query.
* We can't do this in a standalone backend, or if the command will
* try to modify any data, or if this is a cursor operation, or if any
* try to modify any data, or if this is a cursor operation, or if
* GUCs are set to values that don't permit parallelism, or if
* parallel-unsafe functions are present in the query tree.
*
* For now, we don't try to use parallel mode if we're running inside
@@ -223,9 +224,9 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
glob->parallelModeOK = (cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 &&
IsUnderPostmaster && dynamic_shared_memory_type != DSM_IMPL_NONE &&
parse->commandType == CMD_SELECT && !parse->hasModifyingCTE &&
parse->utilityStmt == NULL && !IsParallelWorker() &&
!IsolationIsSerializable() &&
!contain_parallel_unsafe((Node *) parse);
parse->utilityStmt == NULL && max_parallel_degree > 0 &&
!IsParallelWorker() && !IsolationIsSerializable() &&
!has_parallel_hazard((Node *) parse, true);

/*
* glob->parallelModeOK should tell us whether it's necessary to impose
Oops, something went wrong.

0 comments on commit 80558c1

Please sign in to comment.