Skip to content

Commit

Permalink
Automatically replace blocking threads
Browse files Browse the repository at this point in the history
This changes the scheduler such that it supports replacing blocking
threads with backup threads. Prior to performing a blocking operation,
such as reading a file, a thread signals that it's about to enter a
blocking operation. When it's done with the operation it signals this
accordingly. In the background a monitor thread periodically checks for
blocking threads. If this monitor thread finds any threads that have
been blocking for too long, it signals them to become a backup thread.

The thread counts are fixed as this is easier to implement, and removes
the need for synchronisation when stealing work from queues. The pool is
now divided into two groups of threads: primary and backup threads. The
primary thread count (P) defaults to the number of CPU cores, while the
backup thread count (B) defaults to four times the number of CPU cores.

When the blocked thread returns from the blocking call it continues
running the process. When it would normally pick up new work, it
transitions to a blocking thread. We don't transition to a backup thread
immediately after returning from the blocking call, as this could result
in the process taking even longer to complete.

The monitor thread runs at an interval of 100 µsec, but the exact
interval may be slightly higher depending on the timer resolution.
Windows in particular is quite bad at this, with a default resolution of
around 16 milliseconds. In general the interval is just a best-case
effort to prevent Inko processes from blocking OS threads forever,
rather than a hard guarantee.

The cost of a thread marking itself as a blocking thread is somewhere
between 100 nanoseconds and 1 µsec, depending a bit on system load and
if the monitor thread needs to be woken up from a deep sleep.

In this setup, there may be more than P threads performing non-bocking
work, albeit only for a brief period of time. This is fine and the
system eventually settles back at P number of threads. The maximum
capacity is P+B. One crucial detail is that while the total capacity is
P+B, there are only P queues. This means that no matter what value B is
set to, threads only need to consider stealing from up to P queues. This
means you can increase B without this resulting in work stealing
becoming more expensive.

As part of this work I considered simply having a fixed size pool for
_only_ blocking processes, similar to the setup we had before version
0.10.0. I ultimately didn't go with this approach, as moving processes
between pools for _every_ blocking operation would result in increased
latency and poorer cache utilisation.

This fixes https://gitlab.com/inko-lang/inko/-/issues/247.

Changelog: performance
  • Loading branch information
yorickpeterse committed Sep 20, 2022
1 parent 959a2f8 commit 89467af
Show file tree
Hide file tree
Showing 26 changed files with 1,024 additions and 173 deletions.
20 changes: 16 additions & 4 deletions docs/source/internals/vm.md
Expand Up @@ -66,10 +66,22 @@ poller". This is a system/thread that polls a list of sockets until they are
ready, rescheduling their corresponding processes. Polling is done using APIs
such as epoll on Linux, kqueue on macOS/BSD, and IO completion ports on Windows.

