Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

- New feature: allow PL/R functions to be declared and used as WINDOW

  functions. Make window frame datums available as a new set of arguments
  (farg1, farg2, ..., fargN) to the R script. Inspiration by Ian Gow.
  Still lacks regression and documentation and needs more testing.
- Minor fixes for compiler warnings by updated gcc
  • Loading branch information...
commit 6a9c69f553f10305e08c9a0c3435919032b69ab2 1 parent af0788b
@jconway authored
Showing with 319 additions and 42 deletions.
  1. +125 −0 pg_conversion.c
  2. +184 −42 plr.c
  3. +10 −0 plr.h
View
125 pg_conversion.c
@@ -323,6 +323,131 @@ pg_array_get_r(Datum dvalue, FmgrInfo out_func, int typlen, bool typbyval, char
}
/*
+ * Given an array pg datums, convert to a multi-row R vector.
+ */
+SEXP
+pg_datum_array_get_r(Datum *elem_values, bool *elem_nulls, int numels, bool has_nulls,
+ Oid element_type, FmgrInfo out_func, bool typbyval)
+{
+ /*
+ * Loop through and convert each scalar value.
+ * Use the converted values to build an R vector.
+ */
+ SEXP result;
+ int i;
+ bool fast_track_type;
+
+ switch (element_type)
+ {
+ case INT4OID:
+ case FLOAT8OID:
+ fast_track_type = true;
+ break;
+ default:
+ fast_track_type = false;
+ }
+
+ /*
+ * Special case for pass-by-value data types, if the following conditions are met:
+ * designated fast_track_type
+ * no NULL elements
+ * 1 dimensional array only
+ * at least one element
+ */
+ if (fast_track_type &&
+ typbyval &&
+ !has_nulls &&
+ (numels > 0))
+ {
+ SEXP matrix_dims;
+
+ /* get new vector of the appropriate type and length */
+ PROTECT(result = get_r_vector(element_type, numels));
+
+ /* keep this in sync with switch above -- fast_track_type only */
+ switch (element_type)
+ {
+ case INT4OID:
+ Assert(sizeof(int) == 4);
+ memcpy(INTEGER_DATA(result), elem_values, numels * sizeof(int));
+ break;
+ case FLOAT8OID:
+ Assert(sizeof(double) == 8);
+ memcpy(NUMERIC_DATA(result), elem_values, numels * sizeof(double));
+ break;
+ default:
+ /* Everything else is error */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("direct array passthrough attempted for unsupported type")));
+ }
+
+ /* attach dimensions */
+ PROTECT(matrix_dims = allocVector(INTSXP, 1));
+ INTEGER_DATA(matrix_dims)[0] = numels;
+ setAttrib(result, R_DimSymbol, matrix_dims);
+ UNPROTECT(1);
+
+ UNPROTECT(1); /* result */
+ }
+ else
+ {
+ SEXP matrix_dims;
+
+ /* array is empty */
+ if (numels == 0)
+ {
+ PROTECT(result = get_r_vector(element_type, 0));
+ UNPROTECT(1);
+
+ return result;
+ }
+
+ /* get new vector of the appropriate type and length */
+ PROTECT(result = get_r_vector(element_type, numels));
+
+ /* Convert all values to their R form and build the vector */
+ for (i = 0; i < numels; i++)
+ {
+ char *value;
+ Datum itemvalue;
+ bool isnull;
+
+ isnull = elem_nulls[i];
+ itemvalue = elem_values[i];
+
+ if (!isnull)
+ {
+ value = DatumGetCString(FunctionCall3(&out_func,
+ itemvalue,
+ (Datum) 0,
+ Int32GetDatum(-1)));
+ }
+ else
+ value = NULL;
+
+ /*
+ * Note that pg_get_one_r() replaces NULL values with
+ * the NA value appropriate for the data type.
+ */
+ pg_get_one_r(value, element_type, &result, i);
+ if (value != NULL)
+ pfree(value);
+ }
+
+ /* attach dimensions */
+ PROTECT(matrix_dims = allocVector(INTSXP, 1));
+ INTEGER_DATA(matrix_dims)[0] = numels;
+ setAttrib(result, R_DimSymbol, matrix_dims);
+ UNPROTECT(1);
+
+ UNPROTECT(1); /* result */
+ }
+
+ return result;
+}
+
+/*
* Given an array of pg tuples, convert to an R list
* the created object is not quite actually a data.frame
*/
View
226 plr.c
@@ -171,12 +171,13 @@ static plr_function *do_compile(FunctionCallInfo fcinfo,
HeapTuple procTup,
plr_func_hashkey *hashkey);
static SEXP plr_parse_func_body(const char *body);
-static SEXP plr_convertargs(plr_function *function, Datum *arg, bool *argnull);
+static SEXP plr_convertargs(plr_function *function, Datum *arg, bool *argnull, FunctionCallInfo fcinfo);
static void plr_error_callback(void *arg);
static Oid getNamespaceOidFromFunctionOid(Oid fnOid);
static bool haveModulesTable(Oid nspOid);
static char *getModulesSql(Oid nspOid);
static char **fetchArgNames(HeapTuple procTup, int nargs);
+static void WinGetFrameData(WindowObject winobj, int argno, Datum *dvalues, bool *isnull, int *numels, bool *has_nulls);
/*
* plr_call_handler - This is the only visible function
@@ -215,8 +216,7 @@ void
load_r_cmd(const char *cmd)
{
SEXP cmdSexp,
- cmdexpr,
- ans = R_NilValue;
+ cmdexpr;
int i,
status;
@@ -248,7 +248,7 @@ load_r_cmd(const char *cmd)
/* Loop is needed here as EXPSEXP may be of length > 1 */
for(i = 0; i < length(cmdexpr); i++)
{
- ans = R_tryEval(VECTOR_ELT(cmdexpr, i), R_GlobalEnv, &status);
+ R_tryEval(VECTOR_ELT(cmdexpr, i), R_GlobalEnv, &status);
if(status != 0)
{
if (last_R_error_msg)
@@ -279,7 +279,6 @@ PLR_CLEANUP
{
char *buf;
char *tmpdir = getenv("R_SESSION_TMPDIR");
- int ret;
R_dot_Last();
R_RunExitFinalizers();
@@ -295,8 +294,8 @@ PLR_CLEANUP
buf = (char *) palloc(9 + 1 + strlen(tmpdir));
sprintf(buf, "rm -rf \"%s\"", tmpdir);
- /* ignoring return value, but silence the compiler */
- ret = system(buf);
+ /* ignoring return value */
+ system(buf);
}
}
@@ -709,7 +708,7 @@ plr_trigger_handler(PG_FUNCTION_ARGS)
PROTECT(fun = function->fun);
/* Convert all call arguments */
- PROTECT(rargs = plr_convertargs(function, arg, argnull));
+ PROTECT(rargs = plr_convertargs(function, arg, argnull, fcinfo));
/* Call the R function */
PROTECT(rvalue = call_r_func(fun, rargs));
@@ -747,7 +746,7 @@ plr_func_handler(PG_FUNCTION_ARGS)
PROTECT(fun = function->fun);
/* Convert all call arguments */
- PROTECT(rargs = plr_convertargs(function, fcinfo->arg, fcinfo->argnull));
+ PROTECT(rargs = plr_convertargs(function, fcinfo->arg, fcinfo->argnull, fcinfo));
/* Call the R function */
PROTECT(rvalue = call_r_func(fun, rargs));
@@ -945,6 +944,9 @@ do_compile(FunctionCallInfo fcinfo,
function->fn_xmin = HeapTupleHeaderGetXmin(procTup->t_data);
function->fn_tid = procTup->t_self;
+ /* Flag for window functions */
+ function->iswindow = procStruct->proiswindow;
+
/* Lookup the pg_language tuple by Oid*/
langTup = SearchSysCache(LANGOID,
ObjectIdGetDatum(procStruct->prolang),
@@ -1214,6 +1216,15 @@ do_compile(FunctionCallInfo fcinfo,
}
}
FREE_ARG_NAMES;
+
+ if (function->iswindow)
+ {
+ for (i = 0; i < function->nargs; i++)
+ {
+ appendStringInfo(proc_internal_args, ",");
+ SET_FRAME_ARG_NAME;
+ }
+ }
}
else
{
@@ -1452,17 +1463,28 @@ call_r_func(SEXP fun, SEXP rargs)
}
static SEXP
-plr_convertargs(plr_function *function, Datum *arg, bool *argnull)
+plr_convertargs(plr_function *function, Datum *arg, bool *argnull, FunctionCallInfo fcinfo)
{
int i;
SEXP rargs,
el;
- /*
- * Create an array of R objects with the number of elements
- * equal to the number of arguments.
- */
- PROTECT(rargs = allocVector(VECSXP, function->nargs));
+ if (!function->iswindow)
+ {
+ /*
+ * Create an array of R objects with the number of elements
+ * equal to the number of arguments.
+ */
+ PROTECT(rargs = allocVector(VECSXP, function->nargs));
+ }
+ else
+ {
+ /*
+ * Create an array of R objects with the number of elements
+ * equal to twice the number of arguments.
+ */
+ PROTECT(rargs = allocVector(VECSXP, 2 * function->nargs));
+ }
/*
* iterate over the arguments, convert each of them and put them in
@@ -1470,41 +1492,113 @@ plr_convertargs(plr_function *function, Datum *arg, bool *argnull)
*/
for (i = 0; i < function->nargs; i++)
{
- if (argnull[i])
- {
- /* fast track for null arguments */
- PROTECT(el = R_NilValue);
- }
- else if (function->arg_is_rel[i])
+ if (!function->iswindow)
{
- /* for tuple args, convert to a one row data.frame */
- CONVERT_TUPLE_TO_DATAFRAME;
- }
- else if (function->arg_elem[i] == InvalidOid)
- {
- /* for scalar args, convert to a one row vector */
- Datum dvalue = arg[i];
- Oid arg_typid = function->arg_typid[i];
- FmgrInfo arg_out_func = function->arg_out_func[i];
+ if (argnull[i])
+ {
+ /* fast track for null arguments */
+ PROTECT(el = R_NilValue);
+ }
+ else if (function->arg_is_rel[i])
+ {
+ /* for tuple args, convert to a one row data.frame */
+ CONVERT_TUPLE_TO_DATAFRAME;
+ }
+ else if (function->arg_elem[i] == InvalidOid)
+ {
+ /* for scalar args, convert to a one row vector */
+ Datum dvalue = arg[i];
+ Oid arg_typid = function->arg_typid[i];
+ FmgrInfo arg_out_func = function->arg_out_func[i];
- PROTECT(el = pg_scalar_get_r(dvalue, arg_typid, arg_out_func));
+ PROTECT(el = pg_scalar_get_r(dvalue, arg_typid, arg_out_func));
+ }
+ else
+ {
+ /* better be a pg array arg, convert to a multi-row vector */
+ Datum dvalue = (Datum) PG_DETOAST_DATUM(arg[i]);
+ FmgrInfo out_func = function->arg_elem_out_func[i];
+ int typlen = function->arg_elem_typlen[i];
+ bool typbyval = function->arg_elem_typbyval[i];
+ char typalign = function->arg_elem_typalign[i];
+
+ PROTECT(el = pg_array_get_r(dvalue, out_func, typlen, typbyval, typalign));
+ }
+ SET_VECTOR_ELT(rargs, i, el);
+ UNPROTECT(1);
}
else
{
- /* better be a pg array arg, convert to a multi-row vector */
- Datum dvalue = (Datum) PG_DETOAST_DATUM(arg[i]);
- FmgrInfo out_func = function->arg_elem_out_func[i];
- int typlen = function->arg_elem_typlen[i];
- bool typbyval = function->arg_elem_typbyval[i];
- char typalign = function->arg_elem_typalign[i];
-
- PROTECT(el = pg_array_get_r(dvalue, out_func, typlen, typbyval, typalign));
- }
+ Datum dvalue;
+ bool isnull;
+ WindowObject winobj = PG_WINDOW_OBJECT();
- SET_VECTOR_ELT(rargs, i, el);
- UNPROTECT(1);
+ /* get datum for the current row of the window frame */
+ dvalue = WinGetFuncArgInFrame(winobj, i, 0, WINDOW_SEEK_CURRENT, false, &isnull, NULL);
+
+ if (isnull)
+ {
+ /* fast track for null arguments */
+ PROTECT(el = R_NilValue);
+ }
+ else if (function->arg_is_rel[i])
+ {
+ /* keep compiler quiet */
+ el = R_NilValue;
+
+ elog(ERROR, "Tuple arguments not supported in PL/R Window Functions");
+ }
+ else if (function->arg_elem[i] == InvalidOid)
+ {
+ /* for scalar args, convert to a one row vector */
+ Oid arg_typid = function->arg_typid[i];
+ FmgrInfo arg_out_func = function->arg_out_func[i];
+
+ PROTECT(el = pg_scalar_get_r(dvalue, arg_typid, arg_out_func));
+ }
+ else
+ {
+ /* better be a pg array arg, convert to a multi-row vector */
+ FmgrInfo out_func = function->arg_elem_out_func[i];
+ int typlen = function->arg_elem_typlen[i];
+ bool typbyval = function->arg_elem_typbyval[i];
+ char typalign = function->arg_elem_typalign[i];
+
+ dvalue = (Datum) PG_DETOAST_DATUM(dvalue);
+ PROTECT(el = pg_array_get_r(dvalue, out_func, typlen, typbyval, typalign));
+ }
+ SET_VECTOR_ELT(rargs, i, el);
+ UNPROTECT(1);
+ }
}
+ /* now get an array of datums for the entire window frame for each argument */
+ if (function->iswindow)
+ {
+ WindowObject winobj = PG_WINDOW_OBJECT();
+ int64 totalrows = WinGetPartitionRowCount(winobj);
+ Datum *dvalues = palloc0(totalrows * sizeof(Datum));
+ bool *isnulls = palloc0(totalrows * sizeof(bool));
+ int numels;
+ Oid datum_typid;
+ FmgrInfo datum_out_func;
+ bool datum_typbyval;
+ bool has_nulls;
+
+ for (i = 0; i < function->nargs; i++)
+ {
+ WinGetFrameData(winobj, i, dvalues, isnulls, &numels, &has_nulls);
+
+ datum_typid = function->arg_typid[i];
+ datum_out_func = function->arg_out_func[i];
+ datum_typbyval = function->arg_elem_typbyval[i];
+ PROTECT(el = pg_datum_array_get_r(dvalues, isnulls, numels, has_nulls,
+ datum_typid, datum_out_func, datum_typbyval));
+
+ SET_VECTOR_ELT(rargs, i + 1, el);
+ UNPROTECT(1);
+ }
+ }
UNPROTECT(1);
return(rargs);
@@ -1672,3 +1766,51 @@ pg_unprotect(int n, char *fn, int ln)
unprotect(n);
}
#endif /* DEBUGPROTECT */
+
+/*
+ * WinGetFrameData
+ * Evaluate a window function's argument expression on a specified
+ * window frame, returning an array of Datums for the frame
+ *
+ * argno: argument number to evaluate (counted from 0)
+ * isnull: output argument, receives isnull status of result
+ */
+static void
+WinGetFrameData(WindowObject winobj, int argno, Datum *dvalues, bool *isnulls, int *numels, bool *has_nulls)
+{
+ int64 i = 0;
+
+ *has_nulls = false;
+ for(;;)
+ {
+ Datum lcl_dvalue;
+ bool lcl_isnull;
+ bool isout;
+ bool set_mark;
+
+ if (i > 0)
+ set_mark = false;
+ else
+ set_mark = true;
+
+ lcl_dvalue = WinGetFuncArgInFrame(winobj, argno, i, WINDOW_SEEK_HEAD,
+ set_mark, &lcl_isnull, &isout);
+
+ if (!isout)
+ {
+ dvalues[i] = lcl_dvalue;
+ isnulls[i] = lcl_isnull;
+ if (lcl_isnull)
+ *has_nulls = true;
+ }
+ else
+ {
+ *numels = i;
+ break;
+ }
+
+ i++;
+ };
+}
+
+
View
10 plr.h
@@ -38,6 +38,7 @@
#include "fmgr.h"
#include "funcapi.h"
#include "miscadmin.h"
+#include "windowapi.h"
#include "access/heapam.h"
#include "catalog/catversion.h"
#include "catalog/pg_language.h"
@@ -59,6 +60,7 @@
#endif
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
@@ -371,6 +373,10 @@ extern void R_RunExitFinalizers(void);
else \
appendStringInfo(proc_internal_args, "arg%d", i + 1); \
} while (0)
+#define SET_FRAME_ARG_NAME \
+ do { \
+ appendStringInfo(proc_internal_args, "farg%d", i + 1); \
+ } while (0)
#define FREE_ARG_NAMES \
do { \
if (argnames) \
@@ -389,6 +395,7 @@ extern void R_RunExitFinalizers(void);
ErrorData *edata; \
SWITCHTO_PLR_SPI_CONTEXT(temp_context); \
edata = CopyErrorData(); \
+ MemoryContextSwitchTo(temp_context); \
error("error in SQL statement : %s", edata->message); \
}
#define PLR_PG_END_TRY() \
@@ -451,6 +458,7 @@ typedef struct plr_function
char arg_elem_typalign[FUNC_MAX_ARGS];
int arg_is_rel[FUNC_MAX_ARGS];
SEXP fun; /* compiled R function */
+ bool iswindow;
} plr_function;
/* compiled function hash table */
@@ -478,6 +486,8 @@ extern SEXP call_r_func(SEXP fun, SEXP rargs);
/* argument and return value conversion functions */
extern SEXP pg_scalar_get_r(Datum dvalue, Oid arg_typid, FmgrInfo arg_out_func);
extern SEXP pg_array_get_r(Datum dvalue, FmgrInfo out_func, int typlen, bool typbyval, char typalign);
+extern SEXP pg_datum_array_get_r(Datum *elem_values, bool *elem_nulls, int numels, bool has_nulls,
+ Oid element_type, FmgrInfo out_func, bool typbyval);
extern SEXP pg_tuple_get_r_frame(int ntuples, HeapTuple *tuples, TupleDesc tupdesc);
extern Datum r_get_pg(SEXP rval, plr_function *function, FunctionCallInfo fcinfo);
extern Datum get_datum(SEXP rval, Oid typid, Oid typelem, FmgrInfo in_func, bool *isnull);
Please sign in to comment.
Something went wrong with that request. Please try again.