diff --git a/src/btree.c b/src/btree.c index ef14e49e..31c6366d 100644 --- a/src/btree.c +++ b/src/btree.c @@ -2526,6 +2526,44 @@ btree_iterator_find_end(btree_iterator *itor) btree_node_unget(itor->cc, itor->cfg, &end); } +static async_status +btree_iterator_find_end_async(btree_iterator_async_state *state, uint64 depth) +{ + async_begin(state, depth); + + btree_lookup_async_state_init(&state->lookup_state, + state->itor->cc, + state->itor->cfg, + state->itor->root_addr, + state->itor->page_type, + state->itor->max_key, + NULL, + state->callback, + state->callback_arg); + state->lookup_state.stop_at_height = state->itor->height; + state->lookup_state.stats = NULL; + async_await(state, + btree_lookup_node_async(&state->lookup_state, 0) + == ASYNC_STATUS_DONE); + state->end = state->lookup_state.node; + state->itor->end_addr = state->end.addr; + state->itor->end_generation = state->end.hdr->generation; + + if (key_is_positive_infinity(state->itor->max_key)) { + state->itor->end_idx = btree_num_entries(state->end.hdr); + } else { + state->itor->end_idx = + find_key_in_node(state->itor, + state->end.hdr, + state->itor->max_key, + comparison_invert(state->itor->max_key_comparison), + NULL); + } + + btree_node_unget(state->itor->cc, state->itor->cfg, &state->end); + async_return(state); +} + /* * ---------------------------------------------------------------------------- * Move to the next leaf when we've reached the end of one leaf but @@ -2591,6 +2629,99 @@ btree_iterator_next_leaf(btree_iterator *itor) } } +static async_status +btree_iterator_next_leaf_async(btree_iterator_async_state *state, uint64 depth) +{ + async_begin(state, depth); + + state->last_addr = state->itor->curr.addr; + state->next_addr = state->itor->curr.hdr->next_addr; + btree_node_unget(state->itor->cc, state->itor->cfg, &state->itor->curr); + state->itor->curr.addr = state->next_addr; + + cache_get_async_state_init(state->cache_get_state, + state->itor->cc, + state->itor->curr.addr, + state->itor->page_type, + state->callback, + state->callback_arg); + while (cache_get_async(state->itor->cc, state->cache_get_state) + != ASYNC_STATUS_DONE) + { + async_yield(state); + } + state->itor->curr.page = + cache_get_async_state_result(state->itor->cc, state->cache_get_state); + state->itor->curr.hdr = (btree_hdr *)state->itor->curr.page->data; + + state->itor->idx = 0; + state->itor->curr_min_idx = -1; + + while (state->itor->curr.addr == state->itor->end_addr + && state->itor->curr.hdr->generation != state->itor->end_generation) + { + /* + * We need to recompute the end node and end_idx. (see + * comment at beginning of iterator implementation for + * high-level description) + * + * There's a potential for deadlock with concurrent inserters + * if we hold a read-lock on curr while looking up end, so we + * temporarily release curr. + * + * It is safe to relase curr because we are at index 0 of + * curr. To see why, observe that, at this point, curr + * cannot be the first leaf in the tree (since we just + * followed a next pointer a few lines above). And, for + * every leaf except the left-most leaf of the tree, no key + * can ever be inserted into the leaf that is smaller than + * the leaf's 0th entry, because its 0th entry is also its + * pivot in its parent. Thus we are guaranteed that the + * first key curr will not change between the unget and the + * get. Hence we will not "go backwards" i.e. return a key + * smaller than the previous key) or skip any keys. + * Furthermore, even if another thread comes along and splits + * curr while we've released it, we will still want to + * continue at curr (since we're at the 0th entry). + */ + btree_node_unget(state->itor->cc, state->itor->cfg, &state->itor->curr); + async_await_subroutine(state, btree_iterator_find_end_async); + + cache_get_async_state_init(state->cache_get_state, + state->itor->cc, + state->itor->curr.addr, + state->itor->page_type, + state->callback, + state->callback_arg); + while (cache_get_async(state->itor->cc, state->cache_get_state) + != ASYNC_STATUS_DONE) + { + async_yield(state); + } + state->itor->curr.page = + cache_get_async_state_result(state->itor->cc, state->cache_get_state); + state->itor->curr.hdr = (btree_hdr *)state->itor->curr.page->data; + } + + // To prefetch: + // 1. we just moved from one extent to the next + // 2. this can't be the last extent + if (state->itor->do_prefetch + && !btree_addrs_share_extent( + state->itor->cc, state->last_addr, state->itor->curr.addr) + && state->itor->curr.hdr->next_extent_addr != 0 + && !btree_addrs_share_extent( + state->itor->cc, state->itor->curr.addr, state->itor->end_addr)) + { + // IO prefetch the next extent + cache_prefetch(state->itor->cc, + state->itor->curr.hdr->next_extent_addr, + state->itor->page_type); + } + + async_return(state); +} + /* * ---------------------------------------------------------------------------- * Move to the previous leaf when we've reached the beginning of one leaf. @@ -2651,6 +2782,100 @@ btree_iterator_prev_leaf(btree_iterator *itor) /* } */ } +static async_status +btree_iterator_prev_leaf_async(btree_iterator_async_state *state, uint64 depth) +{ + async_begin(state, depth); + + state->curr_addr = state->itor->curr.addr; + state->prev_addr = state->itor->curr.hdr->prev_addr; + btree_node_unget(state->itor->cc, state->itor->cfg, &state->itor->curr); + state->itor->curr.addr = state->prev_addr; + + cache_get_async_state_init(state->cache_get_state, + state->itor->cc, + state->itor->curr.addr, + state->itor->page_type, + state->callback, + state->callback_arg); + while (cache_get_async(state->itor->cc, state->cache_get_state) + != ASYNC_STATUS_DONE) + { + async_yield(state); + } + state->itor->curr.page = + cache_get_async_state_result(state->itor->cc, state->cache_get_state); + state->itor->curr.hdr = (btree_hdr *)state->itor->curr.page->data; + + /* + * The previous leaf may have split in between our release of the + * old curr node and the new one. In this case, we can just walk + * forward until we find the leaf whose successor is our old leaf. + */ + while (state->itor->curr.hdr->next_addr != state->curr_addr) { + state->next_addr = state->itor->curr.hdr->next_addr; + btree_node_unget(state->itor->cc, state->itor->cfg, &state->itor->curr); + state->itor->curr.addr = state->next_addr; + + cache_get_async_state_init(state->cache_get_state, + state->itor->cc, + state->itor->curr.addr, + state->itor->page_type, + state->callback, + state->callback_arg); + while (cache_get_async(state->itor->cc, state->cache_get_state) + != ASYNC_STATUS_DONE) + { + async_yield(state); + } + state->itor->curr.page = + cache_get_async_state_result(state->itor->cc, state->cache_get_state); + state->itor->curr.hdr = (btree_hdr *)state->itor->curr.page->data; + } + + state->itor->idx = btree_num_entries(state->itor->curr.hdr) - 1; + + /* Do a quick check whether this entire leaf is within the range. */ + state->first_key = + state->itor->height + ? btree_get_pivot(state->itor->cfg, state->itor->curr.hdr, 0) + : btree_get_tuple_key(state->itor->cfg, state->itor->curr.hdr, 0); + if (btree_key_compare( + state->itor->cfg, state->itor->min_key, state->first_key) + < 0) + { + state->itor->curr_min_idx = -1; + } else { + state->itor->curr_min_idx = + find_key_in_node(state->itor, + state->itor->curr.hdr, + state->itor->min_key, + state->itor->min_key_comparison, + NULL); + } + if (state->itor->curr.hdr->prev_addr == 0 && state->itor->curr_min_idx == -1) + { + state->itor->curr_min_idx = 0; + } + + // FIXME: To prefetch: + // 1. we just moved from one extent to the next + // 2. this can't be the last extent + /* if (itor->do_prefetch */ + /* && !btree_addrs_share_extent(cc, last_addr, itor->curr.addr) */ + /* && itor->curr.hdr->next_extent_addr != 0 */ + /* && !btree_addrs_share_extent(cc, itor->curr.addr, itor->end_addr)) + */ + /* { */ + /* // IO prefetch the next extent */ + /* cache_prefetch(cc, itor->curr.hdr->next_extent_addr, + * itor->page_type); + */ + /* } */ + + async_return(state); +} + platform_status btree_iterator_next(iterator *base_itor) { @@ -2726,6 +2951,25 @@ btree_iterator_move_leaf_if_needed(btree_iterator *itor) } } +static inline async_status +btree_iterator_move_leaf_if_needed_async(btree_iterator_async_state *state, + uint64 depth) +{ + async_begin(state, depth); + + if (state->itor->curr.addr != state->itor->end_addr + && state->itor->idx == btree_num_entries(state->itor->curr.hdr)) + { + async_await_subroutine(state, btree_iterator_next_leaf_async); + state->itor->curr_min_idx = 0; // we came from an irrelevant leaf + } + if (state->itor->curr_min_idx == -1 && state->itor->idx == -1) { + async_await_subroutine(state, btree_iterator_prev_leaf_async); + } + + async_return(state); +} + // This function violates our locking rules. See comment at top of file. static inline void find_btree_node_and_get_idx_bounds(btree_iterator *itor, @@ -2801,6 +3045,107 @@ find_btree_node_and_get_idx_bounds(btree_iterator *itor, btree_iterator_move_leaf_if_needed(itor); } +// This function violates our locking rules. See comment at top of file. +static inline async_status +find_btree_node_and_get_idx_bounds_async(btree_iterator_async_state *state, + uint64 depth) +{ + async_begin(state, depth); + + // lookup the node that contains target + btree_lookup_async_state_init(&state->lookup_state, + state->itor->cc, + state->itor->cfg, + state->itor->root_addr, + state->itor->page_type, + state->target, + NULL, + state->callback, + state->callback_arg); + state->lookup_state.stop_at_height = state->itor->height; + state->lookup_state.stats = NULL; + async_await(state, + btree_lookup_node_async(&state->lookup_state, 0) + == ASYNC_STATUS_DONE); + state->itor->curr = state->lookup_state.node; + + /* + * We have to claim curr in order to prevent possible deadlocks + * with insertion threads while finding the end node. + * + * Note that we can't lookup end first because, if there's a split + * between looking up end and looking up curr, we could end up in a + * situation where end comes before curr in the tree! (We could + * prevent this by holding a claim on end while looking up curr, + * but that would essentially be the same as the code below.) + * + * Note that the approach in advance (i.e. releasing and reaquiring + * a lock on curr) is not viable here because we are not + * necessarily searching for the 0th entry in curr. Thus a split + * of curr while we have released it could mean that we really want + * to start at curr's right sibling (after the split). So we'd + * have to redo the search from scratch after releasing curr. + * + * So we take a claim on curr instead. + */ + while ( + !btree_node_claim(state->itor->cc, state->itor->cfg, &state->itor->curr)) + { + btree_node_unget(state->itor->cc, state->itor->cfg, &state->itor->curr); + btree_lookup_async_state_init(&state->lookup_state, + state->itor->cc, + state->itor->cfg, + state->itor->root_addr, + state->itor->page_type, + state->target, + NULL, + state->callback, + state->callback_arg); + state->lookup_state.stop_at_height = state->itor->height; + state->lookup_state.stats = NULL; + async_await(state, + btree_lookup_node_async(&state->lookup_state, 0) + == ASYNC_STATUS_DONE); + state->itor->curr = state->lookup_state.node; + } + + async_await_subroutine(state, btree_iterator_find_end_async); + + /* Once we've found end, we can unclaim curr. */ + btree_node_unclaim(state->itor->cc, state->itor->cfg, &state->itor->curr); + + // find the index of the minimum key + state->tmp = find_key_in_node(state->itor, + state->itor->curr.hdr, + state->itor->min_key, + state->itor->min_key_comparison, + &state->found); + // If min key doesn't exist in current node, but is: + // 1) in range: Min idx = first key satisfying min_key_comparison + // 2) out of range: Min idx = -1 + state->itor->curr_min_idx = + !state->found && state->tmp == 0 ? state->tmp - 1 : state->tmp; + // if min_key is not within the current node but there is no previous node + // then set curr_min_idx to 0 + if (state->itor->curr_min_idx == -1 && state->itor->curr.hdr->prev_addr == 0) + { + state->itor->curr_min_idx = 0; + } + + // find the index of the actual target + state->itor->idx = find_key_in_node(state->itor, + state->itor->curr.hdr, + state->target, + state->position_rule, + &state->found); + btree_iterator_bound_idx(state->itor, state->position_rule); + + // check if we already need to move to the prev/next leaf + async_await_subroutine(state, btree_iterator_move_leaf_if_needed_async); + + async_return(state); +} + /* * Seek to a given key within the btree * seek_type defines where the iterator is positioned relative to the target @@ -2949,6 +3294,67 @@ btree_iterator_init(cache *cc, || itor->idx < btree_num_entries(itor->curr.hdr)); } +async_status +btree_iterator_init_async(btree_iterator_async_state *state) +{ + async_begin(state, 0); + + platform_assert(state->root_addr != 0); + debug_assert(state->type == PAGE_TYPE_MEMTABLE + || state->type == PAGE_TYPE_BRANCH); + + debug_assert(!key_is_null(state->min_key) && !key_is_null(state->max_key) + && !key_is_null(state->start_key)); + debug_assert(state->min_key_comparison == greater_than + || state->min_key_comparison == greater_than_or_equal); + debug_assert(state->max_key_comparison == less_than + || state->max_key_comparison == less_than_or_equal); + + if (btree_key_compare(state->cfg, state->min_key, state->max_key) > 0) { + state->max_key = state->min_key; + state->min_key_comparison = greater_than_or_equal; + state->max_key_comparison = less_than; + } + if (btree_key_compare(state->cfg, state->start_key, state->min_key) < 0) { + state->start_key = state->min_key; + } + if (btree_key_compare(state->cfg, state->start_key, state->max_key) > 0) { + state->start_key = state->max_key; + } + + ZERO_CONTENTS(state->itor); + state->itor->cc = state->cc; + state->itor->cfg = state->cfg; + state->itor->root_addr = state->root_addr; + state->itor->do_prefetch = state->do_prefetch; + state->itor->height = state->height; + state->itor->min_key_comparison = state->min_key_comparison; + state->itor->min_key = state->min_key; + state->itor->max_key_comparison = state->max_key_comparison; + state->itor->max_key = state->max_key; + state->itor->page_type = state->type; + state->itor->super.ops = &btree_iterator_ops; + + state->target = state->start_key; + state->position_rule = state->start_type; + async_await_subroutine(state, find_btree_node_and_get_idx_bounds_async); + + if (state->itor->do_prefetch && state->itor->curr.hdr->next_extent_addr != 0 + && !btree_addrs_share_extent( + state->cc, state->itor->curr.addr, state->itor->end_addr)) + { + // IO prefetch the next extent + cache_prefetch(state->cc, + state->itor->curr.hdr->next_extent_addr, + state->itor->page_type); + } + + debug_assert(!iterator_can_curr((iterator *)state->itor) + || state->itor->idx < btree_num_entries(state->itor->curr.hdr)); + + async_return(state, STATUS_OK); +} + void btree_iterator_deinit(btree_iterator *itor) { diff --git a/src/btree.h b/src/btree.h index cb03c927..dbfc56fc 100644 --- a/src/btree.h +++ b/src/btree.h @@ -286,6 +286,44 @@ btree_iterator_init(cache *cc, bool32 do_prefetch, uint32 height); +// clang-format off +DEFINE_ASYNC_STATE(btree_iterator_async_state, 5, + param, cache *, cc, + param, const btree_config *, cfg, + param, btree_iterator *, itor, + param, uint64, root_addr, + param, page_type, type, + param, comparison, min_key_comparison, + param, key, min_key, + param, comparison, max_key_comparison, + param, key, max_key, + param, comparison, start_type, + param, key, start_key, + param, bool32, do_prefetch, + param, uint32, height, + param, async_callback_fn, callback, + param, void *, callback_arg, + local, platform_status, __async_result, + local, btree_lookup_async_state, lookup_state, + local, page_get_async_state_buffer, cache_get_state, + local, btree_node, end, + local, key, target, + local, comparison, position_rule, + local, bool32, found, + local, bool32, forward, + local, int64, tmp, + local, uint64, curr_addr, + local, uint64, last_addr, + local, uint64, next_addr, + local, uint64, prev_addr, + local, uint64, num_entries, + local, key, first_key, + local, key, last_key) +// clang-format on + +async_status +btree_iterator_init_async(btree_iterator_async_state *state); + void btree_iterator_deinit(btree_iterator *itor); diff --git a/src/core.c b/src/core.c index eee2f1f1..a341ef4b 100644 --- a/src/core.c +++ b/src/core.c @@ -793,57 +793,104 @@ core_lookup_from_memtable_generation_locked(core_handle *spl, return STATUS_OK; } -/* - * Branch iterator wrapper functions - */ +typedef struct core_btree_iterator_init_async_context { + btree_iterator_async_state state; + bool32 ready; + bool32 done; +} core_btree_iterator_init_async_context; static void -core_branch_iterator_init(core_handle *spl, - btree_iterator *itor, - uint64 branch_addr, - comparison min_key_comparison, - key min_key, - comparison max_key_comparison, - key max_key, - comparison start_key_comparison, - key start_key, - bool32 do_prefetch, - bool32 should_inc_ref) +core_btree_iterator_init_async_callback(void *arg) +{ + core_btree_iterator_init_async_context *ctxt = arg; + ctxt->ready = TRUE; +} + +static platform_status +core_start_btree_iterator_init_async( + core_handle *spl, + core_btree_iterator_init_async_context *ctxt, + btree_iterator *itor, + uint64 root_addr, + page_type page_type, + comparison min_key_comparison, + key min_key, + comparison max_key_comparison, + key max_key, + comparison start_key_comparison, + key start_key, + bool32 do_prefetch) { - cache *cc = spl->cc; - btree_config *btree_cfg = spl->cfg.btree_cfg; - if (branch_addr != 0 && should_inc_ref) { - btree_inc_ref(cc, btree_cfg, branch_addr); + btree_iterator_async_state_init(&ctxt->state, + spl->cc, + spl->cfg.btree_cfg, + itor, + root_addr, + page_type, + min_key_comparison, + min_key, + max_key_comparison, + max_key, + start_key_comparison, + start_key, + do_prefetch, + 0, + core_btree_iterator_init_async_callback, + ctxt); + ctxt->ready = FALSE; + ctxt->done = FALSE; + + if (btree_iterator_init_async(&ctxt->state) == ASYNC_STATUS_DONE) { + ctxt->done = TRUE; + return async_result(&ctxt->state); } - btree_iterator_init(cc, - btree_cfg, - itor, - branch_addr, - PAGE_TYPE_BRANCH, - min_key_comparison, - min_key, - max_key_comparison, - max_key, - start_key_comparison, - start_key, - do_prefetch, - 0); + + return STATUS_OK; } -static void -core_branch_iterator_deinit(core_handle *spl, - btree_iterator *itor, - bool32 should_dec_ref) +static platform_status +core_drain_btree_iterator_init_async( + cache *cc, + core_btree_iterator_init_async_context *ctxt, + uint64 num_inits) { - if (itor->root_addr == 0) { - return; + platform_status result = STATUS_OK; + uint64 done_count = 0; + for (uint64 i = 0; i < num_inits; i++) { + if (ctxt[i].done) { + done_count++; + platform_status rc = async_result(&ctxt[i].state); + if (!SUCCESS(rc) && SUCCESS(result)) { + result = rc; + } + } } - cache *cc = spl->cc; - btree_config *btree_cfg = spl->cfg.btree_cfg; - btree_iterator_deinit(itor); - if (should_dec_ref) { - btree_dec_ref(cc, btree_cfg, itor->root_addr, PAGE_TYPE_BRANCH); + + while (done_count < num_inits) { + bool32 made_progress = FALSE; + for (uint64 i = 0; i < num_inits; i++) { + if (ctxt[i].done || !ctxt[i].ready) { + continue; + } + + ctxt[i].ready = FALSE; + made_progress = TRUE; + if (btree_iterator_init_async(&ctxt[i].state) == ASYNC_STATUS_DONE) { + ctxt[i].done = TRUE; + done_count++; + platform_status rc = async_result(&ctxt[i].state); + if (!SUCCESS(rc) && SUCCESS(result)) { + result = rc; + } + } + } + + if (!made_progress) { + cache_cleanup(cc); + } } + + return result; } /* @@ -918,6 +965,8 @@ core_range_iterator_init(core_handle *spl, range_itor->can_next = TRUE; range_itor->min_key_comparison = min_key_comparison; range_itor->max_key_comparison = max_key_comparison; + ZERO_ARRAY(range_itor->compacted); + ZERO_ARRAY(range_itor->btree_itor_initialized); key_buffer_init(&range_itor->min_key, PROCESS_PRIVATE_HEAP_ID); key_buffer_init(&range_itor->max_key, PROCESS_PRIVATE_HEAP_ID); @@ -964,9 +1013,6 @@ core_range_iterator_init(core_handle *spl, return rc; } - - ZERO_ARRAY(range_itor->compacted); - // grab the lookup lock memtable_begin_lookup(&spl->mt_ctxt); @@ -1061,43 +1107,68 @@ core_range_iterator_init(core_handle *spl, } } + core_btree_iterator_init_async_context *init_ctxt = NULL; + if (range_itor->num_branches != 0) { + init_ctxt = TYPED_ARRAY_ZALLOC( + PROCESS_PRIVATE_HEAP_ID, init_ctxt, range_itor->num_branches); + } + if (range_itor->num_branches != 0 && init_ctxt == NULL) { + core_range_iterator_deinit(range_itor); + return STATUS_NO_MEMORY; + } + + uint64 started_inits = 0; for (uint64 i = 0; i < range_itor->num_branches; i++) { uint64 branch_no = range_itor->num_branches - i - 1; btree_iterator *btree_itor = &range_itor->btree_itor[branch_no]; uint64 branch_addr = range_itor->branch[branch_no].addr; + page_type page_type = range_itor->branch[branch_no].type; + bool32 do_prefetch = FALSE; if (range_itor->compacted[branch_no]) { - bool32 do_prefetch = + do_prefetch = range_itor->compacted[branch_no] && num_tuples > CORE_PREFETCH_MIN ? TRUE : FALSE; - core_branch_iterator_init(spl, - btree_itor, - branch_addr, - range_itor->local_min_key_comparison, - key_buffer_key(&range_itor->local_min_key), - range_itor->local_max_key_comparison, - key_buffer_key(&range_itor->local_max_key), - start_key_comparison, - start_key, - do_prefetch, - FALSE); - } else { - bool32 is_live = branch_no == 0; - core_memtable_iterator_init(spl, - btree_itor, - branch_addr, - range_itor->local_min_key_comparison, - key_buffer_key(&range_itor->local_min_key), - range_itor->local_max_key_comparison, - key_buffer_key(&range_itor->local_max_key), - start_key_comparison, - start_key, - is_live, - FALSE); + } + rc = core_start_btree_iterator_init_async( + spl, + &init_ctxt[i], + btree_itor, + branch_addr, + page_type, + range_itor->local_min_key_comparison, + key_buffer_key(&range_itor->local_min_key), + range_itor->local_max_key_comparison, + key_buffer_key(&range_itor->local_max_key), + start_key_comparison, + start_key, + do_prefetch); + started_inits++; + if (!SUCCESS(rc)) { + break; } range_itor->itor[i] = &btree_itor->super; } + platform_status drain_rc = + core_drain_btree_iterator_init_async(spl->cc, init_ctxt, started_inits); + if (SUCCESS(rc)) { + rc = drain_rc; + } + for (uint64 i = 0; i < started_inits; i++) { + if (init_ctxt[i].done) { + uint64 branch_no = range_itor->num_branches - i - 1; + range_itor->btree_itor_initialized[branch_no] = TRUE; + } + } + if (init_ctxt != NULL) { + platform_free(PROCESS_PRIVATE_HEAP_ID, init_ctxt); + } + if (!SUCCESS(rc)) { + core_range_iterator_deinit(range_itor); + return rc; + } + rc = merge_iterator_create(PROCESS_PRIVATE_HEAP_ID, spl->cfg.data_cfg, range_itor->num_branches, @@ -1343,18 +1414,21 @@ core_range_iterator_deinit(core_range_iterator *range_itor) core_handle *spl = range_itor->spl; if (range_itor->merge_itor != NULL) { merge_iterator_destroy(PROCESS_PRIVATE_HEAP_ID, &range_itor->merge_itor); - for (uint64 i = 0; i < range_itor->num_branches; i++) { - btree_iterator *btree_itor = &range_itor->btree_itor[i]; - if (range_itor->compacted[i]) { - uint64 root_addr = btree_itor->root_addr; - core_branch_iterator_deinit(spl, btree_itor, FALSE); - btree_dec_ref( - spl->cc, spl->cfg.btree_cfg, root_addr, PAGE_TYPE_BRANCH); - } else { - uint64 mt_gen = range_itor->memtable_start_gen - i; - core_memtable_iterator_deinit(spl, btree_itor, mt_gen, FALSE); - core_memtable_dec_ref(spl, mt_gen); - } + } + for (uint64 i = 0; i < range_itor->num_branches; i++) { + btree_iterator *btree_itor = &range_itor->btree_itor[i]; + if (range_itor->btree_itor_initialized[i]) { + btree_iterator_deinit(btree_itor); + range_itor->btree_itor_initialized[i] = FALSE; + } + if (range_itor->compacted[i]) { + btree_dec_ref(spl->cc, + spl->cfg.btree_cfg, + range_itor->branch[i].addr, + PAGE_TYPE_BRANCH); + } else { + uint64 mt_gen = range_itor->memtable_start_gen - i; + core_memtable_dec_ref(spl, mt_gen); } } key_buffer_deinit(&range_itor->min_key); diff --git a/src/core.h b/src/core.h index ee46fcfd..7ae21fe7 100644 --- a/src/core.h +++ b/src/core.h @@ -137,6 +137,7 @@ typedef struct core_range_iterator { comparison local_max_key_comparison; bool32 local_max_key_truncated; btree_iterator btree_itor[CORE_RANGE_ITOR_MAX_BRANCHES]; + bool32 btree_itor_initialized[CORE_RANGE_ITOR_MAX_BRANCHES]; trunk_branch_info branch[CORE_RANGE_ITOR_MAX_BRANCHES]; // used for merge iterator construction diff --git a/src/merge.c b/src/merge.c index 1ac02b2d..b58e4313 100644 --- a/src/merge.c +++ b/src/merge.c @@ -100,11 +100,48 @@ bsearch_insert(register const ordered_iterator *key, register ordered_iterator **p; bool32 prev_equal = FALSE; bool32 next_equal = FALSE; + bool32 keys_equal = FALSE; + if (nmemb == 0) { + *prev_equal_out = FALSE; + *next_equal_out = FALSE; + return -1; + } + + size_t nrel = 1; + + while (nrel <= nmemb && nrel < 4) { + cmp = bsearch_comp(key, base[nrel - 1], forwards, cfg, &keys_equal); + if (cmp <= 0) { + *prev_equal_out = prev_equal; + *next_equal_out = keys_equal; + return nrel - 2; + } + prev_equal |= keys_equal; + nrel++; + } + + if (nrel > nmemb) { + *prev_equal_out = prev_equal; + *next_equal_out = FALSE; + return nmemb - 1; + } + + if (4 <= nrel) { + while (nrel <= nmemb + && bsearch_comp(key, base[nrel - 1], forwards, cfg, &keys_equal) + > 0) + { + nrel *= 2; + } + } + + if (nmemb < nrel) { + nrel = nmemb; + } - for (lim = nmemb; lim != 0; lim >>= 1) { - p = base + (lim >> 1); - bool32 keys_equal; + for (lim = nrel; lim != 0; lim >>= 1) { + p = base + (lim >> 1); cmp = bsearch_comp(key, *p, forwards, cfg, &keys_equal); debug_assert(cmp != 0); diff --git a/tests/functional/scan_benchmark.c b/tests/functional/scan_benchmark.c new file mode 100644 index 00000000..452653ac --- /dev/null +++ b/tests/functional/scan_benchmark.c @@ -0,0 +1,852 @@ +// Copyright 2018-2026 VMware, Inc. +// SPDX-License-Identifier: Apache-2.0 + +#include "test.h" +#include "platform_io.h" +#include "splinterdb/default_data_config.h" +#include "splinterdb_tests_private.h" + +#include + +#include "random.h" +#include "poison.h" + +#define SCAN_BENCHMARK_KEY_SIZE 8 +#define SCAN_BENCHMARK_MAX_MILESTONES 32 + +typedef enum scan_benchmark_mode { + SCAN_BENCHMARK_LOAD_AND_SCAN, + SCAN_BENCHMARK_INIT_ONLY, + SCAN_BENCHMARK_SCAN_ONLY, +} scan_benchmark_mode; + +typedef struct scan_benchmark_options { + scan_benchmark_mode mode; + bool32 random_load_order; + bool32 splinter_random_keys; + bool32 random_scan_starts; + bool32 backwards_scan; + uint64 scan_length; + uint64 scan_count; +} scan_benchmark_options; + +typedef struct scan_benchmark_milestone_stats { + uint64 tuples; + uint64 samples; + uint64 elapsed_ns_sum; + uint64 logical_bytes_sum; +} scan_benchmark_milestone_stats; + +static inline int +scan_benchmark_status_to_int(platform_status status) +{ + return status.r; +} + +static void +scan_benchmark_usage(const char *prog) +{ + platform_error_log("Usage:\n"); + platform_error_log("\t%s [--init-only | --scan-only] [--random-load-order] " + "[--splinter-random-keys]\n", + prog); + platform_error_log("\t [--scan-length ] [--scan-count ] " + "[--random-scan-starts]\n"); + platform_error_log("\t [--backwards-scan]\n"); + platform_error_log("\t --num-inserts [generic config options]\n"); + platform_error_log("\n"); + platform_error_log("Modes:\n"); + platform_error_log("\t(default) create/load database, close it, reopen it, " + "then scan once\n"); + platform_error_log("\t--init-only create/load database and exit\n"); + platform_error_log("\t--scan-only open existing database and scan once\n"); + platform_error_log("\n"); + platform_error_log("Benchmark options:\n"); + platform_error_log("\t--random-load-order insert keys using a " + "deterministic permutation\n"); + platform_error_log("\t--splinter-random-keys use the same TEST_RANDOM key " + "mapping as splinter_test\n"); + platform_error_log("\t--scan-length limit each scan to this many " + "returned tuples (0 = full scan)\n"); + platform_error_log( + "\t--scan-count number of scans to run (default 1)\n"); + platform_error_log("\t--random-scan-starts choose a fresh random start key " + "for each scan\n"); + platform_error_log("\t--backwards-scan scan toward smaller keys\n"); + config_usage(); +} + +static platform_status +scan_benchmark_parse_args(int argc, + char *argv[], + scan_benchmark_options *options, + int *config_argc, + char ***config_argv) +{ + *options = (scan_benchmark_options){ + .mode = SCAN_BENCHMARK_LOAD_AND_SCAN, + .random_load_order = FALSE, + .splinter_random_keys = FALSE, + .random_scan_starts = FALSE, + .backwards_scan = FALSE, + .scan_length = 0, + .scan_count = 1, + }; + + char **filtered = + TYPED_ARRAY_MALLOC(platform_get_heap_id(), filtered, MAX(argc - 1, 1)); + if (filtered == NULL) { + return STATUS_NO_MEMORY; + } + + int filtered_count = 0; + for (int i = 1; i < argc; i++) { + if (STRING_EQUALS_LITERAL(argv[i], "--init-only")) { + if (options->mode == SCAN_BENCHMARK_SCAN_ONLY) { + platform_error_log("scan_benchmark: choose only one of " + "--init-only or --scan-only\n"); + platform_free(platform_get_heap_id(), filtered); + return STATUS_BAD_PARAM; + } + options->mode = SCAN_BENCHMARK_INIT_ONLY; + } else if (STRING_EQUALS_LITERAL(argv[i], "--scan-only")) { + if (options->mode == SCAN_BENCHMARK_INIT_ONLY) { + platform_error_log("scan_benchmark: choose only one of " + "--init-only or --scan-only\n"); + platform_free(platform_get_heap_id(), filtered); + return STATUS_BAD_PARAM; + } + options->mode = SCAN_BENCHMARK_SCAN_ONLY; + } else if (STRING_EQUALS_LITERAL(argv[i], "--random-load-order")) { + options->random_load_order = TRUE; + } else if (STRING_EQUALS_LITERAL(argv[i], "--splinter-random-keys")) { + options->splinter_random_keys = TRUE; + } else if (STRING_EQUALS_LITERAL(argv[i], "--random-scan-starts")) { + options->random_scan_starts = TRUE; + } else if (STRING_EQUALS_LITERAL(argv[i], "--backwards-scan")) { + options->backwards_scan = TRUE; + } else if (STRING_EQUALS_LITERAL(argv[i], "--scan-length")) { + if (i + 1 == argc + || !try_string_to_uint64(argv[++i], &options->scan_length)) + { + platform_error_log( + "scan_benchmark: failed to parse --scan-length\n"); + platform_free(platform_get_heap_id(), filtered); + return STATUS_BAD_PARAM; + } + } else if (STRING_EQUALS_LITERAL(argv[i], "--scan-count")) { + if (i + 1 == argc + || !try_string_to_uint64(argv[++i], &options->scan_count) + || options->scan_count == 0) + { + platform_error_log( + "scan_benchmark: failed to parse --scan-count\n"); + platform_free(platform_get_heap_id(), filtered); + return STATUS_BAD_PARAM; + } + } else { + filtered[filtered_count++] = argv[i]; + } + } + + *config_argc = filtered_count; + *config_argv = filtered; + return STATUS_OK; +} + +static inline comparison +scan_benchmark_start_comparison(bool32 backwards_scan) +{ + return backwards_scan ? less_than_or_equal : greater_than_or_equal; +} + +static inline bool32 +scan_benchmark_iterator_can_advance(splinterdb_iterator *iter, + bool32 backwards_scan) +{ + return backwards_scan ? splinterdb_iterator_can_prev(iter) + : splinterdb_iterator_can_next(iter); +} + +static inline void +scan_benchmark_iterator_advance(splinterdb_iterator *iter, + bool32 backwards_scan) +{ + if (backwards_scan) { + splinterdb_iterator_prev(iter); + } else { + splinterdb_iterator_next(iter); + } +} + +static inline void +scan_benchmark_encode_key(uint64 record_no, + uint8 keybuf[SCAN_BENCHMARK_KEY_SIZE]) +{ + for (uint64 byte_no = 0; byte_no < SCAN_BENCHMARK_KEY_SIZE; byte_no++) { + uint64 shift = 8 * (SCAN_BENCHMARK_KEY_SIZE - 1 - byte_no); + keybuf[byte_no] = (record_no >> shift) & 0xFF; + } +} + +static inline void +scan_benchmark_encode_splinter_random_key(uint64 record_no, + uint8 keybuf[SCAN_BENCHMARK_KEY_SIZE]) +{ + uint64 encoded = platform_checksum64(&record_no, sizeof(record_no), 42); + memcpy(keybuf, &encoded, sizeof(encoded)); +} + +static inline void +scan_benchmark_encode_record_key(uint64 record_no, + bool32 splinter_random_keys, + uint8 keybuf[SCAN_BENCHMARK_KEY_SIZE]) +{ + if (splinter_random_keys) { + scan_benchmark_encode_splinter_random_key(record_no, keybuf); + } else { + scan_benchmark_encode_key(record_no, keybuf); + } +} + +static inline uint64 +scan_benchmark_gcd(uint64 a, uint64 b) +{ + while (b != 0) { + uint64 tmp = a % b; + a = b; + b = tmp; + } + return a; +} + +static void +scan_benchmark_choose_permutation(uint64 num_records, + uint64 seed, + uint64 *multiplier, + uint64 *offset) +{ + if (num_records <= 1) { + *multiplier = 1; + *offset = 0; + return; + } + + random_state rs; + random_init(&rs, seed, 0); + + uint64 candidate = 1 + (random_next_uint64(&rs) % (num_records - 1)); + while (scan_benchmark_gcd(candidate, num_records) != 1) { + candidate++; + if (candidate >= num_records) { + candidate = 1; + } + } + + *multiplier = candidate; + *offset = random_next_uint64(&rs) % num_records; +} + +static inline uint64 +scan_benchmark_permute_record(uint64 position, + uint64 num_records, + uint64 multiplier, + uint64 offset) +{ + return (uint64)(((__uint128_t)multiplier * position + offset) % num_records); +} + +static inline void +scan_benchmark_print_progress(const char *label, + uint64 completed, + uint64 total, + timestamp start_time) +{ + uint64 elapsed_ns = platform_timestamp_elapsed(start_time); + double elapsed_s = (double)elapsed_ns / BILLION; + double pct = total == 0 ? 100.0 : (100.0 * completed) / total; + double rate = + elapsed_ns == 0 ? 0.0 : ((double)completed * BILLION) / elapsed_ns; + + platform_default_log( + "%s progress: %lu / %lu (%.1f%%), %.2fs elapsed, %.2f ops/s\n", + label, + completed, + total, + pct, + elapsed_s, + rate); +} + +static inline double +scan_benchmark_logical_mib_per_sec(uint64 logical_bytes_scanned, + uint64 elapsed_ns) +{ + return elapsed_ns == 0 ? 0.0 + : ((double)logical_bytes_scanned * BILLION) + / elapsed_ns / MiB_TO_B(1); +} + +static void +scan_benchmark_print_milestone(uint64 tuples_scanned, + uint64 logical_bytes_scanned, + uint64 elapsed_ns) +{ + double elapsed_s = (double)elapsed_ns / BILLION; + double tuples_per_sec = + elapsed_ns == 0 ? 0.0 : ((double)tuples_scanned * BILLION) / elapsed_ns; + double mib_per_sec = + scan_benchmark_logical_mib_per_sec(logical_bytes_scanned, elapsed_ns); + double ns_per_tuple = + tuples_scanned == 0 ? 0.0 : (double)elapsed_ns / tuples_scanned; + + platform_default_log( + "scan milestone: %10lu tuples, %8.3fs elapsed, " + "%8.2f ns/tuple, %10.2f tuples/s, %8.2f MiB/s logical\n", + tuples_scanned, + elapsed_s, + ns_per_tuple, + tuples_per_sec, + mib_per_sec); +} + +static void +scan_benchmark_build_milestones( + uint64 max_tuples, + uint64 milestones[SCAN_BENCHMARK_MAX_MILESTONES], + uint64 *milestone_count) +{ + *milestone_count = 0; + if (max_tuples == 0) { + return; + } + + uint64 milestone = 1; + while (*milestone_count < SCAN_BENCHMARK_MAX_MILESTONES + && milestone < max_tuples) + { + milestones[(*milestone_count)++] = milestone; + if (milestone > UINT64_MAX / 10) { + break; + } + milestone *= 10; + } + + if (*milestone_count == 0 || milestones[*milestone_count - 1] != max_tuples) + { + platform_assert(*milestone_count < SCAN_BENCHMARK_MAX_MILESTONES); + milestones[(*milestone_count)++] = max_tuples; + } +} + +static void +scan_benchmark_print_average_milestones( + uint64 scans_completed, + const scan_benchmark_milestone_stats *milestone_stats, + uint64 milestone_count) +{ + platform_default_log("average milestones after %lu scan%s:\n", + scans_completed, + scans_completed == 1 ? "" : "s"); + for (uint64 i = 0; i < milestone_count; i++) { + const scan_benchmark_milestone_stats *stats = &milestone_stats[i]; + if (stats->samples == 0) { + continue; + } + + double tuples_per_sec = + stats->elapsed_ns_sum == 0 + ? 0.0 + : ((double)stats->tuples * stats->samples * BILLION) + / stats->elapsed_ns_sum; + double mib_per_sec = scan_benchmark_logical_mib_per_sec( + stats->logical_bytes_sum, stats->elapsed_ns_sum); + double ns_per_tuple = + (stats->tuples == 0 || stats->samples == 0) + ? 0.0 + : (double)stats->elapsed_ns_sum / (stats->tuples * stats->samples); + + platform_default_log(" %10lu tuples over %6lu scan%s: %8.2f ns/tuple, " + "%10.2f tuples/s, %8.2f MiB/s logical\n", + stats->tuples, + stats->samples, + stats->samples == 1 ? "" : "s", + ns_per_tuple, + tuples_per_sec, + mib_per_sec); + } +} + +static void +scan_benchmark_make_config(const master_config *master_cfg, + data_config *data_cfg, + splinterdb_config *cfg, + bool open_existing) +{ + *cfg = (splinterdb_config){ + .filename = master_cfg->io_filename, + .cache_size = master_cfg->cache_capacity, + .disk_size = master_cfg->allocator_capacity, + .data_cfg = data_cfg, + .use_shmem = master_cfg->use_shmem, + .shmem_size = master_cfg->shmem_size, + .page_size = master_cfg->page_size, + .extent_size = master_cfg->extent_size, + .io_flags = master_cfg->io_flags, + .io_perms = master_cfg->io_perms, + .io_async_queue_depth = master_cfg->io_async_queue_depth, + .cache_use_stats = master_cfg->use_stats, + .cache_logfile = master_cfg->cache_logfile, + .num_memtable_bg_threads = master_cfg->num_memtable_bg_threads, + .num_normal_bg_threads = master_cfg->num_normal_bg_threads, + .btree_rough_count_height = master_cfg->btree_rough_count_height, + .filter_hash_size = master_cfg->filter_hash_size, + .filter_log_index_size = master_cfg->filter_log_index_size, + .use_log = master_cfg->use_log, + .memtable_capacity = master_cfg->memtable_capacity, + .fanout = master_cfg->fanout, + .use_stats = master_cfg->use_stats, + .reclaim_threshold = master_cfg->reclaim_threshold, + .queue_scale_percent = master_cfg->queue_scale_percent, + }; + + if (open_existing) { + cfg->io_flags &= ~O_CREAT; + } +} + +static int +scan_benchmark_load_database(const splinterdb_config *cfg, + uint64 num_records, + uint64 value_size, + bool32 random_load_order, + bool32 splinter_random_keys, + uint64 seed) +{ + splinterdb *kvs = NULL; + int rc = splinterdb_create(cfg, &kvs); + if (rc != 0) { + return rc; + } + + uint8 *value_buf = + TYPED_ARRAY_ZALLOC(platform_get_heap_id(), value_buf, value_size); + if (value_buf == NULL) { + splinterdb_close(&kvs); + return scan_benchmark_status_to_int(STATUS_NO_MEMORY); + } + + uint8 keybuf[SCAN_BENCHMARK_KEY_SIZE]; + slice value = slice_create(value_size, value_buf); + timestamp start_time = platform_get_timestamp(); + uint64 progress_interval = MAX(num_records / 10, 1); + uint64 permutation_multiplier = 1; + uint64 permutation_offset = 0; + + if (random_load_order) { + scan_benchmark_choose_permutation(num_records, + seed ^ 0x9e3779b97f4a7c15ULL, + &permutation_multiplier, + &permutation_offset); + platform_default_log("scan_benchmark: random load order enabled " + "(multiplier=%lu offset=%lu)\n", + permutation_multiplier, + permutation_offset); + } + + platform_default_log("scan_benchmark: loading %lu records\n", num_records); + + for (uint64 record_no = 0; record_no < num_records; record_no++) { + uint64 key_record_no = + random_load_order + ? scan_benchmark_permute_record(record_no, + num_records, + permutation_multiplier, + permutation_offset) + : record_no; + scan_benchmark_encode_record_key( + key_record_no, splinter_random_keys, keybuf); + slice key = slice_create(sizeof(keybuf), keybuf); + rc = splinterdb_insert(kvs, key, value, NULL); + if (rc != 0) { + platform_error_log( + "scan_benchmark: insert failed at record %lu: %d\n", record_no, rc); + break; + } + + if ((record_no + 1) % progress_interval == 0 + || record_no + 1 == num_records) + { + scan_benchmark_print_progress( + "load", record_no + 1, num_records, start_time); + } + } + + platform_free(platform_get_heap_id(), value_buf); + splinterdb_close(&kvs); + return rc; +} + +static int +scan_benchmark_run_scan(const splinterdb_config *cfg, + bool print_lookup_stats, + uint64 expected_records, + bool32 backwards_scan) +{ + splinterdb *kvs = NULL; + int rc = splinterdb_open(cfg, &kvs); + if (rc != 0) { + return rc; + } + + splinterdb_stats_reset(kvs); + + splinterdb_iterator *iter = NULL; + timestamp start_time = platform_get_timestamp(); + rc = splinterdb_iterator_init( + kvs, &iter, scan_benchmark_start_comparison(backwards_scan), NULL_SLICE); + if (rc != 0) { + splinterdb_close(&kvs); + return rc; + } + + uint64 next_milestone = 1; + uint64 tuples_scanned = 0; + uint64 logical_bytes_scanned = 0; + + while (splinterdb_iterator_valid(iter)) { + slice key; + slice value; + splinterdb_iterator_get_current(iter, &key, &value); + tuples_scanned++; + logical_bytes_scanned += slice_length(key) + slice_length(value); + + if (tuples_scanned == next_milestone) { + scan_benchmark_print_milestone(tuples_scanned, + logical_bytes_scanned, + platform_timestamp_elapsed(start_time)); + if (next_milestone <= UINT64_MAX / 10) { + next_milestone *= 10; + } + } + + if (!scan_benchmark_iterator_can_advance(iter, backwards_scan)) { + break; + } + scan_benchmark_iterator_advance(iter, backwards_scan); + } + + rc = splinterdb_iterator_status(iter); + if (rc == 0 + && (tuples_scanned == 0 || tuples_scanned != next_milestone / 10)) + { + scan_benchmark_print_milestone(tuples_scanned, + logical_bytes_scanned, + platform_timestamp_elapsed(start_time)); + } + + uint64 total_elapsed_ns = platform_timestamp_elapsed(start_time); + platform_default_log("scan complete: %lu tuples, %.2f MiB/s logical\n", + tuples_scanned, + scan_benchmark_logical_mib_per_sec( + logical_bytes_scanned, total_elapsed_ns)); + + if (expected_records != 0 && expected_records != tuples_scanned) { + platform_error_log( + "scan_benchmark: expected %lu tuples but scanned %lu\n", + expected_records, + tuples_scanned); + rc = EINVAL; + } + + if (print_lookup_stats) { + splinterdb_stats_print_lookup(kvs); + } + + splinterdb_iterator_deinit(iter); + splinterdb_close(&kvs); + return rc; +} + +static int +scan_benchmark_run_repeated_scans(const splinterdb_config *cfg, + bool print_lookup_stats, + uint64 expected_records, + uint64 scan_length, + uint64 scan_count, + bool32 splinter_random_keys, + bool32 random_scan_starts, + bool32 backwards_scan, + uint64 seed) +{ + splinterdb *kvs = NULL; + int rc = splinterdb_open(cfg, &kvs); + if (rc != 0) { + return rc; + } + + splinterdb_stats_reset(kvs); + + uint64 effective_scan_length = + scan_length == 0 ? expected_records : scan_length; + if (effective_scan_length == 0) { + platform_error_log("scan_benchmark: repeated scans require a non-zero " + "scan length or --num-inserts\n"); + splinterdb_close(&kvs); + return EINVAL; + } + + uint64 milestones[SCAN_BENCHMARK_MAX_MILESTONES]; + uint64 milestone_count = 0; + scan_benchmark_build_milestones( + effective_scan_length, milestones, &milestone_count); + + scan_benchmark_milestone_stats + milestone_stats[SCAN_BENCHMARK_MAX_MILESTONES]; + ZERO_ARRAY(milestone_stats); + for (uint64 i = 0; i < milestone_count; i++) { + milestone_stats[i].tuples = milestones[i]; + } + + random_state rs; + random_init(&rs, seed ^ 0xd1b54a32d192ed03ULL, 0); + + uint64 report_interval = MAX(scan_count / 10, 1); + uint64 total_elapsed_ns = 0; + uint64 total_tuples_scanned = 0; + uint64 total_logical_bytes_scanned = 0; + uint8 keybuf[SCAN_BENCHMARK_KEY_SIZE]; + + platform_default_log( + "scan_benchmark: running %lu %s scan%s of up to %lu tuple%s%s\n", + scan_count, + backwards_scan ? "backwards" : "forward", + scan_count == 1 ? "" : "s", + effective_scan_length, + effective_scan_length == 1 ? "" : "s", + random_scan_starts ? " from random starting points" : ""); + + for (uint64 scan_no = 0; scan_no < scan_count; scan_no++) { + slice start_key = NULL_SLICE; + uint64 start_record_no = 0; + if (random_scan_starts) { + platform_assert(expected_records > 0); + start_record_no = random_next_uint64(&rs) % expected_records; + scan_benchmark_encode_record_key( + start_record_no, splinter_random_keys, keybuf); + start_key = slice_create(sizeof(keybuf), keybuf); + } + + splinterdb_iterator *iter = NULL; + timestamp start_time = platform_get_timestamp(); + rc = splinterdb_iterator_init( + kvs, + &iter, + scan_benchmark_start_comparison(backwards_scan), + start_key); + if (rc != 0) { + splinterdb_close(&kvs); + return rc; + } + + uint64 tuples_scanned = 0; + uint64 logical_bytes_scanned = 0; + uint64 milestone_idx = 0; + uint64 target_tuples = effective_scan_length; + + while (splinterdb_iterator_valid(iter) && tuples_scanned < target_tuples) + { + slice key; + slice value; + splinterdb_iterator_get_current(iter, &key, &value); + tuples_scanned++; + logical_bytes_scanned += slice_length(key) + slice_length(value); + + while (milestone_idx < milestone_count + && tuples_scanned == milestones[milestone_idx]) + { + uint64 elapsed_ns = platform_timestamp_elapsed(start_time); + milestone_stats[milestone_idx].samples++; + milestone_stats[milestone_idx].elapsed_ns_sum += elapsed_ns; + milestone_stats[milestone_idx].logical_bytes_sum += + logical_bytes_scanned; + milestone_idx++; + } + + if (tuples_scanned >= target_tuples + || !scan_benchmark_iterator_can_advance(iter, backwards_scan)) + { + break; + } + scan_benchmark_iterator_advance(iter, backwards_scan); + } + + rc = splinterdb_iterator_status(iter); + if (rc != 0) { + splinterdb_iterator_deinit(iter); + splinterdb_close(&kvs); + return rc; + } + + uint64 elapsed_ns = platform_timestamp_elapsed(start_time); + total_elapsed_ns += elapsed_ns; + total_tuples_scanned += tuples_scanned; + total_logical_bytes_scanned += logical_bytes_scanned; + + splinterdb_iterator_deinit(iter); + + bool32 should_report = (scan_no + 1 <= 3) || (scan_no + 1 == scan_count) + || ((scan_no + 1) % report_interval == 0); + if (should_report) { + double avg_ns_per_tuple = + total_tuples_scanned == 0 + ? 0.0 + : (double)total_elapsed_ns / total_tuples_scanned; + double logical_mib_per_sec = scan_benchmark_logical_mib_per_sec( + total_logical_bytes_scanned, total_elapsed_ns); + platform_default_log("scan progress: %lu / %lu scans complete, " + "last_start=%lu, last_tuples=%lu, " + "cumulative %.2f ns/tuple, " + "%.2f MiB/s logical\n", + scan_no + 1, + scan_count, + start_record_no, + tuples_scanned, + avg_ns_per_tuple, + logical_mib_per_sec); + scan_benchmark_print_average_milestones( + scan_no + 1, milestone_stats, milestone_count); + } + } + + platform_default_log("scan complete: %lu scans, %lu tuples, " + "%.2f MiB/s logical\n", + scan_count, + total_tuples_scanned, + scan_benchmark_logical_mib_per_sec( + total_logical_bytes_scanned, total_elapsed_ns)); + + if (print_lookup_stats) { + splinterdb_stats_print_lookup(kvs); + } + + splinterdb_close(&kvs); + return rc; +} + +int +scan_benchmark(int argc, char *argv[]) +{ + platform_status status; + scan_benchmark_options options; + int config_argc = 0; + char **config_argv = NULL; + master_config master_cfg; + data_config default_data_cfg; + splinterdb_config cfg; + int rc = 0; + + if (argc > 1 && STRING_EQUALS_LITERAL(argv[1], "--help")) { + scan_benchmark_usage(argv[0]); + return 0; + } + + platform_register_thread(); + config_set_defaults(&master_cfg); + + status = scan_benchmark_parse_args( + argc, argv, &options, &config_argc, &config_argv); + if (!SUCCESS(status)) { + rc = scan_benchmark_status_to_int(status); + goto out; + } + + status = config_parse(&master_cfg, 1, config_argc, config_argv); + if (!SUCCESS(status)) { + rc = scan_benchmark_status_to_int(status); + goto out; + } + + if (master_cfg.max_key_size < SCAN_BENCHMARK_KEY_SIZE) { + platform_error_log("scan_benchmark: key-size must be at least %u bytes\n", + SCAN_BENCHMARK_KEY_SIZE); + rc = EINVAL; + goto out; + } + + if (options.mode != SCAN_BENCHMARK_SCAN_ONLY && master_cfg.num_inserts == 0) + { + platform_error_log( + "scan_benchmark: --num-inserts must be set for load modes\n"); + rc = EINVAL; + goto out; + } + + if (options.random_scan_starts && master_cfg.num_inserts == 0) { + platform_error_log("scan_benchmark: --random-scan-starts requires " + "--num-inserts to describe the keyspace\n"); + rc = EINVAL; + goto out; + } + + default_data_config_init(master_cfg.max_key_size, &default_data_cfg); + + platform_default_log( + "scan_benchmark: db=%s mode=%d num_inserts=%lu " + "cache=%lu extent=%lu value=%lu " + "random_load=%d splinter_random_keys=%d scan_length=%lu scan_count=%lu " + "random_starts=%d backwards_scan=%d seed=%lu\n", + master_cfg.io_filename, + options.mode, + master_cfg.num_inserts, + master_cfg.cache_capacity, + master_cfg.extent_size, + master_cfg.message_size, + options.random_load_order, + options.splinter_random_keys, + options.scan_length, + options.scan_count, + options.random_scan_starts, + options.backwards_scan, + master_cfg.seed); + + if (options.mode != SCAN_BENCHMARK_SCAN_ONLY) { + scan_benchmark_make_config(&master_cfg, &default_data_cfg, &cfg, FALSE); + rc = scan_benchmark_load_database(&cfg, + master_cfg.num_inserts, + master_cfg.message_size, + options.random_load_order, + options.splinter_random_keys, + master_cfg.seed); + if (rc != 0 || options.mode == SCAN_BENCHMARK_INIT_ONLY) { + goto out; + } + } + + scan_benchmark_make_config(&master_cfg, &default_data_cfg, &cfg, TRUE); + if (options.scan_count == 1 && options.scan_length == 0 + && !options.random_scan_starts) + { + rc = scan_benchmark_run_scan(&cfg, + master_cfg.use_stats, + master_cfg.num_inserts, + options.backwards_scan); + } else { + rc = scan_benchmark_run_repeated_scans(&cfg, + master_cfg.use_stats, + master_cfg.num_inserts, + options.scan_length, + options.scan_count, + options.splinter_random_keys, + options.random_scan_starts, + options.backwards_scan, + master_cfg.seed); + } + +out: + if (config_argv != NULL) { + platform_free(platform_get_heap_id(), config_argv); + } + platform_deregister_thread(); + return rc; +} diff --git a/tests/functional/splinter_test.c b/tests/functional/splinter_test.c index aac0568d..72d76380 100644 --- a/tests/functional/splinter_test.c +++ b/tests/functional/splinter_test.c @@ -69,6 +69,7 @@ typedef struct test_splinter_thread_params { uint8 lookup_positive_pct; // parallel lookup positive % uint64 seed; uint64 range_lookups_done; + uint64 range_tuples_returned; uint64 progress; } test_splinter_thread_params; @@ -342,8 +343,13 @@ test_trunk_lookup_thread(void *arg) } static void -nop_tuple_func(key tuple_key, message value, void *arg) +count_range_tuple_func(key tuple_key, message value, void *arg) { + uint64 *range_tuples_returned = arg; + + (void)tuple_key; + (void)value; + *range_tuples_returned += 1; } /* @@ -439,11 +445,12 @@ test_trunk_range_thread(void *arg) test_cfg[spl_idx].period); uint64 range_tuples = test_range(range_num, min_range_length, max_range_length); - platform_status rc = core_apply_to_range(spl, - key_buffer_key(&start_key), - range_tuples, - nop_tuple_func, - NULL); + platform_status rc = + core_apply_to_range(spl, + key_buffer_key(&start_key), + range_tuples, + count_range_tuple_func, + ¶ms->range_tuples_returned); platform_assert_status_ok(rc); params->range_lookups_done++; @@ -1232,10 +1239,12 @@ splinter_perf_range_lookups(platform_heap_id hid, } for (uint64 i = 0; i < num_range_threads; i++) { - params[i].total_ops = per_table_ranges; - params[i].op_granularity = TEST_RANGE_GRANULARITY; - params[i].min_range_length = min_range_length; - params[i].max_range_length = max_range_length; + params[i].total_ops = per_table_ranges; + params[i].op_granularity = TEST_RANGE_GRANULARITY; + params[i].min_range_length = min_range_length; + params[i].max_range_length = max_range_length; + params[i].range_lookups_done = 0; + params[i].range_tuples_returned = 0; if (verbose_progress) { platform_default_log(" Range thread[%lu] " @@ -1284,26 +1293,37 @@ splinter_perf_range_lookups(platform_heap_id hid, rc = STATUS_OK; - platform_default_log("\nTotal time=%lus for %s lookup, per-splinter " - "per-thread range time per tuple %lu ns\n", - NSEC_TO_SEC(total_time), - range_descr, - total_time * num_range_threads / total_ranges); - - uint64 num_range_lookups = 0; + uint64 num_range_lookups = 0; + uint64 total_returned_tuples = 0; for (uint64 i = 0; i < num_range_threads; i++) { if (verbose_progress) { - platform_default_log(" Range thread %lu, range lookups = %lu\n", + platform_default_log(" Range thread %lu, range lookups = %lu" + ", returned tuples = %lu\n", i, - params[i].range_lookups_done); + params[i].range_lookups_done, + params[i].range_tuples_returned); } num_range_lookups += params[i].range_lookups_done; + total_returned_tuples += params[i].range_tuples_returned; } + platform_default_log( + "\nTotal time=%lus for %s lookup, per-splinter per-thread " + "range time per range op %lu ns, per returned tuple %lu ns\n", + NSEC_TO_SEC(total_time), + range_descr, + (num_range_lookups ? total_time * num_range_threads / num_range_lookups + : 0), + (total_returned_tuples + ? total_time * num_range_threads / total_returned_tuples + : 0)); platform_default_log( "splinter total range lookups: %lu" - ", range rate: %lu ops/second\n", + ", total returned tuples: %lu" + ", range rate: %lu ops/second, tuple rate: %lu tuples/second\n", num_range_lookups, - (total_time ? SEC_TO_NSEC(total_ranges) / total_time : 0)); + total_returned_tuples, + (total_time ? SEC_TO_NSEC(num_range_lookups) / total_time : 0), + (total_time ? SEC_TO_NSEC(total_returned_tuples) / total_time : 0)); for (uint8 spl_idx = 0; spl_idx < num_tables; spl_idx++) { core_handle *spl = &spl_tables[spl_idx]; diff --git a/tests/functional/test.h b/tests/functional/test.h index 596dfe38..bbbd76a2 100644 --- a/tests/functional/test.h +++ b/tests/functional/test.h @@ -37,6 +37,9 @@ filter_test(int argc, char *argv[]); int splinter_test(int argc, char *argv[]); +int +scan_benchmark(int argc, char *argv[]); + int log_test(int argc, char *argv[]); diff --git a/tests/functional/test_dispatcher.c b/tests/functional/test_dispatcher.c index 1f051033..828a04e1 100644 --- a/tests/functional/test_dispatcher.c +++ b/tests/functional/test_dispatcher.c @@ -15,6 +15,7 @@ usage(void) platform_error_log("\tbtree_test\n"); platform_error_log("\tfilter_test\n"); platform_error_log("\tsplinter_test\n"); + platform_error_log("\tscan_benchmark\n"); platform_error_log("\tlog_test\n"); platform_error_log("\tcache_test\n"); platform_error_log("\tio_apis_test\n"); @@ -42,6 +43,8 @@ test_dispatcher(int argc, char *argv[]) return filter_test(argc - 1, &argv[1]); } else if (STRING_EQUALS_LITERAL(test_name, "splinter_test")) { return splinter_test(argc - 1, &argv[1]); + } else if (STRING_EQUALS_LITERAL(test_name, "scan_benchmark")) { + return scan_benchmark(argc - 1, &argv[1]); } else if (STRING_EQUALS_LITERAL(test_name, "log_test")) { return log_test(argc - 1, &argv[1]); } else if (STRING_EQUALS_LITERAL(test_name, "cache_test")) {