From 1c1b3feaf9564737a71a6d03653d381999dc628f Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Wed, 21 Dec 2022 17:58:23 -0400 Subject: [PATCH 1/6] Execute the _recv callback proccess on correct thread - _recv callback for every target was being executed by the static Receiver thread - Moved the cost of execution to the PingProbe thread --- src/inputs/netprobe/PingProbe.cpp | 34 +++++++++++++++++++++++-------- src/inputs/netprobe/PingProbe.h | 5 ++++- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/src/inputs/netprobe/PingProbe.cpp b/src/inputs/netprobe/PingProbe.cpp index 34874bddc..cc6d724b3 100644 --- a/src/inputs/netprobe/PingProbe.cpp +++ b/src/inputs/netprobe/PingProbe.cpp @@ -1,6 +1,7 @@ #include "PingProbe.h" #include "NetProbeException.h" +#include "ThreadName.h" #include #include #include @@ -8,7 +9,6 @@ namespace visor::input::netprobe { sigslot::signal PingReceiver::recv_signal; -std::recursive_mutex PingReceiver::mutex; thread_local std::atomic PingProbe::sock_count{0}; thread_local SOCKET PingProbe::_sock{INVALID_SOCKET}; @@ -94,7 +94,6 @@ void PingReceiver::_setup_receiver() TIMESPEC_TO_TIMEVAL(&time, &stamp); pcpp::RawPacket raw(reinterpret_cast(_array.data()), rc, time, false, pcpp::LINKTYPE_DLT_RAW1); pcpp::Packet packet(&raw, pcpp::ICMP); - std::lock_guard lock(mutex); recv_signal(packet, stamp); } } @@ -105,6 +104,7 @@ void PingReceiver::_setup_receiver() // spawn the loop _io_thread = std::make_unique([this] { + thread::change_self_name("receiver", "ping"); _io_loop->run(); }); } @@ -173,10 +173,6 @@ bool PingProbe::start(std::shared_ptr io_loop) } }); - _recv_connection = PingReceiver::recv_signal.connect([this](pcpp::Packet &packet, timespec stamp) { - _recv(packet, TestType::Ping, _name, stamp); - }); - (_sequence == UCHAR_MAX) ? _sequence = 0 : _sequence++; _send_icmp_v4(_internal_sequence); _internal_sequence++; @@ -184,6 +180,27 @@ bool PingProbe::start(std::shared_ptr io_loop) _internal_timer->start(uvw::TimerHandle::Time{_config.packets_interval_msec}, uvw::TimerHandle::Time{_config.packets_interval_msec}); }); + _recv_handler = _io_loop->resource(); + if (!_recv_handler) { + throw NetProbeException("PingProbe - unable to initialize receive CheckHandle"); + } + + _recv_handler->on([this](const auto &, auto &) { + std::lock_guard lock(_mutex); + if (!_recv_packets.empty()) { + for (auto &[packet, stamp] : _recv_packets) { + _recv(packet, TestType::Ping, _name, stamp); + } + _recv_packets.clear(); + } + }); + _recv_handler->start(); + + _recv_connection = PingReceiver::recv_signal.connect([this](pcpp::Packet &packet, timespec stamp) { + std::lock_guard lock(_mutex); + _recv_packets.push_back({packet, stamp}); + }); + ++sock_count; _interval_timer->start(uvw::TimerHandle::Time{0}, uvw::TimerHandle::Time{_config.interval_msec}); _init = true; @@ -195,6 +212,9 @@ bool PingProbe::stop() if (_interval_timer) { _interval_timer->stop(); } + if (_recv_handler) { + _recv_handler->stop(); + } _recv_connection.disconnect(); _close_socket(); return true; @@ -300,8 +320,6 @@ void PingProbe::_send_icmp_v4(uint8_t sequence) if (rc != SOCKET_ERROR) { pcpp::Packet packet; packet.addLayer(&icmp); - //Ensure that _send and recv_signal is not called at same time - std::lock_guard lock(PingReceiver::mutex); _send(packet, TestType::Ping, _name, stamp); } } diff --git a/src/inputs/netprobe/PingProbe.h b/src/inputs/netprobe/PingProbe.h index fa0b5f6a9..0efb879bf 100644 --- a/src/inputs/netprobe/PingProbe.h +++ b/src/inputs/netprobe/PingProbe.h @@ -39,6 +39,7 @@ typedef int SOCKET; #include #include #include +#include namespace visor::input::netprobe { @@ -61,7 +62,6 @@ class PingReceiver public: static sigslot::signal recv_signal; - static std::recursive_mutex mutex; PingReceiver(); ~PingReceiver(); @@ -86,11 +86,14 @@ class PingProbe final : public NetProbe std::shared_ptr _interval_timer; std::shared_ptr _internal_timer; std::shared_ptr _timeout_timer; + std::shared_ptr _recv_handler; SOCKETLEN _sin_length{0}; std::vector _payload_array; sockaddr_in _sa; sockaddr_in6 _sa6; sigslot::connection _recv_connection; + std::recursive_mutex _mutex; + std::vector> _recv_packets; void _send_icmp_v4(uint8_t sequence); std::optional _get_addr(); From dd4e4d10943795db02d9541045fad0b93534eca1 Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Wed, 21 Dec 2022 18:06:38 -0400 Subject: [PATCH 2/6] Update PingProbe.cpp --- src/inputs/netprobe/PingProbe.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/inputs/netprobe/PingProbe.cpp b/src/inputs/netprobe/PingProbe.cpp index cc6d724b3..9db0ec95c 100644 --- a/src/inputs/netprobe/PingProbe.cpp +++ b/src/inputs/netprobe/PingProbe.cpp @@ -182,7 +182,7 @@ bool PingProbe::start(std::shared_ptr io_loop) _recv_handler = _io_loop->resource(); if (!_recv_handler) { - throw NetProbeException("PingProbe - unable to initialize receive CheckHandle"); + throw NetProbeException("PingProbe - unable to initialize CheckHandle receiver"); } _recv_handler->on([this](const auto &, auto &) { From 7626ff2e409d3e25454fcffed47bd8b87c317b8c Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Thu, 22 Dec 2022 13:40:09 -0400 Subject: [PATCH 3/6] Add static vector to be read by PingProbe threads --- src/inputs/netprobe/PingProbe.cpp | 33 ++++++++++++++++++------------- src/inputs/netprobe/PingProbe.h | 14 +++++++------ 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/src/inputs/netprobe/PingProbe.cpp b/src/inputs/netprobe/PingProbe.cpp index 9db0ec95c..a9599b384 100644 --- a/src/inputs/netprobe/PingProbe.cpp +++ b/src/inputs/netprobe/PingProbe.cpp @@ -8,7 +8,9 @@ namespace visor::input::netprobe { -sigslot::signal PingReceiver::recv_signal; +std::vector> PingReceiver::recv_packets; +std::shared_mutex PingReceiver::mutex; +uint8_t PingReceiver::bucket{0}; thread_local std::atomic PingProbe::sock_count{0}; thread_local SOCKET PingProbe::_sock{INVALID_SOCKET}; @@ -18,7 +20,6 @@ PingReceiver::PingReceiver() } PingReceiver::~PingReceiver() { - recv_signal.disconnect_all(); _poll->close(); if (_async_h && _io_thread) { // we have to use AsyncHandle to stop the loop from the same thread the loop is running in @@ -93,12 +94,22 @@ void PingReceiver::_setup_receiver() timeval time; TIMESPEC_TO_TIMEVAL(&time, &stamp); pcpp::RawPacket raw(reinterpret_cast(_array.data()), rc, time, false, pcpp::LINKTYPE_DLT_RAW1); - pcpp::Packet packet(&raw, pcpp::ICMP); - recv_signal(packet, stamp); + _recv_packets.push_back({pcpp::Packet(&raw, pcpp::ICMP), stamp}); } } }); + _timer = _io_loop->resource(); + _timer->on([this](const auto &, auto &) { + if (!_recv_packets.empty()) { + std::unique_lock lock(mutex); + (bucket == UCHAR_MAX) ? bucket = 0 : bucket++; + recv_packets = _recv_packets; + _recv_packets.clear(); + } + }); + _timer->start(uvw::TimerHandle::Time{100}, uvw::TimerHandle::Time{100}); + _poll->init(); _poll->start(uvw::PollHandle::Event::READABLE); @@ -186,21 +197,16 @@ bool PingProbe::start(std::shared_ptr io_loop) } _recv_handler->on([this](const auto &, auto &) { - std::lock_guard lock(_mutex); - if (!_recv_packets.empty()) { - for (auto &[packet, stamp] : _recv_packets) { + std::shared_lock lock(PingReceiver::mutex); + if (_bucket != PingReceiver::bucket) { + _bucket = PingReceiver::bucket; + for (auto &[packet, stamp] : PingReceiver::recv_packets) { _recv(packet, TestType::Ping, _name, stamp); } - _recv_packets.clear(); } }); _recv_handler->start(); - _recv_connection = PingReceiver::recv_signal.connect([this](pcpp::Packet &packet, timespec stamp) { - std::lock_guard lock(_mutex); - _recv_packets.push_back({packet, stamp}); - }); - ++sock_count; _interval_timer->start(uvw::TimerHandle::Time{0}, uvw::TimerHandle::Time{_config.interval_msec}); _init = true; @@ -215,7 +221,6 @@ bool PingProbe::stop() if (_recv_handler) { _recv_handler->stop(); } - _recv_connection.disconnect(); _close_socket(); return true; } diff --git a/src/inputs/netprobe/PingProbe.h b/src/inputs/netprobe/PingProbe.h index 0efb879bf..f7d180e75 100644 --- a/src/inputs/netprobe/PingProbe.h +++ b/src/inputs/netprobe/PingProbe.h @@ -35,11 +35,12 @@ typedef int SOCKET; #include #include #include +#include #include #include +#include #include #include -#include namespace visor::input::netprobe { @@ -57,11 +58,14 @@ class PingReceiver std::unique_ptr _io_thread; std::shared_ptr _io_loop; std::shared_ptr _async_h; - + std::shared_ptr _timer; + std::vector> _recv_packets; void _setup_receiver(); public: - static sigslot::signal recv_signal; + static std::vector> recv_packets; + static uint8_t bucket; + static std::shared_mutex mutex; PingReceiver(); ~PingReceiver(); @@ -91,9 +95,7 @@ class PingProbe final : public NetProbe std::vector _payload_array; sockaddr_in _sa; sockaddr_in6 _sa6; - sigslot::connection _recv_connection; - std::recursive_mutex _mutex; - std::vector> _recv_packets; + uint8_t _bucket{0}; void _send_icmp_v4(uint8_t sequence); std::optional _get_addr(); From 5416cf91b1ee5471f585047769afcd88096b24a4 Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Wed, 28 Dec 2022 08:28:36 -0400 Subject: [PATCH 4/6] Lock free approach with async event --- src/inputs/netprobe/PingProbe.cpp | 37 +++++++++++++++---------------- src/inputs/netprobe/PingProbe.h | 16 ++++++++++--- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/src/inputs/netprobe/PingProbe.cpp b/src/inputs/netprobe/PingProbe.cpp index a9599b384..eae7aef76 100644 --- a/src/inputs/netprobe/PingProbe.cpp +++ b/src/inputs/netprobe/PingProbe.cpp @@ -8,9 +8,8 @@ namespace visor::input::netprobe { -std::vector> PingReceiver::recv_packets; -std::shared_mutex PingReceiver::mutex; -uint8_t PingReceiver::bucket{0}; +std::vector> PingReceiver::recv_packets{}; +std::unique_ptr PingProbe::_receiver{nullptr}; thread_local std::atomic PingProbe::sock_count{0}; thread_local SOCKET PingProbe::_sock{INVALID_SOCKET}; @@ -102,9 +101,10 @@ void PingReceiver::_setup_receiver() _timer = _io_loop->resource(); _timer->on([this](const auto &, auto &) { if (!_recv_packets.empty()) { - std::unique_lock lock(mutex); - (bucket == UCHAR_MAX) ? bucket = 0 : bucket++; recv_packets = _recv_packets; + for (const auto &callback : _callbacks) { + callback->send(); + } _recv_packets.clear(); } }); @@ -140,8 +140,10 @@ bool PingProbe::start(std::shared_ptr io_loop) _io_loop = io_loop; - // execute once - static auto receiver = std::make_unique(); + if (!_receiver) { + // only once + _receiver = std::make_unique(); + } _interval_timer = _io_loop->resource(); if (!_interval_timer) { @@ -191,21 +193,17 @@ bool PingProbe::start(std::shared_ptr io_loop) _internal_timer->start(uvw::TimerHandle::Time{_config.packets_interval_msec}, uvw::TimerHandle::Time{_config.packets_interval_msec}); }); - _recv_handler = _io_loop->resource(); + _recv_handler = _io_loop->resource(); if (!_recv_handler) { - throw NetProbeException("PingProbe - unable to initialize CheckHandle receiver"); + throw NetProbeException("PingProbe - unable to initialize AsyncHandle receiver"); } - - _recv_handler->on([this](const auto &, auto &) { - std::shared_lock lock(PingReceiver::mutex); - if (_bucket != PingReceiver::bucket) { - _bucket = PingReceiver::bucket; - for (auto &[packet, stamp] : PingReceiver::recv_packets) { - _recv(packet, TestType::Ping, _name, stamp); - } + _recv_handler->on([this](const auto &, auto &) { + for (auto &[packet, stamp] : PingReceiver::recv_packets) { + _recv(packet, TestType::Ping, _name, stamp); } }); - _recv_handler->start(); + _receiver->register_async_callback(_recv_handler); + _recv_handler->init(); ++sock_count; _interval_timer->start(uvw::TimerHandle::Time{0}, uvw::TimerHandle::Time{_config.interval_msec}); @@ -219,7 +217,8 @@ bool PingProbe::stop() _interval_timer->stop(); } if (_recv_handler) { - _recv_handler->stop(); + _receiver->remove_async_callback(_recv_handler); + _recv_handler->close(); } _close_socket(); return true; diff --git a/src/inputs/netprobe/PingProbe.h b/src/inputs/netprobe/PingProbe.h index f7d180e75..0546db4e5 100644 --- a/src/inputs/netprobe/PingProbe.h +++ b/src/inputs/netprobe/PingProbe.h @@ -58,17 +58,26 @@ class PingReceiver std::unique_ptr _io_thread; std::shared_ptr _io_loop; std::shared_ptr _async_h; + std::vector> _callbacks; std::shared_ptr _timer; std::vector> _recv_packets; void _setup_receiver(); public: static std::vector> recv_packets; - static uint8_t bucket; - static std::shared_mutex mutex; PingReceiver(); ~PingReceiver(); + + void register_async_callback(std::shared_ptr callback) + { + _callbacks.push_back(callback); + } + + void remove_async_callback(std::shared_ptr callback) + { + _callbacks.erase(std::remove(_callbacks.begin(), _callbacks.end(), callback), _callbacks.end()); + } }; /** @@ -80,6 +89,7 @@ class PingReceiver */ class PingProbe final : public NetProbe { + static std::unique_ptr _receiver; static thread_local SOCKET _sock; bool _init{false}; @@ -90,7 +100,7 @@ class PingProbe final : public NetProbe std::shared_ptr _interval_timer; std::shared_ptr _internal_timer; std::shared_ptr _timeout_timer; - std::shared_ptr _recv_handler; + std::shared_ptr _recv_handler; SOCKETLEN _sin_length{0}; std::vector _payload_array; sockaddr_in _sa; From 1806304a929d65cc8099140f83f36cd6305e405e Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Thu, 29 Dec 2022 10:08:05 -0400 Subject: [PATCH 5/6] Update PingProbe.h --- src/inputs/netprobe/PingProbe.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/inputs/netprobe/PingProbe.h b/src/inputs/netprobe/PingProbe.h index 0546db4e5..5842309e7 100644 --- a/src/inputs/netprobe/PingProbe.h +++ b/src/inputs/netprobe/PingProbe.h @@ -105,7 +105,6 @@ class PingProbe final : public NetProbe std::vector _payload_array; sockaddr_in _sa; sockaddr_in6 _sa6; - uint8_t _bucket{0}; void _send_icmp_v4(uint8_t sequence); std::optional _get_addr(); From 842a37f0c0b2b1cf870f93168c532ae30a287107 Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Tue, 3 Jan 2023 13:34:32 -0400 Subject: [PATCH 6/6] Proper clear the receiver icmp data vector --- src/inputs/netprobe/PingProbe.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/inputs/netprobe/PingProbe.cpp b/src/inputs/netprobe/PingProbe.cpp index eae7aef76..d36fc206e 100644 --- a/src/inputs/netprobe/PingProbe.cpp +++ b/src/inputs/netprobe/PingProbe.cpp @@ -93,7 +93,7 @@ void PingReceiver::_setup_receiver() timeval time; TIMESPEC_TO_TIMEVAL(&time, &stamp); pcpp::RawPacket raw(reinterpret_cast(_array.data()), rc, time, false, pcpp::LINKTYPE_DLT_RAW1); - _recv_packets.push_back({pcpp::Packet(&raw, pcpp::ICMP), stamp}); + _recv_packets.emplace_back(pcpp::Packet(&raw, pcpp::ICMP), stamp); } } }); @@ -102,10 +102,10 @@ void PingReceiver::_setup_receiver() _timer->on([this](const auto &, auto &) { if (!_recv_packets.empty()) { recv_packets = _recv_packets; + _recv_packets.clear(); for (const auto &callback : _callbacks) { callback->send(); } - _recv_packets.clear(); } }); _timer->start(uvw::TimerHandle::Time{100}, uvw::TimerHandle::Time{100});