Skip to content

Commit

Permalink
box: introduce memtx_sort_threads config parameter
Browse files Browse the repository at this point in the history
Closes tarantool#3389
Closes tarantool#7689
Closes tarantool#4646

@TarantoolBot document
Title: new box.cfg parameter memtx_sort_threads

The parameter sets the number of threads used to sort keys of secondary
indexes on loading memtx database. The parameter cannot be changed
dynamically (as it does not make sense).

Maximum value is 256, minimum is 1. Default is to use all available cores.

Usage example:
```
box.cfg{memtx_sort_threads=4}
```
  • Loading branch information
nshy committed Jun 9, 2023
1 parent eef6944 commit e8c81ef
Show file tree
Hide file tree
Showing 17 changed files with 144 additions and 360 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## feature/box

* Added the `box.cfg.memtx_sort_threads` parameter that specifies the number of
threads used to sort indexes keys on loading a memtx database. OpenMP is
not used to sort keys anymore (gh-3389).
5 changes: 0 additions & 5 deletions cmake/BuildMisc.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ macro(libmisc_build)
)
endif()

if (HAVE_OPENMP)
list(APPEND misc_src
${PROJECT_SOURCE_DIR}/third_party/qsort_arg_mt.c)
endif()

add_library(misc STATIC ${misc_src})
set_target_properties(misc PROPERTIES COMPILE_FLAGS "${DEPENDENCY_CFLAGS}")

Expand Down
16 changes: 0 additions & 16 deletions cmake/compiler.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,6 @@ if((NOT HAVE_STD_C11 AND NOT HAVE_STD_GNU99) OR
"Please consider upgrade to gcc 4.5+ or clang 3.2+.")
endif()

#
# Check for an omp support
#
set(CMAKE_REQUIRED_FLAGS "-fopenmp -Werror")
check_cxx_source_compiles("int main(void) {
#pragma omp parallel
{
}
return 0;
}" HAVE_OPENMP)
set(CMAKE_REQUIRED_FLAGS "")

#
# GCC started to warn for unused result starting from 4.2, and
# this is when it introduced -Wno-unused-result
Expand Down Expand Up @@ -322,10 +310,6 @@ macro(enable_tnt_compile_flags)
endif()
endmacro(enable_tnt_compile_flags)

if (HAVE_OPENMP)
add_compile_flags("C;CXX" "-fopenmp")
endif()

if (CMAKE_COMPILER_IS_CLANG OR CMAKE_COMPILER_IS_GNUCC)
set(HAVE_BUILTIN_CTZ 1)
set(HAVE_BUILTIN_CTZLL 1)
Expand Down
1 change: 0 additions & 1 deletion debian/copyright
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ Copyright: 2013, Red Hat, Inc
License: GPL

Files: third_party/qsort_arg.c
third_party/qsort_arg_mt.c
third_party/compat/sys/bsd_time.h
third_party/queue.h
third_party/rb.h
Expand Down
7 changes: 0 additions & 7 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -368,13 +368,6 @@ if(BUILD_STATIC)
endif()
endforeach(libstatic)
string(REPLACE ";" " " EXPORT_LIST "${EXPORT_LIST}")

if (HAVE_OPENMP)
# Link libgomp explicitly to make it static. Avoid linking
# against DSO version of libgomp, which implied by -fopenmp
set (common_libraries ${common_libraries} "libgomp.a")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fno-openmp")
endif()
endif()

set(exports_file_sources
Expand Down
21 changes: 21 additions & 0 deletions src/box/box.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
#include "mp_util.h"
#include "small/static.h"
#include "node_name.h"
#include "tt_sort.h"

static char status[64] = "unconfigured";

Expand Down Expand Up @@ -1660,6 +1661,24 @@ box_init_say()
return 0;
}

