Skip to content

Commit

Permalink
allow optional bounded size array (controlled by watermarks) (#243)
Browse files Browse the repository at this point in the history
* allow optional bounded size array (controlled by watermarks)

* fix compiler warnings
  • Loading branch information
Yao Yue committed Aug 22, 2019
1 parent 39462db commit 5b15222
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 55 deletions.
2 changes: 1 addition & 1 deletion src/data_structure/sarray/sarray.c
Expand Up @@ -108,7 +108,7 @@ _linear_search(uint32_t *idx, uint8_t *body, uint32_t nentry, uint32_t esize, ui
static inline bool
_binary_search(uint32_t *idx, uint8_t *body, uint32_t nentry, uint32_t esize, uint64_t val)
{
uint32_t id, imin, imax;
uint32_t id = 0, imin, imax;
uint32_t curr;

*idx = 0;
Expand Down
8 changes: 5 additions & 3 deletions src/protocol/data/resp/cmd_sarray.h
Expand Up @@ -2,7 +2,7 @@

/**
* create: create an empty array or integer width ESIZE
* SArray.create KEY ESIZE
* SArray.create KEY ESIZE [WATERMARK_L] [WATERMARK_H]
*
* delete: delete an array
* SArray.delete KEY
Expand All @@ -20,7 +20,7 @@
* SArray.insert KEY VALUE [VALUE ...]
*
* remove: remove a particular value from array
* SArray.remove KEY VALUE
* SArray.remove KEY VALUE [VALUE ...]
*
* truncate: truncate a array
* SArray.truncate KEY COUNT
Expand All @@ -30,7 +30,7 @@

/* type string #arg #opt */
#define REQ_SARRAY(ACTION) \
ACTION( REQ_SARRAY_CREATE, "SArray.create", 3, 0 )\
ACTION( REQ_SARRAY_CREATE, "SArray.create", 3, 2 )\
ACTION( REQ_SARRAY_DELETE, "SArray.delete", 2, 0 )\
ACTION( REQ_SARRAY_LEN, "SArray.len", 2, 0 )\
ACTION( REQ_SARRAY_FIND, "SArray.find", 3, 0 )\
Expand All @@ -47,4 +47,6 @@ typedef enum sarray_elem {
SARRAY_IDX = 2,
SARRAY_CNT = 2,
SARRAY_ICNT = 3, /* when an index is also present */
SARRAY_WML = 3, /* watermark (low) */
SARRAY_WMH = 4, /* watermark (high) */
} sarray_elem_e;
168 changes: 118 additions & 50 deletions src/server/rds/data/cmd_sarray.c
Expand Up @@ -10,40 +10,12 @@
#include <cc_debug.h>
#include <cc_mm.h>

#define WATERMARK_SIZE (sizeof(uint32_t) * 2) /* <low, high> entries in u32 */
/* TODO(yao): make MAX_NVAL configurable */
#define MAX_NVAL 255 /* max no. of values to insert/remove in one request */
static uint64_t vals[MAX_NVAL];


static inline struct item *
_add_key(struct response *rsp, struct bstring *key)
{
struct element *reply = (struct element *)array_get(rsp->token, 0);
struct item *it;
item_rstatus_e istatus;

it = item_get(key);
if (it != NULL) {
rsp->type = reply->type = ELEM_ERR;
reply->bstr = str2bstr(RSP_EXIST);
INCR(process_metrics, sarray_create_exist);

return NULL;
} else {
/* TODO: figure out a TTL story here */
istatus = item_reserve(&it, key, NULL, SARRAY_HEADER_SIZE, 0, INT32_MAX);
if (istatus != ITEM_OK) {
rsp->type = reply->type = ELEM_ERR;
reply->bstr = str2bstr(RSP_ERR_STORAGE);
INCR(process_metrics, sarray_create_ex);
INCR(process_metrics, process_ex);
} else {
INCR(process_metrics, sarray_create_ok);
}

return it;
}
}
static uint64_t vals[MAX_NVAL];
static struct bstring null_key = null_bstring;

/**
* Attempt to extend an item by delta bytes. This is accomplished by first
Expand All @@ -69,8 +41,13 @@ _realloc_key(struct item **it, const struct bstring *key, uint32_t delta)
istatus = item_reserve(&nit, key, NULL, item_nval(*it) + delta,
(*it)->olen, (*it)->expire_at);
if (istatus != ITEM_OK) {
log_debug("reallocate item for key '%.*s' failed: %d", key->len,
key->data, istatus);
return istatus;
}

log_verb("successfully reallocated item for key '%.*s'", key->len,
key->data);
/*copy item payload */
cc_memcpy(nit->end, (*it)->end, item_npayload(*it));

Expand All @@ -82,16 +59,40 @@ _realloc_key(struct item **it, const struct bstring *key, uint32_t delta)
return ITEM_OK;
}

static inline uint32_t
_watermark_low(uint32_t *opt)
{
return *opt;
}

static inline uint32_t
_watermark_high(uint32_t *opt)
{
return *(opt + 1);
}

static inline void
_set_watermark(uint32_t *opt, uint32_t low, uint32_t high)
{
*opt = low;
++opt;
*opt = high;
}

void
cmd_sarray_create(struct response *rsp, const struct request *req,
const struct command *cmd)
{
struct element *reply = (struct element *)array_push(rsp->token);
struct bstring *key;
struct item *it;
int64_t esize;
item_rstatus_e istatus;
uint32_t ntoken;
bool bounded;
int64_t esize, low, high;

ASSERT(array_nelem(req->token) == cmd->narg);
ntoken = array_nelem(req->token);
ASSERT(ntoken >= cmd->narg);

INCR(process_metrics, sarray_create);

Expand All @@ -101,16 +102,50 @@ cmd_sarray_create(struct response *rsp, const struct request *req,

return;
}
log_verb("before esize");
if (!req_get_int(&esize, req, SARRAY_ESIZE)) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_find_ex);
INCR(process_metrics, sarray_create_ex);

return;
}
log_verb("post parse");

it = _add_key(rsp, key);
bounded = (cmd->nopt > 0);
if (bounded && cmd->nopt != 2) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_create_ex);

