Skip to content

Commit

Permalink
FEATURE: add large value set feature
Browse files Browse the repository at this point in the history
  • Loading branch information
JJungwoo committed May 21, 2021
1 parent 534184c commit 078b363
Show file tree
Hide file tree
Showing 8 changed files with 450 additions and 0 deletions.
233 changes: 233 additions & 0 deletions engines/default/coll_list.c
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,239 @@ void list_elem_release(list_elem_item **elem_array, const int elem_count)
UNLOCK_CACHE();
}

#ifdef ENABLE_LARGE_ITEM
hash_item *list_large_item_alloc(const void* key, const int nkey,
item_attr *attrp, uint32_t nbytes,
const void *cookie)
{
uint32_t total = nbytes;
hash_item *it;
list_elem_item *elem;
list_elem_item *head = NULL;
list_elem_item *tail = NULL;

it = do_list_item_alloc(key, nkey, attrp, cookie);
if (it == NULL) {
return NULL;
}

list_meta_info *info = (list_meta_info *)item_get_meta(it);
it->nbytes = nbytes;

while (total > 0) {
nbytes = (total >= config->max_element_bytes ? config->max_element_bytes : total);
elem = do_list_elem_alloc(nbytes, cookie);
if (elem == NULL) {
do_item_free(it);
it = NULL;
return it;
}
if (tail == NULL) {
tail = elem;
head = tail;
head->prev = NULL;
} else {
tail->next = elem;
elem->prev = tail;
tail = elem;
}
total -= nbytes;
info->stotal += sizeof(list_elem_item) + nbytes;
info->ccnt++;
}
assert(total == 0);
tail->next = NULL;
info->head = head;
info->tail = tail;
it->iflag &= ~ITEM_IFLAG_LIST;

return it;
}

static list_elem_item *do_elem_delete_crlf(list_elem_item *elem, const void *cookie)
{
assert(elem != NULL);
list_elem_item *new_elem;
uint32_t nbytes = elem->nbytes - 2; /* CRLF */

if (nbytes > 0) {
new_elem = do_list_elem_alloc(nbytes, cookie);
if (new_elem == NULL) {
return NULL;
}

memcpy(new_elem->value, elem->value, nbytes);
new_elem->prev = elem->prev;
if (elem->prev != NULL) {
elem->prev->next = new_elem;
}
} else {
new_elem = elem->prev;
}

elem->next = (list_elem_item *)ADDR_MEANS_UNLINKED;
do_list_elem_free(elem);

return new_elem;
}

/* old_it of this function must be a large kv item */
ENGINE_ERROR_CODE list_large_item_attach(hash_item *old_it, hash_item *attach_it,
ENGINE_STORE_OPERATION operation, bool update,
const void *cookie)
{
assert(old_it != NULL && attach_it != NULL);
uint32_t add_stotal = 0;
uint32_t add_ccnt = 0;
uint32_t add_nbytes = 0;
/* new_it->nbytes can be smaller than MIN_LARGE_ITEM_SIZE */
list_meta_info *old_info = (list_meta_info *)((char *)item_get_key(old_it) + META_OFFSET_IN_ITEM(old_it->nkey, 2));
ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;

if (IS_LARGE_ITEM(attach_it->nbytes)) {
/* large + large */
list_meta_info *attach_info = (list_meta_info *)item_get_meta(attach_it);

if (operation == OPERATION_APPEND) {
old_info->tail = do_elem_delete_crlf(old_info->tail, cookie);
if (old_info->tail == NULL) {
/* if old item has a single element, head re-points to the element */
if (old_info->ccnt == 1) {
old_info->head = old_info->tail;
/* if old item is empty occur to below code */
if (old_it->nbytes <= 2) {
old_info->head = attach_info->head;
old_info->ccnt -= 1;
}
} else {
return ENGINE_ENOMEM;
}
} else {
old_info->tail->next = attach_info->head;
attach_info->head->prev = old_info->tail;
}
old_info->tail = attach_info->tail;
} else {
/* OPERATION_PREPEND */
attach_info->tail = do_elem_delete_crlf(attach_info->tail, cookie);
if (attach_info->tail == NULL) {
return ENGINE_ENOMEM;
}
assert(attach_info->ccnt > 1);

old_info->head->prev = attach_info->tail;
attach_info->tail->next = old_info->head;
old_info->head = attach_info->head;
}
add_stotal += attach_info->stotal - 2; /* CRLF */
add_ccnt += attach_info->ccnt;
add_nbytes += attach_it->nbytes - 2; /* CRLF */

old_info->head->prev = old_info->tail->next = NULL;
attach_info->head = attach_info->tail = NULL;
attach_info->ccnt = 0;
} else {
/* large + kv */
char *data = item_get_data(attach_it);
uint32_t nbytes;
uint32_t total = (operation == OPERATION_PREPEND ? attach_it->nbytes - 2 /* CRLF */ : attach_it->nbytes);
list_elem_item *elem;
list_elem_item *head = NULL;
list_elem_item *tail = NULL;
add_nbytes = total;

if (total <= 0) {
/* normal terminate because you do not need to create a new element */
return ENGINE_SUCCESS;
}

while (total > 0) {
nbytes = (total >= config->max_element_bytes ? config->max_element_bytes : total);
elem = do_list_elem_alloc(nbytes, cookie);
if (elem == NULL) {
ret = ENGINE_ENOMEM;
break;
}
memcpy(elem->value, data, nbytes);
if (tail == NULL) {
tail = elem;
head = tail;
head->prev = NULL;
} else {
tail->next = elem;
elem->prev = tail;
tail = elem;
}
total -= nbytes;
data += nbytes;
add_stotal += sizeof(list_elem_item) + nbytes;
add_ccnt++;
}
assert(total == 0);

if (ret == ENGINE_SUCCESS) {
if (operation == OPERATION_APPEND) {
old_info->tail = do_elem_delete_crlf(old_info->tail, cookie);
if (old_info->tail == NULL) {
ret = ENGINE_ENOMEM;
} else {
old_info->stotal -= 2; /* CRLF */
old_it->nbytes -= 2; /* CRLF */
}
}
}

if (ret != ENGINE_SUCCESS) {
/* free allocated all elements */
while (head != NULL) {
elem = head->next;
elem->next = (list_elem_item *)ADDR_MEANS_UNLINKED;
do_list_elem_free(elem);
head = elem;
}
return ret;
}

if (operation == OPERATION_APPEND) {
old_info->tail->next = head;
head->prev = old_info->tail;
old_info->tail = tail;
old_info->tail->next = NULL;
} else {
/* OPERATION_PREPEND */
old_info->head->prev = tail;
tail->next = old_info->head;
old_info->head = head;
}
}
if (ret == ENGINE_SUCCESS) {
if (update) {
/* unlink for relink to the updated old item */
do_item_unlink(old_it, ITEM_UNLINK_REPLACE);
}
old_info->stotal += add_stotal;
old_info->ccnt += add_ccnt;
old_it->nbytes += add_nbytes;
}
return ret;
}

