Skip to content

Commit

Permalink
Introduce mechanism to turn off dual-copy aggregation code
Browse files Browse the repository at this point in the history
To provide the producer a lock-free write mechanism for aggregations,
we have two copies of aggregations.  The producer updates both, in
turn, and the consumer reads the one not being written, as indicated
by a latch sequence number.

Currently, however, we are using older kernels that do not allow us
to mmap BPF maps.  The consumer is forced to lookup the entire BPF
aggregation map at once, making the dual-copy mechanism useless.
Meanwhile, the dual copies are consuming valuable BPF map space.

Introduce a preprocessor macro DT_AGG_NUM_COPIES that can be set to
turn off the dual-copy mechanism for now while retaining the code for
when we want to restore the mechanism.

For example, the D script
    BEGIN {x = 100; @ = lquantize(x, 0, 2045, 1); exit(0) }
runs, but increasing to 2046 fails.  With DT_AGG_NUM_COPIES=1,
one can go up to 4093.

The latch sequence number is preserved since it is used to see
which aggregations have data and should be reported.

Signed-off-by: Eugene Loh <eugene.loh@oracle.com>
Reviewed-by: Kris Van Hees <kris.van.hees@oracle.com>
  • Loading branch information
euloh committed Jan 14, 2021
1 parent 7639f3c commit 8068b2c
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 35 deletions.
6 changes: 4 additions & 2 deletions libdtrace/dt_aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,8 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
/* point to the latch sequence number */
src = (int64_t *)(st->buf + aid->di_offset);

/* real size excludes latch sequence number and second data copy */
realsz = (aid->di_size - sizeof(uint64_t)) / 2;
/* real size excludes latch sequence number and second data copy, if any */
realsz = (aid->di_size - sizeof(uint64_t)) / DT_AGG_NUM_COPIES;

/* See if we already have an entry for this aggregation. */
for (h = agh->dtah_hash[ndx]; h != NULL; h = h->dtahe_next) {
Expand Down Expand Up @@ -999,7 +999,9 @@ init_minmax(dt_idhash_t *dhp, dt_ident_t *aid, char *buf)
/* skip ptr[0], it is the latch sequence number */
ptr = (int64_t *)(buf + aid->di_offset);
ptr[1] = value;
#if DT_AGG_NUM_COPIES == 2
ptr[2] = value;
#endif

return 0;
}
Expand Down
92 changes: 61 additions & 31 deletions libdtrace/dt_cg.c
Original file line number Diff line number Diff line change
Expand Up @@ -3032,17 +3032,65 @@ dt_cg_node(dt_node_t *dnp, dt_irlist_t *dlp, dt_regset_t *drp)
* Macro to set the storage data (offset and size) for the aggregation
* identifier (if not set yet).
*
* We consume twice the required data size because of the odd/even data pair
* mechanism to provide lockless, write-wait-free operation. Additionally,
* we make room for a latch sequence number.
* We make room for a latch sequence number of sizeof(uint64_t).
*
* If DT_AGG_NUM_COPIES==2, we consume twice the required data size for
* a dual-copy mechanism to provide lockless, write-wait-free operation.
*/
#define DT_CG_AGG_SET_STORAGE(aid, sz) \
do { \
if ((aid)->di_offset == -1) \
dt_ident_set_storage((aid), sizeof(uint64_t), \
sizeof(uint64_t) + 2 * (sz)); \
sizeof(uint64_t) + \
DT_AGG_NUM_COPIES * (sz)); \
} while (0)

