Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve combine invocation analysis for user-defined aggregates #1990

Open
jchbh-duplicate opened this issue Nov 28, 2018 · 2 comments
Open
Milestone

Comments

@jchbh-duplicate
Copy link

jchbh-duplicate commented Nov 28, 2018

Just try to use following user-defined aggregate, on pipelinedb 1.0, pg11

/* define aggregate for top5 value */
CREATE TYPE TYPE_TOP_ITEM AS (
  score NUMERIC,
  uid   TEXT
);

CREATE TYPE TOP_N_RESULT AS (
  data  TYPE_TOP_ITEM [],
  max_n INT
);

CREATE OR REPLACE FUNCTION calc_top_n(state TOP_N_RESULT, income TOP_N_RESULT)
  RETURNS TOP_N_RESULT AS $$
DECLARE
  oResult top_n_result%ROWTYPE;
BEGIN
  oResult.max_n = greatest(COALESCE(state.max_n, 0), COALESCE(income.max_n, 0));
  WITH xrow AS (SELECT * FROM unnest(state.data || income.data)),
       allrow AS (SELECT max(xrow.score) as score, uid FROM xrow GROUP BY xrow.uid),
       order_rows as (select * from allrow ORDER BY score desc limit oResult.max_n)
  SELECT array_agg(order_rows ORDER BY score) INTO oResult.data
  FROM order_rows;
  RETURN oResult;
END;
$$
LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION s_calc_top_n(s TOP_N_RESULT, income TYPE_TOP_ITEM, max_n INT)
  RETURNS TOP_N_RESULT AS $$
DECLARE
  oResult top_n_result%ROWTYPE;
BEGIN
  oResult.max_n = greatest(COALESCE(s.max_n, 0), max_n);
  WITH xrow AS (SELECT * FROM unnest(s.data || income)),
       allrow AS (SELECT max(xrow.score) as score, uid FROM xrow GROUP BY xrow.uid),
       order_rows as (select * from allrow ORDER BY score desc limit max_n)
  SELECT array_agg(order_rows ORDER BY score) INTO oResult.data
  FROM order_rows;
  RETURN oResult;
END;
$$
LANGUAGE plpgsql;

CREATE AGGREGATE top_n_by_score(TYPE_TOP_ITEM, INT) (
  COMBINEFUNC = calc_top_n,
  SFUNC = s_calc_top_n,
  STYPE = TOP_N_RESULT
);

with very simple stream:

create FOREIGN TABLE test1 (
  id TEXT,
  score numeric,
  at timestamptz
) SERVER pipelinedb;

create view hourly as
  select date(at) as date, hour(at) as hour, top_n_by_score((ROW("score", "id"))::TYPE_TOP_ITEM, 5) as top5
  from test1
      GROUP BY date, hour;

  create view daily as
    select (new).date as date, combine((delta).top5) as top5
    from output_of('hourly')
  group by date;

insert into test1 VALUES ('a', 10, now()), ('b', 4, now()), ('a', 5, now()), ('c', 4, now());

view daily is empty because:

2018-11-28 07:10:56.159 UTC [13604] ERROR:  invalid input syntax for type numeric: "{"(4,c)","(4,b)","(10,a)"}"
2018-11-28 07:10:56.159 UTC [13604] CONTEXT:  PL/pgSQL function s_calc_top_n(top_n_result,type_top_item,integer) while storing call arguments into local variables

and view hourly works great as expected.

based on the log, it seems the combine((delta).top5) is not call COMBINEFUNC, but call SFUNC instead, then it parsed the state-object TOP_N_RESULT into TYPE_TOP_ITEM, cause the numeric "score" received a string which is the text format of array object.

it will very simple to replay the issue. Right now I just fall back to get result from original stream directly.


Add on Nov. 28

I also found another customized aggregate also fail as worker crash. which is work fine in 0.9.9.

