diff --git a/src/lib/core/fiber.c b/src/lib/core/fiber.c index b813c17399eb..6973fd6788f3 100644 --- a/src/lib/core/fiber.c +++ b/src/lib/core/fiber.c @@ -37,6 +37,7 @@ #include #include #include +#include /* __rdtscp() */ #include "assoc.h" #include "memory.h" @@ -82,6 +83,28 @@ static int (*fiber_invoke)(fiber_func f, va_list ap); err; \ }) +/** + * An action performed each time a context switch happens. + * Used to count each fiber's processing time. + * This is a macro rather than a function, since it is used + * in scheduler too. + */ +#define clock_set_on_csw(caller) \ +({ \ + uint64_t clock; \ + uint32_t cpu_id; \ + clock = __rdtscp(&cpu_id); \ + \ + if (cpu_id == cord()->cpu_id_last) { \ + (caller)->clock_delta += clock - cord()->clock_last; \ + cord()->clock_delta += clock - cord()->clock_last; \ + } else { \ + cord()->cpu_id_last = cpu_id; \ + cord()->cpu_miss_count++; \ + } \ + cord()->clock_last = clock; \ +}) + /* * Defines a handler to be executed on exit from cord's thread func, * accessible via cord()->on_exit (normally NULL). It is used to @@ -107,6 +130,8 @@ pthread_t main_thread_id; static size_t page_size; static int stack_direction; +static bool fiber_top_enabled = false; + enum { /* The minimum allowable fiber stack size in bytes */ FIBER_STACK_SIZE_MINIMAL = 16384, @@ -246,6 +271,7 @@ fiber_call(struct fiber *callee) /** By convention, these triggers must not throw. */ if (! rlist_empty(&caller->on_yield)) trigger_run(&caller->on_yield, NULL); + clock_set_on_csw(caller); callee->caller = caller; callee->flags |= FIBER_IS_READY; caller->flags |= FIBER_IS_READY; @@ -474,6 +500,7 @@ fiber_yield(void) /** By convention, these triggers must not throw. */ if (! rlist_empty(&caller->on_yield)) trigger_run(&caller->on_yield, NULL); + clock_set_on_csw(caller); assert(callee->flags & FIBER_IS_READY || callee == &cord->sched); assert(! (callee->flags & FIBER_IS_DEAD)); @@ -584,6 +611,7 @@ fiber_schedule_list(struct rlist *list) } last->caller = fiber(); assert(fiber() == &cord()->sched); + clock_set_on_csw(fiber()); fiber_call_impl(first); } @@ -980,6 +1008,8 @@ fiber_new_ex(const char *name, const struct fiber_attr *fiber_attr, } fiber->f = f; + fiber->clock_acc = 0; + fiber->clock_delta = 0; /* Excluding reserved range */ if (++cord->max_fid < FIBER_ID_MAX_RESERVED) cord->max_fid = FIBER_ID_MAX_RESERVED + 1; @@ -1044,6 +1074,103 @@ fiber_destroy_all(struct cord *cord) struct fiber, link)); } +static void +loop_on_iteration_start(ev_loop *loop, ev_check *watcher, int revents) +{ + (void) loop; + (void) watcher; + (void) revents; + + cord()->clock_last = __rdtscp(&cord()->cpu_id_last); + cord()->cpu_miss_count = 0; +} + + +/** + * Calculate the exponential moving average for the clock deltas + * per loop iteration. The coeffitient is 1/16. + */ +static inline uint64_t +clock_diff_accumulate(uint64_t acc, uint64_t delta) +{ + if (acc > 0) { + return delta / 16 + 15 * acc / 16; + } else { + return delta; + } +} + +static void +loop_on_iteration_end(ev_loop *loop, ev_prepare *watcher, int revents) +{ + (void) loop; + (void) watcher; + (void) revents; + struct fiber *fiber; + assert(fiber() == &cord()->sched); + + /* + * Record the scheduler's latest clock change, even though + * it's not a context switch, but an event loop iteration + * end. + */ + clock_set_on_csw(&cord()->sched); + + cord()->cpu_miss_count_last = cord()->cpu_miss_count; + cord()->cpu_miss_count = 0; + + cord()->clock_acc = clock_diff_accumulate(cord()->clock_acc, cord()->clock_delta); + cord()->clock_delta_last = cord()->clock_delta; + cord()->clock_delta = 0; + + cord()->sched.clock_acc = clock_diff_accumulate(cord()->sched.clock_acc, cord()->sched.clock_delta); + cord()->sched.clock_delta_last = cord()->sched.clock_delta; + cord()->sched.clock_delta = 0; + + rlist_foreach_entry(fiber, &cord()->alive, link) { + fiber->clock_acc = clock_diff_accumulate(fiber->clock_acc, fiber->clock_delta); + fiber->clock_delta_last = fiber->clock_delta; + fiber->clock_delta = 0; + } +} + +static inline void +fiber_top_init() +{ + ev_prepare_init(&cord()->prepare_event, loop_on_iteration_end); + ev_check_init(&cord()->check_event, loop_on_iteration_start); +} + +bool +fiber_top_is_enabled() +{ + return fiber_top_enabled; +} + +inline void +fiber_top_enable() +{ + if (!fiber_top_enabled) { + ev_prepare_start(cord()->loop, &cord()->prepare_event); + ev_check_start(cord()->loop, &cord()->check_event); + fiber_top_enabled = true; + + cord()->clock_acc = 0; + cord()->cpu_miss_count_last = 0; + cord()->clock_delta_last = 0; + } +} + +inline void +fiber_top_disable() +{ + if (fiber_top_enabled) { + ev_prepare_stop(cord()->loop, &cord()->prepare_event); + ev_check_stop(cord()->loop, &cord()->check_event); + fiber_top_enabled = false; + } +} + void cord_create(struct cord *cord, const char *name) { @@ -1077,6 +1204,13 @@ cord_create(struct cord *cord, const char *name) ev_async_init(&cord->wakeup_event, fiber_schedule_wakeup); ev_idle_init(&cord->idle_event, fiber_schedule_idle); + + /* fiber.top() currently works only for the main thread. */ + if (cord_is_main()) { + fiber_top_init(); + fiber_top_enable(); + } + cord_set_name(name); #if ENABLE_ASAN diff --git a/src/lib/core/fiber.h b/src/lib/core/fiber.h index fb168e25e2ba..e56a2aa3523a 100644 --- a/src/lib/core/fiber.h +++ b/src/lib/core/fiber.h @@ -386,6 +386,16 @@ struct fiber { uint32_t fid; /** Fiber flags */ uint32_t flags; + /** + * Accumulated clock value calculated using exponential + * moving average. + */ + uint64_t clock_acc; + /** + * Clock delta calculated on previous event loop iteration. + */ + uint64_t clock_delta_last; + uint64_t clock_delta; /** Link in cord->alive or cord->dead list. */ struct rlist link; /** Link in cord->ready list. */ @@ -457,6 +467,13 @@ struct cord { * reserved. */ uint32_t max_fid; + uint64_t clock_acc; + uint64_t clock_delta; + uint64_t clock_delta_last; + uint64_t clock_last; + uint32_t cpu_id_last; + uint32_t cpu_miss_count; + uint32_t cpu_miss_count_last; pthread_t id; const struct cord_on_exit *on_exit; /** A helper hash to map id -> fiber. */ @@ -482,6 +499,14 @@ struct cord { * is no 1 ms delay in case of zero sleep timeout. */ ev_idle idle_event; + /** An event triggered on every event loop iteration start. */ + ev_check check_event; + /** + * An event triggered on every event loop iteration end. + * Just like the event above it is used in per-fiber cpu + * time calculations. + */ + ev_prepare prepare_event; /** A memory cache for (struct fiber) */ struct mempool fiber_mempool; /** A runtime slab cache for general use in this cord. */ @@ -625,6 +650,15 @@ typedef int (*fiber_stat_cb)(struct fiber *f, void *ctx); int fiber_stat(fiber_stat_cb cb, void *cb_ctx); +bool +fiber_top_is_enabled(); + +void +fiber_top_enable(); + +void +fiber_top_disable(); + /** Useful for C unit tests */ static inline int fiber_c_invoke(fiber_func f, va_list ap) diff --git a/src/lua/fiber.c b/src/lua/fiber.c index 336be60a287c..d5d9d5573da1 100644 --- a/src/lua/fiber.c +++ b/src/lua/fiber.c @@ -319,6 +319,59 @@ lbox_fiber_statof_nobt(struct fiber *f, void *cb_ctx) return lbox_fiber_statof(f, cb_ctx, false); } +static int +lbox_fiber_top_entry(struct fiber *f, void *cb_ctx) +{ + struct lua_State *L = (struct lua_State *) cb_ctx; + + lua_pushinteger(L, f->fid); + lua_newtable(L); + + lua_pushliteral(L, "cpu average (%)"); + lua_pushnumber(L, f->clock_acc / (double)cord()->clock_acc * 100); + lua_settable(L, -3); + lua_pushliteral(L, "cpu instant (%)"); + lua_pushnumber(L, f->clock_delta_last / (double)cord()->clock_delta_last * 100); + lua_settable(L, -3); + lua_settable(L, -3); + + return 0; +} + +static int +lbox_fiber_top(struct lua_State *L) +{ + if (!fiber_top_is_enabled()) { + luaL_error(L, "fiber.top() is disabled. enable it with" + " fiber.top_enable() first"); + } + lua_newtable(L); + lua_pushliteral(L, "cpu misses"); + lua_pushnumber(L, cord()->cpu_miss_count_last); + lua_settable(L, -3); + + lbox_fiber_top_entry(&cord()->sched, L); + fiber_stat(lbox_fiber_top_entry, L); + + return 1; +} + +static int +lbox_fiber_top_enable(struct lua_State *L) +{ + (void) L; + fiber_top_enable(); + return 0; +} + +static int +lbox_fiber_top_disable(struct lua_State *L) +{ + (void) L; + fiber_top_disable(); + return 0; +} + /** * Return fiber statistics. */ @@ -741,6 +794,9 @@ static const struct luaL_Reg lbox_fiber_meta [] = { static const struct luaL_Reg fiberlib[] = { {"info", lbox_fiber_info}, + {"top", lbox_fiber_top}, + {"top_enable", lbox_fiber_top_enable}, + {"top_disable", lbox_fiber_top_disable}, {"sleep", lbox_fiber_sleep}, {"yield", lbox_fiber_yield}, {"self", lbox_fiber_self}, diff --git a/test/app/fiber.result b/test/app/fiber.result index 94e690f6cf6c..6974cebc8df7 100644 --- a/test/app/fiber.result +++ b/test/app/fiber.result @@ -1462,6 +1462,84 @@ fiber.join(fiber.self()) --- - error: the fiber is not joinable ... +sum = 0 +--- +... +-- gh-2694 fiber.top() +a = fiber.top() +--- +... +-- scheduler is present in fiber.top() +a[1] ~= nil +--- +- true +... +type(a["cpu misses"]) == 'number' +--- +- true +... +sum_inst = 0 +--- +... +sum_avg = 0 +--- +... +test_run:cmd('setopt delimiter ";"') +--- +- true +... +for k, v in pairs(a) do + if type(v) == 'table' then + sum_inst = sum_inst + v["cpu instant (%)"] + sum_avg = sum_avg + v["cpu average (%)"] + end +end; +--- +... +test_run:cmd('setopt delimiter ""'); +--- +- true +... +sum_inst +--- +- 100 +... +-- not exact due to accumulated integer division errors +sum_avg > 99 and sum_avg < 101 or sum_avg +--- +- true +... +tbl = nil +--- +... +f = fiber.new(function() for i = 1,1000 do end fiber.yield() tbl = fiber.top()[fiber.self().id()] end) +--- +... +while f:status() ~= 'dead' do fiber.sleep(0.01) end +--- +... +tbl["cpu average (%)"] > 0 +--- +- true +... +tbl["cpu instant (%)"] > 0 +--- +- true +... +fiber.top_disable() +--- +... +fiber.top() +--- +- error: fiber.top() is disabled. enable it with fiber.top_enable() first +... +fiber.top_enable() +--- +... +type(fiber.top()) +--- +- table +... -- cleanup test_run:cmd("clear filter") --- diff --git a/test/app/fiber.test.lua b/test/app/fiber.test.lua index bb8c249905b5..279c40ce145d 100644 --- a/test/app/fiber.test.lua +++ b/test/app/fiber.test.lua @@ -629,6 +629,37 @@ while f:status() ~= 'dead' do fiber.sleep(0.01) end -- fiber.join(fiber.self()) +sum = 0 + +-- gh-2694 fiber.top() +a = fiber.top() +-- scheduler is present in fiber.top() +a[1] ~= nil +type(a["cpu misses"]) == 'number' +sum_inst = 0 +sum_avg = 0 +test_run:cmd('setopt delimiter ";"') +for k, v in pairs(a) do + if type(v) == 'table' then + sum_inst = sum_inst + v["cpu instant (%)"] + sum_avg = sum_avg + v["cpu average (%)"] + end +end; +test_run:cmd('setopt delimiter ""'); +sum_inst +-- not exact due to accumulated integer division errors +sum_avg > 99 and sum_avg < 101 or sum_avg +tbl = nil +f = fiber.new(function() for i = 1,1000 do end fiber.yield() tbl = fiber.top()[fiber.self().id()] end) +while f:status() ~= 'dead' do fiber.sleep(0.01) end +tbl["cpu average (%)"] > 0 +tbl["cpu instant (%)"] > 0 + +fiber.top_disable() +fiber.top() +fiber.top_enable() +type(fiber.top()) + -- cleanup test_run:cmd("clear filter")