Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 37 additions & 15 deletions src/inputs/netprobe/PingProbe.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
#include "PingProbe.h"

#include "NetProbeException.h"
#include "ThreadName.h"
#include <Packet.h>
#include <TimespecTimeval.h>
#include <uvw/idle.h>

namespace visor::input::netprobe {

sigslot::signal<pcpp::Packet &, timespec> PingReceiver::recv_signal;
std::recursive_mutex PingReceiver::mutex;
std::vector<std::pair<pcpp::Packet, timespec>> PingReceiver::recv_packets{};
std::unique_ptr<PingReceiver> PingProbe::_receiver{nullptr};
thread_local std::atomic<uint32_t> PingProbe::sock_count{0};
thread_local SOCKET PingProbe::_sock{INVALID_SOCKET};

Expand All @@ -18,7 +19,6 @@ PingReceiver::PingReceiver()
}
PingReceiver::~PingReceiver()
{
recv_signal.disconnect_all();
_poll->close();
if (_async_h && _io_thread) {
// we have to use AsyncHandle to stop the loop from the same thread the loop is running in
Expand Down Expand Up @@ -93,18 +93,29 @@ void PingReceiver::_setup_receiver()
timeval time;
TIMESPEC_TO_TIMEVAL(&time, &stamp);
pcpp::RawPacket raw(reinterpret_cast<uint8_t *>(_array.data()), rc, time, false, pcpp::LINKTYPE_DLT_RAW1);
pcpp::Packet packet(&raw, pcpp::ICMP);
std::lock_guard<std::recursive_mutex> lock(mutex);
recv_signal(packet, stamp);
_recv_packets.emplace_back(pcpp::Packet(&raw, pcpp::ICMP), stamp);
}
}
});

_timer = _io_loop->resource<uvw::TimerHandle>();
_timer->on<uvw::TimerEvent>([this](const auto &, auto &) {
if (!_recv_packets.empty()) {
recv_packets = _recv_packets;
_recv_packets.clear();
for (const auto &callback : _callbacks) {
callback->send();
}
}
});
_timer->start(uvw::TimerHandle::Time{100}, uvw::TimerHandle::Time{100});

_poll->init();
_poll->start(uvw::PollHandle::Event::READABLE);

// spawn the loop
_io_thread = std::make_unique<std::thread>([this] {
thread::change_self_name("receiver", "ping");
_io_loop->run();
});
}
Expand All @@ -129,8 +140,10 @@ bool PingProbe::start(std::shared_ptr<uvw::Loop> io_loop)

_io_loop = io_loop;

// execute once
static auto receiver = std::make_unique<PingReceiver>();
if (!_receiver) {
// only once
_receiver = std::make_unique<PingReceiver>();
}

_interval_timer = _io_loop->resource<uvw::TimerHandle>();
if (!_interval_timer) {
Expand Down Expand Up @@ -173,17 +186,25 @@ bool PingProbe::start(std::shared_ptr<uvw::Loop> io_loop)
}
});

_recv_connection = PingReceiver::recv_signal.connect([this](pcpp::Packet &packet, timespec stamp) {
_recv(packet, TestType::Ping, _name, stamp);
});

(_sequence == UCHAR_MAX) ? _sequence = 0 : _sequence++;
_send_icmp_v4(_internal_sequence);
_internal_sequence++;
_timeout_timer->start(uvw::TimerHandle::Time{_config.timeout_msec}, uvw::TimerHandle::Time{0});
_internal_timer->start(uvw::TimerHandle::Time{_config.packets_interval_msec}, uvw::TimerHandle::Time{_config.packets_interval_msec});
});

_recv_handler = _io_loop->resource<uvw::AsyncHandle>();
if (!_recv_handler) {
throw NetProbeException("PingProbe - unable to initialize AsyncHandle receiver");
}
_recv_handler->on<uvw::AsyncEvent>([this](const auto &, auto &) {
for (auto &[packet, stamp] : PingReceiver::recv_packets) {
_recv(packet, TestType::Ping, _name, stamp);
}
});
_receiver->register_async_callback(_recv_handler);
_recv_handler->init();

++sock_count;
_interval_timer->start(uvw::TimerHandle::Time{0}, uvw::TimerHandle::Time{_config.interval_msec});
_init = true;
Expand All @@ -195,7 +216,10 @@ bool PingProbe::stop()
if (_interval_timer) {
_interval_timer->stop();
}
_recv_connection.disconnect();
if (_recv_handler) {
_receiver->remove_async_callback(_recv_handler);
_recv_handler->close();
}
_close_socket();
return true;
}
Expand Down Expand Up @@ -300,8 +324,6 @@ void PingProbe::_send_icmp_v4(uint8_t sequence)
if (rc != SOCKET_ERROR) {
pcpp::Packet packet;
packet.addLayer(&icmp);
//Ensure that _send and recv_signal is not called at same time
std::lock_guard<std::recursive_mutex> lock(PingReceiver::mutex);
_send(packet, TestType::Ping, _name, stamp);
}
}
Expand Down
22 changes: 18 additions & 4 deletions src/inputs/netprobe/PingProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ typedef int SOCKET;
#include <memory>
#include <mutex>
#include <optional>
#include <shared_mutex>
#include <sigslot/signal.hpp>
#include <uvw/async.h>
#include <uvw/check.h>
#include <uvw/poll.h>
#include <uvw/timer.h>

Expand All @@ -56,15 +58,26 @@ class PingReceiver
std::unique_ptr<std::thread> _io_thread;
std::shared_ptr<uvw::Loop> _io_loop;
std::shared_ptr<uvw::AsyncHandle> _async_h;

std::vector<std::shared_ptr<uvw::AsyncHandle>> _callbacks;
std::shared_ptr<uvw::TimerHandle> _timer;
std::vector<std::pair<pcpp::Packet, timespec>> _recv_packets;
void _setup_receiver();

public:
static sigslot::signal<pcpp::Packet &, timespec> recv_signal;
static std::recursive_mutex mutex;
static std::vector<std::pair<pcpp::Packet, timespec>> recv_packets;

PingReceiver();
~PingReceiver();

void register_async_callback(std::shared_ptr<uvw::AsyncHandle> callback)
{
_callbacks.push_back(callback);
}

void remove_async_callback(std::shared_ptr<uvw::AsyncHandle> callback)
{
_callbacks.erase(std::remove(_callbacks.begin(), _callbacks.end(), callback), _callbacks.end());
}
};

/**
Expand All @@ -76,6 +89,7 @@ class PingReceiver
*/
class PingProbe final : public NetProbe
{
static std::unique_ptr<PingReceiver> _receiver;
static thread_local SOCKET _sock;

bool _init{false};
Expand All @@ -86,11 +100,11 @@ class PingProbe final : public NetProbe
std::shared_ptr<uvw::TimerHandle> _interval_timer;
std::shared_ptr<uvw::TimerHandle> _internal_timer;
std::shared_ptr<uvw::TimerHandle> _timeout_timer;
std::shared_ptr<uvw::AsyncHandle> _recv_handler;
SOCKETLEN _sin_length{0};
std::vector<uint8_t> _payload_array;
sockaddr_in _sa;
sockaddr_in6 _sa6;
sigslot::connection _recv_connection;

void _send_icmp_v4(uint8_t sequence);
std::optional<ErrorType> _get_addr();
Expand Down