diff --git a/docs/source/internals/vm.md b/docs/source/internals/vm.md index d077b6bb4..4d6e568c5 100644 --- a/docs/source/internals/vm.md +++ b/docs/source/internals/vm.md @@ -60,12 +60,24 @@ compiler to insert these instructions in the right place. ## IO operations +### Sockets + For network IO the VM uses non-blocking sockets. When performing an operation that would block, the process and its socket are registered with "the network poller". This is a system/thread that polls a list of sockets until they are ready, rescheduling their corresponding processes. Polling is done using APIs such as epoll on Linux, kqueue on macOS/BSD, and IO completion ports on Windows. +By default a single network poller thread is spawned, and each process thread +uses the same poller. The number of poller threads is configured using the +`INKO_NETPOLL_THREADS` environment variable. This variable can be set to a value +between 1 and 127. When the value is greater than one, network poller threads +are assigned to process threads in a round-robin fashion. Most programs won't +need more than a single thread, but if you make heavy use of (many) sockets you +may want to increase this value. + +### Blocking IO + For blocking operations, such as file IO, Inko uses a fixed amount of backup threads. When an OS thread is about to enter a blocking operation, it sets a flag indicating when it did so. This is implemented such that it in most cases diff --git a/vm/src/builtin_functions/socket.rs b/vm/src/builtin_functions/socket.rs index 80152de2d..b16773752 100644 --- a/vm/src/builtin_functions/socket.rs +++ b/vm/src/builtin_functions/socket.rs @@ -8,16 +8,21 @@ use crate::socket::Socket; use crate::state::State; use std::io::Write; -macro_rules! ret { - ($result:expr, $state:expr, $proc:expr, $sock:expr, $interest:expr) => {{ - if let Err(ref err) = $result { - if err.should_poll() { - $sock.register($proc, &$state.network_poller, $interest)?; - } +fn ret( + result: Result, + state: &State, + thread: &Thread, + process: ProcessPointer, + socket: &mut Socket, + interest: Interest, +) -> Result { + if let Err(ref err) = result { + if err.should_poll() { + socket.register(state, process, thread.network_poller, interest)?; } + } - $result - }}; + result } pub(crate) fn socket_allocate_ipv4( @@ -58,7 +63,7 @@ pub(crate) fn socket_allocate_unix( pub(crate) fn socket_write_string( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { @@ -69,12 +74,12 @@ pub(crate) fn socket_write_string( .map(|size| Int::alloc(state.permanent_space.int_class(), size as i64)) .map_err(RuntimeError::from); - ret!(res, state, process, sock, Interest::Write) + ret(res, state, thread, process, sock, Interest::Write) } pub(crate) fn socket_write_bytes( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { @@ -85,12 +90,12 @@ pub(crate) fn socket_write_bytes( .map(|size| Int::alloc(state.permanent_space.int_class(), size as i64)) .map_err(RuntimeError::from); - ret!(res, state, process, sock, Interest::Write) + ret(res, state, thread, process, sock, Interest::Write) } pub(crate) fn socket_read( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { @@ -102,7 +107,7 @@ pub(crate) fn socket_read( .read(buffer, amount) .map(|size| Int::alloc(state.permanent_space.int_class(), size as i64)); - ret!(result, state, process, sock, Interest::Read) + ret(result, state, thread, process, sock, Interest::Read) } pub(crate) fn socket_listen( @@ -120,7 +125,7 @@ pub(crate) fn socket_listen( pub(crate) fn socket_bind( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { @@ -129,12 +134,12 @@ pub(crate) fn socket_bind( let port = unsafe { Int::read(arguments[2]) } as u16; let result = sock.bind(addr, port).map(|_| Pointer::nil_singleton()); - ret!(result, state, process, sock, Interest::Read) + ret(result, state, thread, process, sock, Interest::Read) } pub(crate) fn socket_connect( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { @@ -143,36 +148,36 @@ pub(crate) fn socket_connect( let port = unsafe { Int::read(arguments[2]) } as u16; let result = sock.connect(addr, port).map(|_| Pointer::nil_singleton()); - ret!(result, state, process, sock, Interest::Write) + ret(result, state, thread, process, sock, Interest::Write) } pub(crate) fn socket_accept_ip( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { let sock = unsafe { arguments[0].get_mut::() }; let result = sock.accept().map(Pointer::boxed); - ret!(result, state, process, sock, Interest::Read) + ret(result, state, thread, process, sock, Interest::Read) } pub(crate) fn socket_accept_unix( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { let sock = unsafe { arguments[0].get_mut::() }; let result = sock.accept().map(Pointer::boxed); - ret!(result, state, process, sock, Interest::Read) + ret(result, state, thread, process, sock, Interest::Read) } pub(crate) fn socket_receive_from( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { @@ -183,12 +188,12 @@ pub(crate) fn socket_receive_from( .recv_from(buffer, amount) .map(|(addr, port)| allocate_address_pair(state, addr, port)); - ret!(result, state, process, sock, Interest::Read) + ret(result, state, thread, process, sock, Interest::Read) } pub(crate) fn socket_send_bytes_to( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { @@ -200,12 +205,12 @@ pub(crate) fn socket_send_bytes_to( .send_to(buffer, address, port) .map(|size| Int::alloc(state.permanent_space.int_class(), size as i64)); - ret!(result, state, process, sock, Interest::Write) + ret(result, state, thread, process, sock, Interest::Write) } pub(crate) fn socket_send_string_to( state: &State, - _: &mut Thread, + thread: &mut Thread, process: ProcessPointer, arguments: &[Pointer], ) -> Result { @@ -217,7 +222,7 @@ pub(crate) fn socket_send_string_to( .send_to(buffer, address, port) .map(|size| Int::alloc(state.permanent_space.int_class(), size as i64)); - ret!(result, state, process, sock, Interest::Write) + ret(result, state, thread, process, sock, Interest::Write) } pub(crate) fn socket_shutdown_read( diff --git a/vm/src/config.rs b/vm/src/config.rs index f2294cdc1..c43f2e107 100644 --- a/vm/src/config.rs +++ b/vm/src/config.rs @@ -17,8 +17,19 @@ macro_rules! set_from_env { }}; } +/// The default number of reductions to consume before a process suspends +/// itself. const DEFAULT_REDUCTIONS: u16 = 1000; +/// The default number of network poller threads to use. +/// +/// We default to one thread because for most setups this is probably more than +/// enough. +const DEFAULT_NETPOLL_THREADS: u8 = 1; + +/// The maximum number of netpoll threads that are allowed. +const MAX_NETPOLL_THREADS: u8 = 127; + /// Structure containing the configuration settings for the virtual machine. pub struct Config { /// The number of process threads to run. @@ -27,6 +38,14 @@ pub struct Config { /// The number of backup process threads to spawn. pub backup_threads: u16, + /// The number of network poller threads to use. + /// + /// While this value is stored as an u8, it's limited to a maximum of 127. + /// This is because internally we use an i8 to store registered poller IDs, + /// and use the value -1 to signal a file descriptor isn't registered with + /// any poller. + pub netpoll_threads: u8, + /// The number of reductions a process can perform before being suspended. pub reductions: u16, } @@ -38,6 +57,7 @@ impl Config { Config { process_threads: cpu_count, backup_threads: cpu_count * 4, + netpoll_threads: DEFAULT_NETPOLL_THREADS, reductions: DEFAULT_REDUCTIONS, } } @@ -48,9 +68,17 @@ impl Config { set_from_env!(config, process_threads, "PROCESS_THREADS", u16); set_from_env!(config, backup_threads, "BACKUP_THREADS", u16); set_from_env!(config, reductions, "REDUCTIONS", u16); + set_from_env!(config, netpoll_threads, "NETPOLL_THREADS", u8); + config.verify(); config } + + fn verify(&mut self) { + if self.netpoll_threads > MAX_NETPOLL_THREADS { + self.netpoll_threads = MAX_NETPOLL_THREADS; + } + } } #[cfg(test)] @@ -61,6 +89,7 @@ mod tests { match key { "INKO_FOO" => Ok("1"), "INKO_BAR" => Ok("0"), + "INKO_NETPOLL_THREADS" => Ok("4"), _ => Err(()), } } @@ -87,4 +116,17 @@ mod tests { assert_eq!(cfg.reductions, DEFAULT_REDUCTIONS); } + + #[test] + fn test_verify() { + let mut cfg = Config::new(); + + cfg.netpoll_threads = 64; + cfg.verify(); + assert_eq!(cfg.netpoll_threads, 64); + + cfg.netpoll_threads = 130; + cfg.verify(); + assert_eq!(cfg.netpoll_threads, MAX_NETPOLL_THREADS); + } } diff --git a/vm/src/machine.rs b/vm/src/machine.rs index 780ba3d8f..0d9fa434e 100644 --- a/vm/src/machine.rs +++ b/vm/src/machine.rs @@ -122,12 +122,14 @@ impl<'a> Machine<'a> { } { - let state = state.clone(); + for id in 0..state.network_pollers.len() { + let state = state.clone(); - thread::Builder::new() - .name("network poller".to_string()) - .spawn(move || NetworkPollerWorker::new(state).run()) - .unwrap(); + thread::Builder::new() + .name(format!("netpoll {}", id)) + .spawn(move || NetworkPollerWorker::new(id, state).run()) + .unwrap(); + } } state.scheduler.run(&*state, entry_class, entry_method); diff --git a/vm/src/network_poller.rs b/vm/src/network_poller.rs index 06b032966..4074cf23b 100644 --- a/vm/src/network_poller.rs +++ b/vm/src/network_poller.rs @@ -59,19 +59,21 @@ impl NetworkPoller { /// A thread that polls a poller and reschedules processes. pub(crate) struct Worker { + id: usize, state: RcState, } impl Worker { - pub(crate) fn new(state: RcState) -> Self { - Worker { state } + pub(crate) fn new(id: usize, state: RcState) -> Self { + Worker { id, state } } pub(crate) fn run(&self) { let mut events = Vec::new(); + let poller = &self.state.network_pollers[self.id]; loop { - if let Err(err) = self.state.network_poller.poll(&mut events) { + if let Err(err) = poller.poll(&mut events) { if err.kind() != io::ErrorKind::Interrupted { // It's not entirely clear if/when we ever run into this, // but should we run into any error that's _not_ an diff --git a/vm/src/scheduler/process.rs b/vm/src/scheduler/process.rs index 7cff8454a..5d08adcb8 100644 --- a/vm/src/scheduler/process.rs +++ b/vm/src/scheduler/process.rs @@ -124,12 +124,19 @@ pub(crate) struct Thread<'a> { /// A value of 0 indicates the thread isn't blocked. blocked_at: u64, + /// The ID of the network poller assigned to this thread. + /// + /// Threads are each assigned a network poller in a round-robin fashion. + /// This is useful for programs that heavily rely on sockets, as a single + /// network poller thread may not be able to complete its work fast enough. + pub(crate) network_poller: usize, + /// A random number generator to use for the current thread. pub(crate) rng: ThreadRng, } impl<'a> Thread<'a> { - fn new(id: usize, pool: &'a Pool) -> Thread { + fn new(id: usize, network_poller: usize, pool: &'a Pool) -> Thread { Self { id, work: pool.threads[id].queue.clone(), @@ -137,11 +144,12 @@ impl<'a> Thread<'a> { pool, backup: false, blocked_at: NOT_BLOCKING, + network_poller, rng: thread_rng(), } } - fn backup(pool: &'a Pool) -> Thread { + fn backup(network_poller: usize, pool: &'a Pool) -> Thread { Self { // For backup threads the ID/queue doesn't matter, because we won't // use them until we're turned into a regular thread. @@ -151,6 +159,7 @@ impl<'a> Thread<'a> { pool, backup: true, blocked_at: NOT_BLOCKING, + network_poller, rng: thread_rng(), } } @@ -783,6 +792,7 @@ impl Scheduler { method: MethodPointer, ) { let process = Process::main(class, method); + let pollers = state.network_pollers.len(); let _ = scope(move |s| { s.builder() .name("proc monitor".to_string()) @@ -790,16 +800,24 @@ impl Scheduler { .unwrap(); for id in 0..self.primary { + let poll_id = id % pollers; + s.builder() .name(format!("proc {}", id)) - .spawn(move |_| Thread::new(id, &*self.pool).run(state)) + .spawn(move |_| { + Thread::new(id, poll_id, &*self.pool).run(state) + }) .unwrap(); } for id in 0..self.backup { + let poll_id = id % pollers; + s.builder() .name(format!("backup {}", id)) - .spawn(move |_| Thread::backup(&*self.pool).run(state)) + .spawn(move |_| { + Thread::backup(poll_id, &*self.pool).run(state) + }) .unwrap(); } @@ -823,7 +841,7 @@ mod tests { let class = empty_process_class("A"); let process = new_process(*class).take_and_forget(); let scheduler = Scheduler::new(1, 1); - let mut thread = Thread::new(0, &scheduler.pool); + let mut thread = Thread::new(0, 0, &scheduler.pool); thread.schedule(process); @@ -836,7 +854,7 @@ mod tests { let class = empty_process_class("A"); let process = new_process(*class).take_and_forget(); let scheduler = Scheduler::new(1, 1); - let mut thread = Thread::new(0, &scheduler.pool); + let mut thread = Thread::new(0, 0, &scheduler.pool); scheduler.pool.sleeping.fetch_add(1, Ordering::AcqRel); @@ -860,7 +878,7 @@ mod tests { let class = empty_process_class("A"); let process = new_process(*class).take_and_forget(); let scheduler = Scheduler::new(1, 1); - let mut thread = Thread::new(0, &scheduler.pool); + let mut thread = Thread::new(0, 0, &scheduler.pool); thread.schedule_priority(process); @@ -873,7 +891,7 @@ mod tests { let main_method = empty_async_method(); let process = new_main_process(*class, main_method).take_and_forget(); let state = setup(); - let mut thread = Thread::new(0, &state.scheduler.pool); + let mut thread = Thread::new(0, 0, &state.scheduler.pool); thread.schedule(process); thread.run(&state); @@ -889,7 +907,7 @@ mod tests { let main_method = empty_async_method(); let process = new_main_process(*class, main_method).take_and_forget(); let state = setup(); - let mut thread = Thread::new(0, &state.scheduler.pool); + let mut thread = Thread::new(0, 0, &state.scheduler.pool); thread.schedule_priority(process); thread.run(&state); @@ -906,8 +924,8 @@ mod tests { let main_method = empty_async_method(); let process = new_main_process(*class, main_method).take_and_forget(); let state = setup(); - let mut thread0 = Thread::new(0, &state.scheduler.pool); - let mut thread1 = Thread::new(1, &state.scheduler.pool); + let mut thread0 = Thread::new(0, 0, &state.scheduler.pool); + let mut thread1 = Thread::new(1, 0, &state.scheduler.pool); thread1.schedule(process); thread0.run(&state); @@ -924,7 +942,7 @@ mod tests { let main_method = empty_async_method(); let process = new_main_process(*class, main_method).take_and_forget(); let state = setup(); - let mut thread = Thread::new(0, &state.scheduler.pool); + let mut thread = Thread::new(0, 0, &state.scheduler.pool); state.scheduler.pool.schedule(process); thread.run(&state); @@ -941,7 +959,7 @@ mod tests { let main_method = empty_async_method(); let process = new_main_process(*class, main_method).take_and_forget(); let state = setup(); - let mut thread = Thread::new(0, &state.scheduler.pool); + let mut thread = Thread::new(0, 0, &state.scheduler.pool); thread.backup = true; @@ -972,7 +990,7 @@ mod tests { // sleep. This test ensures it wakes up during termination. let _ = scope(|s| { s.spawn(|_| { - let mut thread = Thread::new(0, pool); + let mut thread = Thread::new(0, 0, pool); thread.backup = true; thread.schedule(process); @@ -1000,7 +1018,7 @@ mod tests { let proc2 = new_process(*class).take_and_forget(); let state = setup(); let pool = &state.scheduler.pool; - let mut thread = Thread::new(0, pool); + let mut thread = Thread::new(0, 0, pool); pool.epoch.store(4, Ordering::Release); pool.monitor.status.store(MonitorStatus::Sleeping); @@ -1020,7 +1038,7 @@ mod tests { fn test_thread_finish_blocking() { let state = setup(); let pool = &state.scheduler.pool; - let mut thread = Thread::new(0, pool); + let mut thread = Thread::new(0, 0, pool); thread.start_blocking(); thread.finish_blocking(); @@ -1039,7 +1057,7 @@ mod tests { fn test_thread_blocking() { let state = setup(); let pool = &state.scheduler.pool; - let mut thread = Thread::new(0, pool); + let mut thread = Thread::new(0, 0, pool); thread.blocking(|| { pool.threads[0].blocked_at.store(NOT_BLOCKING, Ordering::Release) @@ -1055,7 +1073,7 @@ mod tests { let proc2 = new_process(*class).take_and_forget(); let state = setup(); let pool = &state.scheduler.pool; - let mut thread = Thread::new(0, pool); + let mut thread = Thread::new(0, 0, pool); thread.schedule(proc1); thread.schedule_priority(proc2); @@ -1081,7 +1099,7 @@ mod tests { #[test] fn test_scheduler_terminate() { let scheduler = Scheduler::new(1, 1); - let thread = Thread::new(0, &scheduler.pool); + let thread = Thread::new(0, 0, &scheduler.pool); scheduler.pool.sleeping.fetch_add(1, Ordering::Release); scheduler.terminate(); diff --git a/vm/src/socket.rs b/vm/src/socket.rs index 01fd60335..91257050d 100644 --- a/vm/src/socket.rs +++ b/vm/src/socket.rs @@ -1,10 +1,10 @@ pub mod socket_address; use crate::network_poller::Interest; -use crate::network_poller::NetworkPoller; use crate::process::ProcessPointer; use crate::runtime_error::RuntimeError; use crate::socket::socket_address::SocketAddress; +use crate::state::State; use socket2::{Domain, SockAddr, Socket as RawSocket, Type}; use std::io; use std::io::Read; @@ -12,9 +12,13 @@ use std::mem::transmute; use std::net::Shutdown; use std::net::{IpAddr, SocketAddr}; use std::slice; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicI8, Ordering}; use std::time::Duration; +/// The registered value to use to signal a socket isn't registered with a +/// network poller. +const NOT_REGISTERED: i8 = -1; + #[cfg(unix)] use libc::{EINPROGRESS, EISCONN}; @@ -164,14 +168,16 @@ pub(crate) struct Socket { /// The raw socket. inner: RawSocket, - /// A flag indicating that this socket has been registered with a poller. + /// The ID of the network poller we're registered with. + /// + /// A value of -1 indicates the socket isn't registered with any poller. /// /// This flag is necessary because the system's polling mechanism may not /// allow overwriting existing registrations without setting some additional /// flags. For example, epoll requires the use of EPOLL_CTL_MOD when /// overwriting a registration, as using EPOLL_CTL_ADD will produce an error /// if a file descriptor is already registered. - registered: AtomicBool, + registered: AtomicI8, /// A flag indicating if we're dealing with a UNIX socket or not. unix: bool, @@ -187,7 +193,11 @@ impl Socket { socket.set_nonblocking(true)?; - Ok(Socket { inner: socket, registered: AtomicBool::new(false), unix }) + Ok(Socket { + inner: socket, + registered: AtomicI8::new(NOT_REGISTERED), + unix, + }) } pub(crate) fn ipv4(kind_int: i64) -> Result { @@ -268,10 +278,13 @@ impl Socket { pub(crate) fn register( &mut self, + state: &State, process: ProcessPointer, - poller: &NetworkPoller, + thread_poller_id: usize, interest: Interest, ) -> Result<(), RuntimeError> { + let existing_id = self.registered.load(Ordering::Acquire); + // Once registered, the process might be rescheduled immediately if // there is data available. This means that once we (re)register the // socket, it is not safe to use "self" anymore. @@ -280,14 +293,20 @@ impl Socket { // // 1. Set "registered" _first_ (if necessary) // 2. Add the socket to the poller - if self.registered.load(Ordering::Acquire) { - Ok(poller.modify(process, &self.inner, interest)?) + let result = if existing_id == NOT_REGISTERED { + let poller = &state.network_pollers[thread_poller_id]; + + self.registered.store(thread_poller_id as i8, Ordering::Release); + poller.add(process, &self.inner, interest) } else { - self.registered.store(true, Ordering::Release); - Ok(poller.add(process, &self.inner, interest)?) - } + let poller = &state.network_pollers[existing_id as usize]; + + poller.modify(process, &self.inner, interest) + }; - // *DO NOT* use "self" from here on. + // *DO NOT* use "self" from here on, as the socket/process may already + // be running on a different thread. + result.map_err(|e| e.into()) } pub(crate) fn accept(&self) -> Result { @@ -299,7 +318,7 @@ impl Socket { Ok(Socket { inner: socket, - registered: AtomicBool::new(false), + registered: AtomicI8::new(NOT_REGISTERED), unix: self.unix, }) } @@ -430,7 +449,7 @@ impl Socket { pub(crate) fn try_clone(&self) -> Result { let sock = Socket { inner: self.inner.try_clone()?, - registered: AtomicBool::new(false), + registered: AtomicI8::new(NOT_REGISTERED), unix: self.unix, }; @@ -462,11 +481,11 @@ mod tests { fn test_try_clone() { let socket1 = Socket::ipv4(0).unwrap(); - socket1.registered.store(true, Ordering::Release); + socket1.registered.store(2, Ordering::Release); let socket2 = socket1.try_clone().unwrap(); - assert!(!socket2.registered.load(Ordering::Acquire)); + assert_eq!(socket2.registered.load(Ordering::Acquire), NOT_REGISTERED); assert!(!socket2.unix); } } diff --git a/vm/src/state.rs b/vm/src/state.rs index 5b2679f21..34757903d 100644 --- a/vm/src/state.rs +++ b/vm/src/state.rs @@ -20,13 +20,13 @@ pub(crate) type RcState = ArcWithoutWeak; /// The state of a virtual machine. pub(crate) struct State { /// The virtual machine's configuration. - pub config: Config, + pub(crate) config: Config, /// The start time of the VM (more or less). - pub start_time: time::Instant, + pub(crate) start_time: time::Instant, /// The commandline arguments passed to an Inko program. - pub arguments: Vec, + pub(crate) arguments: Vec, /// The environment variables defined when the VM started. /// @@ -34,25 +34,25 @@ pub(crate) struct State { /// (or through libraries) may call `setenv()` concurrently with `getenv()` /// calls, which is unsound. Caching the variables also means we can safely /// use `localtime_r()` (which internally may call `setenv()`). - pub environment: HashMap, + pub(crate) environment: HashMap, /// The exit status to use when the VM terminates. - pub exit_status: Mutex, + pub(crate) exit_status: Mutex, /// The scheduler to use for executing Inko processes. - pub scheduler: Scheduler, + pub(crate) scheduler: Scheduler, /// A task used for handling timeouts, such as message and IO timeouts. - pub timeout_worker: TimeoutWorker, + pub(crate) timeout_worker: TimeoutWorker, - /// The system polling mechanism to use for polling non-blocking sockets. - pub network_poller: NetworkPoller, + /// The network pollers to use for process threads. + pub(crate) network_pollers: Vec, /// All builtin functions that a compiler can use. - pub builtin_functions: BuiltinFunctions, + pub(crate) builtin_functions: BuiltinFunctions, /// A type for allocating and storing blocks and permanent objects. - pub permanent_space: PermanentSpace, + pub(crate) permanent_space: PermanentSpace, /// The random state to use for building hashers. /// @@ -104,6 +104,9 @@ impl State { config.backup_threads as usize, ); + let network_pollers = + (0..config.netpoll_threads).map(|_| NetworkPoller::new()).collect(); + let state = State { scheduler, environment, @@ -112,7 +115,7 @@ impl State { exit_status: Mutex::new(0), timeout_worker: TimeoutWorker::new(), arguments, - network_poller: NetworkPoller::new(), + network_pollers, builtin_functions: BuiltinFunctions::new(), permanent_space, hash_state,