diff --git a/src/msg/async/dpdk/DPDKStack.cc b/src/msg/async/dpdk/DPDKStack.cc index a3451eec76966..1543a530df9da 100644 --- a/src/msg/async/dpdk/DPDKStack.cc +++ b/src/msg/async/dpdk/DPDKStack.cc @@ -249,12 +249,12 @@ void DPDKStack::spawn_worker(std::function &&func) // funcs.push_back(std::move(func)); int r = 0; - r = dpdk::eal::init(cct); + r = eal.start(); if (r < 0) { - lderr(cct) << __func__ << " init dpdk rte failed, r=" << r << dendl; + lderr(cct) << __func__ << " start dpdk rte failed, r=" << r << dendl; ceph_abort(); } - // if dpdk::eal::init already called by NVMEDevice, we will select 1..n + // if eal.start already called by NVMEDevice, we will select 1..n // cores unsigned nr_worker = funcs.size(); ceph_assert(rte_lcore_count() >= nr_worker); @@ -265,7 +265,7 @@ void DPDKStack::spawn_worker(std::function &&func) } } void *adapted_func = static_cast(&funcs.back()); - dpdk::eal::execute_on_master([adapted_func, core_id, this]() { + eal.execute_on_master([adapted_func, core_id, this]() { int r = rte_eal_remote_launch(dpdk_thread_adaptor, adapted_func, core_id); if (r < 0) { lderr(cct) << __func__ << " remote launch failed, r=" << r << dendl; @@ -276,7 +276,9 @@ void DPDKStack::spawn_worker(std::function &&func) void DPDKStack::join_worker(unsigned i) { - dpdk::eal::execute_on_master([&]() { + eal.execute_on_master([&]() { rte_eal_wait_lcore(i+1); }); + if (i+1 == get_num_worker()) + eal.stop(); } diff --git a/src/msg/async/dpdk/DPDKStack.h b/src/msg/async/dpdk/DPDKStack.h index 4eb8604c7fbed..e46bb6f51039e 100644 --- a/src/msg/async/dpdk/DPDKStack.h +++ b/src/msg/async/dpdk/DPDKStack.h @@ -25,6 +25,7 @@ #include "const.h" #include "IP.h" #include "Packet.h" +#include "dpdk_rte.h" class interface; @@ -246,6 +247,7 @@ class DPDKWorker : public Worker { friend class DPDKServerSocketImpl; }; +using namespace dpdk; class DPDKStack : public NetworkStack { std::vector > funcs; @@ -254,13 +256,15 @@ class DPDKStack : public NetworkStack { } public: - explicit DPDKStack(CephContext *cct): NetworkStack(cct) { + explicit DPDKStack(CephContext *cct): NetworkStack(cct), eal(cct) { funcs.reserve(cct->_conf->ms_async_op_threads); } virtual bool support_local_listen_table() const override { return true; } virtual void spawn_worker(std::function &&func) override; virtual void join_worker(unsigned i) override; + private: + dpdk::eal eal; }; #endif diff --git a/src/msg/async/dpdk/dpdk_rte.cc b/src/msg/async/dpdk/dpdk_rte.cc index 872d39d519de4..3ebb64360b04a 100644 --- a/src/msg/async/dpdk/dpdk_rte.cc +++ b/src/msg/async/dpdk/dpdk_rte.cc @@ -34,12 +34,6 @@ namespace dpdk { return v; } - bool eal::initialized = false; - std::thread eal::t; - std::mutex eal::lock; - std::condition_variable eal::cond; - std::list> eal::funcs; - static int bitcount(unsigned long long n) { return std::bitset{n}.count(); @@ -75,31 +69,33 @@ namespace dpdk { return count; } - int eal::init(CephContext *c) + bool eal::rte_initialized = false; + + int eal::start() { if (initialized) { return 1; } bool done = false; - auto coremask = c->_conf.get_val("ms_dpdk_coremask"); + auto coremask = cct->_conf.get_val("ms_dpdk_coremask"); int coremaskbit = coremask_bitcount(coremask.c_str()); if (coremaskbit <= 0 - || static_cast(coremaskbit) < c->_conf->ms_async_op_threads) + || static_cast(coremaskbit) < cct->_conf->ms_async_op_threads) return -EINVAL; t = std::thread([&]() { // TODO: Inherit these from the app parameters - "opts" std::vector> args { string2vector("ceph"), - string2vector("-c"), string2vector(c->_conf.get_val("ms_dpdk_coremask")), - string2vector("-n"), string2vector(c->_conf->ms_dpdk_memory_channel), + string2vector("-c"), string2vector(cct->_conf.get_val("ms_dpdk_coremask")), + string2vector("-n"), string2vector(cct->_conf->ms_dpdk_memory_channel), }; std::optional hugepages_path; - if (!c->_conf->ms_dpdk_hugepages.empty()) { - hugepages_path.emplace(c->_conf->ms_dpdk_hugepages); + if (!cct->_conf->ms_dpdk_hugepages.empty()) { + hugepages_path.emplace(cct->_conf->ms_dpdk_hugepages); } // If "hugepages" is not provided and DPDK PMD drivers mode is requested - @@ -123,13 +119,13 @@ namespace dpdk { args.push_back(string2vector("-m")); args.push_back(string2vector(size_MB_str.str())); - } else if (!c->_conf->ms_dpdk_pmd.empty()) { + } else if (!cct->_conf->ms_dpdk_pmd.empty()) { args.push_back(string2vector("--no-huge")); } std::string rte_file_prefix; rte_file_prefix = "rte_"; - rte_file_prefix += c->_conf->name.to_str(); + rte_file_prefix += cct->_conf->name.to_str(); args.push_back(string2vector("--file-prefix")); args.push_back(string2vector(rte_file_prefix)); @@ -138,27 +134,28 @@ namespace dpdk { for (auto&& a: args) { cargs.push_back(a.data()); } - /* initialise the EAL for all */ - int ret = rte_eal_init(cargs.size(), cargs.data()); - if (ret < 0) - return ret; + if (!rte_initialized) { + /* initialise the EAL for all */ + int ret = rte_eal_init(cargs.size(), cargs.data()); + if (ret < 0) + return; + rte_initialized = true; + } std::unique_lock l(lock); initialized = true; done = true; cond.notify_all(); - while (true) { + while (!stopped) { + cond.wait(l, [this] { return !funcs.empty() || stopped; }); if (!funcs.empty()) { auto f = std::move(funcs.front()); funcs.pop_front(); f(); cond.notify_all(); - } else { - cond.wait(l); } } }); - t.detach(); std::unique_lock l(lock); while (!done) cond.wait(l); @@ -182,4 +179,13 @@ namespace dpdk { return memsize; } + void eal::stop() + { + assert(initialized); + assert(!stopped); + stopped = true; + cond.notify_all(); + t.join(); + } + } // namespace dpdk diff --git a/src/msg/async/dpdk/dpdk_rte.h b/src/msg/async/dpdk/dpdk_rte.h index 4aa8389949241..6784af6d4fa5b 100644 --- a/src/msg/async/dpdk/dpdk_rte.h +++ b/src/msg/async/dpdk/dpdk_rte.h @@ -46,12 +46,10 @@ namespace dpdk { class eal { public: using cpuset = std::bitset; - - static std::mutex lock; - static std::condition_variable cond; - static std::list> funcs; - static int init(CephContext *c); - static void execute_on_master(std::function &&f) { + explicit eal(CephContext *cct) : cct(cct) {} + int start(); + void stop(); + void execute_on_master(std::function &&f) { bool done = false; std::unique_lock l(lock); funcs.emplace_back([&]() { f(); done = true; }); @@ -65,9 +63,16 @@ class eal { * * @return */ - static size_t mem_size(int num_cpus); - static bool initialized; - static std::thread t; + size_t mem_size(int num_cpus); + static bool rte_initialized; + private: + CephContext *cct; + bool initialized = false; + bool stopped = false; + std::thread t; + std::mutex lock; + std::condition_variable cond; + std::list> funcs; }; } // namespace dpdk