/**
* Checks whether memtx_sort_threads configuration parameter is correct.
*/
static void
box_check_memtx_sort_threads(void)
{
int num = cfg_geti("memtx_sort_threads");
/*
* After high level checks this parameter is either nil or has
* type 'number'.
*/
if (cfg_isnumber("memtx_sort_threads") &&
(num <= 0 || num > TT_SORT_THREADS_MAX))
tnt_raise(ClientError, ER_CFG, "memtx_sort_threads",
tt_sprintf("must be greater than 0 and less than or"
" equal to %d", TT_SORT_THREADS_MAX));
}

void
box_check_config(void)
{
Expand Down Expand Up @@ -1728,6 +1747,7 @@ box_check_config(void)
diag_raise();
if (box_check_txn_isolation() == txn_isolation_level_MAX)
diag_raise();
box_check_memtx_sort_threads();
}

int
Expand Down Expand Up @@ -4501,6 +4521,7 @@ engine_init()
cfg_geti("slab_alloc_granularity"),
cfg_gets("memtx_allocator"),
cfg_getd("slab_alloc_factor"),
cfg_geti("memtx_sort_threads"),
box_on_indexes_built);
engine_register((struct engine *)memtx);
box_set_memtx_max_tuple_size();
Expand Down
2 changes: 2 additions & 0 deletions src/box/lua/load_cfg.lua
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ local default_cfg = {
sql_cache_size = 5 * 1024 * 1024,
txn_timeout = 365 * 100 * 86400,
txn_isolation = "best-effort",
memtx_sort_threads = nil,

metrics = {
include = 'all',
Expand Down Expand Up @@ -387,6 +388,7 @@ local template_cfg = {
net_msg_max = 'number',
sql_cache_size = 'number',
txn_timeout = 'number',
memtx_sort_threads = 'number',

metrics = 'table',
}
Expand Down
27 changes: 26 additions & 1 deletion src/box/memtx_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "memtx_tuple_compression.h"
#include "memtx_space.h"
#include "memtx_space_upgrade.h"
#include "tt_sort.h"

#include <type_traits>

Expand Down Expand Up @@ -1410,7 +1411,7 @@ struct memtx_engine *
memtx_engine_new(const char *snap_dirname, bool force_recovery,
uint64_t tuple_arena_max_size, uint32_t objsize_min,
bool dontdump, unsigned granularity,
const char *allocator, float alloc_factor,
const char *allocator, float alloc_factor, int sort_threads,
memtx_on_indexes_built_cb on_indexes_built)
{
int64_t snap_signature;
Expand Down Expand Up @@ -1504,6 +1505,30 @@ memtx_engine_new(const char *snap_dirname, bool force_recovery,
memtx->state = MEMTX_INITIALIZED;
memtx->max_tuple_size = MAX_TUPLE_SIZE;
memtx->force_recovery = force_recovery;
if (sort_threads == 0) {
char *ompnum_str = getenv_safe("OMP_NUM_THREADS", NULL, 0);
if (ompnum_str != NULL) {
long ompnum = strtol(ompnum_str, NULL, 10);
if (ompnum > 0 && ompnum <= TT_SORT_THREADS_MAX) {
say_warn("OMP_NUM_THREADS is used to set number"
" of sorting threads. Use cfg option"
" 'memtx_sort_threads' instead.");
sort_threads = ompnum;
}
free(ompnum_str);
}
if (sort_threads == 0) {
sort_threads = sysconf(_SC_NPROCESSORS_ONLN);
if (sort_threads < 1) {
say_warn("Cannot get number of processors. "
"Fallback to single processor.");
sort_threads = 1;
} else if (sort_threads > TT_SORT_THREADS_MAX) {
sort_threads = TT_SORT_THREADS_MAX;
}
}
}
memtx->sort_threads = sort_threads;

memtx->replica_join_cord = NULL;

Expand Down
10 changes: 8 additions & 2 deletions src/box/memtx_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ struct memtx_engine {
* Format used for allocating functional index keys.
*/
struct tuple_format *func_key_format;
/**
* Number of threads used to sort keys of secondary indexes on engine
* start.
*/
int sort_threads;
};

struct memtx_gc_task;
Expand Down Expand Up @@ -221,7 +226,7 @@ struct memtx_engine *
memtx_engine_new(const char *snap_dirname, bool force_recovery,
uint64_t tuple_arena_max_size, uint32_t objsize_min,
bool dontdump, unsigned granularity,
const char *allocator, float alloc_factor,
const char *allocator, float alloc_factor, int threads_num,
memtx_on_indexes_built_cb on_indexes_built);

/**
Expand Down Expand Up @@ -362,13 +367,14 @@ memtx_engine_new_xc(const char *snap_dirname, bool force_recovery,
uint64_t tuple_arena_max_size, uint32_t objsize_min,
bool dontdump, unsigned granularity,
const char *allocator, float alloc_factor,
int sort_threads,
memtx_on_indexes_built_cb on_indexes_built)
{
struct memtx_engine *memtx;
memtx = memtx_engine_new(snap_dirname, force_recovery,
tuple_arena_max_size, objsize_min, dontdump,
granularity, allocator, alloc_factor,
on_indexes_built);
sort_threads, on_indexes_built);
if (memtx == NULL)
diag_raise();
return memtx;
Expand Down
9 changes: 5 additions & 4 deletions src/box/memtx_tree.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
#include "memtx_tx.h"
#include "trivia/config.h"
#include "trivia/util.h"
#include <qsort_arg.h>
#include "tt_sort.h"
#include <small/mempool.h>

/**
Expand Down Expand Up @@ -1840,9 +1840,10 @@ memtx_tree_index_end_build(struct index *base)
struct memtx_tree_index<USE_HINT> *index =
(struct memtx_tree_index<USE_HINT> *)base;
struct key_def *cmp_def = memtx_tree_cmp_def(&index->tree);
qsort_arg(index->build_array, index->build_array_size,
sizeof(index->build_array[0]),
memtx_tree_qcompare<USE_HINT>, cmp_def);
struct memtx_engine *memtx = (struct memtx_engine *)base->engine;
tt_sort(index->build_array, index->build_array_size,
sizeof(index->build_array[0]), memtx_tree_qcompare<USE_HINT>,
cmp_def, memtx->sort_threads);
if (cmp_def->is_multikey || cmp_def->for_func_index) {
/*
* Multikey index may have equal(in terms of
Expand Down
18 changes: 9 additions & 9 deletions src/lib/core/tt_sort.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ static int
find_bucket(struct sort_data *sort, void *elem)
{
/*
* Bucket count is `thread_count`, thus bucket boundraries (splitters)
* Bucket count is `thread_count`, thus bucket boundaries (splitters)
* count is `thread_count - 1` omitting most left and most right
* boundaries. Let's place most left and most right boundaries at
* imaginary indexes `-1` and `size of splitters` respectively.
Expand Down Expand Up @@ -175,9 +175,9 @@ sort_bucket(va_list ap)
struct sort_data *sort = worker->sort;

/* Sort this worker bucket. */
qsort_arg_st(sort->buffer + worker->bucket_begin * sort->elem_size,
worker->bucket_size, sort->elem_size,
sort->cmp, sort->cmp_arg);
qsort_arg(sort->buffer + worker->bucket_begin * sort->elem_size,
worker->bucket_size, sort->elem_size,
sort->cmp, sort->cmp_arg);

/* Move sorted data back from temporary space. */
memcpy(sort->data + worker->bucket_begin * sort->elem_size,
Expand Down Expand Up @@ -254,8 +254,8 @@ find_splitters(struct sort_data *sort)
sort->data + i * sample_step * sort->elem_size,
sort->elem_size);

qsort_arg_st(samples, samples_num, sort->elem_size, sort->cmp,
sort->cmp_arg);
qsort_arg(samples, samples_num, sort->elem_size, sort->cmp,
sort->cmp_arg);

/* Take splitters from samples. */
for (int i = 0; i < sort->thread_count - 1; i++) {
Expand Down Expand Up @@ -293,8 +293,8 @@ sort_all(va_list ap)
{
struct sort_data *sort = va_arg(ap, typeof(sort));

qsort_arg_st(sort->data, sort->elem_count, sort->elem_size, sort->cmp,
sort->cmp_arg);
qsort_arg(sort->data, sort->elem_count, sort->elem_size, sort->cmp,
sort->cmp_arg);

return 0;
}
Expand Down Expand Up @@ -353,7 +353,7 @@ tt_sort(void *data, size_t elem_count, size_t elem_size,
if (elem_count < NOSPAWN_SIZE_THESHOLD) {
say_verbose("data size is less than threshold %d,"
" sort in caller thread", NOSPAWN_SIZE_THESHOLD);
qsort_arg_st(data, elem_count, elem_size, cmp, cmp_arg);
qsort_arg(data, elem_count, elem_size, cmp, cmp_arg);
return;
}

Expand Down
4 changes: 0 additions & 4 deletions src/trivia/config.h.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,6 @@
* Defined if this is a big-endian system.
*/
#cmakedefine HAVE_BYTE_ORDER_BIG_ENDIAN 1
/*
* Defined if this platform supports openmp and it is enabled
*/
#cmakedefine HAVE_OPENMP 1
/*
* Defined if compatible with GNU readline installed.
*/
Expand Down
58 changes: 58 additions & 0 deletions test/box-luatest/gh_3389_cfg_option_memtx_sort_threads_test.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
local server = require('luatest.server')
local t = require('luatest')

local g = t.group()

g.before_all(function(cg)
-- test setting memtx_sort_threads to non default value
cg.server = server:new{}
cg.server:start()
cg.server:exec(function()
-- this will trigger calling tt_sort on box.cfg{}
local s = box.schema.space.create('test')
s:create_index('pri')
s:replace{2}
s:replace{1}
box.snapshot()
end)
end)

g.after_all(function(cg)
if cg.server ~= nil then
cg.server:drop()
end
end)

g.after_each(function(cg)
if cg.server ~= nil then
cg.server:stop()
end
end)

g.test_setting_cfg_option = function(cg)
cg.server = server:new{box_cfg = {memtx_sort_threads = 3}}
cg.server:start()
cg.server:exec(function()
t.assert_error_msg_equals(
"Can't set option 'memtx_sort_threads' dynamically",
box.cfg, {memtx_sort_threads = 5})
end)
end

g.test_setting_openmp_env_var = function(cg)
cg.server = server:new{box_cfg = {log_level = 'warn'},
env = {OMP_NUM_THREADS = ' 3'}}
cg.server:start()
t.helpers.retrying({}, function()
local p = "OMP_NUM_THREADS is used to set number" ..
" of sorting threads. Use cfg option" ..
" 'memtx_sort_threads' instead."
t.assert_not_equals(cg.server:grep_log(p), nil)
end)
cg.server:stop()
end

g.test_setting_openmp_env_var_bad = function(cg)
cg.server = server:new{env= {OMP_NUM_THREADS = '300'}}
cg.server:start()
end
6 changes: 5 additions & 1 deletion test/box-tap/cfg.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ local socket = require('socket')
local fio = require('fio')
local uuid = require('uuid')
local msgpack = require('msgpack')
test:plan(107)
test:plan(111)

--------------------------------------------------------------------------------
-- Invalid values
Expand Down Expand Up @@ -49,6 +49,10 @@ invalid('vinyl_run_size_ratio', 1)
invalid('vinyl_bloom_fpr', 0)
invalid('vinyl_bloom_fpr', 1.1)
invalid('wal_queue_max_size', -1)
invalid('memtx_sort_threads', 'all')
invalid('memtx_sort_threads', -1)
invalid('memtx_sort_threads', 0)
invalid('memtx_sort_threads', 257)

local function invalid_combinations(name, val)
local status, result = pcall(box.cfg, val)
Expand Down

0 comments on commit e8c81ef

Please sign in to comment.