From 44c00881b2c9e6019abd1149ffd9778f06a12d7f Mon Sep 17 00:00:00 2001 From: Yorick Peterse Date: Tue, 20 Sep 2022 17:45:20 +0200 Subject: [PATCH] Support multiple network pollers Instead of all threads using a single network poller, threads are assigned a network poller in a round-robin fashion. By default we spawn just a single network poller thread, but the amount is configurable using the INKO_NETPOLL_THREADS environment variable. In the future we may increase the default to e.g. 2 or 4, but for now this should prove good enough. The benefit of using multiple network poller threads is being able to spread the load of polling many sockets across different threads. While epoll and friends are usually fast enough, programs that make heavy use of sockets can benefit from using more than just a single thread. This fixes https://gitlab.com/inko-lang/inko/-/issues/274. Changelog: performance --- docs/source/internals/vm.md | 12 ++++++ vm/src/builtin_functions/socket.rs | 61 ++++++++++++++++-------------- vm/src/config.rs | 42 ++++++++++++++++++++ vm/src/machine.rs | 12 +++--- vm/src/network_poller.rs | 8 ++-- vm/src/scheduler/process.rs | 56 +++++++++++++++++---------- vm/src/socket.rs | 51 +++++++++++++++++-------- vm/src/state.rs | 27 +++++++------ 8 files changed, 186 insertions(+), 83 deletions(-) diff --git a/docs/source/internals/vm.md b/docs/source/internals/vm.md index d077b6bb4..4d6e568c5 100644 --- a/docs/source/internals/vm.md +++ b/docs/source/internals/vm.md @@ -60,12 +60,24 @@ compiler to insert these instructions in the right place. ## IO operations +### Sockets + For network IO the VM uses non-blocking sockets. When performing an operation that would block, the process and its socket are registered with "the network poller". This is a system/thread that polls a list of sockets until they are ready, rescheduling their corresponding processes. Polling is done using APIs such as epoll on Linux, kqueue on macOS/BSD, and IO completion ports on Windows. +By default a single network poller thread is spawned, and each process thread +uses the same poller. The number of poller threads is configured using the +`INKO_NETPOLL_THREADS` environment variable. This variable can be set to a value +between 1 and 127. When the value is greater than one, network poller threads +are assigned to process threads in a round-robin fashion. Most programs won't +need more than a single thread, but if you make heavy use of (many) sockets you +may want to increase this value. + +### Blocking IO + For blocking operations, such as file IO, Inko uses a fixed amount of backup threads. When an OS thread is about to enter a blocking operation, it sets a flag indicating when it did so. This is implemented such that it in most cases diff --git a/vm/src/builtin_functions/socket.rs b/vm/src/builtin_functions/socket.rs index 80152de2d..b16773752 100644 --- a/vm/src/builtin_functions/socket.rs +++ b/vm/src/builtin_functions/socket.rs @@ -8,16 +8,21 @@ use crate::socket::Socket; use crate::state::State; use std::io::Write; -macro_rules! ret { - ($result:expr, $state:expr, $proc:expr, $sock:expr, $interest:expr) => {{ - if let Err(ref err) = $result { - if err.should_poll() { - $sock.register($proc, &$state.network_poller, $interest)?; - } +fn ret( + result: Result, + state: &State, + thread: &Thread, + process: ProcessPointer, + socket: &mut Socket, + interest: Interest, +) -> Result { + if let Err(ref err) = result { + if err.should_poll() { + socket.register(state, process, thread.network_poller, interest)?; } + } - $result - }}; + result } pub(crate) fn socket_allocate_ipv4( @@ -58,7 +63,7 @@ pub(crate) fn socket_allocate_unix( pub(crate) fn socket_write_string( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { @@ -69,12 +74,12 @@ pub(crate) fn socket_write_string( .map(|size| Int::alloc(state.permanent_space.int_class(), size as i64)) .map_err(RuntimeError::from); - ret!(res, state, process, sock, Interest::Write) + ret(res, state, thread, process, sock, Interest::Write) } pub(crate) fn socket_write_bytes( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { @@ -85,12 +90,12 @@ pub(crate) fn socket_write_bytes( .map(|size| Int::alloc(state.permanent_space.int_class(), size as i64)) .map_err(RuntimeError::from); - ret!(res, state, process, sock, Interest::Write) + ret(res, state, thread, process, sock, Interest::Write) } pub(crate) fn socket_read( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { @@ -102,7 +107,7 @@ pub(crate) fn socket_read( .read(buffer, amount) .map(|size| Int::alloc(state.permanent_space.int_class(), size as i64)); - ret!(result, state, process, sock, Interest::Read) + ret(result, state, thread, process, sock, Interest::Read) } pub(crate) fn socket_listen( @@ -120,7 +125,7 @@ pub(crate) fn socket_listen( pub(crate) fn socket_bind( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { @@ -129,12 +134,12 @@ pub(crate) fn socket_bind( let port = unsafe { Int::read(arguments[2]) } as u16; let result = sock.bind(addr, port).map(|_| Pointer::nil_singleton()); - ret!(result, state, process, sock, Interest::Read) + ret(result, state, thread, process, sock, Interest::Read) } pub(crate) fn socket_connect( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { @@ -143,36 +148,36 @@ pub(crate) fn socket_connect( let port = unsafe { Int::read(arguments[2]) } as u16; let result = sock.connect(addr, port).map(|_| Pointer::nil_singleton()); - ret!(result, state, process, sock, Interest::Write) + ret(result, state, thread, process, sock, Interest::Write) } pub(crate) fn socket_accept_ip( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { let sock = unsafe { arguments[0].get_mut::() }; let result = sock.accept().map(Pointer::boxed); - ret!(result, state, process, sock, Interest::Read) + ret(result, state, thread, process, sock, Interest::Read) } pub(crate) fn socket_accept_unix( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { let sock = unsafe { arguments[0].get_mut::() }; let result = sock.accept().map(Pointer::boxed); - ret!(result, state, process, sock, Interest::Read) + ret(result, state, thread, process, sock, Interest::Read) } pub(crate) fn socket_receive_from( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { @@ -183,12 +188,12 @@ pub(crate) fn socket_receive_from( .recv_from(buffer, amount) .map(|(addr, port)| allocate_address_pair(state, addr, port)); - ret!(result, state, process, sock, Interest::Read) + ret(result, state, thread, process, sock, Interest::Read) } pub(crate) fn socket_send_bytes_to( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { @@ -200,12 +205,12 @@ pub(crate) fn socket_send_bytes_to( .send_to(buffer, address, port) .map(|size| Int::alloc(state.permanent_space.int_class(), size as i64)); - ret!(result, state, process, sock, Interest::Write) + ret(result, state, thread, process, sock, Interest::Write) } pub(crate) fn socket_send_string_to( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { @@ -217,7 +222,7 @@ pub(crate) fn socket_send_string_to( .send_to(buffer, address, port) .map(|size| Int::alloc(state.permanent_space.int_class(), size as i64)); - ret!(result, state, process, sock, Interest::Write) + ret(result, state, thread, process, sock, Interest::Write) } pub(crate) fn socket_shutdown_read( diff --git a/vm/src/config.rs b/vm/src/config.rs index f2294cdc1..c43f2e107 100644 --- a/vm/src/config.rs +++ b/vm/src/config.rs @@ -17,8 +17,19 @@ macro_rules! set_from_env { }}; } +/// The default number of reductions to consume before a process suspends +/// itself. const DEFAULT_REDUCTIONS: u16 = 1000; +/// The default number of network poller threads to use. +/// +/// We default to one thread because for most setups this is probably more than +/// enough. +const DEFAULT_NETPOLL_THREADS: u8 = 1; + +/// The maximum number of netpoll threads that are allowed. +const MAX_NETPOLL_THREADS: u8 = 127; + /// Structure containing the configuration settings for the virtual machine. pub struct Config { /// The number of process threads to run. @@ -27,6 +38,14 @@ pub struct Config { /// The number of backup process threads to spawn. pub backup_threads: u16, + /// The number of network poller threads to use. + /// + /// While this value is stored as an u8, it's limited to a maximum of 127. + /// This is because internally we use an i8 to store registered poller IDs, + /// and use the value -1 to signal a file descriptor isn't registered with + /// any poller. + pub netpoll_threads: u8, + /// The number of reductions a process can perform before being suspended. pub reductions: u16, } @@ -38,6 +57,7 @@ impl Config { Config { process_threads: cpu_count, backup_threads: cpu_count * 4, + netpoll_threads: DEFAULT_NETPOLL_THREADS, reductions: DEFAULT_REDUCTIONS, } } @@ -48,9 +68,17 @@ impl Config { set_from_env!(config, process_threads, "PROCESS_THREADS", u16); set_from_env!(config, backup_threads, "BACKUP_THREADS", u16); set_from_env!(config, reductions, "REDUCTIONS", u16); + set_from_env!(config, netpoll_threads, "NETPOLL_THREADS", u8); + config.verify(); config } + + fn verify(&mut self) { + if self.netpoll_threads > MAX_NETPOLL_THREADS { + self.netpoll_threads = MAX_NETPOLL_THREADS; + } + } } #[cfg(test)] @@ -61,6 +89,7 @@ mod tests { match key { "INKO_FOO" => Ok("1"), "INKO_BAR" => Ok("0"), + "INKO_NETPOLL_THREADS" => Ok("4"), _ => Err(()), } } @@ -87,4 +116,17 @@ mod tests { assert_eq!(cfg.reductions, DEFAULT_REDUCTIONS); } + + #[test] + fn test_verify() { + let mut cfg = Config::new(); + + cfg.netpoll_threads = 64; + cfg.verify(); + assert_eq!(cfg.netpoll_threads, 64); + + cfg.netpoll_threads = 130; + cfg.verify(); + assert_eq!(cfg.netpoll_threads, MAX_NETPOLL_THREADS); + } } diff --git a/vm/src/machine.rs b/vm/src/machine.rs index 780ba3d8f..0d9fa434e 100644 --- a/vm/src/machine.rs +++ b/vm/src/machine.rs @@ -122,12 +122,14 @@ impl<'a> Machine<'a> { } { - let state = state.clone(); + for id in 0..state.network_pollers.len() { + let state = state.clone(); - thread::Builder::new() - .name("network poller".to_string()) - .spawn(move || NetworkPollerWorker::new(state).run()) - .unwrap(); + thread::Builder::new() + .name(format!("netpoll {}", id)) + .spawn(move || NetworkPollerWorker::new(id, state).run()) + .unwrap(); + } } state.scheduler.run(&*state, entry_class, entry_method); diff --git a/vm/src/network_poller.rs b/vm/src/network_poller.rs index 06b032966..4074cf23b 100644 --- a/vm/src/network_poller.rs +++ b/vm/src/network_poller.rs @@ -59,19 +59,21 @@ impl NetworkPoller { /// A thread that polls a poller and reschedules processes. pub(crate) struct Worker { + id: usize, state: RcState, } impl Worker { - pub(crate) fn new(state: RcState) -> Self { - Worker { state } + pub(crate) fn new(id: usize, state: RcState) -> Self { + Worker { id, state } } pub(crate) fn run(&self) { let mut events = Vec::new(); + let poller = &self.state.network_pollers[self.id]; loop { - if let Err(err) = self.state.network_poller.poll(&mut events) { + if let Err(err) = poller.poll(&mut events) { if err.kind() != io::ErrorKind::Interrupted { // It's not entirely clear if/when we ever run into this, // but should we run into any error that's _not_ an diff --git a/vm/src/scheduler/process.rs b/vm/src/scheduler/process.rs index 7cff8454a..5d08adcb8 100644 --- a/vm/src/scheduler/process.rs +++ b/vm/src/scheduler/process.rs @@ -124,12 +124,19 @@ pub(crate) struct Thread<'a> { /// A value of 0 indicates the thread isn't blocked. blocked_at: u64, + /// The ID of the network poller assigned to this thread. + /// + /// Threads are each assigned a network poller in a round-robin fashion. + /// This is useful for programs that heavily rely on sockets, as a single + /// network poller thread may not be able to complete its work fast enough. + pub(crate) network_poller: usize, + /// A random number generator to use for the current thread. pub(crate) rng: ThreadRng, } impl<'a> Thread<'a> { - fn new(id: usize, pool: &'a Pool) -> Thread { + fn new(id: usize, network_poller: usize, pool: &'a Pool) -> Thread { Self { id, work: pool.threads[id].queue.clone(), @@ -137,11 +144,12 @@ impl<'a> Thread<'a> { pool, backup: false, blocked_at: NOT_BLOCKING, + network_poller, rng: thread_rng(), } } - fn backup(pool: &'a Pool) -> Thread { + fn backup(network_poller: usize, pool: &'a Pool) -> Thread { Self { // For backup threads the ID/queue doesn't matter, because we won't // use them until we're turned into a regular thread. @@ -151,6 +159,7 @@ impl<'a> Thread<'a> { pool, backup: true, blocked_at: NOT_BLOCKING, + network_poller, rng: thread_rng(), } } @@ -783,6 +792,7 @@ impl Scheduler { method: MethodPointer, ) { let process = Process::main(class, method); + let pollers = state.network_pollers.len(); let _ = scope(move |s| { s.builder() .name("proc monitor".to_string()) @@ -790,16 +800,24 @@ impl Scheduler { .unwrap(); for id in 0..self.primary { + let poll_id = id % pollers; + s.builder() .name(format!("proc {}", id)) - .spawn(move |_| Thread::new(id, &*self.pool).run(state)) + .spawn(move |_| { + Thread::new(id, poll_id, &*self.pool).run(state) + }) .unwrap(); } for id in 0..self.backup { + let poll_id = id % pollers; + s.builder() .name(format!("backup {}", id)) - .spawn(move |_| Thread::backup(&*self.pool).run(state)) + .spawn(move |_| { + Thread::backup(poll_id, &*self.pool).run(state) + }) .unwrap(); } @@ -823,7 +841,7 @@ mod tests { let class = empty_process_class("A"); let process = new_process(*class).take_and_forget(); let scheduler = Scheduler::new(1, 1); - let mut thread = Thread::new(0, &scheduler.pool); + let mut thread = Thread::new(0, 0, &scheduler.pool); thread.schedule(process); @@ -836,7 +854,7 @@ mod tests { let class = empty_process_class("A"); let process = new_process(*class).take_and_forget(); let scheduler = Scheduler::new(1, 1); - let mut thread = Thread::new(0, &scheduler.pool); + let mut thread = Thread::new(0, 0, &scheduler.pool); scheduler.pool.sleeping.fetch_add(1, Ordering::AcqRel); @@ -860,7 +878,7 @@ mod tests { let class = empty_process_class("A"); let process = new_process(*class).take_and_forget(); let scheduler = Scheduler::new(1, 1); - let mut thread = Thread::new(0, &scheduler.pool); + let mut thread = Thread::new(0, 0, &scheduler.pool); thread.schedule_priority(process); @@ -873,7 +891,7 @@ mod tests { let main_method = empty_async_method(); let process = new_main_process(*class, main_method).take_and_forget(); let state = setup(); - let mut thread = Thread::new(0, &state.scheduler.pool); + let mut thread = Thread::new(0, 0, &state.scheduler.pool); thread.schedule(process); thread.run(&state); @@ -889,7 +907,7 @@ mod tests { let main_method = empty_async_method(); let process = new_main_process(*class, main_method).take_and_forget(); let state = setup(); - let mut thread = Thread::new(0, &state.scheduler.pool); + let mut thread = Thread::new(0, 0, &state.scheduler.pool); thread.schedule_priority(process); thread.run(&state); @@ -906,8 +924,8 @@ mod tests { let main_method = empty_async_method(); let process = new_main_process(*class, main_method).take_and_forget(); let state = setup(); - let mut thread0 = Thread::new(0, &state.scheduler.pool); - let mut thread1 = Thread::new(1, &state.scheduler.pool); + let mut thread0 = Thread::new(0, 0, &state.scheduler.pool); + let mut thread1 = Thread::new(1, 0, &state.scheduler.pool); thread1.schedule(process); thread0.run(&state); @@ -924,7 +942,7 @@ mod tests { let main_method = empty_async_method(); let process = new_main_process(*class, main_method).take_and_forget(); let state = setup(); - let mut thread = Thread::new(0, &state.scheduler.pool); + let mut thread = Thread::new(0, 0, &state.scheduler.pool); state.scheduler.pool.schedule(process); thread.run(&state); @@ -941,7 +959,7 @@ mod tests { let main_method = empty_async_method(); let process = new_main_process(*class, main_method).take_and_forget(); let state = setup(); - let mut thread = Thread::new(0, &state.scheduler.pool); + let mut thread = Thread::new(0, 0, &state.scheduler.pool); thread.backup = true; @@ -972,7 +990,7 @@ mod tests { // sleep. This test ensures it wakes up during termination. let _ = scope(|s| { s.spawn(|_| { - let mut thread = Thread::new(0, pool); + let mut thread = Thread::new(0, 0, pool); thread.backup = true; thread.schedule(process); @@ -1000,7 +1018,7 @@ mod tests { let proc2 = new_process(*class).take_and_forget(); let state = setup(); let pool = &state.scheduler.pool; - let mut thread = Thread::new(0, pool); + let mut thread = Thread::new(0, 0, pool); pool.epoch.store(4, Ordering::Release); pool.monitor.status.store(MonitorStatus::Sleeping); @@ -1020,7 +1038,7 @@ mod tests { fn test_thread_finish_blocking() { let state = setup(); let pool = &state.scheduler.pool; - let mut thread = Thread::new(0, pool); + let mut thread = Thread::new(0, 0, pool); thread.start_blocking(); thread.finish_blocking(); @@ -1039,7 +1057,7 @@ mod tests { fn test_thread_blocking() { let state = setup(); let pool = &state.scheduler.pool; - let mut thread = Thread::new(0, pool); + let mut thread = Thread::new(0, 0, pool); thread.blocking(|| { pool.threads[0].blocked_at.store(NOT_BLOCKING, Ordering::Release) @@ -1055,7 +1073,7 @@ mod tests { let proc2 = new_process(*class).take_and_forget(); let state = setup(); let pool = &state.scheduler.pool; - let mut thread = Thread::new(0, pool); + let mut thread = Thread::new(0, 0, pool); thread.schedule(proc1); thread.schedule_priority(proc2); @@ -1081,7 +1099,7 @@ mod tests { #[test] fn test_scheduler_terminate() { let scheduler = Scheduler::new(1, 1); - let thread = Thread::new(0, &scheduler.pool); + let thread = Thread::new(0, 0, &scheduler.pool); scheduler.pool.sleeping.fetch_add(1, Ordering::Release); scheduler.terminate(); diff --git a/vm/src/socket.rs b/vm/src/socket.rs index 01fd60335..91257050d 100644 --- a/vm/src/socket.rs +++ b/vm/src/socket.rs @@ -1,10 +1,10 @@ pub mod socket_address; use crate::network_poller::Interest; -use crate::network_poller::NetworkPoller; use crate::process::ProcessPointer; use crate::runtime_error::RuntimeError; use crate::socket::socket_address::SocketAddress; +use crate::state::State; use socket2::{Domain, SockAddr, Socket as RawSocket, Type}; use std::io; use std::io::Read; @@ -12,9 +12,13 @@ use std::mem::transmute; use std::net::Shutdown; use std::net::{IpAddr, SocketAddr}; use std::slice; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicI8, Ordering}; use std::time::Duration; +/// The registered value to use to signal a socket isn't registered with a +/// network poller. +const NOT_REGISTERED: i8 = -1; + #[cfg(unix)] use libc::{EINPROGRESS, EISCONN}; @@ -164,14 +168,16 @@ pub(crate) struct Socket { /// The raw socket. inner: RawSocket, - /// A flag indicating that this socket has been registered with a poller. + /// The ID of the network poller we're registered with. + /// + /// A value of -1 indicates the socket isn't registered with any poller. /// /// This flag is necessary because the system's polling mechanism may not /// allow overwriting existing registrations without setting some additional /// flags. For example, epoll requires the use of EPOLL_CTL_MOD when /// overwriting a registration, as using EPOLL_CTL_ADD will produce an error /// if a file descriptor is already registered. - registered: AtomicBool, + registered: AtomicI8, /// A flag indicating if we're dealing with a UNIX socket or not. unix: bool, @@ -187,7 +193,11 @@ impl Socket { socket.set_nonblocking(true)?; - Ok(Socket { inner: socket, registered: AtomicBool::new(false), unix }) + Ok(Socket { + inner: socket, + registered: AtomicI8::new(NOT_REGISTERED), + unix, + }) } pub(crate) fn ipv4(kind_int: i64) -> Result { @@ -268,10 +278,13 @@ impl Socket { pub(crate) fn register( &mut self, + state: &State, process: ProcessPointer, - poller: &NetworkPoller, + thread_poller_id: usize, interest: Interest, ) -> Result<(), RuntimeError> { + let existing_id = self.registered.load(Ordering::Acquire); + // Once registered, the process might be rescheduled immediately if // there is data available. This means that once we (re)register the // socket, it is not safe to use "self" anymore. @@ -280,14 +293,20 @@ impl Socket { // // 1. Set "registered" _first_ (if necessary) // 2. Add the socket to the poller - if self.registered.load(Ordering::Acquire) { - Ok(poller.modify(process, &self.inner, interest)?) + let result = if existing_id == NOT_REGISTERED { + let poller = &state.network_pollers[thread_poller_id]; + + self.registered.store(thread_poller_id as i8, Ordering::Release); + poller.add(process, &self.inner, interest) } else { - self.registered.store(true, Ordering::Release); - Ok(poller.add(process, &self.inner, interest)?) - } + let poller = &state.network_pollers[existing_id as usize]; + + poller.modify(process, &self.inner, interest) + }; - // *DO NOT* use "self" from here on. + // *DO NOT* use "self" from here on, as the socket/process may already + // be running on a different thread. + result.map_err(|e| e.into()) } pub(crate) fn accept(&self) -> Result { @@ -299,7 +318,7 @@ impl Socket { Ok(Socket { inner: socket, - registered: AtomicBool::new(false), + registered: AtomicI8::new(NOT_REGISTERED), unix: self.unix, }) } @@ -430,7 +449,7 @@ impl Socket { pub(crate) fn try_clone(&self) -> Result { let sock = Socket { inner: self.inner.try_clone()?, - registered: AtomicBool::new(false), + registered: AtomicI8::new(NOT_REGISTERED), unix: self.unix, }; @@ -462,11 +481,11 @@ mod tests { fn test_try_clone() { let socket1 = Socket::ipv4(0).unwrap(); - socket1.registered.store(true, Ordering::Release); + socket1.registered.store(2, Ordering::Release); let socket2 = socket1.try_clone().unwrap(); - assert!(!socket2.registered.load(Ordering::Acquire)); + assert_eq!(socket2.registered.load(Ordering::Acquire), NOT_REGISTERED); assert!(!socket2.unix); } } diff --git a/vm/src/state.rs b/vm/src/state.rs index 5b2679f21..34757903d 100644 --- a/vm/src/state.rs +++ b/vm/src/state.rs @@ -20,13 +20,13 @@ pub(crate) type RcState = ArcWithoutWeak; /// The state of a virtual machine. pub(crate) struct State { /// The virtual machine's configuration. - pub config: Config, + pub(crate) config: Config, /// The start time of the VM (more or less). - pub start_time: time::Instant, + pub(crate) start_time: time::Instant, /// The commandline arguments passed to an Inko program. - pub arguments: Vec, + pub(crate) arguments: Vec, /// The environment variables defined when the VM started. /// @@ -34,25 +34,25 @@ pub(crate) struct State { /// (or through libraries) may call `setenv()` concurrently with `getenv()` /// calls, which is unsound. Caching the variables also means we can safely /// use `localtime_r()` (which internally may call `setenv()`). - pub environment: HashMap, + pub(crate) environment: HashMap, /// The exit status to use when the VM terminates. - pub exit_status: Mutex, + pub(crate) exit_status: Mutex, /// The scheduler to use for executing Inko processes. - pub scheduler: Scheduler, + pub(crate) scheduler: Scheduler, /// A task used for handling timeouts, such as message and IO timeouts. - pub timeout_worker: TimeoutWorker, + pub(crate) timeout_worker: TimeoutWorker, - /// The system polling mechanism to use for polling non-blocking sockets. - pub network_poller: NetworkPoller, + /// The network pollers to use for process threads. + pub(crate) network_pollers: Vec, /// All builtin functions that a compiler can use. - pub builtin_functions: BuiltinFunctions, + pub(crate) builtin_functions: BuiltinFunctions, /// A type for allocating and storing blocks and permanent objects. - pub permanent_space: PermanentSpace, + pub(crate) permanent_space: PermanentSpace, /// The random state to use for building hashers. /// @@ -104,6 +104,9 @@ impl State { config.backup_threads as usize, ); + let network_pollers = + (0..config.netpoll_threads).map(|_| NetworkPoller::new()).collect(); + let state = State { scheduler, environment, @@ -112,7 +115,7 @@ impl State { exit_status: Mutex::new(0), timeout_worker: TimeoutWorker::new(), arguments, - network_poller: NetworkPoller::new(), + network_pollers, builtin_functions: BuiltinFunctions::new(), permanent_space, hash_state,