Skip to content

Commit

Permalink
Moved the status bar and plugin error count to the controller
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenengler committed Jul 5, 2022
1 parent 7271b11 commit 7a79718
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 183 deletions.
2 changes: 0 additions & 2 deletions src/main/bindings/c/bindings-opaque.h
Expand Up @@ -124,8 +124,6 @@ typedef struct Random Random;
// Routing information for paths between nodes.
typedef struct RoutingInfo_u32 RoutingInfo_u32;

typedef struct StatusLogger_ShadowStatusBarState StatusLogger_ShadowStatusBarState;

typedef struct SyscallHandler SyscallHandler;

// Mostly for interoperability with C APIs.
Expand Down
16 changes: 2 additions & 14 deletions src/main/bindings/c/bindings.h
Expand Up @@ -127,8 +127,6 @@ typedef struct Random Random;
// Routing information for paths between nodes.
typedef struct RoutingInfo_u32 RoutingInfo_u32;

typedef struct StatusLogger_ShadowStatusBarState StatusLogger_ShadowStatusBarState;

typedef struct SyscallHandler SyscallHandler;

// Mostly for interoperability with C APIs.
Expand Down Expand Up @@ -283,18 +281,6 @@ uint32_t random_nextU32(struct Random *rng);
// Fills the buffer with pseudo-random bytes.
void random_nextNBytes(struct Random *rng, uint8_t *buf, uintptr_t len);

struct StatusLogger_ShadowStatusBarState *statusBar_new(uint64_t end);

struct StatusLogger_ShadowStatusBarState *statusPrinter_new(uint64_t end);

void statusLogger_free(struct StatusLogger_ShadowStatusBarState *status_logger);

void statusLogger_updateEmuTime(const struct StatusLogger_ShadowStatusBarState *status_logger,
uint64_t current);

void statusLogger_updateNumFailedProcesses(const struct StatusLogger_ShadowStatusBarState *status_logger,
uint32_t num_failed_processes);

// Get the backtrace. This function is slow. The string must be freed using `backtrace_free()`.
char *backtrace(void);