#if DT_AGG_NUM_COPIES == 1
/*
* Return a register that holds a pointer to the aggregation data to be
* updated.
*
* We update the latch seqcount (first value in the aggregation) to
* signal that the aggregation has data. The location of data for the
* given aggregation is stored in the register returned from this function.
*/
static int
dt_cg_agg_buf_prepare(dt_ident_t *aid, int size, dt_irlist_t *dlp,
dt_regset_t *drp)
{
int rptr;

TRACE_REGSET(" Prep: Begin");

dt_regset_xalloc(drp, BPF_REG_0);
rptr = dt_regset_alloc(drp);
assert(rptr != -1);

/*
* ptr = dctx->agg; // lddw %rptr, [%fp + DT_STK_DCTX]
* // lddw %rptr, [%rptr + DCTX_AGG]
* ptr += aid->di_offset; // add %rptr, aid->di_offset
*/
emit(dlp, BPF_LOAD(BPF_DW, rptr, BPF_REG_FP, DT_STK_DCTX));
emit(dlp, BPF_LOAD(BPF_DW, rptr, rptr, DCTX_AGG));
emit(dlp, BPF_ALU64_IMM(BPF_ADD, rptr, aid->di_offset));

/*
* *((uint64_t *)ptr)++; // mov %r0, 1
* // xadd [%rptr + 0], %r0
* ptr += sizeof(uint64_t);// add %rptr, sizeof(uint64_t)
*/
emit(dlp, BPF_MOV_IMM(BPF_REG_0, 1));
emit(dlp, BPF_XADD_REG(BPF_DW, rptr, 0, BPF_REG_0));
emit(dlp, BPF_ALU64_IMM(BPF_ADD, rptr, sizeof(uint64_t)));

dt_regset_free(drp, BPF_REG_0);

TRACE_REGSET(" Prep: End ");

return rptr;
}
#else
/*
* Prepare the aggregation buffer for updating for a specific aggregation, and
* return a register that holds a pointer to the aggregation data to be
Expand Down Expand Up @@ -3099,41 +3147,23 @@ dt_cg_agg_buf_prepare(dt_ident_t *aid, int size, dt_irlist_t *dlp,

return ragd;
}
#endif

#define DT_CG_AGG_IMPL(aid, sz, dlp, drp, f, ...) \
do { \
int dreg; \
int i, dreg; \
\
TRACE_REGSET(" Upd: Begin "); \
\
/* \
* Redirect reading to secondary copy. The register \
* returned holds the base pointer to the primary copy. \
*/ \
dreg = dt_cg_agg_buf_prepare((aid), (sz), (dlp), (drp));\
\
/* \
* Call the update implementation. Aggregation data is \
* accessed through register 'dreg'. \
*/ \
(f)((dlp), (drp), dreg, ## __VA_ARGS__); \
dt_regset_free((drp), dreg); \
\
TRACE_REGSET(" Upd: Switch"); \
for (i = 0; i < DT_AGG_NUM_COPIES; i++) { \
if (i == 1) \
TRACE_REGSET(" Upd: Switch"); \
\
/* \
* Redirect reading to primary copy. The register \
* returned holds the base pointer to the secondary \
* copy. \
*/ \
dreg = dt_cg_agg_buf_prepare((aid), (sz), (dlp), (drp));\
dreg = dt_cg_agg_buf_prepare((aid), (sz), (dlp), (drp));\
\
/* \
* Call the update implementation. Aggregation data is \
* accessed through register 'dreg'. \
*/ \
(f)((dlp), (drp), dreg, ## __VA_ARGS__); \
dt_regset_free((drp), dreg); \
(f)((dlp), (drp), dreg, ## __VA_ARGS__); \
dt_regset_free((drp), dreg); \
} \
\
TRACE_REGSET(" Upd: End "); \
} while (0)
Expand Down
14 changes: 14 additions & 0 deletions libdtrace/dt_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,20 @@ typedef struct dt_ahash {
size_t dtah_size; /* size of hash table */
} dt_ahash_t;

/*
* To provide a lock-free aggregation write mechanism for the producer,
* two copies of each aggregation can be used. A latch sequence number
* on each CPU can be incremented to indicate to the consumer which copy
* should be read and whether a copy has changed during reading.
*
* Until BPF maps can be mmapped, however, we cannot take advantage of
* this technique. Using only a single-copy, of course, saves precious
* BPF map space.
*
* Pick DT_AGG_NUM_COPIES to be either 1 or 2.
*/
#define DT_AGG_NUM_COPIES 1

typedef struct dt_aggregate {
char **dtat_cpu_buf; /* per-CPU agg snapshot buffers */
char *dtat_buf; /* aggregation snapshot buffer */
Expand Down
4 changes: 2 additions & 2 deletions libdtrace/dt_map.c
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,13 @@ dt_aggid_add(dtrace_hdl_t *dtp, const dt_ident_t *aid)
/*
* Note the relationship between the aggregation storage
* size (di_size) and the aggregation data size (dtagd_size):
* di_size = dtagd_size * (# agg copies) + (size of latch seq #)
* di_size = dtagd_size * (num copies) + (size of latch seq #)
*/
agg->dtagd_id = id;
agg->dtagd_name = aid->di_name;
agg->dtagd_sig = ((dt_idsig_t *)aid->di_data)->dis_auxinfo;
agg->dtagd_varid = aid->di_id;
agg->dtagd_size = (aid->di_size - sizeof(uint64_t))/ 2;
agg->dtagd_size = (aid->di_size - sizeof(uint64_t)) / DT_AGG_NUM_COPIES;
agg->dtagd_nrecs = agg->dtagd_size / sizeof(uint64_t);

recs = dt_calloc(dtp, agg->dtagd_nrecs, sizeof(dtrace_recdesc_t));
Expand Down

0 comments on commit 8068b2c

Please sign in to comment.