Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions fact/src/bpf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ impl Bpf {
pub fn new(
paths_config: watch::Receiver<Vec<PathBuf>>,
bpf_config: &BpfConfig,
tx: mpsc::Sender<Event>,
) -> anyhow::Result<Self> {
) -> anyhow::Result<(Self, mpsc::Receiver<Event>)> {
Bpf::bump_memlock_rlimit()?;

let btf = Btf::from_sys_fs()?;
Expand All @@ -61,6 +60,7 @@ impl Bpf {
.set_max_entries("inode_map", bpf_config.inodes_max())
.load(fact_ebpf::EBPF_OBJ)?;

let (tx, rx) = mpsc::channel(100);
let paths = Vec::new();
let mut bpf = Bpf {
obj,
Expand All @@ -74,7 +74,7 @@ impl Bpf {
bpf.load_progs(&btf)?;
bpf.load_paths()?;

Ok(bpf)
Ok((bpf, rx))
}

fn bump_memlock_rlimit() -> anyhow::Result<()> {
Expand Down Expand Up @@ -304,9 +304,8 @@ mod bpf_tests {
let mut config = FactConfig::default();
config.set_paths(paths);
let reloader = Reloader::from(config);
let (tx, mut rx) = mpsc::channel(100);
let mut bpf = Bpf::new(reloader.paths(), &reloader.config().bpf, tx)
.expect("Failed to load BPF code");
let (mut bpf, mut rx) =
Bpf::new(reloader.paths(), &reloader.config().bpf).expect("Failed to load BPF code");
let (run_tx, run_rx) = watch::channel(true);
// Create a metrics exporter, but don't start it
let exporter = Exporter::new(bpf.take_metrics().unwrap());
Expand Down
17 changes: 6 additions & 11 deletions fact/src/host_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use fact_ebpf::{inode_key_t, inode_value_t, monitored_t};
use globset::{Glob, GlobSet, GlobSetBuilder};
use log::{debug, info, warn};
use tokio::{
sync::{Notify, broadcast, mpsc, watch},
sync::{Notify, mpsc, watch},
task::JoinHandle,
};

Expand All @@ -52,7 +52,7 @@ pub struct HostScanner {
running: watch::Receiver<bool>,

rx: mpsc::Receiver<Event>,
tx: broadcast::Sender<Arc<Event>>,
tx: mpsc::Sender<Event>,

metrics: HostScannerMetrics,

Expand All @@ -67,10 +67,10 @@ impl HostScanner {
scan_interval: watch::Receiver<Duration>,
running: watch::Receiver<bool>,
metrics: HostScannerMetrics,
) -> anyhow::Result<Self> {
) -> anyhow::Result<(Self, mpsc::Receiver<Event>)> {
let kernel_inode_map = RefCell::new(bpf.take_inode_map()?);
let inode_map = RefCell::new(std::collections::HashMap::new());
let (tx, _) = broadcast::channel(100);
let (tx, output) = mpsc::channel(100);
let paths_globset = HostScanner::build_globset(paths.borrow().as_slice())?;

let host_scanner = HostScanner {
Expand All @@ -88,7 +88,7 @@ impl HostScanner {
// Run an initial scan to fill in the inode map
host_scanner.scan()?;

Ok(host_scanner)
Ok((host_scanner, output))
}

fn build_globset(paths: &[PathBuf]) -> anyhow::Result<GlobSet> {
Expand Down Expand Up @@ -197,10 +197,6 @@ impl HostScanner {
Ok(())
}

pub fn subscribe(&self) -> broadcast::Receiver<Arc<Event>> {
self.tx.subscribe()
}

fn get_host_path(&self, inode: Option<&inode_key_t>) -> Option<PathBuf> {
// The path here needs to be cloned because we won't keep the
// inode_map borrow long enough.
Expand Down Expand Up @@ -456,8 +452,7 @@ impl HostScanner {
continue;
}

let event = Arc::new(event);
if let Err(e) = self.tx.send(event) {
if let Err(e) = self.tx.send(event).await {
self.metrics.events.dropped();
warn!("Failed to send event: {e}");
}
Expand Down
14 changes: 6 additions & 8 deletions fact/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use metrics::exporter::Exporter;
use rate_limiter::RateLimiter;
use tokio::{
signal::unix::{SignalKind, signal},
sync::{mpsc, watch},
sync::watch,
};

mod bpf;
Expand Down Expand Up @@ -80,12 +80,10 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> {
let reloader = config::reloader::Reloader::from(config);
let config_trigger = reloader.get_trigger();

let (tx, rx) = mpsc::channel(100);

let mut bpf = Bpf::new(reloader.paths(), &reloader.config().bpf, tx)?;
let (mut bpf, rx) = Bpf::new(reloader.paths(), &reloader.config().bpf)?;
let exporter = Exporter::new(bpf.take_metrics()?);

let host_scanner = HostScanner::new(
let (host_scanner, rx) = HostScanner::new(
&mut bpf,
rx,
reloader.paths(),
Expand All @@ -94,15 +92,15 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> {
exporter.metrics.host_scanner.clone(),
)?;

let rate_limiter = RateLimiter::new(
host_scanner.subscribe(),
let (rate_limiter, rx) = RateLimiter::new(
rx,
reloader.rate_limit(),
running.subscribe(),
exporter.metrics.rate_limiter.clone(),
)?;

output::start(
rate_limiter.subscribe(),
rx,
running.subscribe(),
exporter.metrics.output.clone(),
reloader.grpc(),
Expand Down
11 changes: 10 additions & 1 deletion fact/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ impl EventCounter {
.expect("Ignored label not found")
.inc();
}

pub fn errored(&self) {
self.counter
.get(&MetricEvents {
label: LabelValues::Error,
})
.expect("Error label not found")
.inc();
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -171,7 +180,7 @@ impl Metrics {
let rate_limiter = EventCounter::new(
"rate_limiter_events",
"Events processed by the rate limiter",
&[LabelValues::Added, LabelValues::Dropped],
&[LabelValues::Added, LabelValues::Dropped, LabelValues::Error],
);
rate_limiter.register(registry);

Expand Down
35 changes: 31 additions & 4 deletions fact/src/output/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use tokio::sync::{broadcast, watch};
use log::{debug, warn};
use tokio::sync::{broadcast, mpsc, watch};

use crate::{config::GrpcConfig, event::Event, metrics::OutputMetrics};

Expand All @@ -12,14 +13,35 @@ mod stdout;
/// Each task is responsible for managing its lifetime, handling
/// incoming events and reloading configuration.
pub fn start(
rx: broadcast::Receiver<Arc<Event>>,
mut rx: mpsc::Receiver<Event>,
running: watch::Receiver<bool>,
metrics: OutputMetrics,
config: watch::Receiver<GrpcConfig>,
stdout_enabled: bool,
) -> anyhow::Result<()> {
let (broad_tx, broad_rx) = broadcast::channel(100);
let mut run = running.clone();
tokio::spawn(async move {
debug!("Starting output component...");
loop {
tokio::select! {
event = rx.recv() => {
let Some(event) = event else {
break;
};

if let Err(e) = broad_tx.send(Arc::new(event)) {
warn!("Failed to forward output event: {e}");
}
}
_ = run.changed() => if !*run.borrow() { break; }
}
}
debug!("Stopping output component...");
});

let grpc_client = grpc::Client::new(
rx.resubscribe(),
broad_rx.resubscribe(),
running.clone(),
metrics.grpc.clone(),
config.clone(),
Expand All @@ -28,7 +50,12 @@ pub fn start(
// JSON client will only start if explicitly enabled or no other
// output is active at startup
if !grpc_client.is_enabled() || stdout_enabled {
stdout::Client::new(rx.resubscribe(), running.clone(), metrics.stdout.clone()).start();
stdout::Client::new(
broad_rx.resubscribe(),
running.clone(),
metrics.stdout.clone(),
)
.start();
}

grpc_client.start();
Expand Down
41 changes: 16 additions & 25 deletions fact/src/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ use governor::{
clock::DefaultClock,
state::{InMemoryState, NotKeyed},
};
use log::warn;
use std::num::NonZeroU32;
use std::sync::Arc;
use tokio::sync::{
broadcast::{self, error::RecvError},
watch,
};
use tokio::sync::{mpsc, watch};
use tokio::task::JoinHandle;

use crate::event::Event;
Expand All @@ -20,31 +17,33 @@ pub struct RateLimiter {
// but in the future we could introduce a key to limit in more flexible ways
// (using a String; process name, container id, whatever)
limiter: Option<governor::RateLimiter<NotKeyed, InMemoryState, DefaultClock>>,
rx: broadcast::Receiver<Arc<Event>>,
tx: broadcast::Sender<Arc<Event>>,
rx: mpsc::Receiver<Event>,
tx: mpsc::Sender<Event>,
rate_config: watch::Receiver<u64>,
running: watch::Receiver<bool>,
metrics: EventCounter,
}

impl RateLimiter {
pub fn new(
rx: broadcast::Receiver<Arc<Event>>,
rx: mpsc::Receiver<Event>,
rate_config: watch::Receiver<u64>,
running: watch::Receiver<bool>,
metrics: EventCounter,
) -> anyhow::Result<Self> {
) -> anyhow::Result<(Self, mpsc::Receiver<Event>)> {
let limiter = Self::build_limiter(*rate_config.borrow());
let (tx, _) = broadcast::channel(100);
let (tx, output) = mpsc::channel(100);

Ok(RateLimiter {
let limiter = RateLimiter {
limiter,
rx,
tx,
rate_config,
running,
metrics,
})
};

Ok((limiter, output))
}

fn build_limiter(
Expand All @@ -64,31 +63,23 @@ impl RateLimiter {
Ok(())
}

pub fn subscribe(&self) -> broadcast::Receiver<Arc<Event>> {
self.tx.subscribe()
}

pub fn start(mut self) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
loop {
tokio::select! {
event = self.rx.recv() => {
let event = match event {
Ok(e) => e,
Err(RecvError::Lagged(n)) => {
self.metrics.dropped_n(n);
continue;
}
Err(RecvError::Closed) => break,
};
let Some(event) = event else { break;};

if let Some(limiter) = &self.limiter && limiter.check().is_err() {
self.metrics.dropped();
continue;
}

self.metrics.added();
let _ = self.tx.send(event);
if let Err(e) = self.tx.send(event).await {
warn!("RateLimiter failed to forward event: {e:?}");
self.metrics.errored();
}
},
_ = self.rate_config.changed() => {
self.reload_limiter()?;
Expand Down
Loading