Skip to content

Commit

Permalink
Support parallel aggregation.
Browse files Browse the repository at this point in the history
Parallel workers can now partially aggregate the data and pass the
transition values back to the leader, which can combine the partial
results to produce the final answer.

David Rowley, based on earlier work by Haribabu Kommi.  Reviewed by
Álvaro Herrera, Tomas Vondra, Amit Kapila, James Sewell, and me.
  • Loading branch information
robertmhaas committed Mar 21, 2016
1 parent 7fa0064 commit e06a389
Show file tree
Hide file tree
Showing 23 changed files with 911 additions and 83 deletions.
8 changes: 8 additions & 0 deletions src/backend/executor/execQual.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -4515,6 +4515,14 @@ ExecInitExpr(Expr *node, PlanState *parent)
if (parent && IsA(parent, AggState)) if (parent && IsA(parent, AggState))
{ {
AggState *aggstate = (AggState *) parent; AggState *aggstate = (AggState *) parent;
Aggref *aggref = (Aggref *) node;

if (aggstate->finalizeAggs &&
aggref->aggoutputtype != aggref->aggtype)
{
/* planner messed up */
elog(ERROR, "Aggref aggoutputtype must match aggtype");
}


aggstate->aggs = lcons(astate, aggstate->aggs); aggstate->aggs = lcons(astate, aggstate->aggs);
aggstate->numaggs++; aggstate->numaggs++;
Expand Down
1 change: 1 addition & 0 deletions src/backend/nodes/copyfuncs.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -1233,6 +1233,7 @@ _copyAggref(const Aggref *from)


COPY_SCALAR_FIELD(aggfnoid); COPY_SCALAR_FIELD(aggfnoid);
COPY_SCALAR_FIELD(aggtype); COPY_SCALAR_FIELD(aggtype);
COPY_SCALAR_FIELD(aggoutputtype);
COPY_SCALAR_FIELD(aggcollid); COPY_SCALAR_FIELD(aggcollid);
COPY_SCALAR_FIELD(inputcollid); COPY_SCALAR_FIELD(inputcollid);
COPY_NODE_FIELD(aggdirectargs); COPY_NODE_FIELD(aggdirectargs);
Expand Down
1 change: 1 addition & 0 deletions src/backend/nodes/equalfuncs.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ _equalAggref(const Aggref *a, const Aggref *b)
{ {
COMPARE_SCALAR_FIELD(aggfnoid); COMPARE_SCALAR_FIELD(aggfnoid);
COMPARE_SCALAR_FIELD(aggtype); COMPARE_SCALAR_FIELD(aggtype);
COMPARE_SCALAR_FIELD(aggoutputtype);
COMPARE_SCALAR_FIELD(aggcollid); COMPARE_SCALAR_FIELD(aggcollid);
COMPARE_SCALAR_FIELD(inputcollid); COMPARE_SCALAR_FIELD(inputcollid);
COMPARE_NODE_FIELD(aggdirectargs); COMPARE_NODE_FIELD(aggdirectargs);
Expand Down
2 changes: 1 addition & 1 deletion src/backend/nodes/nodeFuncs.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ exprType(const Node *expr)
type = ((const Param *) expr)->paramtype; type = ((const Param *) expr)->paramtype;
break; break;
case T_Aggref: case T_Aggref:
type = ((const Aggref *) expr)->aggtype; type = ((const Aggref *) expr)->aggoutputtype;
break; break;
case T_GroupingFunc: case T_GroupingFunc:
type = INT4OID; type = INT4OID;
Expand Down
1 change: 1 addition & 0 deletions src/backend/nodes/outfuncs.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -1033,6 +1033,7 @@ _outAggref(StringInfo str, const Aggref *node)


WRITE_OID_FIELD(aggfnoid); WRITE_OID_FIELD(aggfnoid);
WRITE_OID_FIELD(aggtype); WRITE_OID_FIELD(aggtype);
WRITE_OID_FIELD(aggoutputtype);
WRITE_OID_FIELD(aggcollid); WRITE_OID_FIELD(aggcollid);
WRITE_OID_FIELD(inputcollid); WRITE_OID_FIELD(inputcollid);
WRITE_NODE_FIELD(aggdirectargs); WRITE_NODE_FIELD(aggdirectargs);
Expand Down
1 change: 1 addition & 0 deletions src/backend/nodes/readfuncs.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ _readAggref(void)


READ_OID_FIELD(aggfnoid); READ_OID_FIELD(aggfnoid);
READ_OID_FIELD(aggtype); READ_OID_FIELD(aggtype);
READ_OID_FIELD(aggoutputtype);
READ_OID_FIELD(aggcollid); READ_OID_FIELD(aggcollid);
READ_OID_FIELD(inputcollid); READ_OID_FIELD(inputcollid);
READ_NODE_FIELD(aggdirectargs); READ_NODE_FIELD(aggdirectargs);
Expand Down
3 changes: 2 additions & 1 deletion src/backend/optimizer/path/allpaths.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -1968,7 +1968,8 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel)
*/ */
cheapest_partial_path = linitial(rel->partial_pathlist); cheapest_partial_path = linitial(rel->partial_pathlist);
simple_gather_path = (Path *) simple_gather_path = (Path *)
create_gather_path(root, rel, cheapest_partial_path, NULL); create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
NULL, NULL);
add_path(rel, simple_gather_path); add_path(rel, simple_gather_path);
} }


