From 1c225b487207015068af46f0a42933f175954056 Mon Sep 17 00:00:00 2001 From: Yorick Peterse Date: Thu, 15 Sep 2022 20:14:02 +0200 Subject: [PATCH] Remove alive tracking from the network poller This is done in preparation for https://gitlab.com/inko-lang/inko/-/issues/274, as it removes the need for explicitly shutting down network pollers. Once upon a time embedding Inko was something I was toying with, and in such an environment it makes sense to explicitly and cleanly shut down threads. But in light of wanting to turn Inko into a compiled language, this isn't relevant anymore, and we can simplify things by just aborting. Changelog: changed --- vm/src/machine.rs | 19 ++++++++--------- vm/src/network_poller.rs | 45 ++++++++++------------------------------ vm/src/state.rs | 1 - 3 files changed, 20 insertions(+), 45 deletions(-) diff --git a/vm/src/machine.rs b/vm/src/machine.rs index e9eb3e5c2..983c9184d 100644 --- a/vm/src/machine.rs +++ b/vm/src/machine.rs @@ -113,22 +113,21 @@ impl<'a> Machine<'a> { { let state = state.clone(); - let _ = thread::Builder::new() + + thread::Builder::new() .name("timeout worker".to_string()) .spawn(move || state.timeout_worker.run(&state.scheduler)) .unwrap(); - }; + } - let poller_guard = { - let thread_state = state.clone(); + { + let state = state.clone(); thread::Builder::new() .name("network poller".to_string()) - .spawn(move || { - NetworkPollerWorker::new(thread_state).run(); - }) - .unwrap() - }; + .spawn(move || NetworkPollerWorker::new(state).run()) + .unwrap(); + } // Starting the primary threads will block this thread, as the main // worker will run directly onto the current thread. As such, we must @@ -145,7 +144,7 @@ impl<'a> Machine<'a> { // Joining the pools only fails in case of a panic. In this case we // don't want to re-panic as this clutters the error output. - if primary_guard.join().is_err() || poller_guard.join().is_err() { + if primary_guard.join().is_err() { state.set_exit_status(1); } diff --git a/vm/src/network_poller.rs b/vm/src/network_poller.rs index 1100f8fda..f253db37d 100644 --- a/vm/src/network_poller.rs +++ b/vm/src/network_poller.rs @@ -3,7 +3,6 @@ use crate::process::{Process, ProcessPointer}; use crate::state::RcState; use polling::{Event, Poller, Source}; use std::io; -use std::sync::atomic::{AtomicBool, Ordering}; /// The type of event a poller should wait for. pub(crate) enum Interest { @@ -17,20 +16,17 @@ pub(crate) enum Interest { /// A poller for non-blocking sockets. pub(crate) struct NetworkPoller { poller: Poller, - alive: AtomicBool, } impl NetworkPoller { pub(crate) fn new() -> Self { NetworkPoller { poller: Poller::new().expect("Failed to set up the network poller"), - alive: AtomicBool::new(true), } } - pub(crate) fn poll(&self, events: &mut Vec) -> io::Result { - self.poller.wait(events, None)?; - Ok(self.is_alive()) + pub(crate) fn poll(&self, events: &mut Vec) -> io::Result { + self.poller.wait(events, None) } pub(crate) fn add( @@ -51,15 +47,6 @@ impl NetworkPoller { self.poller.modify(source, self.event(process, interest)) } - pub(crate) fn terminate(&self) { - self.alive.store(false, Ordering::Release); - self.poller.notify().expect("Failed to notify the poller to terminate"); - } - - pub(crate) fn is_alive(&self) -> bool { - self.alive.load(Ordering::Acquire) - } - fn event(&self, process: ProcessPointer, interest: Interest) -> Event { let key = process.identifier(); @@ -84,9 +71,14 @@ impl Worker { let mut events = Vec::new(); loop { - if !self.state.network_poller.poll(&mut events).unwrap_or(false) { - // The poller is no longer alive, so we should shut down. - return; + if let Err(err) = self.state.network_poller.poll(&mut events) { + if err.kind() != io::ErrorKind::Interrupted { + // It's not entirely clear if/when we ever run into this, + // but should we run into any error that's _not_ an + // interrupt then there's probably more going on, and all we + // can do is abort. + panic!("Polling for IO events failed: {:?}", err); + } } for event in &events { @@ -155,24 +147,9 @@ mod tests { poller.add(*process, &sock1, Interest::Write).unwrap(); poller.add(*process, &sock2, Interest::Write).unwrap(); - poller.poll(&mut events).unwrap(); + assert!(poller.poll(&mut events).is_ok()); assert!(events.capacity() >= 2); assert_eq!(events.len(), 2); } - - #[test] - fn test_terminate() { - let poller = NetworkPoller::new(); - let mut events = Vec::with_capacity(1); - - assert!(poller.is_alive()); - - poller.terminate(); - - assert!(!poller.poll(&mut events).unwrap()); - assert_eq!(events.capacity(), 1); - assert_eq!(events.len(), 0); - assert!(!poller.is_alive()); - } } diff --git a/vm/src/state.rs b/vm/src/state.rs index 3cfbd49f5..1ef600908 100644 --- a/vm/src/state.rs +++ b/vm/src/state.rs @@ -118,7 +118,6 @@ impl State { pub(crate) fn terminate(&self) { self.scheduler.terminate(); - self.network_poller.terminate(); } pub(crate) fn set_exit_status(&self, new_status: i32) {