Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ability to write code that is agnostic of blocking vs async I/O #1778

Open
andrewrk opened this issue Nov 24, 2018 · 11 comments

Comments

Projects
None yet
5 participants
@andrewrk
Copy link
Member

commented Nov 24, 2018

This is a pretty popular blog post from 2015 that helps illustrate
one way of thinking about concurrency and async I/O:
What Color is Your Function?

Here's a proposal along these lines. I think we should at least consider it.

Depends on:

  • #287 (copy elision)
  • (lots of issues) (coroutine rewrite)
  • #1764 (single threaded vs multithreaded build option)
  • #1047 (remove namespace type and make every file an empty struct)
  • #924 (thread local variables)
  • I'll type up a separate proposal later, but let's just put the root source file aliased as @import("root") in all packages, so that the standard library and third party packages can access it.

Goals:

  • Ability to write code that works in an event-driven or blocking context.
    For example, libraries should be able to use OS features such as the
    file system and networking without having an opinion about whether to
    use blocking or event-based APIs.
  • In an application that never creates an event loop, there should be
    no code generated to deal with event loops.
  • In an application that never uses blocking I/O, there should be no
    runtime overhead associated with detecting the event loop, and no code
    generated at all to do blocking I/O.
  • An OS kernel should be able to use this mechanism effectively.
    Potentially even would be able to use external Zig packages that make
    standard library API calls, and have those directed to functions in the
    root source file.
  • Code can even express concurrency, such as two independent writes,
    and then wait for them both to be done, and then if this code is used
    from a blocking I/O application, with --single-threaded, it is as if
    it were implemented fully in a blocking fashion.

In summary, writing Zig code should be maximally portable. That means proper
Zig libraries could work in constrained memory environments, multi-threaded environments,
single-threaded environments, blocking I/O applications, evented I/O applications,
inside OS kernels, and userland applications. And not only work correctly in these environments, but work optimally.

Implementation:

  • Drop the async notation from functions. A function is a coroutine if it has a
    await or suspend in it. This is still part of the function's prototype; however
    whether it is a coroutine or not is inferred.
    • Note that this gives functions the property that, depending on comptime values,
      a function could either be a coroutine or a normal function.
  • A function is also a coroutine if it calls a coroutine naively, without storing the coroutine frame somewhere, because this implies an await.

Standard library functions that perform I/O, such as std.os.File.write, have bodies that look like
this:

pub fn write(file: *File, bytes: []const u8) usize {
    // Note that this is an if with comptime-known condition.
    if (std.event.loop.instance) |event_loop| {
        const msg = std.event.fs.Msg{
            .Write = std.event.fs.Msg.Write{
                .handle = file.handle,
                .ptr = bytes.ptr,
                .len = bytes.len,
                .result = undefined,
            },
        };
        suspend {
            event_loop.queueFsWrite(@handle(), &msg);
        }
        return msg.Write.result;
    } else {
        // blocking call
        return std.os.linux.write(file.handle, bytes.ptr, bytes.len);
    }
}

In std/event/loop.zig:

