diff --git a/src/inputs/netprobe/PingProbe.cpp b/src/inputs/netprobe/PingProbe.cpp index 34874bddc..d36fc206e 100644 --- a/src/inputs/netprobe/PingProbe.cpp +++ b/src/inputs/netprobe/PingProbe.cpp @@ -1,14 +1,15 @@ #include "PingProbe.h" #include "NetProbeException.h" +#include "ThreadName.h" #include #include #include namespace visor::input::netprobe { -sigslot::signal PingReceiver::recv_signal; -std::recursive_mutex PingReceiver::mutex; +std::vector> PingReceiver::recv_packets{}; +std::unique_ptr PingProbe::_receiver{nullptr}; thread_local std::atomic PingProbe::sock_count{0}; thread_local SOCKET PingProbe::_sock{INVALID_SOCKET}; @@ -18,7 +19,6 @@ PingReceiver::PingReceiver() } PingReceiver::~PingReceiver() { - recv_signal.disconnect_all(); _poll->close(); if (_async_h && _io_thread) { // we have to use AsyncHandle to stop the loop from the same thread the loop is running in @@ -93,18 +93,29 @@ void PingReceiver::_setup_receiver() timeval time; TIMESPEC_TO_TIMEVAL(&time, &stamp); pcpp::RawPacket raw(reinterpret_cast(_array.data()), rc, time, false, pcpp::LINKTYPE_DLT_RAW1); - pcpp::Packet packet(&raw, pcpp::ICMP); - std::lock_guard lock(mutex); - recv_signal(packet, stamp); + _recv_packets.emplace_back(pcpp::Packet(&raw, pcpp::ICMP), stamp); } } }); + _timer = _io_loop->resource(); + _timer->on([this](const auto &, auto &) { + if (!_recv_packets.empty()) { + recv_packets = _recv_packets; + _recv_packets.clear(); + for (const auto &callback : _callbacks) { + callback->send(); + } + } + }); + _timer->start(uvw::TimerHandle::Time{100}, uvw::TimerHandle::Time{100}); + _poll->init(); _poll->start(uvw::PollHandle::Event::READABLE); // spawn the loop _io_thread = std::make_unique([this] { + thread::change_self_name("receiver", "ping"); _io_loop->run(); }); } @@ -129,8 +140,10 @@ bool PingProbe::start(std::shared_ptr io_loop) _io_loop = io_loop; - // execute once - static auto receiver = std::make_unique(); + if (!_receiver) { + // only once + _receiver = std::make_unique(); + } _interval_timer = _io_loop->resource(); if (!_interval_timer) { @@ -173,10 +186,6 @@ bool PingProbe::start(std::shared_ptr io_loop) } }); - _recv_connection = PingReceiver::recv_signal.connect([this](pcpp::Packet &packet, timespec stamp) { - _recv(packet, TestType::Ping, _name, stamp); - }); - (_sequence == UCHAR_MAX) ? _sequence = 0 : _sequence++; _send_icmp_v4(_internal_sequence); _internal_sequence++; @@ -184,6 +193,18 @@ bool PingProbe::start(std::shared_ptr io_loop) _internal_timer->start(uvw::TimerHandle::Time{_config.packets_interval_msec}, uvw::TimerHandle::Time{_config.packets_interval_msec}); }); + _recv_handler = _io_loop->resource(); + if (!_recv_handler) { + throw NetProbeException("PingProbe - unable to initialize AsyncHandle receiver"); + } + _recv_handler->on([this](const auto &, auto &) { + for (auto &[packet, stamp] : PingReceiver::recv_packets) { + _recv(packet, TestType::Ping, _name, stamp); + } + }); + _receiver->register_async_callback(_recv_handler); + _recv_handler->init(); + ++sock_count; _interval_timer->start(uvw::TimerHandle::Time{0}, uvw::TimerHandle::Time{_config.interval_msec}); _init = true; @@ -195,7 +216,10 @@ bool PingProbe::stop() if (_interval_timer) { _interval_timer->stop(); } - _recv_connection.disconnect(); + if (_recv_handler) { + _receiver->remove_async_callback(_recv_handler); + _recv_handler->close(); + } _close_socket(); return true; } @@ -300,8 +324,6 @@ void PingProbe::_send_icmp_v4(uint8_t sequence) if (rc != SOCKET_ERROR) { pcpp::Packet packet; packet.addLayer(&icmp); - //Ensure that _send and recv_signal is not called at same time - std::lock_guard lock(PingReceiver::mutex); _send(packet, TestType::Ping, _name, stamp); } } diff --git a/src/inputs/netprobe/PingProbe.h b/src/inputs/netprobe/PingProbe.h index fa0b5f6a9..5842309e7 100644 --- a/src/inputs/netprobe/PingProbe.h +++ b/src/inputs/netprobe/PingProbe.h @@ -35,8 +35,10 @@ typedef int SOCKET; #include #include #include +#include #include #include +#include #include #include @@ -56,15 +58,26 @@ class PingReceiver std::unique_ptr _io_thread; std::shared_ptr _io_loop; std::shared_ptr _async_h; - + std::vector> _callbacks; + std::shared_ptr _timer; + std::vector> _recv_packets; void _setup_receiver(); public: - static sigslot::signal recv_signal; - static std::recursive_mutex mutex; + static std::vector> recv_packets; PingReceiver(); ~PingReceiver(); + + void register_async_callback(std::shared_ptr callback) + { + _callbacks.push_back(callback); + } + + void remove_async_callback(std::shared_ptr callback) + { + _callbacks.erase(std::remove(_callbacks.begin(), _callbacks.end(), callback), _callbacks.end()); + } }; /** @@ -76,6 +89,7 @@ class PingReceiver */ class PingProbe final : public NetProbe { + static std::unique_ptr _receiver; static thread_local SOCKET _sock; bool _init{false}; @@ -86,11 +100,11 @@ class PingProbe final : public NetProbe std::shared_ptr _interval_timer; std::shared_ptr _internal_timer; std::shared_ptr _timeout_timer; + std::shared_ptr _recv_handler; SOCKETLEN _sin_length{0}; std::vector _payload_array; sockaddr_in _sa; sockaddr_in6 _sa6; - sigslot::connection _recv_connection; void _send_icmp_v4(uint8_t sequence); std::optional _get_addr();