Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wasi: use File.Poll for all blocking FDs in poll_oneoff #1606

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 37 additions & 38 deletions RATIONALE.md
Original file line number Diff line number Diff line change
Expand Up @@ -1376,58 +1376,54 @@ as socket handles.
### Clock Subscriptions

As detailed above in [sys.Nanosleep](#sysnanosleep), `poll_oneoff` handles
relative clock subscriptions. In our implementation we use `sys.Nanosleep()`
for this purpose in most cases, except when polling for interactive input
from `os.Stdin` (see more details below).
relative clock subscriptions. In our implementation we use `sys.Nanosleep()`.

### FdRead and FdWrite Subscriptions

When subscribing a file descriptor (except `Stdin`) for reads or writes,
the implementation will generally return immediately with success, unless
the file descriptor is unknown. The file descriptor is not checked further
for new incoming data. Any timeout is cancelled, and the API call is able
to return, unless there are subscriptions to `Stdin`: these are handled
separately.
to return, unless there are subscriptions to blocking file descriptors:
these are handled separately.

### FdRead and FdWrite Subscription to Stdin
### FdRead and FdWrite Subscription to Blocking File Descriptors

Subscribing `Stdin` for reads (writes make no sense and cause an error),
requires extra care: wazero allows to configure a custom reader for `Stdin`.
Subscribing a file descriptor for reads requires extra care:
wazero allows to plug an entire custom virtual file system,
and it also allows to configure custom readers and writers for standard I/O
descriptors.

In general, if a custom reader is found, the behavior will be the same
as for regular file descriptors: data is assumed to be present and
a success is written back to the result buffer.
In general, if the file reports to be in non-blocking mode,
the behavior will be the same as for regular file descriptors:
data is assumed to be present and a success is written back to the result buffer.

However, if the reader is detected to read from `os.Stdin`,
a special code path is followed, invoking `sysfs.poll()`.
However, if the file is reported to be in blocking mode (the default),
the `fsapi.File.Poll()` method is invoked.

`sysfs.poll()` is a wrapper for `poll(2)` on POSIX systems,
and it is emulated on Windows.
For regular files, stdin, pipes and sockets, `fsapi.File.Poll()`
is a wrapper for `poll(2)` on POSIX systems, and it is emulated on Windows.
Virtual file systems may provide their own custom implementation.

### Poll on POSIX

On POSIX systems, `poll(2)` allows to wait for incoming data on a file
descriptor, and block until either data becomes available or the timeout
expires.

Usage of `syfs.poll()` is currently only reserved for standard input, because

1. it is really only necessary to handle interactive input: otherwise,
there is no way in Go to peek from Standard Input without actually
reading (and thus consuming) from it;

2. if `Stdin` is connected to a pipe, it is ok in most cases to return
with success immediately;

3. `syfs.poll()` is currently a blocking call, irrespective of goroutines,
because the underlying syscall is; thus, it is better to limit its usage.
Usage of `syfs.poll()` is reserved to blocking I/O. In particular,
it is used most often with pipes (such as `os.Stdin`) and TCP sockets.

So, if the subscription is for `os.Stdin` and the handle is detected
to correspond to an interactive session, then `sysfs.poll()` will be
invoked with a the `Stdin` handle *and* the timeout.
invoked with the `Stdin` file descriptor.

This also means that in this specific case, the timeout is uninterruptible,
unless data becomes available on `Stdin` itself.
In order to avoid a blocking call, the underlying `sysfs.poll()` call
is repeatedly invoked with a 0 timeout at given intervals (currently 100 ms,
until the given timeout expires).

The timeout and the tick both honor the settings for `sys.Nanosleep()`.
This also implies that `sys.Nanosleep()` has to be properly configured.

### Select on Windows

Expand Down Expand Up @@ -1457,15 +1453,18 @@ which plays nicely with the rest of the Go runtime.

### Impact of blocking

Because this is a blocking syscall, it will also block the carrier thread of
the goroutine, preventing any means to support context cancellation directly.

There are ways to obviate this issue. We outline here one idea, that is however
not currently implemented. A common approach to support context cancellation is
to add a signal file descriptor to the set, e.g. the read-end of a pipe or an
eventfd on Linux. When the context is canceled, we may unblock a Select call by
writing to the fd, causing it to return immediately. This however requires to
do a bit of housekeeping to hide the "special" FD from the end-user.
Because this is a blocking syscall, invoking it with a nonzero timeout will also
block the carrier thread of the goroutine, preventing any means
to support context cancellation directly.

We obviate this by invoking `poll` with a 0 timeout repeatedly,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand this. why not poll with 100ms vs poll zero+sleep? Are you suggesting that the poll implementation blocks too long even if you put 100ms? If so maybe the above paragraph needs to clarify this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, if we put a 100ms timeout, then the syscall will block for 100ms, which means it will also block the underlying OS thread. I will add a clarification.

at given intervals (currently, 100 ms). We outline here another idea:
a common approach to support context cancellation is to add a signal
file descriptor to the set, e.g. the read-end of a pipe or an
eventfd on Linux. When the context is canceled, we may unblock a Select
call by writing to the fd, causing it to return immediately.
This however requires to do a bit of housekeeping to hide the "special" FD
from the end-user.

[poll_oneoff]: https://github.com/WebAssembly/wasi-poll#why-is-the-function-called-poll_oneoff
[async-io-windows]: https://tinyclouds.org/iocp_links
Expand Down
165 changes: 109 additions & 56 deletions imports/wasi_snapshot_preview1/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
// - sys.ENOTSUP: a parameters is valid, but not yet supported.
// - sys.EFAULT: there is not enough memory to read the subscriptions or
// write results.
// - sys.EINTR: an OS interrupt has occurred while invoking the syscall.
//
// # Notes
//
Expand All @@ -42,11 +43,15 @@ var pollOneoff = newHostFunc(
"in", "out", "nsubscriptions", "result.nevents",
)

type event struct {
type pollEvent struct {
eventType byte
userData []byte
errno wasip1.Errno
outOffset uint32
}

type filePollEvent struct {
f *internalsys.FileEntry
e *pollEvent
}

func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno {
Expand Down Expand Up @@ -86,36 +91,34 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno

// Extract FS context, used in the body of the for loop for FS access.
fsc := mod.(*wasm.ModuleInstance).Sys.FS()
// Slice of events that are processed out of the loop (blocking stdin subscribers).
var blockingStdinSubs []*event
// Slice of events that are processed out of the loop (blocking subscribers).
var blockingSubs []*filePollEvent
// The timeout is initialized at max Duration, the loop will find the minimum.
var timeout time.Duration = 1<<63 - 1
// Count of all the clock subscribers that have been already written back to outBuf.
clockEvents := uint32(0)
// Count of all the non-clock subscribers that have been already written back to outBuf.
readySubs := uint32(0)
// Count of all the subscriptions that have been already written back to outBuf.
// nevents*32 returns at all times the offset where the next event should be written:
// this way we ensure that there are no gaps between records.
nevents := uint32(0)

// Layout is subscription_u: Union
// https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#subscription_u
for i := uint32(0); i < nsubscriptions; i++ {
inOffset := i * 48
outOffset := i * 32
outOffset := nevents * 32

eventType := inBuf[inOffset+8] // +8 past userdata
// +8 past userdata +8 contents_offset
argBuf := inBuf[inOffset+8+8:]
userData := inBuf[inOffset : inOffset+8]

evt := &event{
evt := &pollEvent{
eventType: eventType,
userData: userData,
errno: wasip1.ErrnoSuccess,
outOffset: outOffset,
}

switch eventType {
case wasip1.EventTypeClock: // handle later
clockEvents++
newTimeout, err := processClockEvent(argBuf)
if err != 0 {
return err
Expand All @@ -125,24 +128,25 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno
timeout = newTimeout
}
// Ack the clock event to the outBuf.
writeEvent(outBuf, evt)
writeEvent(outBuf[outOffset:], evt)
nevents++
case wasip1.EventTypeFdRead:
fd := int32(le.Uint32(argBuf))
if fd < 0 {
return sys.EBADF
}
if file, ok := fsc.LookupFile(fd); !ok {
evt.errno = wasip1.ErrnoBadf
writeEvent(outBuf, evt)
readySubs++
continue
} else if fd == internalsys.FdStdin && !file.File.IsNonblock() {
// if the fd is Stdin, and it is in non-blocking mode,
// do not ack yet, append to a slice for delayed evaluation.
blockingStdinSubs = append(blockingStdinSubs, evt)
writeEvent(outBuf[outOffset:], evt)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writeEvents has been simplified, we pass the buffer at the right offset already

nevents++
} else if file.File.IsNonblock() {
writeEvent(outBuf[outOffset:], evt)
nevents++
} else {
writeEvent(outBuf, evt)
readySubs++
// If the fd is blocking, do not ack yet,
// append to a slice for delayed evaluation.
fe := &filePollEvent{f: file, e: evt}
blockingSubs = append(blockingSubs, fe)
Comment on lines +142 to +149
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these have been reordered for clarity; first we avoid the double negation (!IsNonblock()), second the two immediate writes are in the if + else if, while else handles the special case of "delayed" processing.

}
case wasip1.EventTypeFdWrite:
fd := int32(le.Uint32(argBuf))
Expand All @@ -154,47 +158,46 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno
} else {
evt.errno = wasip1.ErrnoBadf
}
readySubs++
writeEvent(outBuf, evt)
nevents++
writeEvent(outBuf[outOffset:], evt)
default:
return sys.EINVAL
}
}

// If there are subscribers with data ready, we have already written them to outBuf,
// and we don't need to wait for the timeout: clear it.
if readySubs != 0 {
timeout = 0
sysCtx := mod.(*wasm.ModuleInstance).Sys
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the section below has been reordered for clarity

if nevents == nsubscriptions {
// We already wrote back all the results. We already wrote this number
// earlier to offset `resultNevents`.
// We only need to observe the timeout (nonzero if there are clock subscriptions)
// and return.
if timeout > 0 {
sysCtx.Nanosleep(int64(timeout))
}
return 0
}

// If nevents != nsubscriptions, then there are blocking subscribers.
// We check these fds once using poll.
n, errno := pollFileEventsOnce(blockingSubs, outBuf[nevents*32:])
if errno != 0 {
return errno
}
nevents += n

// If there are blocking stdin subscribers, check for data with given timeout.
if len(blockingStdinSubs) > 0 {
stdin, ok := fsc.LookupFile(internalsys.FdStdin)
if !ok {
return sys.EBADF
}
// Wait for the timeout to expire, or for some data to become available on Stdin.
stdinReady, errno := stdin.File.Poll(sys.POLLIN, int32(timeout.Milliseconds()))
// If the previous poll returned n == 0 (no data) but the timeout is nonzero
// (i.e. there are clock subscriptions), we poll until either the timeout expires
// or any File.Poll() returns true ("ready"); otherwise we are done.
if n == 0 && timeout > 0 {
n, errno = pollFileEventsUntil(sysCtx, timeout, blockingSubs, outBuf[nevents*32:])
if errno != 0 {
return errno
}
if stdinReady {
// stdin has data ready to for reading, write back all the events
for i := range blockingStdinSubs {
readySubs++
evt := blockingStdinSubs[i]
evt.errno = 0
writeEvent(outBuf, evt)
}
}
} else {
// No subscribers, just wait for the given timeout.
sysCtx := mod.(*wasm.ModuleInstance).Sys
sysCtx.Nanosleep(int64(timeout))
nevents += n
}

if readySubs != nsubscriptions {
if !mod.Memory().WriteUint32Le(resultNevents, readySubs+clockEvents) {
if nevents != nsubscriptions {
if !mod.Memory().WriteUint32Le(resultNevents, nevents) {
return sys.EFAULT
}
}
Expand Down Expand Up @@ -233,10 +236,60 @@ func processClockEvent(inBuf []byte) (time.Duration, sys.Errno) {

// writeEvent writes the event corresponding to the processed subscription.
// https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#-event-struct
func writeEvent(outBuf []byte, evt *event) {
copy(outBuf[evt.outOffset:], evt.userData) // userdata
outBuf[evt.outOffset+8] = byte(evt.errno) // uint16, but safe as < 255
outBuf[evt.outOffset+9] = 0
le.PutUint32(outBuf[evt.outOffset+10:], uint32(evt.eventType))
func writeEvent(outBuf []byte, evt *pollEvent) {
copy(outBuf, evt.userData) // userdata
outBuf[8] = byte(evt.errno) // uint16, but safe as < 255
outBuf[9] = 0
le.PutUint32(outBuf[10:], uint32(evt.eventType))
// TODO: When FD events are supported, write outOffset+16
}

// closeChAfter closes a channel after the given timeout.
// It is similar to time.After but it uses sysCtx.Nanosleep.
func closeChAfter(sysCtx *internalsys.Context, timeout time.Duration, timeoutCh chan struct{}) {
sysCtx.Nanosleep(int64(timeout))
close(timeoutCh)
}

// pollFileEventsOnce invokes Poll on each sys.FileEntry in the given slice
// and writes back the result to outBuf for each file reported "ready";
// i.e., when Poll() returns true, and no error.
func pollFileEventsOnce(evts []*filePollEvent, outBuf []byte) (n uint32, errno sys.Errno) {
// For simplicity, we assume that there are no multiple subscriptions for the same file.
for _, e := range evts {
isReady, errno := e.f.File.Poll(sys.POLLIN, 0)
if errno != 0 {
return 0, errno
}
if isReady {
e.e.errno = 0
writeEvent(outBuf[n*32:], e.e)
n++
}
}
return
}

// pollFileEventsUntil repeatedly invokes pollFileEventsOnce until the given timeout is reached.
// The poll interval is currently fixed at 100 millis.
func pollFileEventsUntil(sysCtx *internalsys.Context, timeout time.Duration, blockingSubs []*filePollEvent, outBuf []byte) (n uint32, errno sys.Errno) {
timeoutCh := make(chan struct{}, 1)
go closeChAfter(sysCtx, timeout, timeoutCh)

pollInterval := 100 * time.Millisecond
ticker := time.NewTicker(pollInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi closeChAfter we are intentionally using the context clock, but this will use a real one..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops!

defer ticker.Stop()

for {
select {
case <-timeoutCh:
// Give one last chance before returning.
return pollFileEventsOnce(blockingSubs, outBuf)
case <-ticker.C:
n, errno = pollFileEventsOnce(blockingSubs, outBuf)
if errno != 0 || n > 0 {
return
}
}
}
}
Loading
Loading