Expand Down Expand Up @@ -326,6 +312,8 @@ bool controller_managerFinishedCurrentRound(const struct Controller *controller,
void controller_updateMinRunahead(const struct Controller *controller,
SimulationTime min_path_latency);

void controller_incrementPluginErrors(const struct Controller *controller);

// Flush Rust's log::logger().
void rustlogger_flush(void);

Expand Down
2 changes: 1 addition & 1 deletion src/main/bindings/rust/wrapper.rs
Expand Up @@ -1633,7 +1633,7 @@ extern "C" {
) -> *mut Manager;
}
extern "C" {
pub fn manager_free(manager: *mut Manager) -> gint;
pub fn manager_free(manager: *mut Manager);
}
extern "C" {
pub fn manager_run(arg1: *mut Manager);
Expand Down
130 changes: 125 additions & 5 deletions src/main/core/controller.rs
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;
use std::sync::atomic::AtomicU32;
use std::sync::RwLock;
use std::time::Duration;

Expand All @@ -10,9 +11,12 @@ use rand_xoshiro::Xoshiro256PlusPlus;
use crate::core::sim_config::{Bandwidth, HostInfo, SimConfig};
use crate::core::support::configuration::ConfigOptions;
use crate::core::support::configuration::Flatten;
use crate::core::support::emulated_time::EmulatedTime;
use crate::core::support::simulation_time::SimulationTime;
use crate::cshadow as c;
use crate::routing::network_graph::{IpAssignment, RoutingInfo};
use crate::utility::status_bar::{StatusBar, StatusBarState, StatusPrinter};
use crate::utility::time::TimeParts;

pub struct Controller<'a> {
// general options and user configuration for the simulation
Expand All @@ -28,6 +32,12 @@ pub struct Controller<'a> {
dns: *mut c::DNS,
is_runahead_dynamic: bool,

// number of plugins that failed with a non-zero exit code
num_plugin_errors: AtomicU32,

// logs the status of the simulation
status_logger: Option<StatusLogger<ShadowStatusBarState>>,

hosts: Vec<HostInfo>,
scheduling_data: RwLock<ControllerScheduling>,
}
Expand All @@ -48,6 +58,17 @@ impl<'a> Controller<'a> {
let smallest_latency =
SimulationTime::from_nanos(sim_config.routing_info.get_smallest_latency_ns().unwrap());

let status_logger = config.general.progress.unwrap().then(|| {
let state = ShadowStatusBarState::new(EmulatedTime::from_abs_simtime(end_time));

if nix::unistd::isatty(libc::STDERR_FILENO).unwrap() {
let redraw_interval = Duration::from_millis(1000);
StatusLogger::Bar(StatusBar::new(state, redraw_interval))
} else {
StatusLogger::Printer(StatusPrinter::new(state))
}
});

Self {
is_runahead_dynamic: config.experimental.use_dynamic_runahead.unwrap(),
config,
Expand All @@ -57,6 +78,8 @@ impl<'a> Controller<'a> {
routing_info: sim_config.routing_info,
host_bandwidths: sim_config.host_bandwidths,
dns,
num_plugin_errors: AtomicU32::new(0),
status_logger,
scheduling_data: RwLock::new(ControllerScheduling {
min_runahead_config,
smallest_latency,
Expand All @@ -80,7 +103,7 @@ impl<'a> Controller<'a> {
let _fake_ref_for_manager = &self;

// scope used to prevent manager from being accessed after it's freed
let rv = {
{
// The controller will be responsible for distributing the actions to the managers so that
// they all have a consistent view of the simulation, topology, etc. For now we only have
// one manager so send it everything.
Expand Down Expand Up @@ -213,11 +236,16 @@ impl<'a> Controller<'a> {
unsafe { c::manager_run(manager) };
log::info!("Finished simulation, cleaning up now");

unsafe { c::manager_free(manager) }
};
unsafe { c::manager_free(manager) };
}

if rv != 0 {
return Err(anyhow::anyhow!("Manager exited with code {}", rv));
let num_plugin_errors = self
.num_plugin_errors
.load(std::sync::atomic::Ordering::SeqCst);
if num_plugin_errors > 0 {
return Err(anyhow::anyhow!(
"{num_plugin_errors} managed processes exited with a non-zero error code"
));
}

// access the immutable reference to make sure we haven't mutated self
Expand All @@ -231,6 +259,9 @@ impl std::ops::Drop for Controller<'_> {
fn drop(&mut self) {
unsafe { c::dns_free(self.dns) };
self.dns = std::ptr::null_mut();

// stop and clear the status logger
self.status_logger.as_mut().map(|x| x.stop());
}
}

Expand All @@ -247,6 +278,7 @@ trait SimController {
min_next_event_time: SimulationTime,
) -> Option<(SimulationTime, SimulationTime)>;
fn update_min_runahead(&self, min_path_latency: SimulationTime);
fn increment_plugin_errors(&self);
}

impl SimController for Controller<'_> {
Expand Down Expand Up @@ -304,6 +336,15 @@ impl SimController for Controller<'_> {
let scheduling_data = self.scheduling_data.read().unwrap();
let (new_start, new_end) = scheduling_data.next_interval_window(min_next_event_time);

// update the status logger
let display_time = std::cmp::min(new_start, new_end);
if let Some(status_logger) = &self.status_logger {
let display_time = EmulatedTime::from_abs_simtime(display_time);
status_logger.mutate_state(|state| {
state.current = display_time;
});
};

let continue_running = new_start < new_end;
continue_running.then(|| (new_start, new_end))
}
Expand Down Expand Up @@ -364,6 +405,18 @@ impl SimController for Controller<'_> {
min_runahead_config.map(|x| x.as_nanos())
);
}

fn increment_plugin_errors(&self) {
let old_count = self
.num_plugin_errors
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);

if let Some(status_logger) = &self.status_logger {
status_logger.mutate_state(|state| {
state.num_failed_processes = old_count + 1;
});
}
}
}

// the min runahead time is updated by workers, so needs to be locked
Expand Down Expand Up @@ -405,6 +458,67 @@ impl ControllerScheduling {
}
}

struct ShadowStatusBarState {
start: std::time::Instant,
current: EmulatedTime,
end: EmulatedTime,
num_failed_processes: u32,
}

impl std::fmt::Display for ShadowStatusBarState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let sim_current = self.current.duration_since(&EmulatedTime::SIMULATION_START);
let sim_end = self.end.duration_since(&EmulatedTime::SIMULATION_START);
let frac = sim_current.as_millis() as f32 / sim_end.as_millis() as f32;

let sim_current = TimeParts::from_nanos(sim_current.as_nanos().into());
let sim_end = TimeParts::from_nanos(sim_end.as_nanos().into());
let realtime = TimeParts::from_nanos(self.start.elapsed().as_nanos());

write!(
f,
"{}% — simulated: {}/{}, realtime: {}, processes failed: {}",
(frac * 100.0).round() as i8,
sim_current.fmt_hr_min_sec(),
sim_end.fmt_hr_min_sec(),
realtime.fmt_hr_min_sec(),
self.num_failed_processes,
)
}
}

