Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add reusable fibers and fiber pool #268

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions util/fibers/detail/fiber_interface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,31 @@ void ExecuteOnAllFiberStacks(FiberInterface::PrintFn fn) {
FbInitializer().sched->ExecuteOnAllFiberStacks(std::move(fn));
}


base::mpmc_bounded_queue<FiberInterface*> FiberPool::available_queue_(1024);

void ReusableFiberImpl::MoveToPool() {
scheduler()->SuspendAndExecuteOnDispatcher([this] {
DetachScheduler();

while (true) {
// We signal that the fiber is being terminated by setting the kTerminatedBit flag.
// We also set the kBusyBit flag to try to acquire the lock.
uint16_t fprev = flags_.fetch_or(kTerminatedBit | kBusyBit, memory_order_acquire);
if ((fprev & kBusyBit) == 0) { // has been acquired
break;
}
CpuPause();
}
//trace_ = TRACE_TERMINATE; // need to create one
join_q_.NotifyAll(this);

flags_.fetch_and(~kBusyBit, memory_order_release);

FiberPool::AddAvailableFiber(this);
});
}

} // namespace detail

void SetCustomDispatcher(DispatchPolicy* policy) {
Expand Down
96 changes: 94 additions & 2 deletions util/fibers/detail/fiber_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "base/mpsc_intrusive_queue.h"
#include "base/pmr/memory_resource.h"
#include "util/fibers/detail/wait_queue.h"
#include "base/mpmc_bounded_queue.h"

namespace util {
namespace fb2 {
Expand Down Expand Up @@ -62,9 +63,11 @@ class Scheduler;
class FiberInterface {
friend class Scheduler;

static constexpr uint64_t kRemoteFree = 1;


protected:
static constexpr uint64_t kRemoteFree = 1;


// holds its own fiber_context when it's not active.
// the difference between fiber_context and continuation is that continuation is launched
// straight away via callcc and fiber is created without switching to it.
Expand Down Expand Up @@ -291,6 +294,62 @@ template <typename Fn, typename... Arg> class WorkerFiberImpl : public FiberInte
std::tuple<std::decay_t<Arg>...> arg_;
};

class ReusableFiberImpl : public FiberInterface {
using FbCntx = ::boost::context::fiber_context;

public:
template <typename StackAlloc>
ReusableFiberImpl(std::string_view name, const boost::context::preallocated& palloc,
StackAlloc&& salloc) : FiberInterface(WORKER, 1, name) {
stack_size_ = palloc.sctx.size;
entry_ = FbCntx(std::allocator_arg, palloc, std::forward<StackAlloc>(salloc),
[this](FbCntx&& caller) { return run_(std::move(caller)); });
#if defined(BOOST_USE_UCONTEXT)
entry_ = std::move(entry_).resume();
#endif
}

template <class Fn, class ...Arg>
void SetTask(Fn&& fn, Arg&&... arg) {
flags_.fetch_and(~kTerminatedBit, std::memory_order_release);
remote_next_.store((FiberInterface*)kRemoteFree, std::memory_order_relaxed);
auto fn_ptr = std::make_shared<std::decay_t<Fn>>(std::forward<Fn>(fn));
auto arg_ptr = std::make_shared<std::tuple<std::decay_t<Arg>...>>(std::forward<Arg>(arg)...);
fn_ = [fn_ptr, arg_ptr]() mutable {
std::apply(std::move(*fn_ptr), std::move(*arg_ptr));
};
}

private:
FbCntx run_(FbCntx&& c) {
// assert(!c) <- we never pass the caller,
// because with update c_ with it before switching.
while (!stop_flag) {
if (fn_) {
// fn and tpl must be destroyed before calling terminate()
auto fn = std::move(fn_);
fn_ = nullptr;

#if defined(BOOST_USE_UCONTEXT)
std::move(c).resume();
#endif

fn();
} else {
MoveToPool();
}
}

return Terminate();
}

void MoveToPool();

volatile bool stop_flag = false;
// Without decay - fn_ can be a reference, depending how a function is passed to the constructor.
std::function<void()> fn_;
};

template <typename FbImpl>
boost::context::preallocated MakePreallocated(const boost::context::stack_context& sctx) {
// reserve space for FbImpl control structure. fb_impl_ptr points to the address where FbImpl
Expand All @@ -310,6 +369,39 @@ boost::context::preallocated MakePreallocated(const boost::context::stack_contex
return boost::context::preallocated{sp_ptr, size, sctx};
}

// temporary static
class FiberPool {
public:
static void AddAvailableFiber(ReusableFiberImpl* cntx) {
auto res = available_queue_.try_enqueue(cntx);
assert(res);
}

template <typename Fn, typename... Arg>
static ReusableFiberImpl* PopOrCreate( Fn&& fn, Arg&&... arg) {
FiberInterface* popped_fiber = nullptr;
bool is_popped = available_queue_.try_dequeue(popped_fiber);
auto* res = is_popped ? static_cast<ReusableFiberImpl*>(popped_fiber) : MakeReusableFiberImpl();
res->SetTask(std::forward<Fn>(fn), std::forward<Arg>(arg)...);
return res;
}

private:
static ReusableFiberImpl* MakeReusableFiberImpl() {
boost::context::fixedsize_stack salloc(64 * 1024);
boost::context::stack_context sctx = salloc.allocate();
boost::context::preallocated palloc = MakePreallocated<ReusableFiberImpl>(sctx);

void* obj_ptr = palloc.sp; // copy because we move palloc.

// placement new of context on top of fiber's stack
return new (obj_ptr) ReusableFiberImpl("", std::move(palloc), std::move(salloc));
}

private:
static base::mpmc_bounded_queue<FiberInterface*> available_queue_;
};

template <typename StackAlloc, typename Fn, typename... Arg>
static WorkerFiberImpl<Fn, Arg...>* MakeWorkerFiberImpl(std::string_view name, StackAlloc&& salloc,
Fn&& fn, Arg&&... arg) {
Expand Down
16 changes: 5 additions & 11 deletions util/fibers/fibers.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,15 @@ class Fiber {

template <typename Fn, typename... Arg>
Fiber(Launch policy, std::string_view name, Fn&& fn, Arg&&... arg)
: impl_{detail::default_stack_resource
? detail::MakeWorkerFiberImpl(name,
FixedStackAllocator(detail::default_stack_resource,
detail::default_stack_size),
std::forward<Fn>(fn), std::forward<Arg>(arg)...)
: detail::MakeWorkerFiberImpl(name, boost::context::fixedsize_stack(),
std::forward<Fn>(fn), std::forward<Arg>(arg)...)} {
: impl_{detail::FiberPool::PopOrCreate(std::forward<Fn>(fn), std::forward<Arg>(arg)...)} {
impl_->SetName(name);
Start(policy);
}

template <typename Fn, typename StackAlloc, typename... Arg>
Fiber(Launch policy, StackAlloc&& stack_alloc, std::string_view name, Fn&& fn, Arg&&... arg)
: impl_{util::fb2::detail::MakeWorkerFiberImpl(name, std::forward<StackAlloc>(stack_alloc),
std::forward<Fn>(fn),
std::forward<Arg>(arg)...)} {
Fiber(Launch policy, StackAlloc&&, std::string_view name, Fn&& fn, Arg&&... arg)
: impl_{detail::FiberPool::PopOrCreate(std::forward<Fn>(fn), std::forward<Arg>(arg)...)} {
impl_->SetName(name);
Start(policy);
}

Expand Down
Loading