2018-11-28 14:00:10.694 UTC [624] LOG: pipelinedb process "worker0 [prism-omni]" running with pid 624
Segmentation fault (PID 624)
PostgreSQL version: 11.0 (Debian 11.0-1.pgdg90+2)
PipelineDB version: 1.0.0 at revision b4bde99
query: daily_report_by_tenant
backtrace:
/usr/lib/postgresql/11/lib/pipelinedb.so(debug_segfault+0x33)[0x7feabf03f393]
/lib/x86_64-linux-gnu/libpthread.so.0(+0x110c0)[0x7feac726e0c0]
postgres: worker0 [prism-omni] (MakeExpandedObjectReadOnlyInternal+0x0)[0x5647c0575280]
/usr/lib/postgresql/11/lib/plpgsql.so(+0x12a52)[0x7feab4852a52]
postgres: worker0 [prism-omni] (+0x25583d)[0x5647c03df83d]
postgres: worker0 [prism-omni] (+0x27f791)[0x5647c0409791]
postgres: worker0 [prism-omni] (standard_ExecutorRun+0x163)[0x5647c03e2823]
postgres: worker0 [prism-omni] (+0x28d2eb)[0x5647c04172eb]
postgres: worker0 [prism-omni] (SPI_execute_plan_with_paramlist+0x83)[0x5647c04176f3]
/usr/lib/postgresql/11/lib/plpgsql.so(+0x1733e)[0x7feab485733e]
/usr/lib/postgresql/11/lib/plpgsql.so(+0x1956b)[0x7feab485956b]
/usr/lib/postgresql/11/lib/plpgsql.so(+0x1b92f)[0x7feab485b92f]
/usr/lib/postgresql/11/lib/plpgsql.so(plpgsql_exec_function+0x1ad)[0x7feab485baed]
/usr/lib/postgresql/11/lib/plpgsql.so(plpgsql_call_handler+0x146)[0x7feab484eca6]
postgres: worker0 [prism-omni] (+0x25536d)[0x5647c03df36d]
postgres: worker0 [prism-omni] (+0x269ba0)[0x5647c03f3ba0]
/usr/lib/postgresql/11/lib/pipelinedb.so(ExecuteContPlan+0x9b)[0x7feabf04568b]
/usr/lib/postgresql/11/lib/pipelinedb.so(ContinuousQueryWorkerMain+0x275)[0x7feabf008d95]
/usr/lib/postgresql/11/lib/pipelinedb.so(cont_bgworker_main+0x220)[0x7feabf040090]
postgres: worker0 [prism-omni] (StartBackgroundWorker+0x2ad)[0x5647c04b323d]
postgres: worker0 [prism-omni] (+0x335f55)[0x5647c04bff55]
postgres: worker0 [prism-omni] (+0x336b15)[0x5647c04c0b15]
/lib/x86_64-linux-gnu/libpthread.so.0(+0x110c0)[0x7feac726e0c0]
/lib/x86_64-linux-gnu/libc.so.6(__select+0x13)[0x7feac4f093a3]
postgres: worker0 [prism-omni] (+0xb93b3)[0x5647c02433b3]
postgres: worker0 [prism-omni] (PostmasterMain+0xe33)[0x5647c04c1ca3]
postgres: worker0 [prism-omni] (main+0x854)[0x5647c02452f4]
/lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf1)[0x7feac4e482e1]
postgres: worker0 [prism-omni] (_start+0x2a)[0x5647c02453aa]

Do it probably meet the same issue?
I found the issue all happened when I do a "reduce" to one dimension. (above example is reduce the hour dimension).


Fixed:
thanks to @derekjn , just add another new combine function with prefix combine_ to customized aggregate will fix this issue.

CREATE AGGREGATE combine_top_n_by_score(TOP_N_RESULT) (
  COMBINEFUNC = calc_top_n,
  SFUNC = calc_top_n,
  STYPE = TOP_N_RESULT
);
@jchbh-duplicate jchbh-duplicate changed the title Combine seems not work on user-define aggregate complex type. combine seems not work on user-define aggregate complex type. Nov 28, 2018
@derekjn
Copy link
Contributor

derekjn commented Dec 18, 2018

@jchbh-duplicate thanks for your writeup! So internally there are actually a few things that go on implicitly in order for combine to work on an aggregate. You basically need another signature for your aggregate that tells the analyzer how to properly combine it.

In addition to your main aggregate, you need another one that has the combine function as both the combinefunc and the sfunc (transition function). As an example, take a look at the numeric_avg aggregate that PipelineDB allows combine on...

The main avg(numeric) aggregate (defined by PostgreSQL) signature looks something like:

CREATE AGGREGATE numeric_avg(numeric) (
  sfunc = numeric_avg_combine,
  stype = internal,
  finalfunc = numeric_avg,
  combinefunc = numeric_avg_accum,
  serialfunc = numeric_avg_serialize,
  deserialfunc = numeric_avg_deserialize,
  parallel = safe
);

And you'll see that we define an additional aggregate, combine_numeric_avg that tells PipelineDB how to perform combine on numeric_avg:

CREATE AGGREGATE combine_numeric_avg(internal) (
  sfunc = numeric_avg_combine,
  stype = internal,
  finalfunc = numeric_avg,
  combinefunc = numeric_avg_combine,
  serialfunc = numeric_avg_serialize,
  deserialfunc = numeric_avg_deserialize,
  parallel = safe
);

The key things to note here are:

  • The combine aggregate must have the combinefunc as both combinefunc and sfunc.
  • The argument type must be internal if the aggregate uses serialfunc/deserialfunc. Otherwise the argument type is the transition type (stype).

If such a signature exists, the PipelineDB analyzer will resolve it when analyzing a combine invocation. If one doesn't exist, it will attempt to use the original aggregate as is (top_n_by_score in your case). In this case I'm guessing that your aggregate doesn't work for combining multiple transition states.

We admittedly need to document this more thoroughly as it's clearly not particularly obvious. I think it would also be helpful if we added:

  • Better error handling at analyze time, so if PipelineDB can't find a combine aggregate users get a helpful error
  • Add a validation function to check if an aggregate is combinable, and specifying exactly what's missing in the case that it isn't

What do you think?

@derekjn derekjn changed the title combine seems not work on user-define aggregate complex type. Improve combine invocation analysis for user-defined aggregates Dec 18, 2018
@derekjn derekjn added this to the 1.1.0 milestone Dec 21, 2018
@jchbh-duplicate
Copy link
Author

Great! It works after I add

CREATE AGGREGATE combine_top_n_by_score(TOP_N_RESULT) (
  COMBINEFUNC = calc_top_n,
  SFUNC = calc_top_n,
  STYPE = TOP_N_RESULT
);

If there could be a default combine like this function, it will much comfortable.

Please add into the document so other one won't meet the same issue again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants