Skip to content

Commit

Permalink
Support multiple network pollers
Browse files Browse the repository at this point in the history
Instead of all threads using a single network poller, threads are
assigned a network poller in a round-robin fashion. By default we spawn
just a single network poller thread, but the amount is configurable
using the INKO_NETPOLL_THREADS environment variable. In the future we
may increase the default to e.g. 2 or 4, but for now this should prove
good enough.

The benefit of using multiple network poller threads is being able to
spread the load of polling many sockets across different threads. While
epoll and friends are usually fast enough, programs that make heavy use
of sockets can benefit from using more than just a single thread.

This fixes https://gitlab.com/inko-lang/inko/-/issues/274.

Changelog: performance
  • Loading branch information
yorickpeterse committed Sep 20, 2022
1 parent 6163239 commit 44c0088
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 83 deletions.
12 changes: 12 additions & 0 deletions docs/source/internals/vm.md
Expand Up @@ -60,12 +60,24 @@ compiler to insert these instructions in the right place.

## IO operations

### Sockets

For network IO the VM uses non-blocking sockets. When performing an operation
that would block, the process and its socket are registered with "the network
poller". This is a system/thread that polls a list of sockets until they are
ready, rescheduling their corresponding processes. Polling is done using APIs
such as epoll on Linux, kqueue on macOS/BSD, and IO completion ports on Windows.

By default a single network poller thread is spawned, and each process thread
uses the same poller. The number of poller threads is configured using the
`INKO_NETPOLL_THREADS` environment variable. This variable can be set to a value
between 1 and 127. When the value is greater than one, network poller threads
are assigned to process threads in a round-robin fashion. Most programs won't
need more than a single thread, but if you make heavy use of (many) sockets you
may want to increase this value.

### Blocking IO

For blocking operations, such as file IO, Inko uses a fixed amount of backup
threads. When an OS thread is about to enter a blocking operation, it sets a
flag indicating when it did so. This is implemented such that it in most cases
Expand Down
61 changes: 33 additions & 28 deletions vm/src/builtin_functions/socket.rs
Expand Up @@ -8,16 +8,21 @@ use crate::socket::Socket;
use crate::state::State;
use std::io::Write;

