Skip to content

Commit

Permalink
Implement aggregation functions: max(), min(), sum(), and stddev()
Browse files Browse the repository at this point in the history
The min() and max() functions require initialization of their initial
value.  That is, data for min() is initialized with the highest
possible value and data for max() is initialized with the lowest
possible value.  This initialization happens in dt_aggregate_go().

The stddev() function records the number of values, the sum of values,
and the 128-bit sum of the squares of each value.  This information is
processed by the consumer to determine the standard deviation value.

Signed-off-by: David Mc Lean <david.mclean@oracle.com>
Signed-off-by: Kris Van Hees <kris.van.hees@oracle.com>
Reviewed-by: Kris Van Hees <kris.van.hees@oracle.com>
  • Loading branch information
dpmclean authored and kvanhees committed Dec 10, 2020
1 parent c7f41f0 commit 80c97f8
Show file tree
Hide file tree
Showing 2 changed files with 272 additions and 0 deletions.
56 changes: 56 additions & 0 deletions libdtrace/dt_aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -954,12 +954,49 @@ dt_aggregate_bundlecmp(const void *lhs, const void *rhs)
}
}

/*
* Callback for initializing the min() and max() aggregation functions.
* The minimum 64 bit value is set for max() and the maximum 64 bit value is
* set for min(), so that any other value fed to the functions will register
* properly.
*
* The first value in the buffer is used as a flag to indicate whether an
* initial value was stored in the buffer.
*/
static int
init_minmax(dt_idhash_t *dhp, dt_ident_t *aid, char *buf)
{
dt_ident_t *fid = aid->di_iarg;
int64_t *ptr;
int64_t value;

assert(aid->di_kind == DT_IDENT_AGG);
assert(fid);

if (fid->di_id == DT_AGG_MIN)
value = INT64_MAX;
else if (fid->di_id == DT_AGG_MAX)
value = INT64_MIN;
else
return 0;

/* Indicate that we are setting initial values. */
*(int64_t *)buf = 1;

ptr = (int64_t *)(buf + sizeof(int64_t) + aid->di_offset);
ptr[0] = value;
ptr[1] = value;

return 0;
}

int
dt_aggregate_go(dtrace_hdl_t *dtp)
{
dt_aggregate_t *agp = &dtp->dt_aggregate;
dt_ahash_t *agh = &agp->dtat_hash;
int aggsz, i;
uint32_t key = 0;

/* If there are no aggregations there is nothing to do. */
aggsz = dt_idhash_datasize(dtp->dt_aggs);
Expand Down Expand Up @@ -1003,6 +1040,25 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
return dt_set_errno(dtp, EDT_NOMEM);
}

/* Initialize the starting values for min() and max() aggregations. */
dt_idhash_iter(dtp->dt_aggs, (dt_idhash_f *) init_minmax,
agp->dtat_buf);
if (*(int64_t *)agp->dtat_buf != 0) {
*(int64_t *)agp->dtat_buf = 0; /* clear the flag */

for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
int cpu = dtp->dt_conf.cpus[i].cpu_id;

/* Data for CPU 0 was populated, so skip it. */
if (cpu == 0)
continue;

memcpy(agp->dtat_cpu_buf[cpu], agp->dtat_buf, aggsz);
}

dt_bpf_map_update(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
}

return 0;
}

Expand Down
216 changes: 216 additions & 0 deletions libdtrace/dt_cg.c
Original file line number Diff line number Diff line change
Expand Up @@ -3824,13 +3824,69 @@ dt_cg_agg_lquantize(dt_pcb_t *pcb, dt_ident_t *aid, dt_node_t *dnp,
TRACE_REGSET(" AggLq : End ");
}

