Skip to content

Commit 6279c84

Browse files
committed
Collect system usage stats
1 parent 64e46c0 commit 6279c84

File tree

2 files changed

+71
-11
lines changed

2 files changed

+71
-11
lines changed

danny/src/main.rs

+12
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ use danny_base::types::*;
1818
use std::fmt::Debug;
1919
use timely::communication::Allocator;
2020
use timely::worker::Worker;
21+
use std::time::Duration;
22+
use danny::sysmonitor::*;
2123

2224
fn run_cartesian<SV>(
2325
config: &Config,
@@ -255,6 +257,9 @@ fn main() {
255257
// prevent the others from returning its information multiple times.
256258
let network = Arc::new(Mutex::new(Some(NetworkGauge::start())));
257259

260+
// similarly, spawn the system monitor outside of the worker definition
261+
let system_monitor = Arc::new(Mutex::new(MonitorThread::spawn(Duration::from_secs(1))));
262+
258263
// The same goes on for the profiler
259264
let profiler = Arc::new(Mutex::new(
260265
config
@@ -378,6 +383,13 @@ fn main() {
378383
};
379384
let network_summaries = NetworkSummary::collect_from_workers(worker, network_summary);
380385

386+
let system_usage =
387+
system_monitor.lock().unwrap().take().map(|monitor| {
388+
monitor.join()
389+
}).unwrap_or_else(Vec::new);
390+
let system_usage = SystemUsage::collect_from_workers(worker, system_usage);
391+
println!("{:?}", system_usage);
392+
381393
info!("collecting counters");
382394
// close the events input and perform any outstanding work
383395
events_handle

danny/src/sysmonitor.rs

+59-11
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
1+
use crate::config::*;
12
use std::cell::RefCell;
23
use std::fs::File;
34
use std::io::prelude::*;
45
use std::io::BufReader;
5-
use std::time::{Instant,Duration};
6-
use std::thread;
7-
use std::sync::Arc;
6+
use std::rc::Rc;
87
use std::sync::atomic::{AtomicBool, Ordering};
8+
use std::sync::Arc;
9+
use std::thread;
10+
use std::time::{Duration, Instant};
11+
use timely::communication::Allocator;
12+
use timely::dataflow::operators::*;
13+
use timely::dataflow::ProbeHandle;
14+
use timely::worker::Worker;
915

1016
pub struct MonitorThread {
1117
handle: thread::JoinHandle<Vec<(Duration, SystemUsage)>>,
@@ -33,30 +39,73 @@ impl MonitorThread {
3339

3440
samples
3541
});
36-
Some(MonitorThread{handle, running})
42+
Some(MonitorThread { handle, running })
3743
}
3844

3945
pub fn join(self) -> Vec<(Duration, SystemUsage)> {
4046
self.running.store(false, Ordering::SeqCst);
41-
self.handle.join().expect("failed to join monitoring thread")
47+
self.handle
48+
.join()
49+
.expect("failed to join monitoring thread")
4250
}
4351
}
4452

45-
4653
#[derive(Abomonation, Clone, Copy, Debug)]
4754
pub struct SystemUsage {
4855
cpu: CpuUsage,
4956
net: NetworkUsage,
50-
mem: MemorySample
57+
mem: MemorySample,
5158
}
5259

5360
impl SystemUsage {
61+
/// sets up a small dataflow to exchange information about the network exchanges
62+
pub fn collect_from_workers(
63+
worker: &mut Worker<Allocator>,
64+
usage: Vec<(Duration, SystemUsage)>,
65+
) -> Vec<(Duration, String, SystemUsage)> {
66+
use timely::dataflow::channels::pact::Pipeline;
67+
68+
let result = Rc::new(RefCell::new(Vec::new()));
69+
let result_read = Rc::clone(&result);
70+
71+
let (mut input, probe) = worker.dataflow::<u32, _, _>(move |scope| {
72+
let (input, stream) = scope.new_input::<(Duration, String, SystemUsage)>();
73+
let mut probe = ProbeHandle::new();
74+
stream
75+
.exchange(|_| 0)
76+
.unary(Pipeline, "collector", move |_, _| {
77+
move |input, output| {
78+
input.for_each(|t, data| {
79+
let data = data.replace(Vec::new());
80+
result.borrow_mut().extend(data.into_iter());
81+
output.session(&t).give(());
82+
});
83+
}
84+
})
85+
.probe_with(&mut probe);
86+
87+
(input, probe)
88+
});
89+
90+
let host = get_hostname();
91+
for (d, su) in usage {
92+
input.send((d, host.clone(), su));
93+
}
94+
95+
input.close();
96+
worker.step_while(|| !probe.done());
97+
98+
result_read.replace(Vec::new())
99+
}
100+
54101
pub fn compute(prev: &SystemSample, current: &SystemSample, elapsed: Duration) -> SystemUsage {
55102
let secs = elapsed.as_secs_f64();
56103
SystemUsage {
57104
cpu: CpuUsage {
58-
user: (current.cpu.user - prev.cpu.user) as f64 / (current.cpu.total - prev.cpu.total) as f64,
59-
system: (current.cpu.system - prev.cpu.system) as f64 / (current.cpu.total - prev.cpu.total) as f64,
105+
user: (current.cpu.user - prev.cpu.user) as f64
106+
/ (current.cpu.total - prev.cpu.total) as f64,
107+
system: (current.cpu.system - prev.cpu.system) as f64
108+
/ (current.cpu.total - prev.cpu.total) as f64,
60109
},
61110
net: NetworkUsage {
62111
tx: (current.net.tx - prev.net.tx) as f64 / secs,
@@ -122,8 +171,7 @@ impl CpuSample {
122171
.unwrap()
123172
.split_whitespace()
124173
.skip(1)
125-
.map(|s| {
126-
s.parse::<u64>().expect("error parsing CPU stat")}),
174+
.map(|s| s.parse::<u64>().expect("error parsing CPU stat")),
127175
);
128176
let buf = buf.borrow();
129177
let total: u64 = buf.iter().sum();

0 commit comments

Comments
 (0)