Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ docs/html-documentation-generated*
integration_tests/external
golang/pkg/client/version.go
docs/internals/html
appimage/*.AppImage
appimage/*.AppImage
/test-config*.yaml
6 changes: 3 additions & 3 deletions RFCs/2021-04-16-75-taps.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ visor:
taps:
# a pcap tap which uses eth0 and is referenced by the identifier "anycast"
anycast:
type: pcap
input_type: pcap
config:
iface: eth0
# an sflow tap which listens on the given IP and port, referenced by the identifier "pop_switch"
pop_switch:
type: sflow
input_type: sflow
config:
port: 6343
bind: 192.168.1.1
# a dnstap tap which gets its stream from the given socket, named "trex_tap"
trex_tap:
type: dnstap
input_type: dnstap
config:
socket: /var/dns.sock
```
Expand Down
65 changes: 11 additions & 54 deletions cmd/pktvisor-pcap/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@

#include <docopt/docopt.h>

#include "HandlerManager.h"
#include "HandlerModulePlugin.h"
#include "InputModulePlugin.h"
#include "InputStreamManager.h"
#include <Corrade/PluginManager/Manager.h>
#include <Corrade/PluginManager/PluginMetadata.h>
#include "CoreManagers.h"
#include <spdlog/sinks/stdout_color_sinks.h>

#include "handlers/static_plugins.h"
Expand Down Expand Up @@ -60,10 +55,6 @@ void signal_handler(int signal)

using namespace visor;

typedef Corrade::PluginManager::Manager<InputModulePlugin> InputPluginRegistry;
typedef Corrade::PluginManager::Manager<HandlerModulePlugin> HandlerPluginRegistry;
typedef Corrade::Containers::Pointer<InputModulePlugin> InputPluginPtr;
typedef Corrade::Containers::Pointer<HandlerModulePlugin> HandlerPluginPtr;

void initialize_geo(const docopt::value &city, const docopt::value &asn)
{
Expand All @@ -84,48 +75,12 @@ int main(int argc, char *argv[])
true, // show help if requested
VISOR_VERSION); // version string

auto logger = spdlog::stderr_color_mt("pktvisor");
auto logger = spdlog::stderr_color_mt("visor");
if (args["-v"].asBool()) {
logger->set_level(spdlog::level::debug);
}

// inputs
InputPluginRegistry input_registry;
auto input_manager = std::make_unique<InputStreamManager>();
std::vector<InputPluginPtr> input_plugins;

// initialize input plugins
for (auto &s : input_registry.pluginList()) {
InputPluginPtr mod = input_registry.instantiate(s);
logger->info("Load input plugin: {} {}", mod->name(), mod->pluginInterface());
mod->init_module(input_manager.get());
input_plugins.emplace_back(std::move(mod));
}

// handlers
HandlerPluginRegistry handler_registry;
auto handler_manager = std::make_unique<HandlerManager>();
std::vector<HandlerPluginPtr> handler_plugins;

// initialize handler plugins
for (auto &s : handler_registry.pluginList()) {
HandlerPluginPtr mod = handler_registry.instantiate(s);
logger->info("Load handler plugin: {} {}", mod->name(), mod->pluginInterface());
mod->init_module(input_manager.get(), handler_manager.get());
handler_plugins.emplace_back(std::move(mod));
}

shutdown_handler = [&]([[maybe_unused]] int signal) {
// gracefully close all inputs and handlers
auto [input_modules, im_lock] = input_manager->module_get_all_locked();
for (auto &[name, mod] : input_modules) {
mod->stop();
}
auto [handler_modules, hm_lock] = handler_manager->module_get_all_locked();
for (auto &[name, mod] : handler_modules) {
mod->stop();
}
};
CoreManagers mgrs(nullptr);

std::signal(SIGINT, signal_handler);
std::signal(SIGTERM, signal_handler);
Expand Down Expand Up @@ -164,26 +119,28 @@ int main(int argc, char *argv[])
input_stream->info_json(j["info"]);
logger->info("{}", j.dump(4));

input_manager->module_add(std::move(input_stream), false);
auto [input_stream_, stream_mgr_lock] = input_manager->module_get_locked("pcap");
mgrs.input_manager()->module_add(std::move(input_stream));
auto [input_stream_, stream_mgr_lock] = mgrs.input_manager()->module_get_locked("pcap");
stream_mgr_lock.unlock();
auto pcap_stream = dynamic_cast<input::pcap::PcapInputStream *>(input_stream_);

handler::net::NetStreamHandler *net_handler{nullptr};
{
auto handler_module = std::make_unique<handler::net::NetStreamHandler>("net", pcap_stream, periods, sample_rate);
handler_module->config_set("recorded_stream", true);
handler_manager->module_add(std::move(handler_module));
auto [handler, handler_mgr_lock] = handler_manager->module_get_locked("net");
handler_module->start();
mgrs.handler_manager()->module_add(std::move(handler_module));
auto [handler, handler_mgr_lock] = mgrs.handler_manager()->module_get_locked("net");
handler_mgr_lock.unlock();
net_handler = dynamic_cast<handler::net::NetStreamHandler *>(handler);
}
handler::dns::DnsStreamHandler *dns_handler{nullptr};
{
auto handler_module = std::make_unique<handler::dns::DnsStreamHandler>("dns", pcap_stream, periods, sample_rate);
handler_module->config_set("recorded_stream", true);
handler_manager->module_add(std::move(handler_module));
auto [handler, handler_mgr_lock] = handler_manager->module_get_locked("dns");
handler_module->start();
mgrs.handler_manager()->module_add(std::move(handler_module));
auto [handler, handler_mgr_lock] = mgrs.handler_manager()->module_get_locked("dns");
handler_mgr_lock.unlock();
dns_handler = dynamic_cast<handler::dns::DnsStreamHandler *>(handler);
}
Expand Down
51 changes: 45 additions & 6 deletions cmd/pktvisord/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/sinks/syslog_sink.h>
#include <spdlog/spdlog.h>
#include <yaml-cpp/yaml.h>

#include "GeoDB.h"
#include "handlers/dns/DnsStreamHandler.h"
Expand Down Expand Up @@ -47,6 +48,8 @@ static const char USAGE[] =
--version Show version
--geo-city FILE GeoLite2 City database to use for IP to Geo mapping
--geo-asn FILE GeoLite2 ASN database to use for IP to ASN mapping
Configuration:
--config FILE Use specified YAML configuration to configure options, Taps, and Collection Policies
Logging Options:
--log-file FILE Log to the given output file name
--syslog Log to syslog
Expand Down Expand Up @@ -165,15 +168,15 @@ int main(int argc, char *argv[])
std::shared_ptr<spdlog::logger> logger;
if (args["--log-file"]) {
try {
logger = spdlog::basic_logger_mt("pktvisor", args["--log-file"].asString());
logger = spdlog::basic_logger_mt("visor", args["--log-file"].asString());
} catch (const spdlog::spdlog_ex &ex) {
std::cerr << "Log init failed: " << ex.what() << std::endl;
exit(EXIT_FAILURE);
}
} else if (args["--syslog"].asBool()) {
logger = spdlog::syslog_logger_mt("pktvisor", "pktvisord", LOG_PID);
logger = spdlog::syslog_logger_mt("visor", "pktvisord", LOG_PID);
} else {
logger = spdlog::stdout_color_mt("pktvisor");
logger = spdlog::stdout_color_mt("visor");
}
if (args["-v"].asBool()) {
logger->set_level(spdlog::level::debug);
Expand All @@ -186,14 +189,47 @@ int main(int argc, char *argv[])
prom_config.instance = args["--prom-instance"].asString();
}
}
CoreServer svr(!args["--admin-api"].asBool(), logger, prom_config);
CoreServer svr(!args["--admin-api"].asBool(), prom_config);
svr.set_http_logger([&logger](const auto &req, const auto &res) {
logger->info("REQUEST: {} {} {}", req.method, req.path, res.status);
if (res.status == 500) {
logger->error(res.body);
}
});

// local config file
if (args["--config"]) {
logger->info("loading config file: {}", args["--config"].asString());
YAML::Node config_file;
// look for local options
try {
config_file = YAML::LoadFile(args["--config"].asString());

if (!config_file.IsMap() || !config_file["visor"]) {
throw std::runtime_error("invalid schema");
}
if (!config_file["version"] || !config_file["version"].IsScalar() || config_file["version"].as<std::string>() != "1.0") {
throw std::runtime_error("missing or unsupported version");
}

if (config_file["visor"]["config"] && config_file["visor"]["config"].IsMap()) {
// todo more config items
auto config = config_file["visor"]["config"];
if (config["verbose"] && config["verbose"].as<bool>()) {
logger->set_level(spdlog::level::debug);
}
}

// then pass to CoreManagers
svr.mgrs()->configure_from_file(args["--config"].asString());

} catch (std::runtime_error &e) {
logger->error("configuration error: {}", e.what());
exit(EXIT_FAILURE);
}

}

shutdown_handler = [&]([[maybe_unused]] int signal) {
logger->info("Shutting down");
svr.stop();
Expand Down Expand Up @@ -261,20 +297,23 @@ int main(int argc, char *argv[])
input_stream->config_set("bpf", bpf);
input_stream->config_set("host_spec", host_spec);

auto input_manager = svr.input_manager();
auto handler_manager = svr.handler_manager();
auto input_manager = svr.mgrs()->input_manager();
auto handler_manager = svr.mgrs()->handler_manager();

input_stream->start();
input_manager->module_add(std::move(input_stream));
auto [input_stream_, stream_mgr_lock] = input_manager->module_get_locked("pcap");
stream_mgr_lock.unlock();
auto pcap_stream = dynamic_cast<input::pcap::PcapInputStream *>(input_stream_);

{
auto handler_module = std::make_unique<handler::net::NetStreamHandler>("net", pcap_stream, periods, sample_rate);
handler_module->start();
handler_manager->module_add(std::move(handler_module));
}
{
auto handler_module = std::make_unique<handler::dns::DnsStreamHandler>("dns", pcap_stream, periods, sample_rate);
handler_module->start();
handler_manager->module_add(std::move(handler_module));
}

Expand Down
2 changes: 2 additions & 0 deletions conanfile.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ cpp-httplib/0.8.0
corrade/2020.06
pcapplusplus/ns1-dev
json-schema-validator/2.1.0
yaml-cpp/0.6.3
fmt/7.1.3

[build_requires]
benchmark/1.5.2
Expand Down
13 changes: 6 additions & 7 deletions src/AbstractManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ class AbstractManager
{
}

virtual ~AbstractManager()
{
}

auto module_get_all_locked()
{
struct retVals {
Expand All @@ -42,16 +46,12 @@ class AbstractManager
return retVals{_map, std::move(lock)};
}

// atomically ensure module starts before arriving in registry, if requested
virtual void module_add(std::unique_ptr<ModuleType> &&m, bool start = true)
virtual void module_add(std::unique_ptr<ModuleType> &&m)
{
std::unique_lock lock(_map_mutex);
if (_map.count(m->name())) {
throw std::runtime_error("module name already exists");
}
if (start) {
m->start();
}
_map.emplace(m->name(), std::move(m));
}

Expand All @@ -64,7 +64,7 @@ class AbstractManager
throw std::runtime_error("module name does not exist");
}
struct retVals {
ModuleType *map;
ModuleType *module;
std::unique_lock<std::shared_mutex> lock;
};
return retVals{_map[name].get(), std::move(lock)};
Expand All @@ -76,7 +76,6 @@ class AbstractManager
if (_map.count(name) == 0) {
throw std::runtime_error("module name does not exist");
}
_map[name]->stop();
_map.erase(name);
}

Expand Down
Loading