static void
dt_cg_agg_max_impl(dt_irlist_t *dlp, dt_regset_t *drp, int dreg, int vreg)
{
int treg; /* temporary value register */
uint_t Lmax = dt_irlist_label(dlp);

TRACE_REGSET(" Impl: Begin");

if ((treg = dt_regset_alloc(drp)) == -1)
longjmp(yypcb->pcb_jmpbuf, EDT_NOREG);
/* %treg = *(%dreg) */
emit(dlp, BPF_LOAD(BPF_DW, treg, dreg, 0));
/* if (%treg >= %vreg) goto Lmax */
emit(dlp, BPF_BRANCH_REG(BPF_JSGE, treg, vreg, Lmax));
/* *(%dreg) = %vreg */
emit(dlp, BPF_STORE(BPF_DW, dreg, 0, vreg));
emitl(dlp, Lmax,
BPF_NOP());
dt_regset_free(drp, treg);

TRACE_REGSET(" Impl: End ");
}

static void
dt_cg_agg_max(dt_pcb_t *pcb, dt_ident_t *aid, dt_node_t *dnp,
dt_irlist_t *dlp, dt_regset_t *drp)
{
int sz = 1 * sizeof(uint64_t);

DT_CG_AGG_SET_STORAGE(aid, sz);

TRACE_REGSET(" AggMax: Begin");

dt_cg_node(dnp->dn_aggfun->dn_args, dlp, drp);
DT_CG_AGG_IMPL(aid, sz, dlp, drp, dt_cg_agg_max_impl,
dnp->dn_aggfun->dn_args->dn_reg);
dt_regset_free(drp, dnp->dn_aggfun->dn_args->dn_reg);

TRACE_REGSET(" AggMax: End ");

}

static void
dt_cg_agg_min_impl(dt_irlist_t *dlp, dt_regset_t *drp, int dreg, int vreg)
{
int treg; /* temporary value register */
uint_t Lmin = dt_irlist_label(dlp);

TRACE_REGSET(" Impl: Begin");

if ((treg = dt_regset_alloc(drp)) == -1)
longjmp(yypcb->pcb_jmpbuf, EDT_NOREG);
/* %treg = *(%dreg) */
emit(dlp, BPF_LOAD(BPF_DW, treg, dreg, 0));
/* if (%treg <= %vreg) goto Lmin */
emit(dlp, BPF_BRANCH_REG(BPF_JSLE, treg, vreg, Lmin));
/* *(%dreg) = %vreg */
emit(dlp, BPF_STORE(BPF_DW, dreg, 0, vreg));
emitl(dlp, Lmin,
BPF_NOP());
dt_regset_free(drp, treg);

TRACE_REGSET(" Impl: End ");
}

static void
Expand All @@ -3840,6 +3896,15 @@ dt_cg_agg_min(dt_pcb_t *pcb, dt_ident_t *aid, dt_node_t *dnp,
int sz = 1 * sizeof(uint64_t);

DT_CG_AGG_SET_STORAGE(aid, sz);

TRACE_REGSET(" AggMin: Begin");

dt_cg_node(dnp->dn_aggfun->dn_args, dlp, drp);
DT_CG_AGG_IMPL(aid, sz, dlp, drp, dt_cg_agg_min_impl,
dnp->dn_aggfun->dn_args->dn_reg);
dt_regset_free(drp, dnp->dn_aggfun->dn_args->dn_reg);

TRACE_REGSET(" AggMin: End ");
}

static void
Expand Down Expand Up @@ -3899,13 +3964,155 @@ dt_cg_agg_quantize(dt_pcb_t *pcb, dt_ident_t *aid, dt_node_t *dnp,
TRACE_REGSET(" AggQ : End ");
}