Expand Down
12 changes: 10 additions & 2 deletions src/backend/optimizer/path/costsize.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -350,16 +350,22 @@ cost_samplescan(Path *path, PlannerInfo *root,
* *
* 'rel' is the relation to be operated upon * 'rel' is the relation to be operated upon
* 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL
* 'rows' may be used to point to a row estimate; if non-NULL, it overrides
* both 'rel' and 'param_info'. This is useful when the path doesn't exactly
* correspond to any particular RelOptInfo.
*/ */
void void
cost_gather(GatherPath *path, PlannerInfo *root, cost_gather(GatherPath *path, PlannerInfo *root,
RelOptInfo *rel, ParamPathInfo *param_info) RelOptInfo *rel, ParamPathInfo *param_info,
double *rows)
{ {
Cost startup_cost = 0; Cost startup_cost = 0;
Cost run_cost = 0; Cost run_cost = 0;


/* Mark the path with the correct row estimate */ /* Mark the path with the correct row estimate */
if (param_info) if (rows)
path->path.rows = *rows;
else if (param_info)
path->path.rows = param_info->ppi_rows; path->path.rows = param_info->ppi_rows;
else else
path->path.rows = rel->rows; path->path.rows = rel->rows;
Expand Down Expand Up @@ -1751,6 +1757,8 @@ cost_agg(Path *path, PlannerInfo *root,
{ {
/* must be AGG_HASHED */ /* must be AGG_HASHED */
startup_cost = input_total_cost; startup_cost = input_total_cost;
if (!enable_hashagg)
startup_cost += disable_cost;
startup_cost += aggcosts->transCost.startup; startup_cost += aggcosts->transCost.startup;
startup_cost += aggcosts->transCost.per_tuple * input_tuples; startup_cost += aggcosts->transCost.per_tuple * input_tuples;
startup_cost += (cpu_operator_cost * numGroupCols) * input_tuples; startup_cost += (cpu_operator_cost * numGroupCols) * input_tuples;
Expand Down
4 changes: 2 additions & 2 deletions src/backend/optimizer/plan/createplan.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -1575,8 +1575,8 @@ create_agg_plan(PlannerInfo *root, AggPath *best_path)


plan = make_agg(tlist, quals, plan = make_agg(tlist, quals,
best_path->aggstrategy, best_path->aggstrategy,
false, best_path->combineStates,
true, best_path->finalizeAggs,
list_length(best_path->groupClause), list_length(best_path->groupClause),
extract_grouping_cols(best_path->groupClause, extract_grouping_cols(best_path->groupClause,
subplan->targetlist), subplan->targetlist),
Expand Down
Loading

0 comments on commit e06a389

Please sign in to comment.