impl ShadowStatusBarState {
pub fn new(end: EmulatedTime) -> Self {
Self {
start: std::time::Instant::now(),
current: EmulatedTime::SIMULATION_START,
end,
num_failed_processes: 0,
}
}
}

enum StatusLogger<T: StatusBarState> {
Printer(StatusPrinter<T>),
Bar(StatusBar<T>),
}

impl<T: 'static + StatusBarState> StatusLogger<T> {
pub fn mutate_state(&self, f: impl FnOnce(&mut T)) {
match self {
Self::Printer(x) => x.mutate_state(f),
Self::Bar(x) => x.mutate_state(f),
}
}

pub fn stop(&mut self) {
match self {
Self::Printer(x) => x.stop(),
Self::Bar(x) => x.stop(),
}
}
}

mod export {
use super::*;

Expand Down Expand Up @@ -520,4 +634,10 @@ mod export {

controller.update_min_runahead(min_path_latency);
}

#[no_mangle]
pub extern "C" fn controller_incrementPluginErrors(controller: *const Controller) {
let controller = unsafe { controller.as_ref() }.unwrap();
controller.increment_plugin_errors();
}
}
34 changes: 2 additions & 32 deletions src/main/core/manager.c
Expand Up @@ -61,8 +61,6 @@ struct _Manager {
/* the last time we logged heartbeat information */
SimulationTime simClockLastHeartbeat;

guint numPluginErrors;

gchar* dataPath;
gchar* hostsPath;

Expand All @@ -76,8 +74,6 @@ struct _Manager {
// Path to the openssl crypto lib that we preload for managed processes.
gchar* preloadOpensslCryptoPath;

StatusLogger_ShadowStatusBarState* statusLogger;

time_t timeOfLastUsageCheck;
bool checkFdUsage;
bool checkMemUsage;
Expand Down Expand Up @@ -303,14 +299,6 @@ Manager* manager_new(const Controller* controller, const ConfigOptions* config,
}
g_free(configFilename);

if (config_getProgress(config)) {
if (isatty(STDERR_FILENO) == 1) {
manager->statusLogger = statusBar_new(endTime);
} else {
manager->statusLogger = statusPrinter_new(endTime);
}
}

manager->checkFdUsage = true;
manager->checkMemUsage = true;

Expand All @@ -319,9 +307,8 @@ Manager* manager_new(const Controller* controller, const ConfigOptions* config,
return manager;
}

gint manager_free(Manager* manager) {
void manager_free(Manager* manager) {
MAGIC_ASSERT(manager);
gint returnCode = (manager->numPluginErrors > 0) ? -1 : 0;

if (manager->watcher) {
childpidwatcher_free(manager->watcher);
Expand Down Expand Up @@ -394,14 +381,8 @@ gint manager_free(Manager* manager) {
g_free(manager->preloadOpensslCryptoPath);
}

if (manager->statusLogger) {
statusLogger_free(manager->statusLogger);
}

MAGIC_CLEAR(manager);
g_free(manager);

return returnCode;
}

guint manager_getRawCPUFrequency(Manager* manager) {
Expand Down Expand Up @@ -751,10 +732,6 @@ void manager_run(Manager* manager) {
*/
minNextEventTime = scheduler_awaitNextRound(manager->scheduler);

if (manager->statusLogger != NULL) {
statusLogger_updateEmuTime(manager->statusLogger, windowEnd);
}

/* we are in control now, the workers are waiting for the next round */
debug("finished execution window [%" G_GUINT64_FORMAT "--%" G_GUINT64_FORMAT
"] next event at %" G_GUINT64_FORMAT,
Expand All @@ -771,14 +748,7 @@ void manager_run(Manager* manager) {

void manager_incrementPluginError(Manager* manager) {
MAGIC_ASSERT(manager);
_manager_lock(manager);

manager->numPluginErrors++;
if (manager->statusLogger != NULL) {
statusLogger_updateNumFailedProcesses(manager->statusLogger, manager->numPluginErrors);
}

_manager_unlock(manager);
controller_incrementPluginErrors(manager->controller);
}

const gchar* manager_getHostsRootPath(Manager* manager) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/core/manager.h
Expand Up @@ -33,7 +33,7 @@ typedef struct _Manager Manager;

Manager* manager_new(const Controller* controller, const ConfigOptions* config,
SimulationTime endTime, guint randomSeed);
gint manager_free(Manager* manager);
void manager_free(Manager* manager);

ChildPidWatcher* manager_childpidwatcher(Manager* manager);

Expand Down

0 comments on commit 7a79718

Please sign in to comment.