static void
dt_cg_agg_stddev_impl(dt_irlist_t *dlp, dt_regset_t *drp, int dreg, int vreg,
uint64_t hreg, uint64_t lreg)
{
int onereg, lowreg;
uint_t Lncr = dt_irlist_label(dlp);

TRACE_REGSET(" Impl: Begin");

/* Get some registers to work with */
if ((onereg = dt_regset_alloc(drp)) == -1 ||
(lowreg = dt_regset_alloc(drp)) == -1)
longjmp(yypcb->pcb_jmpbuf, EDT_NOREG);

/* Set onereg to a value of 1 for repeated use */
emit(dlp, BPF_MOV_IMM(onereg, 1));

/* dst[0]++; increment count of calls eq. value count */
emit(dlp, BPF_XADD_REG(BPF_DW, dreg, 0, onereg));

/* dst[1] += val; add value to the total sum */
emit(dlp, BPF_XADD_REG(BPF_DW, dreg, 8, vreg));

/* Add double-dst[2-3] += sq(val) starting here */

/* Load the lower half (64 bits) of the previous established value */
emit(dlp, BPF_LOAD(BPF_DW, lowreg, dreg, 16));

/* Add low part of the value; *lowreg += *lreg */
emit(dlp, BPF_ALU64_REG(BPF_ADD, lowreg, lreg));

/* Handle the overflow/carry case; overflow if lowreg < lreg */
emit(dlp, BPF_BRANCH_REG(BPF_JLE, lreg, lowreg, Lncr));
emit(dlp, BPF_XADD_REG(BPF_DW, dreg, 24, onereg)) /* Carry */;
emitl(dlp, Lncr,
BPF_STORE(BPF_DW, dreg, 16, lowreg));

emit(dlp, BPF_XADD_REG(BPF_DW, dreg, 24, hreg)); /* Add higher half */

dt_regset_free(drp, onereg);
dt_regset_free(drp, lowreg);

TRACE_REGSET(" Impl: End ");
}


