Skip to content

Conversation

andrewrk
Copy link
Member

@andrewrk andrewrk commented Oct 15, 2025

This patchset adds std.Io and provides two implementations for it:

  • std.Io.Threaded - based on a thread pool.
    • -fno-single-threaded - supports concurrency and cancellation.
    • -fsingle-threaded - does not support concurrency or cancellation.
  • std.Io.Evented - already bitrotted, work-in-progress, experimental. There
    is an IoUring implementation for Linux, and nothing else yet. This API is not
    ready to be used yet, but it serves to inform the evolution of std.Io API.

std.Io.Threaded has networking and file-system operations implemented.
Cancellation works beautifully, except for a known race condition that has a
couple of competing solutions already in mind.

All of std.net has been deleted in favor of std.Io.net.

std.fs has been partially updated to use std.Io - only as required so that
std.Io.Writer.sendFile could use *std.Io.File.Reader rather than
*std.fs.File.Reader.

closes #8224

Laundry List of Io Features

  • async/await - these primitives express that operations can be done
    independently, making them infallible and support execution on limited Io
    implementations that lack a concurrency mechanism.
  • concurrent - same as async except communicates that the operation
    must be done concurrently for correctness. Requires memory allocation.
  • cancel - equivalent to await except also requests the Io implementation
    to interrupt the operation and return error.Canceled. std.Io.Threaded
    supports cancellation by sending a signal to a thread, causing blocking
    syscalls to return EINTR, giving a chance to notice the cancellation request.
  • select - API for blocking on multiple futures using switch syntax
  • Group - efficiently manages many async tasks. Supports waiting for and
    cancelling all tasks in the group together.
  • Queue(T) - Many producer, many consumer, thread-safe, runtime configurable
    buffer size. When buffer is empty, consumers suspend and are resumed by
    producers. When buffer is full, producers suspend and are resumed by consumers.
    • Avoids code bloat using a type safe wrapper around TypeErasedQueue.
  • Select - for blocking on runtime-known number of tasks and handling a
    subset of them.
  • Clock, Duration, Timestamp, Timeout - type safety for units of measurement
  • Mutex, Condition - synchronization primitives

Demo

Here is an example that makes an HTTP request to a domain:

const std = @import("std");
const assert = std.debug.assert;
const Allocator = std.mem.Allocator;
const Io = std.Io;
const HostName = std.Io.net.HostName;

pub fn main() !void {
    var debug_allocator: std.heap.DebugAllocator(.{}) = .init;
    const gpa = debug_allocator.allocator();

    var threaded: std.Io.Threaded = .init(gpa);
    defer threaded.deinit();

    const io = threaded.io();

    const args = try std.process.argsAlloc(gpa);

    const host_name: HostName = try .init(args[1]);

    var http_client: std.http.Client = .{ .allocator = gpa, .io = io };
    defer http_client.deinit();

    var request = try http_client.request(.HEAD, .{
        .scheme = "http",
        .host = .{ .percent_encoded = host_name.bytes },
        .port = 80,
        .path = .{ .percent_encoded = "/" },
    }, .{});
    defer request.deinit();

    try request.sendBodiless();

    var redirect_buffer: [1024]u8 = undefined;
    var response = try request.receiveHead(&redirect_buffer);
    std.log.info("received {d} {s}", .{ response.head.status, response.head.reason });
}

Thanks to the fact that networking is now taking advantage of the new std.Io interface,
this code has the following properties:

  • It asynchronously sends out DNS queries to each configured nameserver
  • As each response comes in, it immediately, asynchronously tries to TCP connect to the
    returned IP address.
  • Upon the first successful TCP connection, all other in-flight connection
    attempts are canceled, including DNS queries.
  • The code also works when compiled with -fsingle-threaded even though the
    operations happen sequentially.
  • No heap allocation.

You can see how this is implemented in std.Io.net.HostName.connect:

pub fn connect(
    host_name: HostName,
    io: Io,
    port: u16,
    options: IpAddress.ConnectOptions,
) ConnectError!Stream {
    var connect_many_buffer: [32]ConnectManyResult = undefined;
    var connect_many_queue: Io.Queue(ConnectManyResult) = .init(&connect_many_buffer);

    var connect_many = io.async(connectMany, .{ host_name, io, port, &connect_many_queue, options });
    var saw_end = false;
    defer {
        connect_many.cancel(io);
        if (!saw_end) while (true) switch (connect_many_queue.getOneUncancelable(io)) {
            .connection => |loser| if (loser) |s| s.closeConst(io) else |_| continue,
            .end => break,
        };
    }

    var aggregate_error: ConnectError = error.UnknownHostName;

    while (connect_many_queue.getOne(io)) |result| switch (result) {
        .connection => |connection| if (connection) |stream| return stream else |err| switch (err) {
            error.SystemResources,
            error.OptionUnsupported,
            error.ProcessFdQuotaExceeded,
            error.SystemFdQuotaExceeded,
            error.Canceled,
            => |e| return e,

            error.WouldBlock => return error.Unexpected,

            else => |e| aggregate_error = e,
        },
        .end => |end| {
            saw_end = true;
            try end;
            return aggregate_error;
        },
    } else |err| switch (err) {
        error.Canceled => |e| return e,
    }
}