threadlocal var per_thread_instance: ?*Loop = null;
var global_state: Loop = undefined;
const io_mode = @fieldOrDefault(@import("root"), "io_mode", IoMode.Blocking);
const default_instance: ?*Loop = switch (io_mode) {
    .Blocking => null,
    .Evented => &global_state,
    .Mixed => per_thread_instance,
};
const instance: ?*Loop = @fieldOrDefault(@import("root", "event_loop", default_instance);

In the root source file, pub const io_mode determines whether
the application is 100% blocking, 100% evented, or mixed (per-thread).
If nothing is specified then the application is 100% blocking.

Or an application can take even more control, and set the event loop instance directly.
This would potentially be used for OS kernels, which need an event loop specific to their
own code.

When the IO method is Mixed, in the standard library event loop implementation,
worker threads get a thread local variable per_thread_instance set to the event loop
instance pointer. The "main thread" which calls run also sets this thread local
variable to the event loop instance pointer. In this way, sections of the codebase
can be isolated from each other; threads which are not in the thread pool of the
event loop get blocking I/O (or potentially belong to a different event loop) while
threads belonging to a given event loop's thread pool, find their owner instance.

Now let's look at some everyday code that wants to call write:

fn foo() void {
    const rc = file.write("hello\n");
}

// assume this is in root source file
pub const io_mode = .Evented;

Because io_mode is Evented the write function ends up calling suspend
and is therefore a coroutine. And following this, foo is also therefore a
coroutine, since it calls write.

When a coroutine calls a coroutine in this manner, it does a tail-async-await
coroutine call using its own stack.

But what if the code wants to express possible concurrency?

fn foo() void {
    var future1 = async file1.write("hello1\n");
    var future2 = async file2.write("hello2\n");
    const rc1 = await future1;
    const rc2 = await future2;
}

async is a keyword that takes any expression, which could be a block, but in
this case is a function call. The async expression itself returns a coroutine frame
type, which supports await. The inner expression result is stored in the frame,
and is the result when using await. I'll elaborate more on async blocks later.

If an application is compiled with IoMode.Blocking, then the write function is
blocking. How async interacts with an expression that is all blocking,
is to have the result of the async expression be the result of the inner expression.
So then the type of future1 and future2 remain the same as before for consistency,
but in the code generated, they are just the result values, and then the await
expressions are no-ops. The function is essentially rewritten as:

fn foo() void {
    const rc1 = file1.write("hello1\n");
    const rc2 = file2.write("hello2\n");
}

Which makes foo blocking as well.

What about a CPU bound task?

fn areTheyEqual() bool {
    var pi_frame = async blk: {
        std.event.loop.startCpuTask();
        break :blk calculatePi();
    };
    var e_frame = async blk: {
        std.event.loop.startCpuTask();
        break :blk calculateE();
    };
    const pi = await pi_frame;
    const e = await e_frame;
    return pi == e;
}

Here, startCpuTask is defined as:

fn startCpuTask() void {
    if (@import("builtin").is_single_threaded) {
        return;
    } else if (std.event.loop.instance) |event_loop| {
        suspend {
            event_loop.onNextTick(@handle());
        }
    }
}

So, if you build this function in multi-threaded mode, with io_mode != IoMode.Blocking,
the async expression suspends in the startCpuTask and then gets resumed by the event
loop on another thread. areTheyEqual becomes a coroutine. So even though
calculatePi and calculateE are blocking functions, they end up executing in different
threads.

If you build this application in --single-threaded mode, startCpuTask ends up being
return;. It is thus not a coroutine. And so the async expressions in
areTheyEqual have only blocking calls, which means they turn into normal expressions,
and after the function is analyzed, it looks like this:

fn areTheyEqual() bool {
    const pi = calculatePi();
    const e = calculateE();
    return pi == e;
}

@andrewrk andrewrk added the proposal label Nov 24, 2018

@andrewrk andrewrk added this to the 0.4.0 milestone Nov 24, 2018

@daurnimator

This comment has been minimized.

Copy link
Contributor

commented Nov 24, 2018

Ability to write code that works in an event-driven or blocking context.
For example, libraries should be able to use OS features such as the
file system and networking without having an opinion about whether to
use blocking or event-based APIs.

This is a great goal. Being able to write a library that e.g. talks to redis, without caring if it's calling async or sync functions is great: it means that the library becomes a non-blocking redis library if run from the right place!

In an application that never creates an event loop, there should be
no code generated to deal with event loops.

I think the answer here depends: some code will need an event loop. However this can be made transparent: the caller doesn't necessarily need to know. One example I had recently was DNS lookups: you want to do both A and AAAA lookups, and would like to parallelize them:

fn resolve(host: []u8) []dnsResults {
    var r = dnsResults.init();
    var t1 = newthread { r.lookupA(host) };
    var t2 = newthread { r.lookupAAAA(host) };
    waitFor(t1, t2);
    return r;
}

This example has no need to yield back to the main loop: the event loop can happen directly inside of this function. However in an existing main loop, waiting on the records should be done at the top level.

In an application that never uses blocking I/O, there should be no
runtime overhead associated with detecting the event loop, and no code
generated at all to do blocking I/O.

If possible this would be good. Though maybe not possible?

Code can even express concurrency, such as two independent writes,
and then wait for them both to be done, and then if this code is used
from a blocking I/O application, with --single-threaded, it is as if
it were implemented fully in a blocking fashion.

I covered this above.


Language Level

I think you just need the following functions on coroutines at a language level.

  • create(allocator: *Allocator, stackSize: usize, continuation: fn(arg: T) void) OutOfMemory!*Coroutine: allocate a new stack and set it up however is required.
    • a private stack field
    • a resumedBy:?*Coroutine field
    • continuation: fn(arg: T) T gets pushed onto the new stack. It takes a customisable type (which is the type that can be passed to/from resuming/yielding)
    • a complete:bool field indicating if the coroutine is finished
  • isRunning(co: *Coroutine) bool returns if the passed coroutine is currently running
    i.e. co.resumedBy != null
  • running() ?*Coroutine returns the currently running coroutine (saved in thread-local storage)
    • returns null if not in a coroutine (e.g. at _init)
  • resume(co: *Coroutine, arg: T) T
    • assert that co is not currently running
    • co.resumedBy gets set to the current coroutine.
    • set the "current" coroutine in thread-local-storage to co
    • jump to co.stack
    • (the other coroutine now gets to run until return or yield)
    • return whatever was returned/yielded from
  • @yield(ret: T) T
    • assert that running().resumedBy != null
    • push a function onto the current stack that restores current registers and returns a T (which is what will get run on resumption)
    • restore the "current" coroutine in thread-local-storage to co.resumedBy
    • jump back to the coroutine that resumed us and have resume return ret

Standard Library

I'm finding it difficult to express what I'm suggesting. Hopefully this sample pseudo-zig-code makes sense:

const JobList = std.LinkedList(void);
struct Job {
  node: JobList.Node, // intrusive linked list
  co: Coroutine,
  waitingOn: []WaitArg,
  readyWaits: ArrayList(WaitArg),
  onComplete: ?fn (*Job) void,

  pub fn init(f: fn(void) void, onComplete: ?fn (*Job) void) Job {
    return Job{
      .node = JobList.Node.init(undefined),
      .co = coroutine.create(self.allocator, @stackRequired(f), startJob), // something here to pass f
      .waitingOn = null,
      .onComplete = onComplete,
    }
  }
}

threadlocal var currentLoop: ?&eventLoop = null;
threadlocal var currentJob: ?&Job = null;

struct eventLoop {
  allocator: *Allocator,
  pendingJobs: JobList,

  pub fn empty(self:*eventLoop) bool {
    return pendingJobs.len == 0;
  }

  pub fn step(self: *eventLoop) !void {
    var ready: []waitArg = undefined;
    if (currentLoop != null) {
      // Let parent event loop do the waiting
      ready = wait([]WaitArg{ { .EventLoop = &self } });
    } else {
      // There is no event loop above us, do the OS-level polling here
      ready = someOsPoll(self.miscPrivateFields); // e.g. epoll, poll, select... whatever platform API is best
    }

    for (ready) |wait| {
      for (findJobsThatWant(wait)) |job| {
        job.readyWaits.add(wait); // on failure could just ignore, if not using edge-triggered events it will trigger again on next step() call
        try self.pendingJobs.append(job);
      }
    }

    var oldLoop = currentLoop;
    currentLoop = self;
    for (self.pendingJobs) |job| {
      var oldJob = currentJob;
      currentJob = &job;
      job.waitingOn = job.co.resume(job.readyWaits.toSlice());
      currentJob = oldJob;
      job.readyWaits.shrink(0);
      if (job.co.complete) {
        self.pendingJobs.remove(job);
        job.onComplete();
        continue;
      }
      // update miscPrivateFields based on job.waitingOn
    }
    currentLoop = oldLoop;
  }

  pub fn loop(self: *eventLoop) !void {
    while (!self.empty()) {
      try self.step();
    }
  }

  pub fn addJob(self: *eventLoop, job: *Job) void {
    self.pendingJobs.append(job);
  }

  pub fn newJob(self: *eventLoop, f: fn(void) void) !*Job {
    var job = try self.allocator.create(Job.init(f, allocator.destroy));
    addJob(self, job);
    return job;
  }
}

const WaitType = enum { Timeout, PollFD, EventLoop }; // + any other OS primitives to wait on
const WaitArg = union(WaitType) {
  Timeout: f64,
  PollFD: struct {
    fd: i32,
    events: i32, // mask of POLLIN|POLLOUT|POLLPRI and possibly other poll() flags
  },
  EventLoop: *eventLoop, // We build in support so that event loops are themselves wait-able. This allows multiple layers of loops to be composable.
};
pub fn wait(arg: []WaitArg) ![]WaitArg {
  if (currentJob.co == coroutine.running()) { // if we are inside of a managed coroutine then let the loop do the work
    return @yield(arg);
  } else {
    // otherwise create a new single-use loop. could have a global one prepared.
    var loop = eventLoop.init(); // stack allocated
    defer loop.destroy();
    var job = Job.init(wait); // stack allocated job
    loop.addJob(&job);
    try loop.loop();
    // filter arg based on job.readyWaits
    return arg;
  }
}

Then the current std.os.posixWrite could be written as (changed the EAGAIN branch, previously was unreachable):

pub fn posixWrite(socket: *Socket, bytes: []const u8) usize {
    const max_bytes_len = 0x7ffff000;

    var index: usize = 0;
    while (index < bytes.len) {
        const amt_to_write = math.min(bytes.len - index, usize(max_bytes_len));
        const rc = posix.write(fd, bytes.ptr + index, amt_to_write);
        const write_err = posix.getErrno(rc);
        switch (write_err) {
            0 => {
                index += rc;
                continue;
            },
            posix.EINTR => continue,
            posix.EAGAIN => {
              var _ = try wait([]WaitArg{ { .PollFD = { .fd = fd, .events=POLLOUT } }});
              continue;
            }
            posix.EINVAL => unreachable,
            posix.EFAULT => unreachable,
            posix.EBADF => unreachable, // always a race condition
            posix.EDESTADDRREQ => unreachable, // connect was never called
            posix.EDQUOT => return PosixWriteError.DiskQuota,
            posix.EFBIG => return PosixWriteError.FileTooBig,
            posix.EIO => return PosixWriteError.InputOutput,
            posix.ENOSPC => return PosixWriteError.NoSpaceLeft,
            posix.EPERM => return PosixWriteError.AccessDenied,
            posix.EPIPE => return PosixWriteError.BrokenPipe,
            else => return unexpectedErrorPosix(write_err),
        }
    }
}
@andrewrk

This comment has been minimized.

Copy link
Member Author

commented Nov 24, 2018

I don't think this code can accomplish the stated goals:

fn resolve(host: []u8) []dnsResults {
    var r = dnsResults.init();
    var t1 = newthread { r.lookupA(host) };
    var t2 = newthread { r.lookupAAAA(host) };
    waitFor(t1, t2);
    return r;
}

I'd need to understand what newthread and waitFor are doing. It looks like they allocate resources, and have the potential for failure, yet the parameters accept no allocator and the return type indicates no error.

If this code were to be used by a target which had no event loop abilities, this function needs to be purely blocking, which means that it does the lookups serially, and there should be no possibility of OutOfMemory error. Zig code cannot use an event loop without one being set up, in userland. There is no event loop runtime set by the language.

In an application that never uses blocking I/O, there should be no
runtime overhead associated with detecting the event loop, and no code
generated at all to do blocking I/O.

If possible this would be good. Though maybe not possible?

I just outlined how this is possible. Do you see a flaw that would be prevent me from implementing it?


It looks like you're advocating for stackful coroutines, while my proposal is built on the premise of stackless coroutines.

I have a strong stance against stackful coroutines. This kind of concurrency is not really better than creating threads, from a performance and scheduling perspective. One may as well use the OS for what it was designed for. But my main issue with it is that it makes parallelism too intentional. We're back to the threading model for concurrency rather than a more powerful abstraction. Stackless coroutines are also known as "continuation passing style". This allows us to have coroutines without heap allocation, or at least with limited heap allocation. In Zig we don't get to allocate memory for free; it comes at the cost of an extra allocator parameter, and a possible OutOfMemory error, neither of which are typically required for blocking calls.

What I have outlined here is a way that will work in Zig, and it even allows expressing "optional concurrency". It's not clear to me what problems you are pointing out in my proposal, or what you are trying to solve with your pseudocode.

I think in order to be convincing here, you'd have to show me a use case of userland code you would want to write, but using my proposal find yourself unable to express it, however with your counter-proposal the use case would be rectified.

@daurnimator

This comment has been minimized.

Copy link
Contributor

commented Nov 25, 2018

I'd need to understand what newthread and waitFor are doing. It looks like they allocate resources, and have the potential for failure, yet the parameters accept no allocator and the return type indicates no error.

If this code were to be used by a target which had no event loop abilities, this function needs to be purely blocking, which means that it does the lookups serially, and there should be no possibility of OutOfMemory error. Zig code cannot use an event loop without one being set up, in userland. There is no event loop runtime set by the language.

I've rewritten that example to use my earlier imaginary library code:

fn resolve(host: []u8) []dnsResults {
    var r = dnsResults.init();

    // create a stack allocated loop
    var loop = eventLoop.init();
    defer loop.destroy();

    // create a stack allocated Jobs
    // need to figure out style for passing a callback + argument(s)
    // for now I'm pretending we have closures with some sort of block syntax.
    var j1 = Job.init({ r.lookupA(host) }, null);
    var j2 = Job.init({ r.lookupAAAA(host) }, null);

    // Add our new jobs to the loop
    loop.addJob(&j1);
    loop.addJob(&j2);

    // run loop until no jobs left.
    // as our loop only has 2 jobs, this is equivalent to waiting for our two jobs
    // 
    // it could throw an error if e.g. epoll_wait() syscall fails.
    // I also would propagate any errors in a job to here.
    // i.e. if a Job returns an error, it should be returned from .loop
    // preferably along with a pointer to the Job that errored.
    try loop.loop();

    return r;
}

I have a strong stance against stackful coroutines. This kind of concurrency is not really better than creating threads, from a performance and scheduling perspective.

I disagree. It's been widely shown that OS threads are too expensive to have e.g. one per HTTP request.

But my main issue with it is that it makes parallelism too intentional.

From the zen of zig: "Communicate intent precisely."

However with my proposal, the user only needs to know about the abstraction when they wish to have things happening in parallel. The rest of the code may remain oblivious.

@daurnimator

This comment has been minimized.

Copy link
Contributor

commented Nov 25, 2018

What I have outlined here is a way that will work in Zig, and it even allows expressing "optional concurrency". It's not clear to me what problems you are pointing out in my proposal, or what you are trying to solve with your pseudocode.

I'm slowly coming around to your proposal.

I'm wondering how it would work for interop with non-zig code:

  • How would a coroutine be exposed to C?
    To call a coroutine from C you'd need a continue function to be exported?
  • all C functions would be marked as not-a-coroutine?

The global io_mode seems a bit weird to me. Especially as zig is often used to create libraries called from other lanaguages.
Could it be a per-function flag?

@roobie

This comment has been minimized.

Copy link

commented Nov 29, 2018

I think using a per-function flag defeats the mitigation of the red/blue color discrepancy of functions. If I understand correctly, the io_mode would be relevant mostly for exes? Since libraries would either:

  • Not implement support for all types of io_mode which any consumer must take into account when choosing, or
  • Implement support for all io_modes making the library usable by any type of application.
@hcff

This comment has been minimized.

Copy link
Contributor

commented Dec 10, 2018

Maybe I'm wrong, but I don't think hiding the asynchronicity of the function is in line with Zig's goals.
Knowing if an operation is async or not is important, because if the event loop is single-threaded, a blocking operation means that nothing else can happen during that operation.

For example, imagine that you're working on an async web application written in Zig, and you're looking at the account creation part.

fn handle_account_creation(login: []const u8, password: []const u8) !void {
    if (try sql("SELECT count(*) FROM users WHERE login = :?", login) != 0) {
        return error.UserAlreadyExist;
    }
    try sql("INSERT INTO users VALUES (:?, ...)", login, ...);
}

If the sql function is async, then this (pseudo-)code has a potential race condition if two account creation requests with the same login happen at the same time.
In that case, the check if the user already exist may pass for both requests, and then you end up with two users with the same login.
If asynchronicity is implicit, then it's hard to know if sql will actually suspend: You have to check its implementation, and if it depends on io_mode, then you have to check that value too.
If it's explicit, then the await is a clear indication that some other code may get run while the function is suspended.

@andrewrk

This comment has been minimized.

Copy link
Member Author

commented Dec 11, 2018

Thanks for the specific example code.

If the sql function is async, then this (pseudo-)code has a potential race condition if two account creation requests with the same login happen at the same time.

Isn't this the same problem is the code is blocking? If you do a thread per request, then handle_account_creation could be called from more than one thread, so you have this race condition either way.

From the perspective of the handle_account_creation, the sql operations are equivalent to using a global variable for a temporary variable, since the sql database is global to multiple calls to handle_account_creation. Although technically they would be OK in a single-threaded, blocking environment, functions which use global state as temporary values should generally be avoided, and when used, should be carefully documented under what conditions it is appropriate to call them.

More bluntly, I would reject this as a valid use case because the code is inherently flawed, and thus I don't think it should really be considered a guiding example for this proposal.

@roobie

This comment has been minimized.

Copy link

commented Dec 16, 2018

Most systems that has to manage external state (SQL, KV-stores etc.) that is distributed in some fashion has this problem, irrespective of if it's single threaded/blocking calls or full async, since it's possible to run more than one instance that is talking to the data APIs. Handling the case where one user might send two requests for being created in the database should be done in the database with e.g. UNIQUE indices and stuff like that, because that's one of the responsibilities of the RDBMS.

An interesting example with regards to this discussion is full async coroutines in Lua that are used in a fully procedural manner.

@roobie

This comment has been minimized.

Copy link

commented Dec 18, 2018

One thing I am uncertain of with regards to the Lua example, is that it is it not obvious how or if it is possible to run the async functions in parallel. I mean, in the example, the two database queries are sequentially executed, whereas, I can think of situations I would like to execute them in parallel, so that they both wait on I/O simultaneously.

Doing a bit of digging in the pgmoon library, it does support parallel execution (from the README):

This method also supports sending multiple queries at once by separating them with a ;. The number of queries executed is returned as a second return value after the result object. When more than one query is executed then the result object changes slightly. It becomes a array table holding all the individual results:

local res, num_queries = pg:query([[
  select id, name from users;
  select id, title from posts
]])
@daurnimator

This comment has been minimized.

Copy link
Contributor

commented Dec 18, 2018

One thing I am uncertain of with regards to the Lua example, is that it is it not obvious how or if it is possible to run the async functions in parallel. I mean, in the example, the two database queries are sequentially executed, whereas, I can think of situations I would like to execute them in parallel, so that they both wait on I/O simultaneously.

In openresty (which is where pgmoon runs), you create a new stackful coroutine with e.g. ngx.timer.at(0, function() ...... end)

Doing a bit of digging in the pgmoon library, it does support parallel execution

I think this is a postgres feature rather than a pgmoon one.

@andrewrk andrewrk modified the milestones: 0.4.0, 0.5.0 Jan 31, 2019

@galvez

This comment has been minimized.

Copy link

commented Feb 12, 2019

Really like the proposal @andrewrk -- am new to Zig but falling in love very quickly :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.