Skip to content

Commit

Permalink
Remove alive tracking from the network poller
Browse files Browse the repository at this point in the history
This is done in preparation for
https://gitlab.com/inko-lang/inko/-/issues/274, as it removes the need
for explicitly shutting down network pollers. Once upon a time embedding
Inko was something I was toying with, and in such an environment it
makes sense to explicitly and cleanly shut down threads. But in light of
wanting to turn Inko into a compiled language, this isn't relevant
anymore, and we can simplify things by just aborting.

Changelog: changed
  • Loading branch information
yorickpeterse committed Sep 15, 2022
1 parent 1e0b003 commit 1c225b4
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 45 deletions.
19 changes: 9 additions & 10 deletions vm/src/machine.rs
Expand Up @@ -113,22 +113,21 @@ impl<'a> Machine<'a> {

{
let state = state.clone();
let _ = thread::Builder::new()

thread::Builder::new()
.name("timeout worker".to_string())
.spawn(move || state.timeout_worker.run(&state.scheduler))
.unwrap();
};
}

let poller_guard = {
let thread_state = state.clone();
{
let state = state.clone();

thread::Builder::new()
.name("network poller".to_string())
.spawn(move || {
NetworkPollerWorker::new(thread_state).run();
})
.unwrap()
};
.spawn(move || NetworkPollerWorker::new(state).run())
.unwrap();
}

// Starting the primary threads will block this thread, as the main
// worker will run directly onto the current thread. As such, we must
Expand All @@ -145,7 +144,7 @@ impl<'a> Machine<'a> {

// Joining the pools only fails in case of a panic. In this case we
// don't want to re-panic as this clutters the error output.
if primary_guard.join().is_err() || poller_guard.join().is_err() {
if primary_guard.join().is_err() {
state.set_exit_status(1);
}

Expand Down
45 changes: 11 additions & 34 deletions vm/src/network_poller.rs
Expand Up @@ -3,7 +3,6 @@ use crate::process::{Process, ProcessPointer};
use crate::state::RcState;
use polling::{Event, Poller, Source};
use std::io;
use std::sync::atomic::{AtomicBool, Ordering};

/// The type of event a poller should wait for.
pub(crate) enum Interest {
Expand All @@ -17,20 +16,17 @@ pub(crate) enum Interest {
/// A poller for non-blocking sockets.
pub(crate) struct NetworkPoller {
poller: Poller,
alive: AtomicBool,
}

impl NetworkPoller {
pub(crate) fn new() -> Self {
NetworkPoller {
poller: Poller::new().expect("Failed to set up the network poller"),
alive: AtomicBool::new(true),
}
}

pub(crate) fn poll(&self, events: &mut Vec<Event>) -> io::Result<bool> {
self.poller.wait(events, None)?;
Ok(self.is_alive())
pub(crate) fn poll(&self, events: &mut Vec<Event>) -> io::Result<usize> {
self.poller.wait(events, None)
}

pub(crate) fn add(
Expand All @@ -51,15 +47,6 @@ impl NetworkPoller {
self.poller.modify(source, self.event(process, interest))
}

pub(crate) fn terminate(&self) {
self.alive.store(false, Ordering::Release);
self.poller.notify().expect("Failed to notify the poller to terminate");
}

pub(crate) fn is_alive(&self) -> bool {
self.alive.load(Ordering::Acquire)
}

fn event(&self, process: ProcessPointer, interest: Interest) -> Event {
let key = process.identifier();

Expand All @@ -84,9 +71,14 @@ impl Worker {
let mut events = Vec::new();

loop {
if !self.state.network_poller.poll(&mut events).unwrap_or(false) {
// The poller is no longer alive, so we should shut down.
return;
if let Err(err) = self.state.network_poller.poll(&mut events) {
if err.kind() != io::ErrorKind::Interrupted {
// It's not entirely clear if/when we ever run into this,
// but should we run into any error that's _not_ an
// interrupt then there's probably more going on, and all we
// can do is abort.
panic!("Polling for IO events failed: {:?}", err);
}
}

for event in &events {
Expand Down Expand Up @@ -155,24 +147,9 @@ mod tests {

poller.add(*process, &sock1, Interest::Write).unwrap();
poller.add(*process, &sock2, Interest::Write).unwrap();
poller.poll(&mut events).unwrap();

assert!(poller.poll(&mut events).is_ok());
assert!(events.capacity() >= 2);
assert_eq!(events.len(), 2);
}

#[test]
fn test_terminate() {
let poller = NetworkPoller::new();
let mut events = Vec::with_capacity(1);

assert!(poller.is_alive());

poller.terminate();

assert!(!poller.poll(&mut events).unwrap());
assert_eq!(events.capacity(), 1);
assert_eq!(events.len(), 0);
assert!(!poller.is_alive());
}
}
1 change: 0 additions & 1 deletion vm/src/state.rs
Expand Up @@ -118,7 +118,6 @@ impl State {

pub(crate) fn terminate(&self) {
self.scheduler.terminate();
self.network_poller.terminate();
}

pub(crate) fn set_exit_status(&self, new_status: i32) {
Expand Down

0 comments on commit 1c225b4

Please sign in to comment.