pub const ConnectManyResult = union(enum) {
    connection: IpAddress.ConnectError!Stream,
    end: ConnectError!void,
};

/// Asynchronously establishes a connection to all IP addresses associated with
/// a host name, adding them to a results queue upon completion.
pub fn connectMany(
    host_name: HostName,
    io: Io,
    port: u16,
    results: *Io.Queue(ConnectManyResult),
    options: IpAddress.ConnectOptions,
) void {
    var canonical_name_buffer: [max_len]u8 = undefined;
    var lookup_buffer: [32]HostName.LookupResult = undefined;
    var lookup_queue: Io.Queue(LookupResult) = .init(&lookup_buffer);

    host_name.lookup(io, &lookup_queue, .{
        .port = port,
        .canonical_name_buffer = &canonical_name_buffer,
    });

    var group: Io.Group = .init;

    while (lookup_queue.getOne(io)) |dns_result| switch (dns_result) {
        .address => |address| group.async(io, enqueueConnection, .{ address, io, results, options }),
        .canonical_name => continue,
        .end => |lookup_result| {
            group.waitUncancelable(io);
            results.putOneUncancelable(io, .{ .end = lookup_result });
            return;
        },
    } else |err| switch (err) {
        error.Canceled => |e| {
            group.cancel(io);
            results.putOneUncancelable(io, .{ .end = e });
        },
    }
}

Upgrade Guide

Missing io Parameter

If you need an io parameter, and you don't have one, you can get one like this:

var threaded: Io.Threaded = .init_single_threaded;
const io = threaded.io();

This is legal as long as these functions are not called:

  • Io.VTable.async
  • Io.VTable.concurrent
  • Io.VTable.groupAsync

This is a temporary workaround - a lot like reaching for std.heap.page_allocator when
you need an Allocator and do not have one. Instead, it is better to accept an
Io parameter if you need one (or store one on a context struct for convenience).
Point is that the application's main function should generally be responsible for
constructing the Io instance used throughout.

When you're testing you can use std.testing.io (much like std.testing.allocator).

Related

Merge Blockers

  • Threaded: finish macos impl
  • Threaded: finish windows impl
  • Threaded: finish wasi impl
  • implement cancelRequest for non-linux posix
  • Threaded: implement dirStat
  • Threaded: handle have_pread_but_not_preadv
  • Threaded: implement fileSeekBy

Followup Issues

  • Threaded: glibc impl of netLookup
  • Threaded: openbsd impl of netLookup
  • Threaded: freebsd impl of netLookup
  • Threaded: darwin impl of netLookup
  • Threaded: implement netInterfaceName
  • Threaded: handle WSAEINPROGRESS and WSAEINTR in netSendOne
  • Threaded: implement netConnect with timeout
  • when concurrent tasks caused pool to be oversize, reduce pool down to cpu count upon idling
  • Threaded: eliminate dependency on std.Thread (Mutex, Condition, maybe more)
  • move max_iovecs_len to std.Io
  • address the cancelation race condition (signal received between checkCancel and syscall)
  • update signal values to be an enum
  • delete the deprecated fs.File functions
  • move fs.File.Writer to Io
  • add non-blocking flag to net and fs operations, handle EAGAIN
  • finish moving std.fs to Io
  • migrate child process into std.Io
  • eliminate std.Io.poll (it should be replaced by "select" functionality)
  • finish moving all of std.posix into Threaded
  • TCP fastopen - sends initial payload along with connection. can be done for idempotent http requests
  • Threaded: in netLookup, potentially add diagnostics to the output in several places
  • audit writeResolutionQuery - there's a suspicious assert

andrewrk and others added 30 commits October 15, 2025 13:49
which is planned to have all I/O operations in the interface, but for
now has only async and await.
sorry, something still not working correctly
When the previous fiber did not request to be registered as an awaiter,
it may not have actually been a full blown `Fiber`, so only create the
`Fiber` pointer when needed.
This avoids needing to store more sizes and alignments.  Only the result
alignment needs to be stored, because `Fiber` is at a fixed zero offset.
Something is horribly wrong with scheduling, as can be seen in the
debug output, but at least it somehow manages to exit cleanly...
How silly of me to forget that the kernel doesn't implement its own API.
The scheduling is not great, but at least doesn't deadlock or hammer.
data races on deinit tho
 1. a fiber can't put itself on a queue that allows it to be rescheduled
 2. allow the idle fiber to unlock a mutex held by another fiber by
    ignoring reschedule requests originating from the idle fiber
to bucket and free fiber allocations
`std.Io.Evented` is introduced to select an appropriate Io
implementation depending on OS
@andrewrk andrewrk added breaking Implementing this issue could cause existing code to no longer compile or have different behavior. release notes This PR should be mentioned in the release notes. labels Oct 16, 2025
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (*Group, context: *const anyopaque) void,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a particular reason for passing the *Group as a separate parameter to start (which is not needed by the majority of call sites) instead of just having Select(...).async add it to the args in context?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I'm not understanding the suggestion, but that kind of sounds like a pain in the ass and unclear that it will generate better code

before, the max length of the host name depended on the target.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

breaking Implementing this issue could cause existing code to no longer compile or have different behavior. release notes This PR should be mentioned in the release notes.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Proposal: Event loop redesign

5 participants