return;
}

/* get low & high watermarks */
if (cmd->nopt > 0 && (!req_get_int(&low, req, SARRAY_WML) ||
!req_get_int(&high, req, SARRAY_WMH))) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_create_ex);

return;
}

/* add key */
it = item_get(key);
if (it != NULL) {
rsp->type = reply->type = ELEM_ERR;
reply->bstr = str2bstr(RSP_EXIST);
INCR(process_metrics, sarray_create_exist);
} else {
/* TODO: figure out a TTL story here */
istatus = item_reserve(&it, key, NULL, SARRAY_HEADER_SIZE,
WATERMARK_SIZE * bounded, INT32_MAX);
if (istatus != ITEM_OK) {
rsp->type = reply->type = ELEM_ERR;
reply->bstr = str2bstr(RSP_ERR_STORAGE);
} else {
if (bounded) {
_set_watermark((uint32_t *)item_optional(it), low, high);
}
INCR(process_metrics, sarray_create_ok);
}
}
if (it == NULL) {
compose_rsp_storage_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_create_ex);
Expand All @@ -133,7 +168,7 @@ cmd_sarray_delete(struct response *rsp, const struct request *req,
const struct command *cmd)
{
struct element *reply = (struct element *)array_push(rsp->token);
struct bstring *key;
struct bstring *key = &null_key;

ASSERT(array_nelem(req->token) == cmd->narg);

Expand All @@ -160,7 +195,7 @@ cmd_sarray_len(struct response *rsp, const struct request *req,
const struct command *cmd)
{
struct element *reply = (struct element *)array_push(rsp->token);
struct bstring *key;
struct bstring *key = &null_key;
struct item *it;
uint32_t nentry;

Expand Down Expand Up @@ -192,7 +227,7 @@ cmd_sarray_find(struct response *rsp, const struct request *req, const struct
command *cmd)
{
struct element *reply = (struct element *)array_push(rsp->token);
struct bstring *key;
struct bstring *key = &null_key;
struct item *it;
uint32_t idx;
int64_t val;
Expand Down Expand Up @@ -255,7 +290,7 @@ cmd_sarray_get(struct response *rsp, const struct request *req, const struct
command *cmd)
{
struct element *reply = (struct element *)array_push(rsp->token);
struct bstring *key;
struct bstring *key = &null_key;
struct item *it;
int64_t idx = 0, cnt = 1;
uint64_t val;
Expand Down Expand Up @@ -339,10 +374,10 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct
command *cmd)
{
struct element *reply = (struct element *)array_push(rsp->token);
struct bstring *key;
struct bstring *key = &null_key;
struct item *it;
uint32_t nval = 0, ninserted = 0, delta;
int64_t val;
uint32_t nval = 0, esize;
int64_t delta, val, wml, wmh, nentry, ninserted = 0;
sarray_p sa;
sarray_rstatus_e status;

Expand Down Expand Up @@ -377,15 +412,38 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct
}
}

delta = sarray_esize((sarray_p)item_data(it)) * nval;
/* we always insert everything before trying to truncate down an array
* that is too long. The reason for that is because inserting is the
* only way of ensuring the new values are properly sorted in the array,
* and the truncation (designed to be from the left for now) is indeed
* removing the lowest values. The downside is we may trigger an extra
* realloc of the key and assign it more memory than the final size may
* require.
*
* Example: if item can host at most an array of 10 elements, and we
* create an array with watermarks [6, 8], inserting 6 elements into
* an array of 4 elements will result in the array having 10 elements
* before being trimmed back. So the following logic will try to allocate
* (and keep) memory for 10 elements.
*
* However, this seems acceptable, mostly because we assume insert batch
* size is relatively small compared to watermark settings in most cases,
* and therefore users can configure their watermarks and control their
* batch sizes to ensure insertion at maximum array size stays within a
* single slabclass.
*/
sa = (sarray_p)item_data(it);
esize = sarray_esize(sa);
delta = esize * nval;

if (_realloc_key(&it, key, delta) != ITEM_OK) {
compose_rsp_storage_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_insert_ex);

return;
}

sa = (sarray_p)item_data(it);
sa = (sarray_p)item_data(it); /* item might have changed */
for (uint32_t i = 0; i < nval; ++i) {
status = sarray_insert(sa, vals[i]);
if (status == SARRAY_EINVALID) {
Expand All @@ -395,23 +453,33 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct
}

if (status == SARRAY_EDUP) {
compose_rsp_noop(rsp, reply, cmd, key);
INCR(process_metrics, sarray_insert_noop);
} else {
INCR(process_metrics, sarray_insert_ok);
ninserted++;
}
}

compose_rsp_numeric(rsp, reply, cmd, key, (int64_t)ninserted);
if (it->olen > 0) {
wml = _watermark_low((uint32_t *)item_optional(it));
wmh = _watermark_high((uint32_t *)item_optional(it));
nentry = sarray_nentry(sa);
if (nentry > wmh) {
log_verb("truncating '%.*s' from %"PRIu32" down to %"PRIu32" elements",
key->len, key->data, nentry, wml);
sarray_truncate(sa, nentry - wml);
}
}

compose_rsp_numeric(rsp, reply, cmd, key, ninserted);
}

void
cmd_sarray_remove(struct response *rsp, const struct request *req, const struct
command *cmd)
{
struct element *reply = (struct element *)array_push(rsp->token);
struct bstring *key;
struct bstring *key = &null_key;
struct item *it;
uint32_t nval = 0, nremoved = 0;
int64_t val;
Expand Down Expand Up @@ -487,7 +555,7 @@ cmd_sarray_truncate(struct response *rsp, const struct request *req, const
struct command *cmd)
{
struct element *reply = (struct element *)array_push(rsp->token);
struct bstring *key;
struct bstring *key = &null_key;
struct item *it;
int64_t cnt;
sarray_rstatus_e status;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/slab/item.h
Expand Up @@ -166,7 +166,7 @@ item_optional(struct item *it)
}

/*
* Get start location of item payload
* Get start location of item value
*/
static inline char *
item_data(struct item *it)
Expand Down

0 comments on commit 5b15222

Please sign in to comment.