Permalink
Switch branches/tags
Find file
9187 lines (7729 sloc) 221 KB
/*-------------------------------------------------------------------------
*
* numeric.c
* An exact numeric data type for the Postgres database system
*
* Original coding 1998, Jan Wieck. Heavily revised 2003, Tom Lane.
*
* Many of the algorithmic ideas are borrowed from David M. Smith's "FM"
* multiple-precision math library, most recently published as Algorithm
* 786: Multiple-Precision Complex Arithmetic and Functions, ACM
* Transactions on Mathematical Software, Vol. 24, No. 4, December 1998,
* pages 359-367.
*
* Copyright (c) 1998-2017, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/utils/adt/numeric.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <ctype.h>
#include <float.h>
#include <limits.h>
#include <math.h>
#include "access/hash.h"
#include "catalog/pg_type.h"
#include "funcapi.h"
#include "lib/hyperloglog.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/nodeFuncs.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/int8.h"
#include "utils/numeric.h"
#include "utils/sortsupport.h"
/* ----------
* Uncomment the following to enable compilation of dump_numeric()
* and dump_var() and to get a dump of any result produced by make_result().
* ----------
#define NUMERIC_DEBUG
*/
/* ----------
* Local data types
*
* Numeric values are represented in a base-NBASE floating point format.
* Each "digit" ranges from 0 to NBASE-1. The type NumericDigit is signed
* and wide enough to store a digit. We assume that NBASE*NBASE can fit in
* an int. Although the purely calculational routines could handle any even
* NBASE that's less than sqrt(INT_MAX), in practice we are only interested
* in NBASE a power of ten, so that I/O conversions and decimal rounding
* are easy. Also, it's actually more efficient if NBASE is rather less than
* sqrt(INT_MAX), so that there is "headroom" for mul_var and div_var_fast to
* postpone processing carries.
*
* Values of NBASE other than 10000 are considered of historical interest only
* and are no longer supported in any sense; no mechanism exists for the client
* to discover the base, so every client supporting binary mode expects the
* base-10000 format. If you plan to change this, also note the numeric
* abbreviation code, which assumes NBASE=10000.
* ----------
*/
#if 0
#define NBASE 10
#define HALF_NBASE 5
#define DEC_DIGITS 1 /* decimal digits per NBASE digit */
#define MUL_GUARD_DIGITS 4 /* these are measured in NBASE digits */
#define DIV_GUARD_DIGITS 8
typedef signed char NumericDigit;
#endif
#if 0
#define NBASE 100
#define HALF_NBASE 50
#define DEC_DIGITS 2 /* decimal digits per NBASE digit */
#define MUL_GUARD_DIGITS 3 /* these are measured in NBASE digits */
#define DIV_GUARD_DIGITS 6
typedef signed char NumericDigit;
#endif
#if 1
#define NBASE 10000
#define HALF_NBASE 5000
#define DEC_DIGITS 4 /* decimal digits per NBASE digit */
#define MUL_GUARD_DIGITS 2 /* these are measured in NBASE digits */
#define DIV_GUARD_DIGITS 4
typedef int16 NumericDigit;
#endif
/*
* The Numeric type as stored on disk.
*
* If the high bits of the first word of a NumericChoice (n_header, or
* n_short.n_header, or n_long.n_sign_dscale) are NUMERIC_SHORT, then the
* numeric follows the NumericShort format; if they are NUMERIC_POS or
* NUMERIC_NEG, it follows the NumericLong format. If they are NUMERIC_NAN,
* it is a NaN. We currently always store a NaN using just two bytes (i.e.
* only n_header), but previous releases used only the NumericLong format,
* so we might find 4-byte NaNs on disk if a database has been migrated using
* pg_upgrade. In either case, when the high bits indicate a NaN, the
* remaining bits are never examined. Currently, we always initialize these
* to zero, but it might be possible to use them for some other purpose in
* the future.
*
* In the NumericShort format, the remaining 14 bits of the header word
* (n_short.n_header) are allocated as follows: 1 for sign (positive or
* negative), 6 for dynamic scale, and 7 for weight. In practice, most
* commonly-encountered values can be represented this way.
*
* In the NumericLong format, the remaining 14 bits of the header word
* (n_long.n_sign_dscale) represent the display scale; and the weight is
* stored separately in n_weight.
*
* NOTE: by convention, values in the packed form have been stripped of
* all leading and trailing zero digits (where a "digit" is of base NBASE).
* In particular, if the value is zero, there will be no digits at all!
* The weight is arbitrary in that case, but we normally set it to zero.
*/
struct NumericShort
{
uint16 n_header; /* Sign + display scale + weight */
NumericDigit n_data[FLEXIBLE_ARRAY_MEMBER]; /* Digits */
};
struct NumericLong
{
uint16 n_sign_dscale; /* Sign + display scale */
int16 n_weight; /* Weight of 1st digit */
NumericDigit n_data[FLEXIBLE_ARRAY_MEMBER]; /* Digits */
};
union NumericChoice
{
uint16 n_header; /* Header word */
struct NumericLong n_long; /* Long form (4-byte header) */
struct NumericShort n_short; /* Short form (2-byte header) */
};
struct NumericData
{
int32 vl_len_; /* varlena header (do not touch directly!) */
union NumericChoice choice; /* choice of format */
};
/*
* Interpretation of high bits.
*/
#define NUMERIC_SIGN_MASK 0xC000
#define NUMERIC_POS 0x0000
#define NUMERIC_NEG 0x4000
#define NUMERIC_SHORT 0x8000
#define NUMERIC_NAN 0xC000
#define NUMERIC_FLAGBITS(n) ((n)->choice.n_header & NUMERIC_SIGN_MASK)
#define NUMERIC_IS_NAN(n) (NUMERIC_FLAGBITS(n) == NUMERIC_NAN)
#define NUMERIC_IS_SHORT(n) (NUMERIC_FLAGBITS(n) == NUMERIC_SHORT)
#define NUMERIC_HDRSZ (VARHDRSZ + sizeof(uint16) + sizeof(int16))
#define NUMERIC_HDRSZ_SHORT (VARHDRSZ + sizeof(uint16))
/*
* If the flag bits are NUMERIC_SHORT or NUMERIC_NAN, we want the short header;
* otherwise, we want the long one. Instead of testing against each value, we
* can just look at the high bit, for a slight efficiency gain.
*/
#define NUMERIC_HEADER_IS_SHORT(n) (((n)->choice.n_header & 0x8000) != 0)
#define NUMERIC_HEADER_SIZE(n) \
(VARHDRSZ + sizeof(uint16) + \
(NUMERIC_HEADER_IS_SHORT(n) ? 0 : sizeof(int16)))
/*
* Short format definitions.
*/
#define NUMERIC_SHORT_SIGN_MASK 0x2000
#define NUMERIC_SHORT_DSCALE_MASK 0x1F80
#define NUMERIC_SHORT_DSCALE_SHIFT 7
#define NUMERIC_SHORT_DSCALE_MAX \
(NUMERIC_SHORT_DSCALE_MASK >> NUMERIC_SHORT_DSCALE_SHIFT)
#define NUMERIC_SHORT_WEIGHT_SIGN_MASK 0x0040
#define NUMERIC_SHORT_WEIGHT_MASK 0x003F
#define NUMERIC_SHORT_WEIGHT_MAX NUMERIC_SHORT_WEIGHT_MASK
#define NUMERIC_SHORT_WEIGHT_MIN (-(NUMERIC_SHORT_WEIGHT_MASK+1))
/*
* Extract sign, display scale, weight.
*/
#define NUMERIC_DSCALE_MASK 0x3FFF
#define NUMERIC_SIGN(n) \
(NUMERIC_IS_SHORT(n) ? \
(((n)->choice.n_short.n_header & NUMERIC_SHORT_SIGN_MASK) ? \
NUMERIC_NEG : NUMERIC_POS) : NUMERIC_FLAGBITS(n))
#define NUMERIC_DSCALE(n) (NUMERIC_HEADER_IS_SHORT((n)) ? \
((n)->choice.n_short.n_header & NUMERIC_SHORT_DSCALE_MASK) \
>> NUMERIC_SHORT_DSCALE_SHIFT \
: ((n)->choice.n_long.n_sign_dscale & NUMERIC_DSCALE_MASK))
#define NUMERIC_WEIGHT(n) (NUMERIC_HEADER_IS_SHORT((n)) ? \
(((n)->choice.n_short.n_header & NUMERIC_SHORT_WEIGHT_SIGN_MASK ? \
~NUMERIC_SHORT_WEIGHT_MASK : 0) \
| ((n)->choice.n_short.n_header & NUMERIC_SHORT_WEIGHT_MASK)) \
: ((n)->choice.n_long.n_weight))
/* ----------
* NumericVar is the format we use for arithmetic. The digit-array part
* is the same as the NumericData storage format, but the header is more
* complex.
*
* The value represented by a NumericVar is determined by the sign, weight,
* ndigits, and digits[] array.
*
* Note: the first digit of a NumericVar's value is assumed to be multiplied
* by NBASE ** weight. Another way to say it is that there are weight+1
* digits before the decimal point. It is possible to have weight < 0.
*
* buf points at the physical start of the palloc'd digit buffer for the
* NumericVar. digits points at the first digit in actual use (the one
* with the specified weight). We normally leave an unused digit or two
* (preset to zeroes) between buf and digits, so that there is room to store
* a carry out of the top digit without reallocating space. We just need to
* decrement digits (and increment weight) to make room for the carry digit.
* (There is no such extra space in a numeric value stored in the database,
* only in a NumericVar in memory.)
*
* If buf is NULL then the digit buffer isn't actually palloc'd and should
* not be freed --- see the constants below for an example.
*
* dscale, or display scale, is the nominal precision expressed as number
* of digits after the decimal point (it must always be >= 0 at present).
* dscale may be more than the number of physically stored fractional digits,
* implying that we have suppressed storage of significant trailing zeroes.
* It should never be less than the number of stored digits, since that would
* imply hiding digits that are present. NOTE that dscale is always expressed
* in *decimal* digits, and so it may correspond to a fractional number of
* base-NBASE digits --- divide by DEC_DIGITS to convert to NBASE digits.
*
* rscale, or result scale, is the target precision for a computation.
* Like dscale it is expressed as number of *decimal* digits after the decimal
* point, and is always >= 0 at present.
* Note that rscale is not stored in variables --- it's figured on-the-fly
* from the dscales of the inputs.
*
* While we consistently use "weight" to refer to the base-NBASE weight of
* a numeric value, it is convenient in some scale-related calculations to
* make use of the base-10 weight (ie, the approximate log10 of the value).
* To avoid confusion, such a decimal-units weight is called a "dweight".
*
* NB: All the variable-level functions are written in a style that makes it
* possible to give one and the same variable as argument and destination.
* This is feasible because the digit buffer is separate from the variable.
* ----------
*/
typedef struct NumericVar
{
int ndigits; /* # of digits in digits[] - can be 0! */
int weight; /* weight of first digit */
int sign; /* NUMERIC_POS, NUMERIC_NEG, or NUMERIC_NAN */
int dscale; /* display scale */
NumericDigit *buf; /* start of palloc'd space for digits[] */
NumericDigit *digits; /* base-NBASE digits */
} NumericVar;
/* ----------
* Data for generate_series
* ----------
*/
typedef struct
{
NumericVar current;
NumericVar stop;
NumericVar step;
} generate_series_numeric_fctx;
/* ----------
* Sort support.
* ----------
*/
typedef struct
{
void *buf; /* buffer for short varlenas */
int64 input_count; /* number of non-null values seen */
bool estimating; /* true if estimating cardinality */
hyperLogLogState abbr_card; /* cardinality estimator */
} NumericSortSupport;
/* ----------
* Fast sum accumulator.
*
* NumericSumAccum is used to implement SUM(), and other standard aggregates
* that track the sum of input values. It uses 32-bit integers to store the
* digits, instead of the normal 16-bit integers (with NBASE=10000). This
* way, we can safely accumulate up to NBASE - 1 values without propagating
* carry, before risking overflow of any of the digits. 'num_uncarried'
* tracks how many values have been accumulated without propagating carry.
*
* Positive and negative values are accumulated separately, in 'pos_digits'
* and 'neg_digits'. This is simpler and faster than deciding whether to add
* or subtract from the current value, for each new value (see sub_var() for
* the logic we avoid by doing this). Both buffers are of same size, and
* have the same weight and scale. In accum_sum_final(), the positive and
* negative sums are added together to produce the final result.
*
* When a new value has a larger ndigits or weight than the accumulator
* currently does, the accumulator is enlarged to accommodate the new value.
* We normally have one zero digit reserved for carry propagation, and that
* is indicated by the 'have_carry_space' flag. When accum_sum_carry() uses
* up the reserved digit, it clears the 'have_carry_space' flag. The next
* call to accum_sum_add() will enlarge the buffer, to make room for the
* extra digit, and set the flag again.
*
* To initialize a new accumulator, simply reset all fields to zeros.
*
* The accumulator does not handle NaNs.
* ----------
*/
typedef struct NumericSumAccum
{
int ndigits;
int weight;
int dscale;
int num_uncarried;
bool have_carry_space;
int32 *pos_digits;
int32 *neg_digits;
} NumericSumAccum;
/*
* We define our own macros for packing and unpacking abbreviated-key
* representations for numeric values in order to avoid depending on
* USE_FLOAT8_BYVAL. The type of abbreviation we use is based only on
* the size of a datum, not the argument-passing convention for float8.
*/
#define NUMERIC_ABBREV_BITS (SIZEOF_DATUM * BITS_PER_BYTE)
#if SIZEOF_DATUM == 8
#define NumericAbbrevGetDatum(X) ((Datum) SET_8_BYTES(X))
#define DatumGetNumericAbbrev(X) ((int64) GET_8_BYTES(X))
#define NUMERIC_ABBREV_NAN NumericAbbrevGetDatum(PG_INT64_MIN)
#else
#define NumericAbbrevGetDatum(X) ((Datum) SET_4_BYTES(X))
#define DatumGetNumericAbbrev(X) ((int32) GET_4_BYTES(X))
#define NUMERIC_ABBREV_NAN NumericAbbrevGetDatum(PG_INT32_MIN)
#endif
/* ----------
* Some preinitialized constants
* ----------
*/
static const NumericDigit const_zero_data[1] = {0};
static const NumericVar const_zero =
{0, 0, NUMERIC_POS, 0, NULL, (NumericDigit *) const_zero_data};
static const NumericDigit const_one_data[1] = {1};
static const NumericVar const_one =
{1, 0, NUMERIC_POS, 0, NULL, (NumericDigit *) const_one_data};
static const NumericDigit const_two_data[1] = {2};
static const NumericVar const_two =
{1, 0, NUMERIC_POS, 0, NULL, (NumericDigit *) const_two_data};
#if DEC_DIGITS == 4 || DEC_DIGITS == 2
static const NumericDigit const_ten_data[1] = {10};
static const NumericVar const_ten =
{1, 0, NUMERIC_POS, 0, NULL, (NumericDigit *) const_ten_data};
#elif DEC_DIGITS == 1
static const NumericDigit const_ten_data[1] = {1};
static const NumericVar const_ten =
{1, 1, NUMERIC_POS, 0, NULL, (NumericDigit *) const_ten_data};
#endif
#if DEC_DIGITS == 4
static const NumericDigit const_zero_point_five_data[1] = {5000};
#elif DEC_DIGITS == 2
static const NumericDigit const_zero_point_five_data[1] = {50};
#elif DEC_DIGITS == 1
static const NumericDigit const_zero_point_five_data[1] = {5};
#endif
static const NumericVar const_zero_point_five =
{1, -1, NUMERIC_POS, 1, NULL, (NumericDigit *) const_zero_point_five_data};
#if DEC_DIGITS == 4
static const NumericDigit const_zero_point_nine_data[1] = {9000};
#elif DEC_DIGITS == 2
static const NumericDigit const_zero_point_nine_data[1] = {90};
#elif DEC_DIGITS == 1
static const NumericDigit const_zero_point_nine_data[1] = {9};
#endif
static const NumericVar const_zero_point_nine =
{1, -1, NUMERIC_POS, 1, NULL, (NumericDigit *) const_zero_point_nine_data};
#if DEC_DIGITS == 4
static const NumericDigit const_one_point_one_data[2] = {1, 1000};
#elif DEC_DIGITS == 2
static const NumericDigit const_one_point_one_data[2] = {1, 10};
#elif DEC_DIGITS == 1
static const NumericDigit const_one_point_one_data[2] = {1, 1};
#endif
static const NumericVar const_one_point_one =
{2, 0, NUMERIC_POS, 1, NULL, (NumericDigit *) const_one_point_one_data};
static const NumericVar const_nan =
{0, 0, NUMERIC_NAN, 0, NULL, NULL};
#if DEC_DIGITS == 4
static const int round_powers[4] = {0, 1000, 100, 10};
#endif
/* ----------
* Local functions
* ----------
*/
#ifdef NUMERIC_DEBUG
static void dump_numeric(const char *str, Numeric num);
static void dump_var(const char *str, NumericVar *var);
#else
#define dump_numeric(s,n)
#define dump_var(s,v)
#endif
#define digitbuf_alloc(ndigits) \
((NumericDigit *) palloc((ndigits) * sizeof(NumericDigit)))
#define digitbuf_free(buf) \
do { \
if ((buf) != NULL) \
pfree(buf); \
} while (0)
#define init_var(v) MemSetAligned(v, 0, sizeof(NumericVar))
#define NUMERIC_DIGITS(num) (NUMERIC_HEADER_IS_SHORT(num) ? \
(num)->choice.n_short.n_data : (num)->choice.n_long.n_data)
#define NUMERIC_NDIGITS(num) \
((VARSIZE(num) - NUMERIC_HEADER_SIZE(num)) / sizeof(NumericDigit))
#define NUMERIC_CAN_BE_SHORT(scale,weight) \
((scale) <= NUMERIC_SHORT_DSCALE_MAX && \
(weight) <= NUMERIC_SHORT_WEIGHT_MAX && \
(weight) >= NUMERIC_SHORT_WEIGHT_MIN)
static void alloc_var(NumericVar *var, int ndigits);
static void free_var(NumericVar *var);
static void zero_var(NumericVar *var);
static const char *set_var_from_str(const char *str, const char *cp,
NumericVar *dest);
static void set_var_from_num(Numeric value, NumericVar *dest);
static void init_var_from_num(Numeric num, NumericVar *dest);
static void set_var_from_var(const NumericVar *value, NumericVar *dest);
static char *get_str_from_var(const NumericVar *var);
static char *get_str_from_var_sci(const NumericVar *var, int rscale);
static Numeric make_result(const NumericVar *var);
static void apply_typmod(NumericVar *var, int32 typmod);
static int32 numericvar_to_int32(const NumericVar *var);
static bool numericvar_to_int64(const NumericVar *var, int64 *result);
static void int64_to_numericvar(int64 val, NumericVar *var);
#ifdef HAVE_INT128
static bool numericvar_to_int128(const NumericVar *var, int128 *result);
static void int128_to_numericvar(int128 val, NumericVar *var);
#endif
static double numeric_to_double_no_overflow(Numeric num);
static double numericvar_to_double_no_overflow(const NumericVar *var);
static Datum numeric_abbrev_convert(Datum original_datum, SortSupport ssup);
static bool numeric_abbrev_abort(int memtupcount, SortSupport ssup);
static int numeric_fast_cmp(Datum x, Datum y, SortSupport ssup);
static int numeric_cmp_abbrev(Datum x, Datum y, SortSupport ssup);
static Datum numeric_abbrev_convert_var(const NumericVar *var,
NumericSortSupport *nss);
static int cmp_numerics(Numeric num1, Numeric num2);
static int cmp_var(const NumericVar *var1, const NumericVar *var2);
static int cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
int var1weight, int var1sign,
const NumericDigit *var2digits, int var2ndigits,
int var2weight, int var2sign);
static void add_var(const NumericVar *var1, const NumericVar *var2,
NumericVar *result);
static void sub_var(const NumericVar *var1, const NumericVar *var2,
NumericVar *result);
static void mul_var(const NumericVar *var1, const NumericVar *var2,
NumericVar *result,
int rscale);
static void div_var(const NumericVar *var1, const NumericVar *var2,
NumericVar *result,
int rscale, bool round);
static void div_var_fast(const NumericVar *var1, const NumericVar *var2,
NumericVar *result, int rscale, bool round);
static int select_div_scale(const NumericVar *var1, const NumericVar *var2);
static void mod_var(const NumericVar *var1, const NumericVar *var2,
NumericVar *result);
static void ceil_var(const NumericVar *var, NumericVar *result);
static void floor_var(const NumericVar *var, NumericVar *result);
static void sqrt_var(const NumericVar *arg, NumericVar *result, int rscale);
static void exp_var(const NumericVar *arg, NumericVar *result, int rscale);
static int estimate_ln_dweight(const NumericVar *var);
static void ln_var(const NumericVar *arg, NumericVar *result, int rscale);
static void log_var(const NumericVar *base, const NumericVar *num,
NumericVar *result);
static void power_var(const NumericVar *base, const NumericVar *exp,
NumericVar *result);
static void power_var_int(const NumericVar *base, int exp, NumericVar *result,
int rscale);
static int cmp_abs(const NumericVar *var1, const NumericVar *var2);
static int cmp_abs_common(const NumericDigit *var1digits, int var1ndigits,
int var1weight,
const NumericDigit *var2digits, int var2ndigits,
int var2weight);
static void add_abs(const NumericVar *var1, const NumericVar *var2,
NumericVar *result);
static void sub_abs(const NumericVar *var1, const NumericVar *var2,
NumericVar *result);
static void round_var(NumericVar *var, int rscale);
static void trunc_var(NumericVar *var, int rscale);
static void strip_var(NumericVar *var);
static void compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
const NumericVar *count_var, NumericVar *result_var);
static void accum_sum_add(NumericSumAccum *accum, const NumericVar *var1);
static void accum_sum_rescale(NumericSumAccum *accum, const NumericVar *val);
static void accum_sum_carry(NumericSumAccum *accum);
static void accum_sum_reset(NumericSumAccum *accum);
static void accum_sum_final(NumericSumAccum *accum, NumericVar *result);
static void accum_sum_copy(NumericSumAccum *dst, NumericSumAccum *src);
static void accum_sum_combine(NumericSumAccum *accum, NumericSumAccum *accum2);
/* ----------------------------------------------------------------------
*
* Input-, output- and rounding-functions
*
* ----------------------------------------------------------------------
*/
/*
* numeric_in() -
*
* Input function for numeric data type
*/
Datum
numeric_in(PG_FUNCTION_ARGS)
{
char *str = PG_GETARG_CSTRING(0);
#ifdef NOT_USED
Oid typelem = PG_GETARG_OID(1);
#endif
int32 typmod = PG_GETARG_INT32(2);
Numeric res;
const char *cp;
/* Skip leading spaces */
cp = str;
while (*cp)
{
if (!isspace((unsigned char) *cp))
break;
cp++;
}
/*
* Check for NaN
*/
if (pg_strncasecmp(cp, "NaN", 3) == 0)
{
res = make_result(&const_nan);
/* Should be nothing left but spaces */
cp += 3;
while (*cp)
{
if (!isspace((unsigned char) *cp))
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid input syntax for type %s: \"%s\"",
"numeric", str)));
cp++;
}
}
else
{
/*
* Use set_var_from_str() to parse a normal numeric value
*/
NumericVar value;
init_var(&value);
cp = set_var_from_str(str, cp, &value);
/*
* We duplicate a few lines of code here because we would like to
* throw any trailing-junk syntax error before any semantic error
* resulting from apply_typmod. We can't easily fold the two cases
* together because we mustn't apply apply_typmod to a NaN.
*/
while (*cp)
{
if (!isspace((unsigned char) *cp))
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid input syntax for type %s: \"%s\"",
"numeric", str)));
cp++;
}
apply_typmod(&value, typmod);
res = make_result(&value);
free_var(&value);
}
PG_RETURN_NUMERIC(res);
}
/*
* numeric_out() -
*
* Output function for numeric data type
*/
Datum
numeric_out(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
NumericVar x;
char *str;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num))
PG_RETURN_CSTRING(pstrdup("NaN"));
/*
* Get the number in the variable format.
*/
init_var_from_num(num, &x);
str = get_str_from_var(&x);
PG_RETURN_CSTRING(str);
}
/*
* numeric_is_nan() -
*
* Is Numeric value a NaN?
*/
bool
numeric_is_nan(Numeric num)
{
return NUMERIC_IS_NAN(num);
}
/*
* numeric_maximum_size() -
*
* Maximum size of a numeric with given typmod, or -1 if unlimited/unknown.
*/
int32
numeric_maximum_size(int32 typmod)
{
int precision;
int numeric_digits;
if (typmod < (int32) (VARHDRSZ))
return -1;
/* precision (ie, max # of digits) is in upper bits of typmod */
precision = ((typmod - VARHDRSZ) >> 16) & 0xffff;
/*
* This formula computes the maximum number of NumericDigits we could need
* in order to store the specified number of decimal digits. Because the
* weight is stored as a number of NumericDigits rather than a number of
* decimal digits, it's possible that the first NumericDigit will contain
* only a single decimal digit. Thus, the first two decimal digits can
* require two NumericDigits to store, but it isn't until we reach
* DEC_DIGITS + 2 decimal digits that we potentially need a third
* NumericDigit.
*/
numeric_digits = (precision + 2 * (DEC_DIGITS - 1)) / DEC_DIGITS;
/*
* In most cases, the size of a numeric will be smaller than the value
* computed below, because the varlena header will typically get toasted
* down to a single byte before being stored on disk, and it may also be
* possible to use a short numeric header. But our job here is to compute
* the worst case.
*/
return NUMERIC_HDRSZ + (numeric_digits * sizeof(NumericDigit));
}
/*
* numeric_out_sci() -
*
* Output function for numeric data type in scientific notation.
*/
char *
numeric_out_sci(Numeric num, int scale)
{
NumericVar x;
char *str;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num))
return pstrdup("NaN");
init_var_from_num(num, &x);
str = get_str_from_var_sci(&x, scale);
return str;
}
/*
* numeric_normalize() -
*
* Output function for numeric data type, suppressing insignificant trailing
* zeroes and then any trailing decimal point. The intent of this is to
* produce strings that are equal if and only if the input numeric values
* compare equal.
*/
char *
numeric_normalize(Numeric num)
{
NumericVar x;
char *str;
int last;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num))
return pstrdup("NaN");
init_var_from_num(num, &x);
str = get_str_from_var(&x);
/* If there's no decimal point, there's certainly nothing to remove. */
if (strchr(str, '.') != NULL)
{
/*
* Back up over trailing fractional zeroes. Since there is a decimal
* point, this loop will terminate safely.
*/
last = strlen(str) - 1;
while (str[last] == '0')
last--;
/* We want to get rid of the decimal point too, if it's now last. */
if (str[last] == '.')
last--;
/* Delete whatever we backed up over. */
str[last + 1] = '\0';
}
return str;
}
/*
* numeric_recv - converts external binary format to numeric
*
* External format is a sequence of int16's:
* ndigits, weight, sign, dscale, NumericDigits.
*/
Datum
numeric_recv(PG_FUNCTION_ARGS)
{
StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
#ifdef NOT_USED
Oid typelem = PG_GETARG_OID(1);
#endif
int32 typmod = PG_GETARG_INT32(2);
NumericVar value;
Numeric res;
int len,
i;
init_var(&value);
len = (uint16) pq_getmsgint(buf, sizeof(uint16));
alloc_var(&value, len);
value.weight = (int16) pq_getmsgint(buf, sizeof(int16));
/* we allow any int16 for weight --- OK? */
value.sign = (uint16) pq_getmsgint(buf, sizeof(uint16));
if (!(value.sign == NUMERIC_POS ||
value.sign == NUMERIC_NEG ||
value.sign == NUMERIC_NAN))
ereport(ERROR,
(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
errmsg("invalid sign in external \"numeric\" value")));
value.dscale = (uint16) pq_getmsgint(buf, sizeof(uint16));
if ((value.dscale & NUMERIC_DSCALE_MASK) != value.dscale)
ereport(ERROR,
(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
errmsg("invalid scale in external \"numeric\" value")));
for (i = 0; i < len; i++)
{
NumericDigit d = pq_getmsgint(buf, sizeof(NumericDigit));
if (d < 0 || d >= NBASE)
ereport(ERROR,
(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
errmsg("invalid digit in external \"numeric\" value")));
value.digits[i] = d;
}
/*
* If the given dscale would hide any digits, truncate those digits away.
* We could alternatively throw an error, but that would take a bunch of
* extra code (about as much as trunc_var involves), and it might cause
* client compatibility issues.
*/
trunc_var(&value, value.dscale);
apply_typmod(&value, typmod);
res = make_result(&value);
free_var(&value);
PG_RETURN_NUMERIC(res);
}
/*
* numeric_send - converts numeric to binary format
*/
Datum
numeric_send(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
NumericVar x;
StringInfoData buf;
int i;
init_var_from_num(num, &x);
pq_begintypsend(&buf);
pq_sendint16(&buf, x.ndigits);
pq_sendint16(&buf, x.weight);
pq_sendint16(&buf, x.sign);
pq_sendint16(&buf, x.dscale);
for (i = 0; i < x.ndigits; i++)
pq_sendint16(&buf, x.digits[i]);
PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
}
/*
* numeric_transform() -
*
* Flatten calls to numeric's length coercion function that solely represent
* increases in allowable precision. Scale changes mutate every datum, so
* they are unoptimizable. Some values, e.g. 1E-1001, can only fit into an
* unconstrained numeric, so a change from an unconstrained numeric to any
* constrained numeric is also unoptimizable.
*/
Datum
numeric_transform(PG_FUNCTION_ARGS)
{
FuncExpr *expr = castNode(FuncExpr, PG_GETARG_POINTER(0));
Node *ret = NULL;
Node *typmod;
Assert(list_length(expr->args) >= 2);
typmod = (Node *) lsecond(expr->args);
if (IsA(typmod, Const) &&!((Const *) typmod)->constisnull)
{
Node *source = (Node *) linitial(expr->args);
int32 old_typmod = exprTypmod(source);
int32 new_typmod = DatumGetInt32(((Const *) typmod)->constvalue);
int32 old_scale = (old_typmod - VARHDRSZ) & 0xffff;
int32 new_scale = (new_typmod - VARHDRSZ) & 0xffff;
int32 old_precision = (old_typmod - VARHDRSZ) >> 16 & 0xffff;
int32 new_precision = (new_typmod - VARHDRSZ) >> 16 & 0xffff;
/*
* If new_typmod < VARHDRSZ, the destination is unconstrained; that's
* always OK. If old_typmod >= VARHDRSZ, the source is constrained,
* and we're OK if the scale is unchanged and the precision is not
* decreasing. See further notes in function header comment.
*/
if (new_typmod < (int32) VARHDRSZ ||
(old_typmod >= (int32) VARHDRSZ &&
new_scale == old_scale && new_precision >= old_precision))
ret = relabel_to_typmod(source, new_typmod);
}
PG_RETURN_POINTER(ret);
}
/*
* numeric() -
*
* This is a special function called by the Postgres database system
* before a value is stored in a tuple's attribute. The precision and
* scale of the attribute have to be applied on the value.
*/
Datum
numeric (PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
int32 typmod = PG_GETARG_INT32(1);
Numeric new;
int32 tmp_typmod;
int precision;
int scale;
int ddigits;
int maxdigits;
NumericVar var;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num))
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
* If the value isn't a valid type modifier, simply return a copy of the
* input value
*/
if (typmod < (int32) (VARHDRSZ))
{
new = (Numeric) palloc(VARSIZE(num));
memcpy(new, num, VARSIZE(num));
PG_RETURN_NUMERIC(new);
}
/*
* Get the precision and scale out of the typmod value
*/
tmp_typmod = typmod - VARHDRSZ;
precision = (tmp_typmod >> 16) & 0xffff;
scale = tmp_typmod & 0xffff;
maxdigits = precision - scale;
/*
* If the number is certainly in bounds and due to the target scale no
* rounding could be necessary, just make a copy of the input and modify
* its scale fields, unless the larger scale forces us to abandon the
* short representation. (Note we assume the existing dscale is
* honest...)
*/
ddigits = (NUMERIC_WEIGHT(num) + 1) * DEC_DIGITS;
if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num)
&& (NUMERIC_CAN_BE_SHORT(scale, NUMERIC_WEIGHT(num))
|| !NUMERIC_IS_SHORT(num)))
{
new = (Numeric) palloc(VARSIZE(num));
memcpy(new, num, VARSIZE(num));
if (NUMERIC_IS_SHORT(num))
new->choice.n_short.n_header =
(num->choice.n_short.n_header & ~NUMERIC_SHORT_DSCALE_MASK)
| (scale << NUMERIC_SHORT_DSCALE_SHIFT);
else
new->choice.n_long.n_sign_dscale = NUMERIC_SIGN(new) |
((uint16) scale & NUMERIC_DSCALE_MASK);
PG_RETURN_NUMERIC(new);
}
/*
* We really need to fiddle with things - unpack the number into a
* variable and let apply_typmod() do it.
*/
init_var(&var);
set_var_from_num(num, &var);
apply_typmod(&var, typmod);
new = make_result(&var);
free_var(&var);
PG_RETURN_NUMERIC(new);
}
Datum
numerictypmodin(PG_FUNCTION_ARGS)
{
ArrayType *ta = PG_GETARG_ARRAYTYPE_P(0);
int32 *tl;
int n;
int32 typmod;
tl = ArrayGetIntegerTypmods(ta, &n);
if (n == 2)
{
if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("NUMERIC precision %d must be between 1 and %d",
tl[0], NUMERIC_MAX_PRECISION)));
if (tl[1] < 0 || tl[1] > tl[0])
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("NUMERIC scale %d must be between 0 and precision %d",
tl[1], tl[0])));
typmod = ((tl[0] << 16) | tl[1]) + VARHDRSZ;
}
else if (n == 1)
{
if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("NUMERIC precision %d must be between 1 and %d",
tl[0], NUMERIC_MAX_PRECISION)));
/* scale defaults to zero */
typmod = (tl[0] << 16) + VARHDRSZ;
}
else
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid NUMERIC type modifier")));
typmod = 0; /* keep compiler quiet */
}
PG_RETURN_INT32(typmod);
}
Datum
numerictypmodout(PG_FUNCTION_ARGS)
{
int32 typmod = PG_GETARG_INT32(0);
char *res = (char *) palloc(64);
if (typmod >= 0)
snprintf(res, 64, "(%d,%d)",
((typmod - VARHDRSZ) >> 16) & 0xffff,
(typmod - VARHDRSZ) & 0xffff);
else
*res = '\0';
PG_RETURN_CSTRING(res);
}
/* ----------------------------------------------------------------------
*
* Sign manipulation, rounding and the like
*
* ----------------------------------------------------------------------
*/
Datum
numeric_abs(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
Numeric res;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num))
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
* Do it the easy way directly on the packed format
*/
res = (Numeric) palloc(VARSIZE(num));
memcpy(res, num, VARSIZE(num));
if (NUMERIC_IS_SHORT(num))
res->choice.n_short.n_header =
num->choice.n_short.n_header & ~NUMERIC_SHORT_SIGN_MASK;
else
res->choice.n_long.n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
PG_RETURN_NUMERIC(res);
}
Datum
numeric_uminus(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
Numeric res;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num))
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
* Do it the easy way directly on the packed format
*/
res = (Numeric) palloc(VARSIZE(num));
memcpy(res, num, VARSIZE(num));
/*
* The packed format is known to be totally zero digit trimmed always. So
* we can identify a ZERO by the fact that there are no digits at all. Do
* nothing to a zero.
*/
if (NUMERIC_NDIGITS(num) != 0)
{
/* Else, flip the sign */
if (NUMERIC_IS_SHORT(num))
res->choice.n_short.n_header =
num->choice.n_short.n_header ^ NUMERIC_SHORT_SIGN_MASK;
else if (NUMERIC_SIGN(num) == NUMERIC_POS)
res->choice.n_long.n_sign_dscale =
NUMERIC_NEG | NUMERIC_DSCALE(num);
else
res->choice.n_long.n_sign_dscale =
NUMERIC_POS | NUMERIC_DSCALE(num);
}
PG_RETURN_NUMERIC(res);
}
Datum
numeric_uplus(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
Numeric res;
res = (Numeric) palloc(VARSIZE(num));
memcpy(res, num, VARSIZE(num));
PG_RETURN_NUMERIC(res);
}
/*
* numeric_sign() -
*
* returns -1 if the argument is less than 0, 0 if the argument is equal
* to 0, and 1 if the argument is greater than zero.
*/
Datum
numeric_sign(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
Numeric res;
NumericVar result;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num))
PG_RETURN_NUMERIC(make_result(&const_nan));
init_var(&result);
/*
* The packed format is known to be totally zero digit trimmed always. So
* we can identify a ZERO by the fact that there are no digits at all.
*/
if (NUMERIC_NDIGITS(num) == 0)
set_var_from_var(&const_zero, &result);
else
{
/*
* And if there are some, we return a copy of ONE with the sign of our
* argument
*/
set_var_from_var(&const_one, &result);
result.sign = NUMERIC_SIGN(num);
}
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
/*
* numeric_round() -
*
* Round a value to have 'scale' digits after the decimal point.
* We allow negative 'scale', implying rounding before the decimal
* point --- Oracle interprets rounding that way.
*/
Datum
numeric_round(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
int32 scale = PG_GETARG_INT32(1);
Numeric res;
NumericVar arg;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num))
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
* Limit the scale value to avoid possible overflow in calculations
*/
scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
/*
* Unpack the argument and round it at the proper digit position
*/
init_var(&arg);
set_var_from_num(num, &arg);
round_var(&arg, scale);
/* We don't allow negative output dscale */
if (scale < 0)
arg.dscale = 0;
/*
* Return the rounded result
*/
res = make_result(&arg);
free_var(&arg);
PG_RETURN_NUMERIC(res);
}
/*
* numeric_trunc() -
*
* Truncate a value to have 'scale' digits after the decimal point.
* We allow negative 'scale', implying a truncation before the decimal
* point --- Oracle interprets truncation that way.
*/
Datum
numeric_trunc(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
int32 scale = PG_GETARG_INT32(1);
Numeric res;
NumericVar arg;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num))
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
* Limit the scale value to avoid possible overflow in calculations
*/
scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
/*
* Unpack the argument and truncate it at the proper digit position
*/
init_var(&arg);
set_var_from_num(num, &arg);
trunc_var(&arg, scale);
/* We don't allow negative output dscale */
if (scale < 0)
arg.dscale = 0;
/*
* Return the truncated result
*/
res = make_result(&arg);
free_var(&arg);
PG_RETURN_NUMERIC(res);
}
/*
* numeric_ceil() -
*
* Return the smallest integer greater than or equal to the argument
*/
Datum
numeric_ceil(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
Numeric res;
NumericVar result;
if (NUMERIC_IS_NAN(num))
PG_RETURN_NUMERIC(make_result(&const_nan));
init_var_from_num(num, &result);
ceil_var(&result, &result);
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
/*
* numeric_floor() -
*
* Return the largest integer equal to or less than the argument
*/
Datum
numeric_floor(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
Numeric res;
NumericVar result;
if (NUMERIC_IS_NAN(num))
PG_RETURN_NUMERIC(make_result(&const_nan));
init_var_from_num(num, &result);
floor_var(&result, &result);
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
/*
* generate_series_numeric() -
*
* Generate series of numeric.
*/
Datum
generate_series_numeric(PG_FUNCTION_ARGS)
{
return generate_series_step_numeric(fcinfo);
}
Datum
generate_series_step_numeric(PG_FUNCTION_ARGS)
{
generate_series_numeric_fctx *fctx;
FuncCallContext *funcctx;
MemoryContext oldcontext;
if (SRF_IS_FIRSTCALL())
{
Numeric start_num = PG_GETARG_NUMERIC(0);
Numeric stop_num = PG_GETARG_NUMERIC(1);
NumericVar steploc = const_one;
/* handle NaN in start and stop values */
if (NUMERIC_IS_NAN(start_num))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("start value cannot be NaN")));
if (NUMERIC_IS_NAN(stop_num))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("stop value cannot be NaN")));
/* see if we were given an explicit step size */
if (PG_NARGS() == 3)
{
Numeric step_num = PG_GETARG_NUMERIC(2);
if (NUMERIC_IS_NAN(step_num))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("step size cannot be NaN")));
init_var_from_num(step_num, &steploc);
if (cmp_var(&steploc, &const_zero) == 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("step size cannot equal zero")));
}
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
/*
* Switch to memory context appropriate for multiple function calls.
*/
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* allocate memory for user context */
fctx = (generate_series_numeric_fctx *)
palloc(sizeof(generate_series_numeric_fctx));
/*
* Use fctx to keep state from call to call. Seed current with the
* original start value. We must copy the start_num and stop_num
* values rather than pointing to them, since we may have detoasted
* them in the per-call context.
*/
init_var(&fctx->current);
init_var(&fctx->stop);
init_var(&fctx->step);
set_var_from_num(start_num, &fctx->current);
set_var_from_num(stop_num, &fctx->stop);
set_var_from_var(&steploc, &fctx->step);
funcctx->user_fctx = fctx;
MemoryContextSwitchTo(oldcontext);
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
/*
* Get the saved state and use current state as the result of this
* iteration.
*/
fctx = funcctx->user_fctx;
if ((fctx->step.sign == NUMERIC_POS &&
cmp_var(&fctx->current, &fctx->stop) <= 0) ||
(fctx->step.sign == NUMERIC_NEG &&
cmp_var(&fctx->current, &fctx->stop) >= 0))
{
Numeric result = make_result(&fctx->current);
/* switch to memory context appropriate for iteration calculation */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* increment current in preparation for next iteration */
add_var(&fctx->current, &fctx->step, &fctx->current);
MemoryContextSwitchTo(oldcontext);
/* do when there is more left to send */
SRF_RETURN_NEXT(funcctx, NumericGetDatum(result));
}
else
/* do when there is no more left */
SRF_RETURN_DONE(funcctx);
}
/*
* Implements the numeric version of the width_bucket() function
* defined by SQL2003. See also width_bucket_float8().
*
* 'bound1' and 'bound2' are the lower and upper bounds of the
* histogram's range, respectively. 'count' is the number of buckets
* in the histogram. width_bucket() returns an integer indicating the
* bucket number that 'operand' belongs to in an equiwidth histogram
* with the specified characteristics. An operand smaller than the
* lower bound is assigned to bucket 0. An operand greater than the
* upper bound is assigned to an additional bucket (with number
* count+1). We don't allow "NaN" for any of the numeric arguments.
*/
Datum
width_bucket_numeric(PG_FUNCTION_ARGS)
{
Numeric operand = PG_GETARG_NUMERIC(0);
Numeric bound1 = PG_GETARG_NUMERIC(1);
Numeric bound2 = PG_GETARG_NUMERIC(2);
int32 count = PG_GETARG_INT32(3);
NumericVar count_var;
NumericVar result_var;
int32 result;
if (count <= 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
errmsg("count must be greater than zero")));
if (NUMERIC_IS_NAN(operand) ||
NUMERIC_IS_NAN(bound1) ||
NUMERIC_IS_NAN(bound2))
ereport(ERROR,
(errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
errmsg("operand, lower bound, and upper bound cannot be NaN")));
init_var(&result_var);
init_var(&count_var);
/* Convert 'count' to a numeric, for ease of use later */
int64_to_numericvar((int64) count, &count_var);
switch (cmp_numerics(bound1, bound2))
{
case 0:
ereport(ERROR,
(errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
errmsg("lower bound cannot equal upper bound")));
/* bound1 < bound2 */
case -1:
if (cmp_numerics(operand, bound1) < 0)
set_var_from_var(&const_zero, &result_var);
else if (cmp_numerics(operand, bound2) >= 0)
add_var(&count_var, &const_one, &result_var);
else
compute_bucket(operand, bound1, bound2,
&count_var, &result_var);
break;
/* bound1 > bound2 */
case 1:
if (cmp_numerics(operand, bound1) > 0)
set_var_from_var(&const_zero, &result_var);
else if (cmp_numerics(operand, bound2) <= 0)
add_var(&count_var, &const_one, &result_var);
else
compute_bucket(operand, bound1, bound2,
&count_var, &result_var);
break;
}
/* if result exceeds the range of a legal int4, we ereport here */
result = numericvar_to_int32(&result_var);
free_var(&count_var);
free_var(&result_var);
PG_RETURN_INT32(result);
}
/*
* If 'operand' is not outside the bucket range, determine the correct
* bucket for it to go. The calculations performed by this function
* are derived directly from the SQL2003 spec.
*/
static void
compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
const NumericVar *count_var, NumericVar *result_var)
{
NumericVar bound1_var;
NumericVar bound2_var;
NumericVar operand_var;
init_var_from_num(bound1, &bound1_var);
init_var_from_num(bound2, &bound2_var);
init_var_from_num(operand, &operand_var);
if (cmp_var(&bound1_var, &bound2_var) < 0)
{
sub_var(&operand_var, &bound1_var, &operand_var);
sub_var(&bound2_var, &bound1_var, &bound2_var);
div_var(&operand_var, &bound2_var, result_var,
select_div_scale(&operand_var, &bound2_var), true);
}
else
{
sub_var(&bound1_var, &operand_var, &operand_var);
sub_var(&bound1_var, &bound2_var, &bound1_var);
div_var(&operand_var, &bound1_var, result_var,
select_div_scale(&operand_var, &bound1_var), true);
}
mul_var(result_var, count_var, result_var,
result_var->dscale + count_var->dscale);
add_var(result_var, &const_one, result_var);
floor_var(result_var, result_var);
free_var(&bound1_var);
free_var(&bound2_var);
free_var(&operand_var);
}
/* ----------------------------------------------------------------------
*
* Comparison functions
*
* Note: btree indexes need these routines not to leak memory; therefore,
* be careful to free working copies of toasted datums. Most places don't
* need to be so careful.
*
* Sort support:
*
* We implement the sortsupport strategy routine in order to get the benefit of
* abbreviation. The ordinary numeric comparison can be quite slow as a result
* of palloc/pfree cycles (due to detoasting packed values for alignment);
* while this could be worked on itself, the abbreviation strategy gives more
* speedup in many common cases.
*
* Two different representations are used for the abbreviated form, one in
* int32 and one in int64, whichever fits into a by-value Datum. In both cases
* the representation is negated relative to the original value, because we use
* the largest negative value for NaN, which sorts higher than other values. We
* convert the absolute value of the numeric to a 31-bit or 63-bit positive
* value, and then negate it if the original number was positive.
*
* We abort the abbreviation process if the abbreviation cardinality is below
* 0.01% of the row count (1 per 10k non-null rows). The actual break-even
* point is somewhat below that, perhaps 1 per 30k (at 1 per 100k there's a
* very small penalty), but we don't want to build up too many abbreviated
* values before first testing for abort, so we take the slightly pessimistic
* number. We make no attempt to estimate the cardinality of the real values,
* since it plays no part in the cost model here (if the abbreviation is equal,
* the cost of comparing equal and unequal underlying values is comparable).
* We discontinue even checking for abort (saving us the hashing overhead) if
* the estimated cardinality gets to 100k; that would be enough to support many
* billions of rows while doing no worse than breaking even.
*
* ----------------------------------------------------------------------
*/
/*
* Sort support strategy routine.
*/
Datum
numeric_sortsupport(PG_FUNCTION_ARGS)
{
SortSupport ssup = (SortSupport) PG_GETARG_POINTER(0);
ssup->comparator = numeric_fast_cmp;
if (ssup->abbreviate)
{
NumericSortSupport *nss;
MemoryContext oldcontext = MemoryContextSwitchTo(ssup->ssup_cxt);
nss = palloc(sizeof(NumericSortSupport));
/*
* palloc a buffer for handling unaligned packed values in addition to
* the support struct
*/
nss->buf = palloc(VARATT_SHORT_MAX + VARHDRSZ + 1);
nss->input_count = 0;
nss->estimating = true;
initHyperLogLog(&nss->abbr_card, 10);
ssup->ssup_extra = nss;
ssup->abbrev_full_comparator = ssup->comparator;
ssup->comparator = numeric_cmp_abbrev;
ssup->abbrev_converter = numeric_abbrev_convert;
ssup->abbrev_abort = numeric_abbrev_abort;
MemoryContextSwitchTo(oldcontext);
}
PG_RETURN_VOID();
}
/*
* Abbreviate a numeric datum, handling NaNs and detoasting
* (must not leak memory!)
*/
static Datum
numeric_abbrev_convert(Datum original_datum, SortSupport ssup)
{
NumericSortSupport *nss = ssup->ssup_extra;
void *original_varatt = PG_DETOAST_DATUM_PACKED(original_datum);
Numeric value;
Datum result;
nss->input_count += 1;
/*
* This is to handle packed datums without needing a palloc/pfree cycle;
* we keep and reuse a buffer large enough to handle any short datum.
*/
if (VARATT_IS_SHORT(original_varatt))
{
void *buf = nss->buf;
Size sz = VARSIZE_SHORT(original_varatt) - VARHDRSZ_SHORT;
Assert(sz <= VARATT_SHORT_MAX - VARHDRSZ_SHORT);
SET_VARSIZE(buf, VARHDRSZ + sz);
memcpy(VARDATA(buf), VARDATA_SHORT(original_varatt), sz);
value = (Numeric) buf;
}
else
value = (Numeric) original_varatt;
if (NUMERIC_IS_NAN(value))
{
result = NUMERIC_ABBREV_NAN;
}
else
{
NumericVar var;
init_var_from_num(value, &var);
result = numeric_abbrev_convert_var(&var, nss);
}
/* should happen only for external/compressed toasts */
if ((Pointer) original_varatt != DatumGetPointer(original_datum))
pfree(original_varatt);
return result;
}
/*
* Consider whether to abort abbreviation.
*
* We pay no attention to the cardinality of the non-abbreviated data. There is
* no reason to do so: unlike text, we have no fast check for equal values, so
* we pay the full overhead whenever the abbreviations are equal regardless of
* whether the underlying values are also equal.
*/
static bool
numeric_abbrev_abort(int memtupcount, SortSupport ssup)
{
NumericSortSupport *nss = ssup->ssup_extra;
double abbr_card;
if (memtupcount < 10000 || nss->input_count < 10000 || !nss->estimating)
return false;
abbr_card = estimateHyperLogLog(&nss->abbr_card);
/*
* If we have >100k distinct values, then even if we were sorting many
* billion rows we'd likely still break even, and the penalty of undoing
* that many rows of abbrevs would probably not be worth it. Stop even
* counting at that point.
*/
if (abbr_card > 100000.0)
{
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG,
"numeric_abbrev: estimation ends at cardinality %f"
" after " INT64_FORMAT " values (%d rows)",
abbr_card, nss->input_count, memtupcount);
#endif
nss->estimating = false;
return false;
}
/*
* Target minimum cardinality is 1 per ~10k of non-null inputs. (The
* break even point is somewhere between one per 100k rows, where
* abbreviation has a very slight penalty, and 1 per 10k where it wins by
* a measurable percentage.) We use the relatively pessimistic 10k
* threshold, and add a 0.5 row fudge factor, because it allows us to
* abort earlier on genuinely pathological data where we've had exactly
* one abbreviated value in the first 10k (non-null) rows.
*/
if (abbr_card < nss->input_count / 10000.0 + 0.5)
{
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG,
"numeric_abbrev: aborting abbreviation at cardinality %f"
" below threshold %f after " INT64_FORMAT " values (%d rows)",
abbr_card, nss->input_count / 10000.0 + 0.5,
nss->input_count, memtupcount);
#endif
return true;
}
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG,
"numeric_abbrev: cardinality %f"
" after " INT64_FORMAT " values (%d rows)",
abbr_card, nss->input_count, memtupcount);
#endif
return false;
}
/*
* Non-fmgr interface to the comparison routine to allow sortsupport to elide
* the fmgr call. The saving here is small given how slow numeric comparisons
* are, but it is a required part of the sort support API when abbreviations
* are performed.
*
* Two palloc/pfree cycles could be saved here by using persistent buffers for
* aligning short-varlena inputs, but this has not so far been considered to
* be worth the effort.
*/
static int
numeric_fast_cmp(Datum x, Datum y, SortSupport ssup)
{
Numeric nx = DatumGetNumeric(x);
Numeric ny = DatumGetNumeric(y);
int result;
result = cmp_numerics(nx, ny);
if ((Pointer) nx != DatumGetPointer(x))
pfree(nx);
if ((Pointer) ny != DatumGetPointer(y))
pfree(ny);
return result;
}
/*
* Compare abbreviations of values. (Abbreviations may be equal where the true
* values differ, but if the abbreviations differ, they must reflect the
* ordering of the true values.)
*/
static int
numeric_cmp_abbrev(Datum x, Datum y, SortSupport ssup)
{
/*
* NOTE WELL: this is intentionally backwards, because the abbreviation is
* negated relative to the original value, to handle NaN.
*/
if (DatumGetNumericAbbrev(x) < DatumGetNumericAbbrev(y))
return 1;
if (DatumGetNumericAbbrev(x) > DatumGetNumericAbbrev(y))
return -1;
return 0;
}
/*
* Abbreviate a NumericVar according to the available bit size.
*
* The 31-bit value is constructed as:
*
* 0 + 7bits digit weight + 24 bits digit value
*
* where the digit weight is in single decimal digits, not digit words, and
* stored in excess-44 representation[1]. The 24-bit digit value is the 7 most
* significant decimal digits of the value converted to binary. Values whose
* weights would fall outside the representable range are rounded off to zero
* (which is also used to represent actual zeros) or to 0x7FFFFFFF (which
* otherwise cannot occur). Abbreviation therefore fails to gain any advantage
* where values are outside the range 10^-44 to 10^83, which is not considered
* to be a serious limitation, or when values are of the same magnitude and
* equal in the first 7 decimal digits, which is considered to be an
* unavoidable limitation given the available bits. (Stealing three more bits
* to compare another digit would narrow the range of representable weights by
* a factor of 8, which starts to look like a real limiting factor.)
*
* (The value 44 for the excess is essentially arbitrary)
*
* The 63-bit value is constructed as:
*
* 0 + 7bits weight + 4 x 14-bit packed digit words
*
* The weight in this case is again stored in excess-44, but this time it is
* the original weight in digit words (i.e. powers of 10000). The first four
* digit words of the value (if present; trailing zeros are assumed as needed)
* are packed into 14 bits each to form the rest of the value. Again,
* out-of-range values are rounded off to 0 or 0x7FFFFFFFFFFFFFFF. The
* representable range in this case is 10^-176 to 10^332, which is considered
* to be good enough for all practical purposes, and comparison of 4 words
* means that at least 13 decimal digits are compared, which is considered to
* be a reasonable compromise between effectiveness and efficiency in computing
* the abbreviation.
*
* (The value 44 for the excess is even more arbitrary here, it was chosen just
* to match the value used in the 31-bit case)
*
* [1] - Excess-k representation means that the value is offset by adding 'k'
* and then treated as unsigned, so the smallest representable value is stored
* with all bits zero. This allows simple comparisons to work on the composite
* value.
*/
#if NUMERIC_ABBREV_BITS == 64
static Datum
numeric_abbrev_convert_var(const NumericVar *var, NumericSortSupport *nss)
{
int ndigits = var->ndigits;
int weight = var->weight;
int64 result;
if (ndigits == 0 || weight < -44)
{
result = 0;
}
else if (weight > 83)
{
result = PG_INT64_MAX;
}
else
{
result = ((int64) (weight + 44) << 56);
switch (ndigits)
{
default:
result |= ((int64) var->digits[3]);
/* FALLTHROUGH */
case 3:
result |= ((int64) var->digits[2]) << 14;
/* FALLTHROUGH */
case 2:
result |= ((int64) var->digits[1]) << 28;
/* FALLTHROUGH */
case 1:
result |= ((int64) var->digits[0]) << 42;
break;
}
}
/* the abbrev is negated relative to the original */
if (var->sign == NUMERIC_POS)
result = -result;
if (nss->estimating)
{
uint32 tmp = ((uint32) result
^ (uint32) ((uint64) result >> 32));
addHyperLogLog(&nss->abbr_card, DatumGetUInt32(hash_uint32(tmp)));
}
return NumericAbbrevGetDatum(result);
}
#endif /* NUMERIC_ABBREV_BITS == 64 */
#if NUMERIC_ABBREV_BITS == 32
static Datum
numeric_abbrev_convert_var(const NumericVar *var, NumericSortSupport *nss)
{
int ndigits = var->ndigits;
int weight = var->weight;
int32 result;
if (ndigits == 0 || weight < -11)
{
result = 0;
}
else if (weight > 20)
{
result = PG_INT32_MAX;
}
else
{
NumericDigit nxt1 = (ndigits > 1) ? var->digits[1] : 0;
weight = (weight + 11) * 4;
result = var->digits[0];
/*
* "result" now has 1 to 4 nonzero decimal digits. We pack in more
* digits to make 7 in total (largest we can fit in 24 bits)
*/
if (result > 999)
{
/* already have 4 digits, add 3 more */
result = (result * 1000) + (nxt1 / 10);
weight += 3;
}
else if (result > 99)
{
/* already have 3 digits, add 4 more */
result = (result * 10000) + nxt1;
weight += 2;
}
else if (result > 9)
{
NumericDigit nxt2 = (ndigits > 2) ? var->digits[2] : 0;
/* already have 2 digits, add 5 more */
result = (result * 100000) + (nxt1 * 10) + (nxt2 / 1000);
weight += 1;
}
else
{
NumericDigit nxt2 = (ndigits > 2) ? var->digits[2] : 0;
/* already have 1 digit, add 6 more */
result = (result * 1000000) + (nxt1 * 100) + (nxt2 / 100);
}
result = result | (weight << 24);
}
/* the abbrev is negated relative to the original */
if (var->sign == NUMERIC_POS)
result = -result;
if (nss->estimating)
{
uint32 tmp = (uint32) result;
addHyperLogLog(&nss->abbr_card, DatumGetUInt32(hash_uint32(tmp)));
}
return NumericAbbrevGetDatum(result);
}
#endif /* NUMERIC_ABBREV_BITS == 32 */
/*
* Ordinary (non-sortsupport) comparisons follow.
*/
Datum
numeric_cmp(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
int result;
result = cmp_numerics(num1, num2);
PG_FREE_IF_COPY(num1, 0);
PG_FREE_IF_COPY(num2, 1);
PG_RETURN_INT32(result);
}
Datum
numeric_eq(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
bool result;
result = cmp_numerics(num1, num2) == 0;
PG_FREE_IF_COPY(num1, 0);
PG_FREE_IF_COPY(num2, 1);
PG_RETURN_BOOL(result);
}
Datum
numeric_ne(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
bool result;
result = cmp_numerics(num1, num2) != 0;
PG_FREE_IF_COPY(num1, 0);
PG_FREE_IF_COPY(num2, 1);
PG_RETURN_BOOL(result);
}
Datum
numeric_gt(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
bool result;
result = cmp_numerics(num1, num2) > 0;
PG_FREE_IF_COPY(num1, 0);
PG_FREE_IF_COPY(num2, 1);
PG_RETURN_BOOL(result);
}
Datum
numeric_ge(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
bool result;
result = cmp_numerics(num1, num2) >= 0;
PG_FREE_IF_COPY(num1, 0);
PG_FREE_IF_COPY(num2, 1);
PG_RETURN_BOOL(result);
}
Datum
numeric_lt(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
bool result;
result = cmp_numerics(num1, num2) < 0;
PG_FREE_IF_COPY(num1, 0);
PG_FREE_IF_COPY(num2, 1);
PG_RETURN_BOOL(result);
}
Datum
numeric_le(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
bool result;
result = cmp_numerics(num1, num2) <= 0;
PG_FREE_IF_COPY(num1, 0);
PG_FREE_IF_COPY(num2, 1);
PG_RETURN_BOOL(result);
}
static int
cmp_numerics(Numeric num1, Numeric num2)
{
int result;
/*
* We consider all NANs to be equal and larger than any non-NAN. This is
* somewhat arbitrary; the important thing is to have a consistent sort
* order.
*/
if (NUMERIC_IS_NAN(num1))
{
if (NUMERIC_IS_NAN(num2))
result = 0; /* NAN = NAN */
else
result = 1; /* NAN > non-NAN */
}
else if (NUMERIC_IS_NAN(num2))
{
result = -1; /* non-NAN < NAN */
}
else
{
result = cmp_var_common(NUMERIC_DIGITS(num1), NUMERIC_NDIGITS(num1),
NUMERIC_WEIGHT(num1), NUMERIC_SIGN(num1),
NUMERIC_DIGITS(num2), NUMERIC_NDIGITS(num2),
NUMERIC_WEIGHT(num2), NUMERIC_SIGN(num2));
}
return result;
}
Datum
hash_numeric(PG_FUNCTION_ARGS)
{
Numeric key = PG_GETARG_NUMERIC(0);
Datum digit_hash;
Datum result;
int weight;
int start_offset;
int end_offset;
int i;
int hash_len;
NumericDigit *digits;
/* If it's NaN, don't try to hash the rest of the fields */
if (NUMERIC_IS_NAN(key))
PG_RETURN_UINT32(0);
weight = NUMERIC_WEIGHT(key);
start_offset = 0;
end_offset = 0;
/*
* Omit any leading or trailing zeros from the input to the hash. The
* numeric implementation *should* guarantee that leading and trailing
* zeros are suppressed, but we're paranoid. Note that we measure the
* starting and ending offsets in units of NumericDigits, not bytes.
*/
digits = NUMERIC_DIGITS(key);
for (i = 0; i < NUMERIC_NDIGITS(key); i++)
{
if (digits[i] != (NumericDigit) 0)
break;
start_offset++;
/*
* The weight is effectively the # of digits before the decimal point,
* so decrement it for each leading zero we skip.
*/
weight--;
}
/*
* If there are no non-zero digits, then the value of the number is zero,
* regardless of any other fields.
*/
if (NUMERIC_NDIGITS(key) == start_offset)
PG_RETURN_UINT32(-1);
for (i = NUMERIC_NDIGITS(key) - 1; i >= 0; i--)
{
if (digits[i] != (NumericDigit) 0)
break;
end_offset++;
}
/* If we get here, there should be at least one non-zero digit */
Assert(start_offset + end_offset < NUMERIC_NDIGITS(key));
/*
* Note that we don't hash on the Numeric's scale, since two numerics can
* compare equal but have different scales. We also don't hash on the
* sign, although we could: since a sign difference implies inequality,
* this shouldn't affect correctness.
*/
hash_len = NUMERIC_NDIGITS(key) - start_offset - end_offset;
digit_hash = hash_any((unsigned char *) (NUMERIC_DIGITS(key) + start_offset),
hash_len * sizeof(NumericDigit));
/* Mix in the weight, via XOR */
result = digit_hash ^ weight;
PG_RETURN_DATUM(result);
}
/*
* Returns 64-bit value by hashing a value to a 64-bit value, with a seed.
* Otherwise, similar to hash_numeric.
*/
Datum
hash_numeric_extended(PG_FUNCTION_ARGS)
{
Numeric key = PG_GETARG_NUMERIC(0);
uint64 seed = PG_GETARG_INT64(1);
Datum digit_hash;
Datum result;
int weight;
int start_offset;
int end_offset;
int i;
int hash_len;
NumericDigit *digits;
if (NUMERIC_IS_NAN(key))
PG_RETURN_UINT64(seed);
weight = NUMERIC_WEIGHT(key);
start_offset = 0;
end_offset = 0;
digits = NUMERIC_DIGITS(key);
for (i = 0; i < NUMERIC_NDIGITS(key); i++)
{
if (digits[i] != (NumericDigit) 0)
break;
start_offset++;
weight--;
}
if (NUMERIC_NDIGITS(key) == start_offset)
PG_RETURN_UINT64(seed - 1);
for (i = NUMERIC_NDIGITS(key) - 1; i >= 0; i--)
{
if (digits[i] != (NumericDigit) 0)
break;
end_offset++;
}
Assert(start_offset + end_offset < NUMERIC_NDIGITS(key));
hash_len = NUMERIC_NDIGITS(key) - start_offset - end_offset;
digit_hash = hash_any_extended((unsigned char *) (NUMERIC_DIGITS(key)
+ start_offset),
hash_len * sizeof(NumericDigit),
seed);
result = UInt64GetDatum(DatumGetUInt64(digit_hash) ^ weight);
PG_RETURN_DATUM(result);
}
/* ----------------------------------------------------------------------
*
* Basic arithmetic functions
*
* ----------------------------------------------------------------------
*/
/*
* numeric_add() -
*
* Add two numerics
*/
Datum
numeric_add(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
NumericVar arg1;
NumericVar arg2;
NumericVar result;
Numeric res;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
* Unpack the values, let add_var() compute the result and return it.
*/
init_var_from_num(num1, &arg1);
init_var_from_num(num2, &arg2);
init_var(&result);
add_var(&arg1, &arg2, &result);
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
/*
* numeric_sub() -
*
* Subtract one numeric from another
*/
Datum
numeric_sub(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
NumericVar arg1;
NumericVar arg2;
NumericVar result;
Numeric res;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
* Unpack the values, let sub_var() compute the result and return it.
*/
init_var_from_num(num1, &arg1);
init_var_from_num(num2, &arg2);
init_var(&result);
sub_var(&arg1, &arg2, &result);
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
/*
* numeric_mul() -
*
* Calculate the product of two numerics
*/
Datum
numeric_mul(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
NumericVar arg1;
NumericVar arg2;
NumericVar result;
Numeric res;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
* Unpack the values, let mul_var() compute the result and return it.
* Unlike add_var() and sub_var(), mul_var() will round its result. In the
* case of numeric_mul(), which is invoked for the * operator on numerics,
* we request exact representation for the product (rscale = sum(dscale of
* arg1, dscale of arg2)).
*/
init_var_from_num(num1, &arg1);
init_var_from_num(num2, &arg2);
init_var(&result);
mul_var(&arg1, &arg2, &result, arg1.dscale + arg2.dscale);
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
/*
* numeric_div() -
*
* Divide one numeric into another
*/
Datum
numeric_div(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
NumericVar arg1;
NumericVar arg2;
NumericVar result;
Numeric res;
int rscale;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
* Unpack the arguments
*/
init_var_from_num(num1, &arg1);
init_var_from_num(num2, &arg2);
init_var(&result);
/*
* Select scale for division result
*/
rscale = select_div_scale(&arg1, &arg2);
/*
* Do the divide and return the result
*/
div_var(&arg1, &arg2, &result, rscale, true);
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
/*
* numeric_div_trunc() -
*
* Divide one numeric into another, truncating the result to an integer
*/
Datum
numeric_div_trunc(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
NumericVar arg1;
NumericVar arg2;
NumericVar result;
Numeric res;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
* Unpack the arguments
*/
init_var_from_num(num1, &arg1);
init_var_from_num(num2, &arg2);
init_var(&result);
/*
* Do the divide and return the result
*/
div_var(&arg1, &arg2, &result, 0, false);
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
/*
* numeric_mod() -
*
* Calculate the modulo of two numerics
*/
Datum
numeric_mod(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
Numeric res;
NumericVar arg1;
NumericVar arg2;
NumericVar result;
if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
PG_RETURN_NUMERIC(make_result(&const_nan));
init_var_from_num(num1, &arg1);
init_var_from_num(num2, &arg2);
init_var(&result);
mod_var(&arg1, &arg2, &result);
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
/*
* numeric_inc() -
*
* Increment a number by one
*/
Datum
numeric_inc(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
NumericVar arg;
Numeric res;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num))
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
* Compute the result and return it
*/
init_var_from_num(num, &arg);
add_var(&arg, &const_one, &arg);
res = make_result(&arg);
free_var(&arg);
PG_RETURN_NUMERIC(res);
}
/*
* numeric_smaller() -
*
* Return the smaller of two numbers
*/
Datum
numeric_smaller(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
/*
* Use cmp_numerics so that this will agree with the comparison operators,
* particularly as regards comparisons involving NaN.
*/
if (cmp_numerics(num1, num2) < 0)
PG_RETURN_NUMERIC(num1);
else
PG_RETURN_NUMERIC(num2);
}
/*
* numeric_larger() -
*
* Return the larger of two numbers
*/
Datum
numeric_larger(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
/*
* Use cmp_numerics so that this will agree with the comparison operators,
* particularly as regards comparisons involving NaN.
*/
if (cmp_numerics(num1, num2) > 0)
PG_RETURN_NUMERIC(num1);
else
PG_RETURN_NUMERIC(num2);
}
/* ----------------------------------------------------------------------
*
* Advanced math functions
*
* ----------------------------------------------------------------------
*/
/*
* numeric_fac()
*
* Compute factorial
*/
Datum
numeric_fac(PG_FUNCTION_ARGS)
{
int64 num = PG_GETARG_INT64(0);
Numeric res;
NumericVar fact;
NumericVar result;
if (num <= 1)
{
res = make_result(&const_one);
PG_RETURN_NUMERIC(res);
}
/* Fail immediately if the result would overflow */
if (num > 32177)
ereport(ERROR,
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
errmsg("value overflows numeric format")));
init_var(&fact);
init_var(&result);
int64_to_numericvar(num, &result);
for (num = num - 1; num > 1; num--)
{
/* this loop can take awhile, so allow it to be interrupted */
CHECK_FOR_INTERRUPTS();
int64_to_numericvar(num, &fact);
mul_var(&result, &fact, &result, 0);
}
res = make_result(&result);
free_var(&fact);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
/*
* numeric_sqrt() -
*
* Compute the square root of a numeric.
*/
Datum
numeric_sqrt(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
Numeric res;
NumericVar arg;
NumericVar result;
int sweight;
int rscale;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num))
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
* Unpack the argument and determine the result scale. We choose a scale
* to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
* case not less than the input's dscale.
*/
init_var_from_num(num, &arg);
init_var(&result);
/* Assume the input was normalized, so arg.weight is accurate */
sweight = (arg.weight + 1) * DEC_DIGITS / 2 - 1;
rscale = NUMERIC_MIN_SIG_DIGITS - sweight;
rscale = Max(rscale, arg.dscale);
rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
/*
* Let sqrt_var() do the calculation and return the result.
*/
sqrt_var(&arg, &result, rscale);
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
/*
* numeric_exp() -
*
* Raise e to the power of x
*/
Datum
numeric_exp(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
Numeric res;
NumericVar arg;
NumericVar result;
int rscale;
double val;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num))
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
* Unpack the argument and determine the result scale. We choose a scale
* to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
* case not less than the input's dscale.
*/
init_var_from_num(num, &arg);
init_var(&result);
/* convert input to float8, ignoring overflow */
val = numericvar_to_double_no_overflow(&arg);
/*
* log10(result) = num * log10(e), so this is approximately the decimal
* weight of the result:
*/
val *= 0.434294481903252;
/* limit to something that won't cause integer overflow */
val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
val = Min(val, NUMERIC_MAX_RESULT_SCALE);
rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
rscale = Max(rscale, arg.dscale);
rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
/*
* Let exp_var() do the calculation and return the result.
*/
exp_var(&arg, &result, rscale);
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
/*
* numeric_ln() -
*
* Compute the natural logarithm of x
*/
Datum
numeric_ln(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
Numeric res;
NumericVar arg;
NumericVar result;
int ln_dweight;
int rscale;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num))
PG_RETURN_NUMERIC(make_result(&const_nan));
init_var_from_num(num, &arg);
init_var(&result);
/* Estimated dweight of logarithm */
ln_dweight = estimate_ln_dweight(&arg);
rscale = NUMERIC_MIN_SIG_DIGITS - ln_dweight;
rscale = Max(rscale, arg.dscale);
rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
ln_var(&arg, &result, rscale);
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
/*
* numeric_log() -
*
* Compute the logarithm of x in a given base
*/
Datum
numeric_log(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
Numeric res;
NumericVar arg1;
NumericVar arg2;
NumericVar result;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
* Initialize things
*/
init_var_from_num(num1, &arg1);
init_var_from_num(num2, &arg2);
init_var(&result);
/*
* Call log_var() to compute and return the result; note it handles scale
* selection itself.
*/
log_var(&arg1, &arg2, &result);
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
/*
* numeric_power() -
*
* Raise b to the power of x
*/
Datum
numeric_power(PG_FUNCTION_ARGS)
{
Numeric num1 = PG_GETARG_NUMERIC(0);
Numeric num2 = PG_GETARG_NUMERIC(1);
Numeric res;
NumericVar arg1;
NumericVar arg2;
NumericVar arg2_trunc;
NumericVar result;
/*
* Handle NaN
*/
if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
PG_RETURN_NUMERIC(make_result(&const_nan));
/*
* Initialize things
*/
init_var(&arg2_trunc);
init_var(&result);
init_var_from_num(num1, &arg1);
init_var_from_num(num2, &arg2);
set_var_from_var(&arg2, &arg2_trunc);
trunc_var(&arg2_trunc, 0);
/*
* The SQL spec requires that we emit a particular SQLSTATE error code for
* certain error conditions. Specifically, we don't return a
* divide-by-zero error code for 0 ^ -1.
*/
if (cmp_var(&arg1, &const_zero) == 0 &&
cmp_var(&arg2, &const_zero) < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
errmsg("zero raised to a negative power is undefined")));
if (cmp_var(&arg1, &const_zero) < 0 &&
cmp_var(&arg2, &arg2_trunc) != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
errmsg("a negative number raised to a non-integer power yields a complex result")));
/*
* Call power_var() to compute and return the result; note it handles
* scale selection itself.
*/
power_var(&arg1, &arg2, &result);
res = make_result(&result);
free_var(&result);
free_var(&arg2_trunc);
PG_RETURN_NUMERIC(res);
}
/*
* numeric_scale() -
*
* Returns the scale, i.e. the count of decimal digits in the fractional part
*/
Datum
numeric_scale(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
if (NUMERIC_IS_NAN(num))
PG_RETURN_NULL();
PG_RETURN_INT32(NUMERIC_DSCALE(num));
}
/* ----------------------------------------------------------------------
*
* Type conversion functions
*
* ----------------------------------------------------------------------
*/
Datum
int4_numeric(PG_FUNCTION_ARGS)
{
int32 val = PG_GETARG_INT32(0);
Numeric res;
NumericVar result;
init_var(&result);
int64_to_numericvar((int64) val, &result);
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
Datum
numeric_int4(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
NumericVar x;
int32 result;
/* XXX would it be better to return NULL? */
if (NUMERIC_IS_NAN(num))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot convert NaN to integer")));
/* Convert to variable format, then convert to int4 */
init_var_from_num(num, &x);
result = numericvar_to_int32(&x);
PG_RETURN_INT32(result);
}
/*
* Given a NumericVar, convert it to an int32. If the NumericVar
* exceeds the range of an int32, raise the appropriate error via
* ereport(). The input NumericVar is *not* free'd.
*/
static int32
numericvar_to_int32(const NumericVar *var)
{
int32 result;
int64 val;
if (!numericvar_to_int64(var, &val))
ereport(ERROR,
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
errmsg("integer out of range")));
/* Down-convert to int4 */
result = (int32) val;
/* Test for overflow by reverse-conversion. */
if ((int64) result != val)
ereport(ERROR,
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
errmsg("integer out of range")));
return result;
}
Datum
int8_numeric(PG_FUNCTION_ARGS)
{
int64 val = PG_GETARG_INT64(0);
Numeric res;
NumericVar result;
init_var(&result);
int64_to_numericvar(val, &result);
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
Datum
numeric_int8(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
NumericVar x;
int64 result;
/* XXX would it be better to return NULL? */
if (NUMERIC_IS_NAN(num))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot convert NaN to bigint")));
/* Convert to variable format and thence to int8 */
init_var_from_num(num, &x);
if (!numericvar_to_int64(&x, &result))
ereport(ERROR,
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
errmsg("bigint out of range")));
PG_RETURN_INT64(result);
}
Datum
int2_numeric(PG_FUNCTION_ARGS)
{
int16 val = PG_GETARG_INT16(0);
Numeric res;
NumericVar result;
init_var(&result);
int64_to_numericvar((int64) val, &result);
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
Datum
numeric_int2(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
NumericVar x;
int64 val;
int16 result;
/* XXX would it be better to return NULL? */
if (NUMERIC_IS_NAN(num))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot convert NaN to smallint")));
/* Convert to variable format and thence to int8 */
init_var_from_num(num, &x);
if (!numericvar_to_int64(&x, &val))
ereport(ERROR,
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
errmsg("smallint out of range")));
/* Down-convert to int2 */
result = (int16) val;
/* Test for overflow by reverse-conversion. */
if ((int64) result != val)
ereport(ERROR,
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
errmsg("smallint out of range")));
PG_RETURN_INT16(result);
}
Datum
float8_numeric(PG_FUNCTION_ARGS)
{
float8 val = PG_GETARG_FLOAT8(0);
Numeric res;
NumericVar result;
char buf[DBL_DIG + 100];
if (isnan(val))
PG_RETURN_NUMERIC(make_result(&const_nan));
if (isinf(val))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot convert infinity to numeric")));
snprintf(buf, sizeof(buf), "%.*g", DBL_DIG, val);
init_var(&result);
/* Assume we need not worry about leading/trailing spaces */
(void) set_var_from_str(buf, buf, &result);
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
Datum
numeric_float8(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
char *tmp;
Datum result;
if (NUMERIC_IS_NAN(num))
PG_RETURN_FLOAT8(get_float8_nan());
tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
NumericGetDatum(num)));
result = DirectFunctionCall1(float8in, CStringGetDatum(tmp));
pfree(tmp);
PG_RETURN_DATUM(result);
}
/*
* Convert numeric to float8; if out of range, return +/- HUGE_VAL
*
* (internal helper function, not directly callable from SQL)
*/
Datum
numeric_float8_no_overflow(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
double val;
if (NUMERIC_IS_NAN(num))
PG_RETURN_FLOAT8(get_float8_nan());
val = numeric_to_double_no_overflow(num);
PG_RETURN_FLOAT8(val);
}
Datum
float4_numeric(PG_FUNCTION_ARGS)
{
float4 val = PG_GETARG_FLOAT4(0);
Numeric res;
NumericVar result;
char buf[FLT_DIG + 100];
if (isnan(val))
PG_RETURN_NUMERIC(make_result(&const_nan));
if (isinf(val))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot convert infinity to numeric")));
snprintf(buf, sizeof(buf), "%.*g", FLT_DIG, val);
init_var(&result);
/* Assume we need not worry about leading/trailing spaces */
(void) set_var_from_str(buf, buf, &result);
res = make_result(&result);
free_var(&result);
PG_RETURN_NUMERIC(res);
}
Datum
numeric_float4(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
char *tmp;
Datum result;
if (NUMERIC_IS_NAN(num))
PG_RETURN_FLOAT4(get_float4_nan());
tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
NumericGetDatum(num)));
result = DirectFunctionCall1(float4in, CStringGetDatum(tmp));
pfree(tmp);
PG_RETURN_DATUM(result);
}
/* ----------------------------------------------------------------------
*
* Aggregate functions
*
* The transition datatype for all these aggregates is declared as INTERNAL.
* Actually, it's a pointer to a NumericAggState allocated in the aggregate
* context. The digit buffers for the NumericVars will be there too.
*
* On platforms which support 128-bit integers some aggregates instead use a
* 128-bit integer based transition datatype to speed up calculations.
*
* ----------------------------------------------------------------------
*/
typedef struct NumericAggState
{
bool calcSumX2; /* if true, calculate sumX2 */
MemoryContext agg_context; /* context we're calculating in */
int64 N; /* count of processed numbers */
NumericSumAccum sumX; /* sum of processed numbers */
NumericSumAccum sumX2; /* sum of squares of processed numbers */
int maxScale; /* maximum scale seen so far */
int64 maxScaleCount; /* number of values seen with maximum scale */
int64 NaNcount; /* count of NaN values (not included in N!) */
} NumericAggState;
/*
* Prepare state data for a numeric aggregate function that needs to compute
* sum, count and optionally sum of squares of the input.
*/
static NumericAggState *
makeNumericAggState(FunctionCallInfo fcinfo, bool calcSumX2)
{
NumericAggState *state;
MemoryContext agg_context;
MemoryContext old_context;
if (!AggCheckCallContext(fcinfo, &agg_context))
elog(ERROR, "aggregate function called in non-aggregate context");
old_context = MemoryContextSwitchTo(agg_context);
state = (NumericAggState *) palloc0(sizeof(NumericAggState));
state->calcSumX2 = calcSumX2;
state->agg_context = agg_context;
MemoryContextSwitchTo(old_context);
return state;
}
/*
* Like makeNumericAggState(), but allocate the state in the current memory
* context.
*/
static NumericAggState *
makeNumericAggStateCurrentContext(bool calcSumX2)
{
NumericAggState *state;
state = (NumericAggState *) palloc0(sizeof(NumericAggState));
state->calcSumX2 = calcSumX2;
state->agg_context = CurrentMemoryContext;
return state;
}
/*
* Accumulate a new input value for numeric aggregate functions.
*/
static void
do_numeric_accum(NumericAggState *state, Numeric newval)
{
NumericVar X;
NumericVar X2;
MemoryContext old_context;
/* Count NaN inputs separately from all else */
if (NUMERIC_IS_NAN(newval))
{
state->NaNcount++;
return;
}
/* load processed number in short-lived context */
init_var_from_num(newval, &X);
/*
* Track the highest input dscale that we've seen, to support inverse
* transitions (see do_numeric_discard).
*/
if (X.dscale > state->maxScale)
{
state->maxScale = X.dscale;
state->maxScaleCount = 1;
}
else if (X.dscale == state->maxScale)
state->maxScaleCount++;
/* if we need X^2, calculate that in short-lived context */
if (state->calcSumX2)
{
init_var(&X2);
mul_var(&X, &X, &X2, X.dscale * 2);
}
/* The rest of this needs to work in the aggregate context */
old_context = MemoryContextSwitchTo(state->agg_context);
state->N++;
/* Accumulate sums */
accum_sum_add(&(state->sumX), &X);
if (state->calcSumX2)
accum_sum_add(&(state->sumX2), &X2);
MemoryContextSwitchTo(old_context);
}
/*
* Attempt to remove an input value from the aggregated state.
*
* If the value cannot be removed then the function will return false; the
* possible reasons for failing are described below.
*
* If we aggregate the values 1.01 and 2 then the result will be 3.01.
* If we are then asked to un-aggregate the 1.01 then we must fail as we
* won't be able to tell what the new aggregated value's dscale should be.
* We don't want to return 2.00 (dscale = 2), since the sum's dscale would
* have been zero if we'd really aggregated only 2.
*
* Note: alternatively, we could count the number of inputs with each possible
* dscale (up to some sane limit). Not yet clear if it's worth the trouble.
*/
static bool
do_numeric_discard(NumericAggState *state, Numeric newval)
{
NumericVar X;
NumericVar X2;
MemoryContext old_context;
/* Count NaN inputs separately from all else */
if (NUMERIC_IS_NAN(newval))
{
state->NaNcount--;
return true;
}
/* load processed number in short-lived context */
init_var_from_num(newval, &X);
/*
* state->sumX's dscale is the maximum dscale of any of the inputs.
* Removing the last input with that dscale would require us to recompute
* the maximum dscale of the *remaining* inputs, which we cannot do unless
* no more non-NaN inputs remain at all. So we report a failure instead,
* and force the aggregation to be redone from scratch.
*/
if (X.dscale == state->maxScale)
{
if (state->maxScaleCount > 1 || state->maxScale == 0)
{
/*
* Some remaining inputs have same dscale, or dscale hasn't gotten
* above zero anyway
*/
state->maxScaleCount--;
}
else if (state->N == 1)
{
/* No remaining non-NaN inputs at all, so reset maxScale */
state->maxScale = 0;
state->maxScaleCount = 0;
}
else
{
/* Correct new maxScale is uncertain, must fail */
return false;
}
}
/* if we need X^2, calculate that in short-lived context */
if (state->calcSumX2)
{
init_var(&X2);
mul_var(&X, &X, &X2, X.dscale * 2);
}
/* The rest of this needs to work in the aggregate context */
old_context = MemoryContextSwitchTo(state->agg_context);
if (state->N-- > 1)
{
/* Negate X, to subtract it from the sum */
X.sign = (X.sign == NUMERIC_POS ? NUMERIC_NEG : NUMERIC_POS);
accum_sum_add(&(state->sumX), &X);
if (state->calcSumX2)
{
/* Negate X^2. X^2 is always positive */
X2.sign = NUMERIC_NEG;
accum_sum_add(&(state->sumX2), &X2);
}
}
else
{
/* Zero the sums */
Assert(state->N == 0);
accum_sum_reset(&state->sumX);
if (state->calcSumX2)
accum_sum_reset(&state->sumX2);
}
MemoryContextSwitchTo(old_context);
return true;
}
/*
* Generic transition function for numeric aggregates that require sumX2.
*/
Datum
numeric_accum(PG_FUNCTION_ARGS)
{
NumericAggState *state;
state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
/* Create the state data on the first call */
if (state == NULL)
state = makeNumericAggState(fcinfo, true);
if (!PG_ARGISNULL(1))
do_numeric_accum(state, PG_GETARG_NUMERIC(1));
PG_RETURN_POINTER(state);
}
/*
* Generic combine function for numeric aggregates which require sumX2
*/
Datum
numeric_combine(PG_FUNCTION_ARGS)
{
NumericAggState *state1;
NumericAggState *state2;
MemoryContext agg_context;
MemoryContext old_context;
if (!AggCheckCallContext(fcinfo, &agg_context))
elog(ERROR, "aggregate function called in non-aggregate context");
state1 = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
state2 = PG_ARGISNULL(1) ? NULL : (NumericAggState *) PG_GETARG_POINTER(1);
if (state2 == NULL)
PG_RETURN_POINTER(state1);
/* manually copy all fields from state2 to state1 */
if (state1 == NULL)
{
old_context = MemoryContextSwitchTo(agg_context);
state1 = makeNumericAggStateCurrentContext(true);
state1->N = state2->N;
state1->NaNcount = state2->NaNcount;
state1->maxScale = state2->maxScale;
state1->maxScaleCount = state2->maxScaleCount;
accum_sum_copy(&state1->sumX, &state2->sumX);
accum_sum_copy(&state1->sumX2, &state2->sumX2);
MemoryContextSwitchTo(old_context);
PG_RETURN_POINTER(state1);
}
if (state2->N > 0)
{
state1->N += state2->N;
state1->NaNcount += state2->NaNcount;
/*
* These are currently only needed for moving aggregates, but let's do
* the right thing anyway...
*/
if (state2->maxScale > state1->maxScale)
{
state1->maxScale = state2->maxScale;
state1->maxScaleCount = state2->maxScaleCount;
}
else if (state2->maxScale == state1->maxScale)
state1->maxScaleCount += state2->maxScaleCount;
/* The rest of this needs to work in the aggregate context */
old_context = MemoryContextSwitchTo(agg_context);
/* Accumulate sums */
accum_sum_combine(&state1->sumX, &state2->sumX);
accum_sum_combine(&state1->sumX2, &state2->sumX2);
MemoryContextSwitchTo(old_context);
}
PG_RETURN_POINTER(state1);
}
/*
* Generic transition function for numeric aggregates that don't require sumX2.
*/
Datum
numeric_avg_accum(PG_FUNCTION_ARGS)
{
NumericAggState *state;
state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
/* Create the state data on the first call */
if (state == NULL)
state = makeNumericAggState(fcinfo, false);
if (!PG_ARGISNULL(1))
do_numeric_accum(state, PG_GETARG_NUMERIC(1));
PG_RETURN_POINTER(state);
}
/*
* Combine function for numeric aggregates which don't require sumX2
*/
Datum
numeric_avg_combine(PG_FUNCTION_ARGS)
{
NumericAggState *state1;
NumericAggState *state2;
MemoryContext agg_context;
MemoryContext old_context;
if (!AggCheckCallContext(fcinfo, &agg_context))
elog(ERROR, "aggregate function called in non-aggregate context");
state1 = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
state2 = PG_ARGISNULL(1) ? NULL : (NumericAggState *) PG_GETARG_POINTER(1);
if (state2 == NULL)
PG_RETURN_POINTER(state1);
/* manually copy all fields from state2 to state1 */
if (state1 == NULL)
{
old_context = MemoryContextSwitchTo(agg_context);
state1 = makeNumericAggStateCurrentContext(false);
state1->N = state2->N;
state1->NaNcount = state2->NaNcount;
state1->maxScale = state2->maxScale;
state1->maxScaleCount = state2->maxScaleCount;
accum_sum_copy(&state1->sumX, &state2->sumX);
MemoryContextSwitchTo(old_context);
PG_RETURN_POINTER(state1);
}
if (state2->N > 0)
{
state1->N += state2->N;
state1->NaNcount += state2->NaNcount;
/*
* These are currently only needed for moving aggregates, but let's do
* the right thing anyway...
*/
if (state2->maxScale > state1->maxScale)
{
state1->maxScale = state2->maxScale;
state1->maxScaleCount = state2->maxScaleCount;
}
else if (state2->maxScale == state1->maxScale)
state1->maxScaleCount += state2->maxScaleCount;
/* The rest of this needs to work in the aggregate context */
old_context = MemoryContextSwitchTo(agg_context);
/* Accumulate sums */
accum_sum_combine(&state1->sumX, &state2->sumX);
MemoryContextSwitchTo(old_context);
}
PG_RETURN_POINTER(state1);
}
/*
* numeric_avg_serialize
* Serialize NumericAggState for numeric aggregates that don't require
* sumX2.
*/
Datum
numeric_avg_serialize(PG_FUNCTION_ARGS)
{
NumericAggState *state;
StringInfoData buf;
Datum temp;
bytea *sumX;
bytea *result;
NumericVar tmp_var;
/* Ensure we disallow calling when not in aggregate context */
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
state = (NumericAggState *) PG_GETARG_POINTER(0);
/*
* This is a little wasteful since make_result converts the NumericVar
* into a Numeric and numeric_send converts it back again. Is it worth
* splitting the tasks in numeric_send into separate functions to stop
* this? Doing so would also remove the fmgr call overhead.
*/
init_var(&tmp_var);
accum_sum_final(&state->sumX, &tmp_var);
temp = DirectFunctionCall1(numeric_send,
NumericGetDatum(make_result(&tmp_var)));
sumX = DatumGetByteaPP(temp);
free_var(&tmp_var);
pq_begintypsend(&buf);
/* N */
pq_sendint64(&buf, state->N);
/* sumX */
pq_sendbytes(&buf, VARDATA_ANY(sumX), VARSIZE_ANY_EXHDR(sumX));
/* maxScale */
pq_sendint32(&buf, state->maxScale);
/* maxScaleCount */
pq_sendint64(&buf, state->maxScaleCount);
/* NaNcount */
pq_sendint64(&buf, state->NaNcount);
result = pq_endtypsend(&buf);
PG_RETURN_BYTEA_P(result);
}
/*
* numeric_avg_deserialize
* Deserialize bytea into NumericAggState for numeric aggregates that
* don't require sumX2.
*/
Datum
numeric_avg_deserialize(PG_FUNCTION_ARGS)
{
bytea *sstate;
NumericAggState *result;
Datum temp;
NumericVar tmp_var;
StringInfoData buf;
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
sstate = PG_GETARG_BYTEA_PP(0);
/*
* Copy the bytea into a StringInfo so that we can "receive" it using the
* standard recv-function infrastructure.
*/
initStringInfo(&buf);
appendBinaryStringInfo(&buf,
VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate));
result = makeNumericAggStateCurrentContext(false);
/* N */
result->N = pq_getmsgint64(&buf);
/* sumX */
temp = DirectFunctionCall3(numeric_recv,
PointerGetDatum(&buf),
InvalidOid,
-1);
init_var_from_num(DatumGetNumeric(temp), &tmp_var);
accum_sum_add(&(result->sumX), &tmp_var);
/* maxScale */
result->maxScale = pq_getmsgint(&buf, 4);
/* maxScaleCount */
result->maxScaleCount = pq_getmsgint64(&buf);
/* NaNcount */
result->NaNcount = pq_getmsgint64(&buf);
pq_getmsgend(&buf);
pfree(buf.data);
PG_RETURN_POINTER(result);
}
/*
* numeric_serialize
* Serialization function for NumericAggState for numeric aggregates that
* require sumX2.
*/
Datum
numeric_serialize(PG_FUNCTION_ARGS)
{
NumericAggState *state;
StringInfoData buf;
Datum temp;
bytea *sumX;
NumericVar tmp_var;
bytea *sumX2;
bytea *result;
/* Ensure we disallow calling when not in aggregate context */
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
state = (NumericAggState *) PG_GETARG_POINTER(0);
/*
* This is a little wasteful since make_result converts the NumericVar
* into a Numeric and numeric_send converts it back again. Is it worth
* splitting the tasks in numeric_send into separate functions to stop
* this? Doing so would also remove the fmgr call overhead.
*/
init_var(&tmp_var);
accum_sum_final(&state->sumX, &tmp_var);
temp = DirectFunctionCall1(numeric_send,
NumericGetDatum(make_result(&tmp_var)));
sumX = DatumGetByteaPP(temp);
accum_sum_final(&state->sumX2, &tmp_var);
temp = DirectFunctionCall1(numeric_send,
NumericGetDatum(make_result(&tmp_var)));
sumX2 = DatumGetByteaPP(temp);
free_var(&tmp_var);
pq_begintypsend(&buf);
/* N */
pq_sendint64(&buf, state->N);
/* sumX */
pq_sendbytes(&buf, VARDATA_ANY(sumX), VARSIZE_ANY_EXHDR(sumX));
/* sumX2 */
pq_sendbytes(&buf, VARDATA_ANY(sumX2), VARSIZE_ANY_EXHDR(sumX2));
/* maxScale */
pq_sendint32(&buf, state->maxScale);
/* maxScaleCount */
pq_sendint64(&buf, state->maxScaleCount);
/* NaNcount */
pq_sendint64(&buf, state->NaNcount);
result = pq_endtypsend(&buf);
PG_RETURN_BYTEA_P(result);
}
/*
* numeric_deserialize
* Deserialization function for NumericAggState for numeric aggregates that
* require sumX2.
*/
Datum
numeric_deserialize(PG_FUNCTION_ARGS)
{
bytea *sstate;
NumericAggState *result;
Datum temp;
NumericVar sumX_var;
NumericVar sumX2_var;
StringInfoData buf;
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
sstate = PG_GETARG_BYTEA_PP(0);
/*
* Copy the bytea into a StringInfo so that we can "receive" it using the
* standard recv-function infrastructure.
*/
initStringInfo(&buf);
appendBinaryStringInfo(&buf,
VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate));
result = makeNumericAggStateCurrentContext(false);
/* N */
result->N = pq_getmsgint64(&buf);
/* sumX */
temp = DirectFunctionCall3(numeric_recv,
PointerGetDatum(&buf),
InvalidOid,
-1);
init_var_from_num(DatumGetNumeric(temp), &sumX_var);
accum_sum_add(&(result->sumX), &sumX_var);
/* sumX2 */
temp = DirectFunctionCall3(numeric_recv,
PointerGetDatum(&buf),
InvalidOid,
-1);
init_var_from_num(DatumGetNumeric(temp), &sumX2_var);
accum_sum_add(&(result->sumX2), &sumX2_var);
/* maxScale */
result->maxScale = pq_getmsgint(&buf, 4);
/* maxScaleCount */
result->maxScaleCount = pq_getmsgint64(&buf);
/* NaNcount */
result->NaNcount = pq_getmsgint64(&buf);
pq_getmsgend(&buf);
pfree(buf.data);
PG_RETURN_POINTER(result);
}