Skip to content

Commit

Permalink
Window function optimizations (#53)
Browse files Browse the repository at this point in the history
* Convert unbound window frame data on first row call only

On the very first row call within an unbound frame, we create a list
window_frame_<PG_WINDOW_OBJECT()> in the global namespace
and reuse elements on subsequent calls, cleaning up at the end.

This closes #18, fixes #52

* Tests for window function optimizations

* Update user guide on window function optimizations

* Update AppVeyor versions

* fixup! Update AppVeyor
  • Loading branch information
mlt authored and davecramer committed Apr 9, 2019
1 parent 1482fcd commit 6496bfa
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 199 deletions.
4 changes: 2 additions & 2 deletions Makefile
Expand Up @@ -29,9 +29,9 @@ DATA_built = plr.sql
DATA = plr--8.4.sql plr--8.3.0.18--8.4.sql plr--unpackaged--8.4.sql

ifeq ($(PG12),yes)
REGRESS = plr12 bad_fun
REGRESS = plr12 bad_fun opt_window
else
REGRESS = plr bad_fun
REGRESS = plr bad_fun opt_window
endif

ifdef USE_PGXS
Expand Down
20 changes: 10 additions & 10 deletions appveyor.yml
Expand Up @@ -7,30 +7,30 @@ clone_depth: 1
environment:
PGUSER: postgres
PGPASSWORD: Password12!
rversion: 3.5.1
rversion: 3.5.3
matrix:
- pg: master
PlatformToolset: v141
configuration: Debug
APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2017
- pg: 9.3.24-1
- pg: 9.3.25-1
PlatformToolset: Windows7.1SDK
- pg: 9.4.19-1
- pg: 9.4.21-1
PlatformToolset: v120
- pg: 9.5.15-1
- pg: 9.5.16-1
PlatformToolset: v120
- pg: 9.6.11-1
- pg: 9.6.12-1
PlatformToolset: v120
- pg: 10.6-1
- pg: 10.7-1
PlatformToolset: v120
- pg: 11.1-1
- pg: 11.2-1
PlatformToolset: v140
matrix:
allow_failures:
- pg: master
exclude:
- platform: x86
pg: 11.1-1
pg: 11.2-1
PlatformToolset: v140
- platform: x86
pg: master
Expand All @@ -41,7 +41,7 @@ init: # Make %x64% available for caching
- setx /m exe %exe%

install:
- if not exist R-%rversion%-win.exe appveyor downloadfile https://cran.rstudio.com/bin/windows/base/R-%rversion%-win.exe
- if not exist R-%rversion%-win.exe appveyor downloadfile https://cran.r-project.org/bin/windows/base/old/%rversion%/R-%rversion%-win.exe
- R-%rversion%-win.exe /VERYSILENT
# We could have used RTools many R users have, but let's use msys64 existing on Appveyor intead
#- if not exist Rtools35.exe appveyor downloadfile https://cran.r-project.org/bin/windows/Rtools/Rtools35.exe
Expand Down Expand Up @@ -132,7 +132,7 @@ test_script:
}
$env:Outcome="Passed"
$elapsed=(Measure-Command {
pg_regress "$env:psqlopt=$env:pgroot\bin" --dbname=pl_regression plr bad_fun 2>&1 |
pg_regress "$env:psqlopt=$env:pgroot\bin" --dbname=pl_regression plr bad_fun opt_window 2>&1 |
%{ if ($_ -is [System.Management.Automation.ErrorRecord]) { $_.Exception.Message } else { $_ } } |
Out-Default
if ($LASTEXITCODE -ne 0) {
Expand Down
27 changes: 27 additions & 0 deletions expected/opt_window.out
@@ -0,0 +1,27 @@
create or replace function fast_win(a int4, b bigint) returns bool AS $$
is.null(farg2) || pg.throwerror('Constants shall not be passes with the frame')
identical(parent.frame(), .GlobalEnv) && pg.throwerror('Parent env is global')
exists('plr_window_frame', parent.frame(), inherits=FALSE) || pg.throwerror('No window frame data found')
a == farg1[prownum]
$$ window language plr;
select s, p, fast_win(NULLIF(s, 4), 123) over w
from (
select s, s % 2 as p
from generate_series(1,10) s
) foo
window w as (partition by p order by s rows between unbounded preceding and unbounded following)
order by s;
s | p | fast_win
----+---+----------
1 | 1 | t
2 | 0 | t
3 | 1 | t
4 | 0 |
5 | 1 | t
6 | 0 | t
7 | 1 | t
8 | 0 | t
9 | 1 | t
10 | 0 | t
(10 rows)

157 changes: 60 additions & 97 deletions pg_conversion.c
Expand Up @@ -36,7 +36,7 @@

static void pg_get_one_r(char *value, Oid arg_out_fn_oid, SEXP *obj,
int elnum);
static SEXP get_r_vector(Oid typtype, int numels);
static SEXP get_r_vector(Oid typtype, int64 numels);
static Datum get_trigger_tuple(SEXP rval, plr_function *function,
FunctionCallInfo fcinfo, bool *isnull);
static Datum get_tuplestore(SEXP rval, plr_function *function,
Expand Down Expand Up @@ -324,130 +324,93 @@ pg_array_get_r(Datum dvalue, FmgrInfo out_func, int typlen, bool typbyval, char
return result;
}

#ifdef HAVE_WINDOW_FUNCTIONS
/*
* Given an array pg datums, convert to a multi-row R vector.
* Evaluate a window function's argument expression on a specified
* window frame, returning R array for the argno column in the frame
*
* winobj: PostgreSQL window object handle
* argno: argument number to evaluate (counted from 0)
* function: contains necessary info on how to output Datum as string for general case conversion
*/
SEXP
pg_datum_array_get_r(Datum *elem_values, bool *elem_nulls, int numels, bool has_nulls,
Oid element_type, FmgrInfo out_func, bool typbyval)
pg_window_frame_get_r(WindowObject winobj, int argno, plr_function* function)
{
/*
* Loop through and convert each scalar value.
* Use the converted values to build an R vector.
*/
SEXP result;
int i;
bool fast_track_type;

switch (element_type)
{
case INT4OID:
case FLOAT8OID:
fast_track_type = true;
break;
default:
fast_track_type = false;
}
int numels = 0;
Oid element_type = function->arg_typid[argno];
FmgrInfo out_func = function->arg_out_func[argno];
int64 totalrows = WinGetPartitionRowCount(winobj);

/*
* Special case for pass-by-value data types, if the following conditions are met:
* designated fast_track_type
* no NULL elements
* 1 dimensional array only
* at least one element
* Get new vector of the appropriate type.
* We presume unbound frame as a common use case in R.
* If not, we will trim vector later.
*/
if (fast_track_type &&
typbyval &&
!has_nulls &&
(numels > 0))
PROTECT(result = get_r_vector(element_type, totalrows));

/* Convert all values to their R form and build the vector */
for (;; numels++)
{
SEXP matrix_dims;
char *value;
bool isnull;
Datum dvalue;
bool isout = false;
bool set_mark = (0 == numels);

/* get new vector of the appropriate type and length */
PROTECT(result = get_r_vector(element_type, numels));
dvalue = WinGetFuncArgInFrame(winobj, argno, numels, WINDOW_SEEK_HEAD,
set_mark, &isnull, &isout);

if (isout)
break;

/* keep this in sync with switch above -- fast_track_type only */
switch (element_type)
{
case BOOLOID:
LOGICAL_DATA(result)[numels] = isnull ? NA_LOGICAL : DatumGetBool(dvalue);
break;
case INT8OID:
NUMERIC_DATA(result)[numels] = isnull ? NA_REAL : (double)DatumGetInt64(dvalue);
break;
case INT2OID:
case INT4OID:
Assert(sizeof(int) == 4);
memcpy(INTEGER_DATA(result), elem_values, numels * sizeof(int));
case OIDOID:
INTEGER_DATA(result)[numels] = isnull ? NA_INTEGER : DatumGetInt32(dvalue);
break;
case FLOAT4OID:
NUMERIC_DATA(result)[numels] = isnull ? NA_REAL : DatumGetFloat4(dvalue);
break;
case FLOAT8OID:
Assert(sizeof(double) == 8);
memcpy(NUMERIC_DATA(result), elem_values, numels * sizeof(double));
NUMERIC_DATA(result)[numels] = isnull ? NA_REAL : DatumGetFloat8(dvalue);
break;
default:
/* Everything else is error */
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("direct array passthrough attempted for unsupported type")));
}

/* attach dimensions */
PROTECT(matrix_dims = allocVector(INTSXP, 1));
INTEGER_DATA(matrix_dims)[0] = numels;
setAttrib(result, R_DimSymbol, matrix_dims);
UNPROTECT(1);

UNPROTECT(1); /* result */
}
else
{
SEXP matrix_dims;

/* array is empty */
if (numels == 0)
{
PROTECT(result = get_r_vector(element_type, 0));
UNPROTECT(1);

return result;
}

/* get new vector of the appropriate type and length */
PROTECT(result = get_r_vector(element_type, numels));

/* Convert all values to their R form and build the vector */
for (i = 0; i < numels; i++)
{
char *value;
Datum itemvalue;
bool isnull;

isnull = elem_nulls[i];
itemvalue = elem_values[i];

if (!isnull)
{
value = DatumGetCString(FunctionCall3(&out_func,
itemvalue,
(Datum) 0,
Int32GetDatum(-1)));
}
else
value = NULL;
value = isnull ? NULL : DatumGetCString(FunctionCall3(&out_func,
dvalue,
(Datum) 0,
Int32GetDatum(-1)));

/*
* Note that pg_get_one_r() replaces NULL values with
* the NA value appropriate for the data type.
*/
pg_get_one_r(value, element_type, &result, i);
if (value != NULL)
pfree(value);
/*
* Note that pg_get_one_r() replaces NULL values with
* the NA value appropriate for the data type.
*/
pg_get_one_r(value, element_type, &result, numels);
if (value != NULL)
pfree(value);
}
}

/* attach dimensions */
PROTECT(matrix_dims = allocVector(INTSXP, 1));
INTEGER_DATA(matrix_dims)[0] = numels;
setAttrib(result, R_DimSymbol, matrix_dims);
UNPROTECT(1);
if (numels != totalrows)
SET_LENGTH(result, numels);

UNPROTECT(1); /* result */
}
UNPROTECT(1); /* result */

return result;
}
#endif

/*
* Given an array of pg tuples, convert to an R list
Expand Down Expand Up @@ -586,7 +549,7 @@ pg_tuple_get_r_frame(int ntuples, HeapTuple *tuples, TupleDesc tupdesc)
* create an R vector of a given type and size based on pg output function oid
*/
static SEXP
get_r_vector(Oid typtype, int numels)
get_r_vector(Oid typtype, int64 numels)
{
SEXP result;

Expand Down

0 comments on commit 6496bfa

Please sign in to comment.