static void
dt_cg_agg_stddev(dt_pcb_t *pcb, dt_ident_t *aid, dt_node_t *dnp,
dt_irlist_t *dlp, dt_regset_t *drp)
{
int sz = 4 * sizeof(uint64_t);
uint64_t hi_reg, lowreg, midreg, lmdreg;

/* labels */
uint_t Lpos = dt_irlist_label(dlp);
uint_t Lncy = dt_irlist_label(dlp);

DT_CG_AGG_SET_STORAGE(aid, sz);

TRACE_REGSET(" AggSDv: Begin");

dt_cg_node(dnp->dn_aggfun->dn_args, dlp, drp);

/* Handle sq(val) starting here; 128-bit answer in 2 64-bit registers */
/* val = (uint64_t)dnp->dn_aggfun->dn_args->dn_value; */
/* sqr = (__int128)val * val; */
/* low = sqr & 0xFFFFFFFFFFFFFFFFULL; */
/* hi_ = sqr >> 64; */
/* get some registers to work with */
if ((hi_reg = dt_regset_alloc(drp)) == -1 ||
(lowreg = dt_regset_alloc(drp)) == -1 ||
(midreg = dt_regset_alloc(drp)) == -1 ||
(lmdreg = dt_regset_alloc(drp)) == -1)
longjmp(yypcb->pcb_jmpbuf, EDT_NOREG);

/* If value is negative make it positive, sq() is positive anyhow */
emit(dlp, BPF_MOV_REG(lowreg, dnp->dn_aggfun->dn_args->dn_reg));
emit(dlp, BPF_BRANCH_IMM(BPF_JSGE, lowreg, 0, Lpos));
emit(dlp, BPF_NEG_REG(lowreg)) /* Change to a positive value */;

/* Now we are sure lowreg holds our absolute value */
/* Split our value into two pieces: high and low */
emitl(dlp, Lpos,
BPF_MOV_REG(hi_reg, lowreg));

/* Put least significant half in lowreg */
/* Implicit AND of lowreg with 0xFFFFFFFF mask */
emit(dlp, BPF_ALU64_IMM(BPF_LSH, lowreg, 32));
emit(dlp, BPF_ALU64_IMM(BPF_RSH, lowreg, 32));
/* Put most significant half in (the lower part of) hi_reg */
emit(dlp, BPF_ALU64_IMM(BPF_RSH, hi_reg, 32));

/* Multiply using similar approach to algebraic FOIL of binomial */
/* That is: high*high + 2*high*low + low*low */
/* Contain interim values to 64 bits or account for carry */

/* First multiply high times low value */
/* Value must be doubled for answer, eventually */
/* This result represents a value shifted 32 bits to the right */
emit(dlp, BPF_MOV_REG(midreg, lowreg));
emit(dlp, BPF_ALU64_REG(BPF_MUL, midreg, hi_reg));
/* Product of low values */
emit(dlp, BPF_ALU64_REG(BPF_MUL, lowreg, lowreg));
/* Product of high values; result represented shifted 64 to the right */
emit(dlp, BPF_ALU64_REG(BPF_MUL, hi_reg, hi_reg));

/* Now add the pieces in their proper place */

/* Get the bottom half of the mid value to add to the low value */
emit(dlp, BPF_MOV_REG(lmdreg, midreg));
/* The mid values are doubled, as O and I in FOIL are for a square */
/* Adjust for shifted representation with respect to low half value */
/* Implicit AND of lmdreg (lower mid) with 0xFFFFFFFF mask, times 2 */
emit(dlp, BPF_ALU64_IMM(BPF_LSH, lmdreg, 33));
/* Upper half times 2 + carry bit from lower half x2 */
emit(dlp, BPF_ALU64_IMM(BPF_RSH, midreg, 31));

/* Add low value part from mid to lowreg */
emit(dlp, BPF_ALU64_REG(BPF_ADD, lowreg, lmdreg));
/* Handle the overflow/carry case */
emit(dlp, BPF_BRANCH_REG(BPF_JLT, lmdreg, lowreg, Lncy));
emit(dlp, BPF_ALU64_IMM(BPF_ADD, hi_reg, 1)) /* account for carry */;

/* Sum high value; no overflow expected nor accounted for */
emitl(dlp, Lncy,
BPF_ALU64_REG(BPF_ADD, hi_reg, midreg));

dt_regset_free(drp, lmdreg);
dt_regset_free(drp, midreg);

DT_CG_AGG_IMPL(aid, sz, dlp, drp, dt_cg_agg_stddev_impl,
dnp->dn_aggfun->dn_args->dn_reg, hi_reg, lowreg);

dt_regset_free(drp, dnp->dn_aggfun->dn_args->dn_reg);
dt_regset_free(drp, hi_reg);
dt_regset_free(drp, lowreg);

TRACE_REGSET(" AggSDv: End ");
}

static void
dt_cg_agg_sum_impl(dt_irlist_t *dlp, dt_regset_t *drp, int dreg, int vreg)
{
TRACE_REGSET(" Impl: Begin");

/* *(%dreg) += %vreg */
emit(dlp, BPF_XADD_REG(BPF_DW, dreg, 0, vreg));

TRACE_REGSET(" Impl: End ");
}

static void
Expand All @@ -3915,6 +4122,15 @@ dt_cg_agg_sum(dt_pcb_t *pcb, dt_ident_t *aid, dt_node_t *dnp,
int sz = 1 * sizeof(uint64_t);

DT_CG_AGG_SET_STORAGE(aid, sz);

TRACE_REGSET(" AggSum: Begin");

dt_cg_node(dnp->dn_aggfun->dn_args, dlp, drp);
DT_CG_AGG_IMPL(aid, sz, dlp, drp, dt_cg_agg_sum_impl,
dnp->dn_aggfun->dn_args->dn_reg);
dt_regset_free(drp, dnp->dn_aggfun->dn_args->dn_reg);

TRACE_REGSET(" AggSum: End ");
}

typedef void dt_cg_aggfunc_f(dt_pcb_t *, dt_ident_t *, dt_node_t *,
Expand Down

0 comments on commit 80c97f8

Please sign in to comment.