void list_large_item_free(hash_item *it)
{
list_meta_info *info = (list_meta_info *)item_get_meta(it);
list_elem_item *elem = info->head;
list_elem_item *next;
while (elem != NULL) {
next = elem->next;
elem->next = (list_elem_item *)ADDR_MEANS_UNLINKED;
do_list_elem_free(elem);
info->ccnt--;
elem = next;
}
assert(info->ccnt == 0);
}
#endif

ENGINE_ERROR_CODE list_elem_insert(const char *key, const uint32_t nkey,
int index, list_elem_item *elem,
item_attr *attrp,
Expand Down
15 changes: 15 additions & 0 deletions engines/default/coll_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,21 @@

#include "item_base.h"

#ifdef ENABLE_LARGE_ITEM
/*
* Large item which made of list structure internally.
*/
hash_item *list_large_item_alloc(const void* key, const int nkey,
item_attr *attrp, uint32_t nbytes,
const void *cookie);

ENGINE_ERROR_CODE list_large_item_attach(hash_item *it, hash_item *new_it,
ENGINE_STORE_OPERATION operation, bool update,
const void *cookie);

void list_large_item_free(hash_item *it);
#endif

/*
* List Collection
*/
Expand Down
46 changes: 46 additions & 0 deletions engines/default/default_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,17 @@ default_item_allocate(ENGINE_HANDLE* handle, const void* cookie,
const int flags, const rel_time_t exptime,
const uint64_t cas)
{
#ifndef ENABLE_LARGE_ITEM
uint32_t ntotal = item_kv_size(nkey, nbytes);
unsigned int id = slabs_clsid(ntotal);
if (id == 0) {
return ENGINE_E2BIG;
}
#else
if (nbytes > MAX_ITEM_VALUE_LENGTH) {
return ENGINE_E2BIG;
}
#endif

hash_item *it;
ENGINE_ERROR_CODE ret = ENGINE_EINVAL;
Expand Down Expand Up @@ -1642,6 +1648,38 @@ default_scrub_stale(ENGINE_HANDLE* handle)

/* Item/Elem Info */

#ifdef ENABLE_LARGE_ITEM
static bool
get_large_item_info(hash_item *it, item_info *item_info)
{
list_meta_info *info = (list_meta_info *)item_get_meta(it);
list_elem_item *elem = info->head;
int count = 0;

item_info->naddnl = info->ccnt;
item_info->nvalue = 0;

if (item_info->addnl != NULL) {
return true;
}

item_info->addnl = (value_item **)malloc(sizeof(value_item *) * item_info->naddnl);
if (item_info->addnl == NULL) {
return false;
}

while (elem != NULL) {
item_info->addnl[count] = (value_item *)((char *)elem + offsetof(list_elem_item, nbytes));
item_info->addnl[count]->len = elem->nbytes;
count++;
elem = elem->next;
}
assert(count == item_info->naddnl);

return true;
}
#endif

static bool
get_item_info(ENGINE_HANDLE *handle, const void *cookie,
const item* item, item_info *item_info)
Expand All @@ -1658,7 +1696,15 @@ get_item_info(ENGINE_HANDLE *handle, const void *cookie,
item_info->naddnl = 0;
item_info->key = item_get_key(it);
item_info->value = item_get_data(it);
#ifdef ENABLE_LARGE_ITEM
if (IS_LARGE_ITEM(it->nbytes)) {
return get_large_item_info(it, item_info);
} else {
item_info->addnl = NULL;
}
#else
item_info->addnl = NULL;
#endif
return true;
}

Expand Down
4 changes: 4 additions & 0 deletions engines/default/default_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ struct default_engine;

#define MAX_FILEPATH_LENGTH 4096
#define MAX_FILENAME_LENGTH 256
#ifdef ENABLE_LARGE_ITEM
#define MAX_ITEM_VALUE_LENGTH 10 * 1024 * 1024
#define LARGE_ITEM_VALUE_LENGTH 128 * 1024
#endif

/**
* engine configuration
Expand Down

0 comments on commit 078b363

Please sign in to comment.