For file IO we block the OS thread. Inko used to use a dedicated pool of OS
threads for blocking operations, but we removed this to simplify the VM. An
alternative and better approach is discussed in [this
issue](https://gitlab.com/inko-lang/inko/-/issues/247).
For blocking operations, such as file IO, Inko uses a fixed amount of backup
threads. When an OS thread is about to enter a blocking operation, it sets a
flag indicating when it did so. This is implemented such that it in most cases
it won't take more than 100-200 nanoseconds.

In the background a monitor thread periodically examines all OS threads. If it
finds an OS thread is blocking for too long, it wakes up a backup thread to take
over the work of this blocking OS thread. When the blocking OS thread finishes
the blocking call it continues running its process. When the process is
rescheduled and the OS thread would pick up new work, it becomes a backup thread
instead.

The number of backup threads is controlled using the environment variable
`INKO_BACKUP_THREADS` and defaults to four times the number of CPU cores. The
monitor thread runs at an interval of 100 microseconds, though the exact
interval may differ between platforms. This interval can't be changed.

## Timeouts

Expand Down
9 changes: 7 additions & 2 deletions vm/src/builtin_functions.rs
Expand Up @@ -2,6 +2,7 @@
use crate::mem::Pointer;
use crate::process::ProcessPointer;
use crate::runtime_error::RuntimeError;
use crate::scheduler::process::Thread;
use crate::state::State;
use std::io::Read;

Expand All @@ -21,8 +22,12 @@ mod sys;
mod time;

/// A builtin function that can be called from Inko source code.
pub(crate) type BuiltinFunction =
fn(&State, ProcessPointer, &[Pointer]) -> Result<Pointer, RuntimeError>;
pub(crate) type BuiltinFunction = fn(
&State,
&mut Thread,
ProcessPointer,
&[Pointer],
) -> Result<Pointer, RuntimeError>;

/// Reads a number of bytes from a buffer into a Vec.
pub(crate) fn read_into<T: Read>(
Expand Down
3 changes: 3 additions & 0 deletions vm/src/builtin_functions/array.rs
@@ -1,10 +1,12 @@
use crate::mem::{Array, Int, Pointer};
use crate::process::ProcessPointer;
use crate::runtime_error::RuntimeError;
use crate::scheduler::process::Thread;
use crate::state::State;

pub(crate) fn array_reserve(
_: &State,
_: &mut Thread,
_: ProcessPointer,
args: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -17,6 +19,7 @@ pub(crate) fn array_reserve(

pub(crate) fn array_capacity(
state: &State,
_: &mut Thread,
_: ProcessPointer,
args: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand Down
3 changes: 3 additions & 0 deletions vm/src/builtin_functions/byte_array.rs
Expand Up @@ -2,10 +2,12 @@ use crate::immutable_string::ImmutableString;
use crate::mem::{ByteArray, Pointer, String as InkoString};
use crate::process::ProcessPointer;
use crate::runtime_error::RuntimeError;
use crate::scheduler::process::Thread;
use crate::state::State;

pub(crate) fn byte_array_to_string(
state: &State,
_: &mut Thread,
_: ProcessPointer,
args: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -21,6 +23,7 @@ pub(crate) fn byte_array_to_string(

pub(crate) fn byte_array_drain_to_string(
state: &State,
_: &mut Thread,
_: ProcessPointer,
args: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand Down
10 changes: 10 additions & 0 deletions vm/src/builtin_functions/env.rs
Expand Up @@ -4,12 +4,14 @@ use crate::mem::{Array, Pointer, String as InkoString};
use crate::platform;
use crate::process::ProcessPointer;
use crate::runtime_error::RuntimeError;
use crate::scheduler::process::Thread;
use crate::state::State;
use std::env;
use std::path::PathBuf;

pub(crate) fn env_get(
state: &State,
_: &mut Thread,
_: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -25,6 +27,7 @@ pub(crate) fn env_get(

pub(crate) fn env_variables(
state: &State,
_: &mut Thread,
_: ProcessPointer,
_: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -41,6 +44,7 @@ pub(crate) fn env_variables(

pub(crate) fn env_home_directory(
state: &State,
_: &mut Thread,
_: ProcessPointer,
_: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -58,6 +62,7 @@ pub(crate) fn env_home_directory(

pub(crate) fn env_temp_directory(
state: &State,
_: &mut Thread,
_: ProcessPointer,
_: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -68,6 +73,7 @@ pub(crate) fn env_temp_directory(

pub(crate) fn env_get_working_directory(
state: &State,
_: &mut Thread,
_: ProcessPointer,
_: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -78,6 +84,7 @@ pub(crate) fn env_get_working_directory(

pub(crate) fn env_set_working_directory(
_: &State,
_: &mut Thread,
_: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -89,6 +96,7 @@ pub(crate) fn env_set_working_directory(

pub(crate) fn env_arguments(
state: &State,
_: &mut Thread,
_: ProcessPointer,
_: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -100,6 +108,7 @@ pub(crate) fn env_arguments(

pub(crate) fn env_platform(
_: &State,
_: &mut Thread,
_: ProcessPointer,
_: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -108,6 +117,7 @@ pub(crate) fn env_platform(

pub(crate) fn env_executable(
state: &State,
_: &mut Thread,
_: ProcessPointer,
_: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand Down
13 changes: 13 additions & 0 deletions vm/src/builtin_functions/ffi.rs
Expand Up @@ -5,10 +5,12 @@ use crate::ffi::{
use crate::mem::{Array, Int, Pointer, String as InkoString};
use crate::process::ProcessPointer;
use crate::runtime_error::RuntimeError;
use crate::scheduler::process::Thread;
use crate::state::State;

pub(crate) fn ffi_library_open(
_: &State,
_: &mut Thread,
_: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -22,6 +24,7 @@ pub(crate) fn ffi_library_open(

pub(crate) fn ffi_function_attach(
_: &State,
_: &mut Thread,
_: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -39,6 +42,7 @@ pub(crate) fn ffi_function_attach(

pub(crate) fn ffi_function_call(
state: &State,
_: &mut Thread,
_: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -50,6 +54,7 @@ pub(crate) fn ffi_function_call(

pub(crate) fn ffi_pointer_attach(
_: &State,
_: &mut Thread,
_: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -66,6 +71,7 @@ pub(crate) fn ffi_pointer_attach(

pub(crate) fn ffi_pointer_read(
state: &State,
_: &mut Thread,
_: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -79,6 +85,7 @@ pub(crate) fn ffi_pointer_read(

pub(crate) fn ffi_pointer_write(
_: &State,
_: &mut Thread,
_: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -96,6 +103,7 @@ pub(crate) fn ffi_pointer_write(

pub(crate) fn ffi_pointer_from_address(
_: &State,
_: &mut Thread,
_: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -106,6 +114,7 @@ pub(crate) fn ffi_pointer_from_address(

pub(crate) fn ffi_pointer_address(
state: &State,
_: &mut Thread,
_: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -116,6 +125,7 @@ pub(crate) fn ffi_pointer_address(

pub(crate) fn ffi_type_size(
_: &State,
_: &mut Thread,
_: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -126,6 +136,7 @@ pub(crate) fn ffi_type_size(

pub(crate) fn ffi_type_alignment(
_: &State,
_: &mut Thread,
_: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -136,6 +147,7 @@ pub(crate) fn ffi_type_alignment(

pub(crate) fn ffi_library_drop(
_: &State,
_: &mut Thread,
_: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -148,6 +160,7 @@ pub(crate) fn ffi_library_drop(

pub(crate) fn ffi_function_drop(
_: &State,
_: &mut Thread,
_: ProcessPointer,
arguments: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand Down
3 changes: 3 additions & 0 deletions vm/src/builtin_functions/float.rs
@@ -1,10 +1,12 @@
use crate::mem::{Float, Int, Pointer};
use crate::process::ProcessPointer;
use crate::runtime_error::RuntimeError;
use crate::scheduler::process::Thread;
use crate::state::State;

pub(crate) fn float_to_bits(
state: &State,
_: &mut Thread,
_: ProcessPointer,
args: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand All @@ -15,6 +17,7 @@ pub(crate) fn float_to_bits(

pub(crate) fn float_from_bits(
state: &State,
_: &mut Thread,
_: ProcessPointer,
args: &[Pointer],
) -> Result<Pointer, RuntimeError> {
Expand Down

0 comments on commit 89467af

Please sign in to comment.