macro_rules! ret {
($result:expr, $state:expr, $proc:expr, $sock:expr, $interest:expr) => {{
if let Err(ref err) = $result {
if err.should_poll() {
$sock.register($proc, &$state.network_poller, $interest)?;
}
fn ret(
result: Result<Pointer, RuntimeError>,
state: &State,
thread: &Thread,
process: ProcessPointer,
socket: &mut Socket,
interest: Interest,
) -> Result<Pointer, RuntimeError> {
if let Err(ref err) = result {
if err.should_poll() {
socket.register(state, process, thread.network_poller, interest)?;
}
}

$result
}};
result
}

pub(crate) fn socket_allocate_ipv4(
Expand Down Expand Up @@ -58,7 +63,7 @@ pub(crate) fn socket_allocate_unix(

pub(crate) fn socket_write_string(
state: &State,
_: &mut Thread,
thread: &mut Thread,
process: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -69,12 +74,12 @@ pub(crate) fn socket_write_string(
.map(|size| Int::alloc(state.permanent_space.int_class(), size as i64))
.map_err(RuntimeError::from);

ret!(res, state, process, sock, Interest::Write)
ret(res, state, thread, process, sock, Interest::Write)
}

pub(crate) fn socket_write_bytes(
state: &State,
_: &mut Thread,
thread: &mut Thread,
process: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -85,12 +90,12 @@ pub(crate) fn socket_write_bytes(
.map(|size| Int::alloc(state.permanent_space.int_class(), size as i64))
.map_err(RuntimeError::from);

ret!(res, state, process, sock, Interest::Write)
ret(res, state, thread, process, sock, Interest::Write)
}

pub(crate) fn socket_read(
state: &State,
_: &mut Thread,
thread: &mut Thread,
process: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -102,7 +107,7 @@ pub(crate) fn socket_read(
.read(buffer, amount)
.map(|size| Int::alloc(state.permanent_space.int_class(), size as i64));

ret!(result, state, process, sock, Interest::Read)
ret(result, state, thread, process, sock, Interest::Read)
}

pub(crate) fn socket_listen(
Expand All @@ -120,7 +125,7 @@ pub(crate) fn socket_listen(

pub(crate) fn socket_bind(
state: &State,
_: &mut Thread,
thread: &mut Thread,
process: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -129,12 +134,12 @@ pub(crate) fn socket_bind(
let port = unsafe { Int::read(arguments[2]) } as u16;
let result = sock.bind(addr, port).map(|_| Pointer::nil_singleton());

ret!(result, state, process, sock, Interest::Read)
ret(result, state, thread, process, sock, Interest::Read)
}

pub(crate) fn socket_connect(
state: &State,
_: &mut Thread,
thread: &mut Thread,
process: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -143,36 +148,36 @@ pub(crate) fn socket_connect(
let port = unsafe { Int::read(arguments[2]) } as u16;
let result = sock.connect(addr, port).map(|_| Pointer::nil_singleton());

ret!(result, state, process, sock, Interest::Write)
ret(result, state, thread, process, sock, Interest::Write)
}

pub(crate) fn socket_accept_ip(
state: &State,
_: &mut Thread,
thread: &mut Thread,
process: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
let sock = unsafe { arguments[0].get_mut::<Socket>() };
let result = sock.accept().map(Pointer::boxed);

ret!(result, state, process, sock, Interest::Read)
ret(result, state, thread, process, sock, Interest::Read)
}

pub(crate) fn socket_accept_unix(
state: &State,
_: &mut Thread,
thread: &mut Thread,
process: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
let sock = unsafe { arguments[0].get_mut::<Socket>() };
let result = sock.accept().map(Pointer::boxed);

ret!(result, state, process, sock, Interest::Read)
ret(result, state, thread, process, sock, Interest::Read)
}

pub(crate) fn socket_receive_from(
state: &State,
_: &mut Thread,
thread: &mut Thread,
process: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -183,12 +188,12 @@ pub(crate) fn socket_receive_from(
.recv_from(buffer, amount)
.map(|(addr, port)| allocate_address_pair(state, addr, port));

ret!(result, state, process, sock, Interest::Read)
ret(result, state, thread, process, sock, Interest::Read)
}

pub(crate) fn socket_send_bytes_to(
state: &State,
_: &mut Thread,
thread: &mut Thread,
process: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -200,12 +205,12 @@ pub(crate) fn socket_send_bytes_to(
.send_to(buffer, address, port)
.map(|size| Int::alloc(state.permanent_space.int_class(), size as i64));

ret!(result, state, process, sock, Interest::Write)
ret(result, state, thread, process, sock, Interest::Write)
}

pub(crate) fn socket_send_string_to(
state: &State,
_: &mut Thread,
thread: &mut Thread,
process: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -217,7 +222,7 @@ pub(crate) fn socket_send_string_to(
.send_to(buffer, address, port)
.map(|size| Int::alloc(state.permanent_space.int_class(), size as i64));

ret!(result, state, process, sock, Interest::Write)
ret(result, state, thread, process, sock, Interest::Write)
}

pub(crate) fn socket_shutdown_read(
Expand Down
42 changes: 42 additions & 0 deletions vm/src/config.rs
Expand Up @@ -17,8 +17,19 @@ macro_rules! set_from_env {
}};
}

/// The default number of reductions to consume before a process suspends
/// itself.
const DEFAULT_REDUCTIONS: u16 = 1000;

/// The default number of network poller threads to use.
///
/// We default to one thread because for most setups this is probably more than
/// enough.
const DEFAULT_NETPOLL_THREADS: u8 = 1;

/// The maximum number of netpoll threads that are allowed.
const MAX_NETPOLL_THREADS: u8 = 127;

/// Structure containing the configuration settings for the virtual machine.
pub struct Config {
/// The number of process threads to run.
Expand All @@ -27,6 +38,14 @@ pub struct Config {
/// The number of backup process threads to spawn.
pub backup_threads: u16,

/// The number of network poller threads to use.
///
/// While this value is stored as an u8, it's limited to a maximum of 127.
/// This is because internally we use an i8 to store registered poller IDs,
/// and use the value -1 to signal a file descriptor isn't registered with
/// any poller.
pub netpoll_threads: u8,

/// The number of reductions a process can perform before being suspended.
pub reductions: u16,
}
Expand All @@ -38,6 +57,7 @@ impl Config {
Config {
process_threads: cpu_count,
backup_threads: cpu_count * 4,
netpoll_threads: DEFAULT_NETPOLL_THREADS,
reductions: DEFAULT_REDUCTIONS,
}
}
Expand All @@ -48,9 +68,17 @@ impl Config {
set_from_env!(config, process_threads, "PROCESS_THREADS", u16);
set_from_env!(config, backup_threads, "BACKUP_THREADS", u16);
set_from_env!(config, reductions, "REDUCTIONS", u16);
set_from_env!(config, netpoll_threads, "NETPOLL_THREADS", u8);

config.verify();
config
}

fn verify(&mut self) {
if self.netpoll_threads > MAX_NETPOLL_THREADS {
self.netpoll_threads = MAX_NETPOLL_THREADS;
}
}
}

#[cfg(test)]
Expand All @@ -61,6 +89,7 @@ mod tests {
match key {
"INKO_FOO" => Ok("1"),
"INKO_BAR" => Ok("0"),
"INKO_NETPOLL_THREADS" => Ok("4"),
_ => Err(()),
}
}
Expand All @@ -87,4 +116,17 @@ mod tests {

assert_eq!(cfg.reductions, DEFAULT_REDUCTIONS);
}

#[test]
fn test_verify() {
let mut cfg = Config::new();

cfg.netpoll_threads = 64;
cfg.verify();
assert_eq!(cfg.netpoll_threads, 64);

cfg.netpoll_threads = 130;
cfg.verify();
assert_eq!(cfg.netpoll_threads, MAX_NETPOLL_THREADS);
}
}
12 changes: 7 additions & 5 deletions vm/src/machine.rs
Expand Up @@ -122,12 +122,14 @@ impl<'a> Machine<'a> {
}

{
let state = state.clone();
for id in 0..state.network_pollers.len() {
let state = state.clone();

thread::Builder::new()
.name("network poller".to_string())
.spawn(move || NetworkPollerWorker::new(state).run())
.unwrap();
thread::Builder::new()
.name(format!("netpoll {}", id))
.spawn(move || NetworkPollerWorker::new(id, state).run())
.unwrap();
}
}

state.scheduler.run(&*state, entry_class, entry_method);
Expand Down
8 changes: 5 additions & 3 deletions vm/src/network_poller.rs
Expand Up @@ -59,19 +59,21 @@ impl NetworkPoller {

/// A thread that polls a poller and reschedules processes.
pub(crate) struct Worker {
id: usize,
state: RcState,
}

impl Worker {
pub(crate) fn new(state: RcState) -> Self {
Worker { state }
pub(crate) fn new(id: usize, state: RcState) -> Self {
Worker { id, state }
}

pub(crate) fn run(&self) {
let mut events = Vec::new();
let poller = &self.state.network_pollers[self.id];

loop {
if let Err(err) = self.state.network_poller.poll(&mut events) {
if let Err(err) = poller.poll(&mut events) {
if err.kind() != io::ErrorKind::Interrupted {
// It's not entirely clear if/when we ever run into this,
// but should we run into any error that's _not_ an
Expand Down

0 comments on commit 44c0088

Please sign in to comment.