Permalink
Browse files

Support parallel aggregation.

Parallel workers can now partially aggregate the data and pass the
transition values back to the leader, which can combine the partial
results to produce the final answer.

David Rowley, based on earlier work by Haribabu Kommi.  Reviewed by
Álvaro Herrera, Tomas Vondra, Amit Kapila, James Sewell, and me.
  • Loading branch information...
Robert Haas
Robert Haas committed Mar 21, 2016
1 parent 7fa0064 commit e06a38965b3bcdaa881e7e06892d4d8ab6c2c980
@@ -4515,6 +4515,14 @@ ExecInitExpr(Expr *node, PlanState *parent)
if (parent && IsA(parent, AggState))
{
AggState *aggstate = (AggState *) parent;
+ Aggref *aggref = (Aggref *) node;
+
+ if (aggstate->finalizeAggs &&
+ aggref->aggoutputtype != aggref->aggtype)
+ {
+ /* planner messed up */
+ elog(ERROR, "Aggref aggoutputtype must match aggtype");
+ }
aggstate->aggs = lcons(astate, aggstate->aggs);
aggstate->numaggs++;
@@ -1233,6 +1233,7 @@ _copyAggref(const Aggref *from)
COPY_SCALAR_FIELD(aggfnoid);
COPY_SCALAR_FIELD(aggtype);
+ COPY_SCALAR_FIELD(aggoutputtype);
COPY_SCALAR_FIELD(aggcollid);
COPY_SCALAR_FIELD(inputcollid);
COPY_NODE_FIELD(aggdirectargs);
@@ -192,6 +192,7 @@ _equalAggref(const Aggref *a, const Aggref *b)
{
COMPARE_SCALAR_FIELD(aggfnoid);
COMPARE_SCALAR_FIELD(aggtype);
+ COMPARE_SCALAR_FIELD(aggoutputtype);
COMPARE_SCALAR_FIELD(aggcollid);
COMPARE_SCALAR_FIELD(inputcollid);
COMPARE_NODE_FIELD(aggdirectargs);
@@ -57,7 +57,7 @@ exprType(const Node *expr)
type = ((const Param *) expr)->paramtype;
break;
case T_Aggref:
- type = ((const Aggref *) expr)->aggtype;
+ type = ((const Aggref *) expr)->aggoutputtype;
break;
case T_GroupingFunc:
type = INT4OID;
@@ -1033,6 +1033,7 @@ _outAggref(StringInfo str, const Aggref *node)
WRITE_OID_FIELD(aggfnoid);
WRITE_OID_FIELD(aggtype);
+ WRITE_OID_FIELD(aggoutputtype);
WRITE_OID_FIELD(aggcollid);
WRITE_OID_FIELD(inputcollid);
WRITE_NODE_FIELD(aggdirectargs);
@@ -552,6 +552,7 @@ _readAggref(void)
READ_OID_FIELD(aggfnoid);
READ_OID_FIELD(aggtype);
+ READ_OID_FIELD(aggoutputtype);
READ_OID_FIELD(aggcollid);
READ_OID_FIELD(inputcollid);
READ_NODE_FIELD(aggdirectargs);
@@ -1968,7 +1968,8 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel)
*/
cheapest_partial_path = linitial(rel->partial_pathlist);
simple_gather_path = (Path *)
- create_gather_path(root, rel, cheapest_partial_path, NULL);
+ create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
+ NULL, NULL);
add_path(rel, simple_gather_path);
}
@@ -350,16 +350,22 @@ cost_samplescan(Path *path, PlannerInfo *root,
*
* 'rel' is the relation to be operated upon
* 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL
+ * 'rows' may be used to point to a row estimate; if non-NULL, it overrides
+ * both 'rel' and 'param_info'. This is useful when the path doesn't exactly
+ * correspond to any particular RelOptInfo.
*/
void
cost_gather(GatherPath *path, PlannerInfo *root,
- RelOptInfo *rel, ParamPathInfo *param_info)
+ RelOptInfo *rel, ParamPathInfo *param_info,
+ double *rows)
{
Cost startup_cost = 0;
Cost run_cost = 0;
/* Mark the path with the correct row estimate */
- if (param_info)
+ if (rows)
+ path->path.rows = *rows;
+ else if (param_info)
path->path.rows = param_info->ppi_rows;
else
path->path.rows = rel->rows;
@@ -1751,6 +1757,8 @@ cost_agg(Path *path, PlannerInfo *root,
{
/* must be AGG_HASHED */
startup_cost = input_total_cost;
+ if (!enable_hashagg)
+ startup_cost += disable_cost;
startup_cost += aggcosts->transCost.startup;
startup_cost += aggcosts->transCost.per_tuple * input_tuples;
startup_cost += (cpu_operator_cost * numGroupCols) * input_tuples;
@@ -1575,8 +1575,8 @@ create_agg_plan(PlannerInfo *root, AggPath *best_path)
plan = make_agg(tlist, quals,
best_path->aggstrategy,
- false,
- true,
+ best_path->combineStates,
+ best_path->finalizeAggs,
list_length(best_path->groupClause),
extract_grouping_cols(best_path->groupClause,
subplan->targetlist),
Oops, something went wrong.

0 comments on commit e06a389

Please sign in to comment.