Skip to content

Commit

Permalink
Use array-of-maps as storage for aggregations
Browse files Browse the repository at this point in the history
In preparation for indexed aggregations and the clear() and trunc()
operations, the storage for aggregations is moving from a per-CPU
array map to an array of maps, indexed by CPU id.

The existing storage solution for aggregations stored all data in a
singleton map value, i.e. all CPUs were writing to their own portion
of a block of memory that the consumer retrieved in its entirety in
a single system call.

The new storage solution allocates a memory block for each CPU so
that data retrieval by the consumer can be done per CPU.  This sets
the stage for future development where the consumer may need to
update the aggregation buffers.

Signed-off-by: Kris Van Hees <kris.van.hees@oracle.com>
Reviewed-by: Eugene Loh <eugene.loh@oracle.com>
  • Loading branch information
kvanhees committed Sep 6, 2022
1 parent 0595e1d commit a04427e
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 90 deletions.
95 changes: 34 additions & 61 deletions libdtrace/dt_aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,6 @@ typedef void (*agg_cpu_f)(dt_ident_t *aid, int64_t *dst, int64_t *src,
typedef struct dt_snapstate {
dtrace_hdl_t *dtp;
processorid_t cpu;
char *buf;
dt_aggregate_t *agp;
} dt_snapstate_t;

static void
Expand Down Expand Up @@ -444,7 +442,9 @@ dt_agg_one_agg(dt_ident_t *aid, int64_t *dst, int64_t *src, uint_t datasz)
static int
dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
{
dt_ahash_t *agh = &st->agp->dtat_hash;
dtrace_hdl_t *dtp = st->dtp;
dt_aggregate_t *agp = &dtp->dt_aggregate;
dt_ahash_t *agh = &agp->dtat_hash;
dt_ahashent_t *h;
dtrace_aggdesc_t *agg;
dtrace_aggdata_t *agd;
Expand All @@ -454,12 +454,12 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
uint_t i, datasz;
int64_t *src;

rval = dt_aggid_lookup(st->dtp, aid->di_id, &agg);
rval = dt_aggid_lookup(dtp, aid->di_id, &agg);
if (rval != 0)
return rval;

/* point to the data counter */
src = (int64_t *)(st->buf + aid->di_offset);
src = (int64_t *)(agp->dtat_buf + aid->di_offset);

/* skip it if data counter is 0 */
if (*src == 0)
Expand Down Expand Up @@ -487,46 +487,45 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
}

/* add it to the hash table */
h = dt_zalloc(st->dtp, sizeof(dt_ahashent_t));
h = dt_zalloc(dtp, sizeof(dt_ahashent_t));
if (h == NULL)
return dt_set_errno(st->dtp, EDT_NOMEM);
return dt_set_errno(dtp, EDT_NOMEM);

agd = &h->dtahe_data;
agd->dtada_data = dt_alloc(st->dtp, datasz);
agd->dtada_data = dt_alloc(dtp, datasz);
if (agd->dtada_data == NULL) {
dt_free(st->dtp, h);
return dt_set_errno(st->dtp, EDT_NOMEM);
dt_free(dtp, h);
return dt_set_errno(dtp, EDT_NOMEM);
}

memcpy(agd->dtada_data, src, datasz);
agd->dtada_size = datasz;
agd->dtada_desc = agg;
agd->dtada_hdl = st->dtp;
agd->dtada_hdl = dtp;

h->dtahe_hval = hval;
h->dtahe_size = datasz;

if (st->agp->dtat_flags & DTRACE_A_PERCPU) {
char **percpu = dt_calloc(st->dtp,
st->dtp->dt_conf.max_cpuid + 1,
if (agp->dtat_flags & DTRACE_A_PERCPU) {
char **percpu = dt_calloc(dtp, dtp->dt_conf.max_cpuid + 1,
sizeof(char *));

if (percpu == NULL) {
dt_free(st->dtp, agd->dtada_data);
dt_free(st->dtp, h);
dt_free(dtp, agd->dtada_data);
dt_free(dtp, h);

dt_set_errno(st->dtp, EDT_NOMEM);
dt_set_errno(dtp, EDT_NOMEM);
}

for (i = 0; i <= st->dtp->dt_conf.max_cpuid; i++) {
percpu[i] = dt_zalloc(st->dtp, datasz);
for (i = 0; i <= dtp->dt_conf.max_cpuid; i++) {
percpu[i] = dt_zalloc(dtp, datasz);
if (percpu[i] == NULL) {
while (--i >= 0)
dt_free(st->dtp, percpu[i]);
dt_free(st->dtp, agd->dtada_data);
dt_free(st->dtp, h);
dt_free(dtp, percpu[i]);
dt_free(dtp, agd->dtada_data);
dt_free(dtp, h);

dt_set_errno(st->dtp, EDT_NOMEM);
dt_set_errno(dtp, EDT_NOMEM);
}
}

Expand All @@ -553,14 +552,15 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
static int
dt_aggregate_snap_cpu(dtrace_hdl_t *dtp, processorid_t cpu)
{
dt_aggregate_t *agp = &dtp->dt_aggregate;
char *buf = agp->dtat_cpu_buf[cpu];
dt_snapstate_t st;
uint32_t key = 0;

st.dtp = dtp;
st.cpu = cpu;
st.buf = buf;
st.agp = agp;

if (dt_bpf_map_lookup_inner(dtp->dt_aggmap_fd, &cpu, &key,
dtp->dt_aggregate.dtat_buf) == -1)
return 0;

return dt_idhash_iter(dtp->dt_aggs,
(dt_idhash_f *)dt_aggregate_snap_one, &st);
Expand All @@ -573,22 +573,17 @@ int
dtrace_aggregate_snap(dtrace_hdl_t *dtp)
{
dt_aggregate_t *agp = &dtp->dt_aggregate;
uint32_t key = 0;
int i, rval;

/*
* If we do not have a buffer initialized, we will not be processing
* aggregations, so there is nothing to be done here.
*/
if (agp->dtat_cpu_buf == NULL)
if (agp->dtat_buf == NULL)
return 0;

dtrace_aggregate_clear(dtp);

rval = dt_bpf_map_lookup(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
if (rval != 0)
return dt_set_errno(dtp, -rval);

for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
rval = dt_aggregate_snap_cpu(dtp, dtp->dt_conf.cpus[i].cpu_id);
if (rval != 0)
Expand Down Expand Up @@ -1001,41 +996,22 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
dt_aggregate_t *agp = &dtp->dt_aggregate;
dt_ahash_t *agh = &agp->dtat_hash;
int aggsz, i;
uint32_t key = 0;

/* If there are no aggregations there is nothing to do. */
aggsz = dt_idhash_datasize(dtp->dt_aggs);
if (aggsz <= 0)
return 0;

/*
* Allocate a buffer to hold the aggregation data for all possible
* CPUs, and initialize the per-CPU data pointers for CPUs that are
* currently enabled.
*/
agp->dtat_buf = dt_zalloc(dtp, dtp->dt_conf.num_possible_cpus * aggsz);
/* Allocate a buffer to hold the aggregation data for a CPU. */
agp->dtat_buf = dt_zalloc(dtp, aggsz);
if (agp->dtat_buf == NULL)
return dt_set_errno(dtp, EDT_NOMEM);

agp->dtat_cpu_buf = dt_calloc(dtp, dtp->dt_conf.max_cpuid + 1,
sizeof(char *));
if (agp->dtat_cpu_buf == NULL) {
dt_free(dtp, agp->dtat_buf);
return dt_set_errno(dtp, EDT_NOMEM);
}

for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
int cpu = dtp->dt_conf.cpus[i].cpu_id;

agp->dtat_cpu_buf[cpu] = agp->dtat_buf + cpu * aggsz;
}

/* Create the aggregation hash. */
agh->dtah_size = DTRACE_AHASHSIZE;
agh->dtah_hash = dt_zalloc(dtp,
agh->dtah_size * sizeof(dt_ahashent_t *));
if (agh->dtah_hash == NULL) {
dt_free(dtp, agp->dtat_cpu_buf);
dt_free(dtp, agp->dtat_buf);
return dt_set_errno(dtp, EDT_NOMEM);
}
Expand All @@ -1047,15 +1023,13 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
return 0;
*(int64_t *)agp->dtat_buf = 0; /* clear the flag */
for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
int cpu = dtp->dt_conf.cpus[i].cpu_id;
int cpu = dtp->dt_conf.cpus[i].cpu_id;
uint32_t key = 0;

/* Data for CPU 0 was populated, so skip it. */
if (cpu == 0)
if (dt_bpf_map_update_inner(dtp->dt_aggmap_fd, &cpu, &key,
dtp->dt_aggregate.dtat_buf) == -1)
continue;

memcpy(agp->dtat_cpu_buf[cpu], agp->dtat_buf, aggsz);
}
dt_bpf_map_update(dtp->dt_aggmap_fd, &key, agp->dtat_buf);

return 0;
}
Expand Down Expand Up @@ -1822,6 +1796,5 @@ dt_aggregate_destroy(dtrace_hdl_t *dtp)
hash->dtah_size = 0;
}

dt_free(dtp, agp->dtat_cpu_buf);
dt_free(dtp, agp->dtat_buf);
}
69 changes: 44 additions & 25 deletions libdtrace/dt_bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,22 @@ dt_bpf_init_helpers(dtrace_hdl_t *dtp)
#undef BPF_HELPER_MAP
}

static int
map_create_error(dtrace_hdl_t *dtp, const char *name, int err)
{
char msg[64];

snprintf(msg, sizeof(msg),
"failed to create BPF map '%s'", name);

if (err == E2BIG)
return dt_bpf_error(dtp, "%s: Too big\n", msg);
if (err == EPERM)
return dt_bpf_lockmem_error(dtp, msg);

return dt_bpf_error(dtp, "%s: %s\n", msg, strerror(err));
}

static int
create_gmap(dtrace_hdl_t *dtp, const char *name, enum bpf_map_type type,
size_t ksz, size_t vsz, size_t size)
Expand All @@ -369,17 +385,8 @@ create_gmap(dtrace_hdl_t *dtp, const char *name, enum bpf_map_type type,
err = errno;
}

if (fd < 0) {
char msg[64];

snprintf(msg, sizeof(msg),
"failed to create BPF map '%s'", name);
if (err == E2BIG)
return dt_bpf_error(dtp, "%s: Too big\n", msg);
if (err == EPERM)
return dt_bpf_lockmem_error(dtp, msg);
return dt_bpf_error(dtp, "%s: %s\n", msg, strerror(err));
}
if (fd < 0)
return map_create_error(dtp, name, err);

dt_dprintf("BPF map '%s' is FD %d\n", name, fd);

Expand Down Expand Up @@ -421,17 +428,8 @@ create_gmap_of_maps(dtrace_hdl_t *dtp, const char *name,
err = errno;
}

if (fd < 0) {
char msg[64];

snprintf(msg, sizeof(msg),
"failed to create BPF map '%s'", name);
if (err == E2BIG)
return dt_bpf_error(dtp, "%s: Too big\n", msg);
if (err == EPERM)
return dt_bpf_lockmem_error(dtp, msg);
return dt_bpf_error(dtp, "%s: %s\n", msg, strerror(err));
}
if (fd < 0)
return map_create_error(dtp, name, err);

dt_dprintf("BPF map '%s' is FD %d\n", name, fd);

Expand Down Expand Up @@ -470,19 +468,40 @@ gmap_create_state(dtrace_hdl_t *dtp)
* Create the 'aggs' BPF map.
*
* Aggregation data buffer map, associated with each CPU. The map is
* implemented as a global per-CPU map with a singleton element (key 0).
* implemented as a global array-of-maps indexed by CPU id. The associated
* value is a map with a singleton element (key 0).
*/
static int
gmap_create_aggs(dtrace_hdl_t *dtp)
{
size_t sz = dt_idhash_datasize(dtp->dt_aggs);
size_t ncpus = dtp->dt_conf.max_cpuid + 1;
int i;

/* Only create the map if it is used. */
if (sz == 0)
return 0;

dtp->dt_aggmap_fd = create_gmap(dtp, "aggs", BPF_MAP_TYPE_PERCPU_ARRAY,
sizeof(uint32_t), sz, 1);
dtp->dt_aggmap_fd = create_gmap_of_maps(dtp, "aggs",
BPF_MAP_TYPE_ARRAY_OF_MAPS,
sizeof(uint32_t), ncpus,
BPF_MAP_TYPE_ARRAY,
sizeof(uint32_t), sz, 1);

for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
int cpu = dtp->dt_conf.cpus[i].cpu_id;
char name[16];
int fd;

snprintf(name, 16, "aggs_%d", cpu);
fd = dt_bpf_map_create(BPF_MAP_TYPE_ARRAY, name,
sizeof(uint32_t), sz, 1, 0);
if (fd < 0)
return map_create_error(dtp, name, errno);

dt_bpf_map_update(dtp->dt_aggmap_fd, &cpu, &fd);
}


return dtp->dt_aggmap_fd;
}
Expand Down
53 changes: 51 additions & 2 deletions libdtrace/dt_cg.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ dt_cg_tramp_prologue_act(dt_pcb_t *pcb, dt_activity_t act)
{
dtrace_hdl_t *dtp = pcb->pcb_hdl;
dt_irlist_t *dlp = &pcb->pcb_ir;
dt_ident_t *aggs = dt_dlib_get_map(dtp, "aggs");
dt_ident_t *mem = dt_dlib_get_map(dtp, "mem");
dt_ident_t *state = dt_dlib_get_map(dtp, "state");
dt_ident_t *prid = dt_dlib_get_var(pcb->pcb_hdl, "PRID");
uint_t lbl_exit = pcb->pcb_exitlbl;

assert(aggs != NULL);
assert(mem != NULL);
assert(state != NULL);
assert(prid != NULL);
Expand Down Expand Up @@ -206,13 +208,60 @@ dt_cg_tramp_prologue_act(dt_pcb_t *pcb, dt_activity_t act)
DT_CG_STORE_MAP_PTR("strtab", DCTX_STRTAB);
if (dtp->dt_options[DTRACEOPT_SCRATCHSIZE] > 0)
DT_CG_STORE_MAP_PTR("scratchmem", DCTX_SCRATCHMEM);
if (dt_idhash_datasize(dtp->dt_aggs) > 0)
DT_CG_STORE_MAP_PTR("aggs", DCTX_AGG);
if (dt_idhash_datasize(dtp->dt_globals) > 0)
DT_CG_STORE_MAP_PTR("gvars", DCTX_GVARS);
if (dtp->dt_maxlvaralloc > 0)
DT_CG_STORE_MAP_PTR("lvars", DCTX_LVARS);
#undef DT_CG_STORE_MAP_PTR

/*
* Aggregation data is stored in a CPU-specific BPF map. Populate
* dctx->agg with the map for the current CPU.
*
* key = bpf_get_smp_processor_id()
* // call bpf_get_smp_processor_id
* // (%r1 ... %r5 clobbered)
* // (%r0 = cpuid)
* // stdw [%r9 + DCTX_AGG], %r0
* rc = bpf_map_lookup_elem(&aggs, &key);
* // lddw %r1, &aggs
* // mov %r2, %r9
* // add %r2, DCTX_AGG
* // call bpf_map_lookup_elem
* // (%r1 ... %r5 clobbered)
* // (%r0 = 'aggs' BPF map value)
* if (rc == 0) // jeq %r0, 0, lbl_exit
* goto exit;
*
* key = 0; // stdw [%r9 + DCTX_AGG], 0
* rc = bpf_map_lookup_elem(rc, &key);
* // mov %r1, %r0
* // mov %r2, %r9
* // add %r2, DCTX_AGG
* // call bpf_map_lookup_elem
* // (%r1 ... %r5 clobbered)
* // (%r0 = aggs[cpuid] BPF map value)
* if (rc == 0) // jeq %r0, 0, lbl_exit
* goto exit;
*
* dctx.aggs = rc; // stdw [%r9 + DCTX_AGG], %r0
*/
if (dt_idhash_datasize(dtp->dt_aggs) > 0) {
emit(dlp, BPF_CALL_HELPER(BPF_FUNC_get_smp_processor_id));
emit(dlp, BPF_STORE(BPF_DW, BPF_REG_9, DCTX_AGG, BPF_REG_0));
dt_cg_xsetx(dlp, aggs, DT_LBL_NONE, BPF_REG_1, aggs->di_id);
emit(dlp, BPF_MOV_REG(BPF_REG_2, BPF_REG_9));
emit(dlp, BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, DCTX_AGG));
emit(dlp, BPF_CALL_HELPER(BPF_FUNC_map_lookup_elem));
emit(dlp, BPF_BRANCH_IMM(BPF_JEQ, BPF_REG_0, 0, lbl_exit));
emit(dlp, BPF_STORE_IMM(BPF_DW, BPF_REG_9, DCTX_AGG, 0));
emit(dlp, BPF_MOV_REG(BPF_REG_1, BPF_REG_0));
emit(dlp, BPF_MOV_REG(BPF_REG_2, BPF_REG_9));
emit(dlp, BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, DCTX_AGG));
emit(dlp, BPF_CALL_HELPER(BPF_FUNC_map_lookup_elem));
emit(dlp, BPF_BRANCH_IMM(BPF_JEQ, BPF_REG_0, 0, lbl_exit));
emit(dlp, BPF_STORE(BPF_DW, BPF_REG_9, DCTX_AGG, BPF_REG_0));
}
}

void
Expand Down

0 comments on commit a04427e

Please sign in to comment.