diff --git a/imports/wasi_snapshot_preview1/poll.go b/imports/wasi_snapshot_preview1/poll.go index fa8a58f737..ddd4818945 100644 --- a/imports/wasi_snapshot_preview1/poll.go +++ b/imports/wasi_snapshot_preview1/poll.go @@ -95,10 +95,8 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno var blockingSubs []*filePollEvent // The timeout is initialized at max Duration, the loop will find the minimum. var timeout time.Duration = 1<<63 - 1 - // Count of all the clock subscribers that have been already written back to outBuf. - clockEvents := uint32(0) - // Count of all the non-clock subscribers that have been already written back to outBuf. - readySubs := uint32(0) + // Count of all the subscriptions that have been already written back to outBuf. + nevents := uint32(0) // Layout is subscription_u: Union // https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#subscription_u @@ -112,15 +110,14 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno userData := inBuf[inOffset : inOffset+8] evt := &pollEvent{ + outOffset: outOffset, eventType: eventType, userData: userData, errno: wasip1.ErrnoSuccess, - outOffset: outOffset, } switch eventType { case wasip1.EventTypeClock: // handle later - clockEvents++ newTimeout, err := processClockEvent(argBuf) if err != 0 { return err @@ -131,6 +128,7 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno } // Ack the clock event to the outBuf. writeEvent(outBuf, evt) + nevents++ case wasip1.EventTypeFdRead: fd := int32(le.Uint32(argBuf)) if fd < 0 { @@ -139,7 +137,7 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno if file, ok := fsc.LookupFile(fd); !ok { evt.errno = wasip1.ErrnoBadf writeEvent(outBuf, evt) - readySubs++ + nevents++ } else if !file.File.IsNonblock() { // If the fd is blocking, do not ack yet, // append to a slice for delayed evaluation. @@ -147,7 +145,7 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno blockingSubs = append(blockingSubs, fe) } else { writeEvent(outBuf, evt) - readySubs++ + nevents++ } case wasip1.EventTypeFdWrite: fd := int32(le.Uint32(argBuf)) @@ -159,7 +157,7 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno } else { evt.errno = wasip1.ErrnoBadf } - readySubs++ + nevents++ writeEvent(outBuf, evt) default: return sys.EINVAL @@ -167,33 +165,42 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno } sysCtx := mod.(*wasm.ModuleInstance).Sys + if nevents == nsubscriptions { + // We already wrote back all the results. We already wrote this number + // earlier to offset `resultNevents`. + // We only need to observe the timeout (nonzero if there are clock subscriptions) + // and return. + if timeout > 0 { + sysCtx.Nanosleep(int64(timeout)) + } + return 0 + } - // There are no blocking subscribers, we just wait for the given timeout. - if len(blockingSubs) == 0 { - sysCtx.Nanosleep(int64(timeout)) - } else { - // If there are blocking subscribers, check the fds using poll. - n, errno := pollFileEventsOnce(blockingSubs, outBuf) + // If nevents != nsubscriptions, then there are blocking subscribers. + // We check these fds once using poll. + n, errno := pollFileEventsOnce(blockingSubs, outBuf) + if errno != 0 { + return errno + } + nevents += n + + // If the previous poll returned n == 0 (no data) but the timeout is nonzero + // (i.e. there are clock subscriptions), we poll until either the timeout expires + // or any File.Poll() returns true ("ready"); otherwise we are done. + if n == 0 && timeout > 0 { + n, errno = pollFileEventsUntil(sysCtx, timeout, blockingSubs, outBuf) if errno != 0 { return errno } - readySubs += n - - // If there are any subscribers ready, including those we checked earlier, - // we don't need to poll further; otherwise, poll until the given timeout. - if readySubs == 0 { - readySubs, errno = pollFileEventsUntil(sysCtx, timeout, blockingSubs, outBuf) - if errno != 0 { - return errno - } - } + nevents += n } - if readySubs != nsubscriptions { - if !mod.Memory().WriteUint32Le(resultNevents, readySubs+clockEvents) { + if nevents != nsubscriptions { + if !mod.Memory().WriteUint32Le(resultNevents, nevents) { return sys.EFAULT } } + return 0 } diff --git a/imports/wasi_snapshot_preview1/poll_test.go b/imports/wasi_snapshot_preview1/poll_test.go index 6db7d6926d..233229e1ed 100644 --- a/imports/wasi_snapshot_preview1/poll_test.go +++ b/imports/wasi_snapshot_preview1/poll_test.go @@ -2,12 +2,15 @@ package wasi_snapshot_preview1_test import ( "io/fs" + "net" + "os" "strings" "testing" "time" "github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero/api" + "github.com/tetratelabs/wazero/experimental/sock" experimentalsys "github.com/tetratelabs/wazero/experimental/sys" "github.com/tetratelabs/wazero/internal/sys" "github.com/tetratelabs/wazero/internal/testing/require" @@ -401,6 +404,295 @@ func setStdin(t *testing.T, mod api.Module, stdin experimentalsys.File) { f.File = stdin } +func Test_pollOneoff_Mixed(t *testing.T) { + // Test stdin (pipes) mixed with sockets. + + const listenFd = 3 + const acceptFd = 4 + + type addr interface { + Addr() *net.TCPAddr + } + + tests := []struct { + name string + skip bool + in, out, nsubscriptions, resultNevents uint32 + connected, nonblocking bool + mem []byte // at offset in + files []experimentalsys.File + expectedErrno wasip1.Errno + expectedMem []byte // at offset out + expectedLog string + expectedNevents uint32 + }{ + { + name: "Read from sock (not connected)", + nsubscriptions: 1, + expectedNevents: 0, + mem: fdReadSubFd(listenFd), // assume sock at fd 3 + expectedErrno: wasip1.ErrnoSuccess, + out: 128, // past in + resultNevents: 512, // past out + expectedMem: []byte{ + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + + '?', // stopped after encoding + }, + expectedLog: ` +==> wasi_snapshot_preview1.poll_oneoff(in=0,out=128,nsubscriptions=1) +<== (nevents=0,errno=ESUCCESS) +`, + }, + { + name: "Read from sock (connected)", + connected: true, + nsubscriptions: 2, + expectedNevents: 1, + mem: append(fdReadSubFd(listenFd), fdReadSubFd(acceptFd)...), // assume sock at fd 3 + expectedErrno: wasip1.ErrnoSuccess, + out: 128, // past in + resultNevents: 512, // past out + expectedMem: []byte{ + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata + byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit + wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, + + '?', // stopped after encoding + }, + expectedLog: ` +==> wasi_snapshot_preview1.poll_oneoff(in=0,out=128,nsubscriptions=2) +<== (nevents=1,errno=ESUCCESS) +`, + }, + + { + name: "Read from sock (connected+nonblocking)", + connected: true, + nonblocking: true, + nsubscriptions: 2, + expectedNevents: 2, + mem: append(fdReadSubFd(listenFd), fdReadSubFd(acceptFd)...), // assume sock at fd 3 + expectedErrno: wasip1.ErrnoSuccess, + out: 128, // past in + resultNevents: 512, // past out + expectedMem: []byte{ + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata + byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit + wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, + + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata + byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit + wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, + + '?', // stopped after encoding + }, + expectedLog: ` +==> wasi_snapshot_preview1.poll_oneoff(in=0,out=128,nsubscriptions=2) +<== (nevents=2,errno=ESUCCESS) +`, + }, + + { + name: "Read from sock (not connected) and stdin", + nsubscriptions: 2, + expectedNevents: 1, + mem: append(fdReadSubFd(listenFd), fdReadSub...), // assume sock at fd 3 + expectedErrno: wasip1.ErrnoSuccess, + out: 128, // past in + resultNevents: 512, // past out + expectedMem: []byte{ + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata + byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit + wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, + + '?', // stopped after encoding + }, + expectedLog: ` +==> wasi_snapshot_preview1.poll_oneoff(in=0,out=128,nsubscriptions=2) +<== (nevents=1,errno=ESUCCESS) +`, + }, + + { + name: "Read from sock (connected) and stdin (ready)", + connected: true, + nsubscriptions: 3, + expectedNevents: 2, + mem: append(append(fdReadSubFd(listenFd), fdReadSubFd(acceptFd)...), fdReadSub...), // assume sock at fd 3 + expectedErrno: wasip1.ErrnoSuccess, + out: 128, // past in + resultNevents: 512, // past out + expectedMem: []byte{ + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata + byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit + wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, + + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata + byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit + wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, + + '?', // stopped after encoding + }, + expectedLog: ` +==> wasi_snapshot_preview1.poll_oneoff(in=0,out=128,nsubscriptions=3) +<== (nevents=2,errno=ESUCCESS) +`, + }, + { + name: "Read from sock (connected+nonblocking) and stdin (ready)", + connected: true, + nonblocking: true, + nsubscriptions: 3, + expectedNevents: 3, + mem: append(append(fdReadSubFd(listenFd), fdReadSubFd(acceptFd)...), fdReadSub...), // assume sock at fd 3 + expectedErrno: wasip1.ErrnoSuccess, + out: 128, // past in + resultNevents: 512, // past out + expectedMem: []byte{ + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata + byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit + wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, + + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata + byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit + wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, + + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata + byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit + wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, + + '?', // stopped after encoding + }, + expectedLog: ` +==> wasi_snapshot_preview1.poll_oneoff(in=0,out=128,nsubscriptions=3) +<== (nevents=3,errno=ESUCCESS) +`, + }, + } + + for _, tt := range tests { + tc := tt + + t.Run(tc.name, func(t *testing.T) { + if tc.skip { + t.Skip() + } + ctx := sock.WithConfig(testCtx, + sock.NewConfig().WithTCPListener("127.0.0.1", 0)) + + stdinReader, stdinWriter, err := os.Pipe() + require.NoError(t, err) + defer stdinReader.Close() + defer stdinWriter.Close() + + mod, r, log := requireProxyModuleWithContext(ctx, t, wazero.NewModuleConfig().WithStdin(stdinReader)) + _, _ = stdinWriter.Write([]byte("wazero")) + + defer r.Close(ctx) + defer log.Reset() + + maskMemory(t, mod, 1024) + if tc.mem != nil { + mod.Memory().Write(tc.in, tc.mem) + } + + if tc.connected { + fsc := mod.(*wasm.ModuleInstance).Sys.FS() + ch := make(chan struct{}, 1) + file, _ := fsc.LookupFile(listenFd) + if tc.nonblocking { + _ = file.File.SetNonblock(true) + } + + go func() { + for { + _, errno := fsc.SockAccept(listenFd, false) + if errno == experimentalsys.EAGAIN { + continue + } + require.EqualErrno(t, 0, errno) + close(ch) + return + } + }() + + // Wait for the socket to accept. + sleepALittle() + + addr := file.File.(addr) + c, err := net.DialTCP("tcp", nil, addr.Addr()) + + <-ch + + require.NoError(t, err) + _, _ = c.Write([]byte("wazero")) + } + + requireErrnoResult(t, tc.expectedErrno, mod, wasip1.PollOneoffName, uint64(tc.in), uint64(tc.out), + uint64(tc.nsubscriptions), uint64(tc.resultNevents)) + require.Equal(t, tc.expectedLog, "\n"+log.String()) + + out, ok := mod.Memory().Read(tc.out, uint32(len(tc.expectedMem))) + require.True(t, ok) + require.Equal(t, tc.expectedMem, out) + + // Events should be written on success regardless of nested failure. + if tc.expectedErrno == wasip1.ErrnoSuccess { + nevents, ok := mod.Memory().ReadUint32Le(tc.resultNevents) + require.True(t, ok) + require.Equal(t, tc.expectedNevents, nevents) + _ = nevents + } + }) + } +} + func Test_pollOneoff_Zero(t *testing.T) { poller := &pollStdinFile{StdinFile: sys.StdinFile{Reader: strings.NewReader("test")}, ready: true} @@ -522,6 +814,10 @@ func fdReadSubFd(fd byte) []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, fd, 0x0, 0x0, 0x0, // valid readable FD + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, } } diff --git a/imports/wasi_snapshot_preview1/testdata/zig-cc/wasi.c b/imports/wasi_snapshot_preview1/testdata/zig-cc/wasi.c index 818b8c330a..f8afbc743c 100644 --- a/imports/wasi_snapshot_preview1/testdata/zig-cc/wasi.c +++ b/imports/wasi_snapshot_preview1/testdata/zig-cc/wasi.c @@ -54,7 +54,7 @@ void main_poll(int timeout, int millis) { tv.tv_usec = millis*1000; ret = select(1, &rfds, NULL, NULL, &tv); if ((ret > 0) && FD_ISSET(0, &rfds)) { - printf("STDIN\n"); + printf("STDIN\n", ret); } else { printf("NOINPUT\n"); } @@ -121,7 +121,7 @@ void main_open_wronly() { unlink(path); } -void main_sock() { +void main_sock_mixed(bool checkStdin) { // Get a listener from the pre-opened file descriptor. // The listener is the first pre-open, with a file-descriptor of 3. int listener_fd = 3; @@ -148,7 +148,13 @@ void main_sock() { struct timeval tv = {1, 0}; fd_set set; FD_ZERO(&set); - FD_SET(nfd, &set); + if (checkStdin) { + FD_SET(0, &set); + FD_SET(nfd, &set); + FD_SET(listener_fd, &set); + } else { + FD_SET(nfd, &set); + } int ret = select(nfd+1, &set, NULL, NULL, &tv); // If some data is available, read it. @@ -162,6 +168,14 @@ void main_sock() { } } +void main_sock() { + main_sock_mixed(false); +} + +void main_mixed() { + main_sock_mixed(true); +} + void main_nonblock(char* fpath) { struct timespec tim, tim2; tim.tv_sec = 0; @@ -212,6 +226,8 @@ int main(int argc, char** argv) { main_open_wronly(); } else if (strcmp(argv[1],"sock")==0) { main_sock(); + } else if (strcmp(argv[1],"mixed")==0) { + main_mixed(); } else if (strcmp(argv[1],"nonblock")==0) { main_nonblock(argv[2]); } else { diff --git a/imports/wasi_snapshot_preview1/testdata/zig-cc/wasi.wasm b/imports/wasi_snapshot_preview1/testdata/zig-cc/wasi.wasm index 255a198f09..e75963d132 100755 Binary files a/imports/wasi_snapshot_preview1/testdata/zig-cc/wasi.wasm and b/imports/wasi_snapshot_preview1/testdata/zig-cc/wasi.wasm differ diff --git a/imports/wasi_snapshot_preview1/wasi_stdlib_test.go b/imports/wasi_snapshot_preview1/wasi_stdlib_test.go index c4f7979523..6b17706504 100644 --- a/imports/wasi_snapshot_preview1/wasi_stdlib_test.go +++ b/imports/wasi_snapshot_preview1/wasi_stdlib_test.go @@ -635,3 +635,55 @@ func testLargeStdout(t *testing.T, tname string, bin []byte) { require.NoError(t, err, string(output)) } } + +func Test_Mixed(t *testing.T) { + toolchains := map[string][]byte{ + // TODO: "cargo-wasi": wasmCargoWasi, + "zig-cc": wasmZigCc, + } + + for toolchain, bin := range toolchains { + toolchain := toolchain + bin := bin + t.Run(toolchain, func(t *testing.T) { + testMixed(t, bin) + }) + } +} + +func testMixed(t *testing.T, bin []byte) { + // This is identical to testSock, except we also hook a pipe to stdin + // We expect poll_oneoff to be invoked successfully. + sockCfg := experimentalsock.NewConfig().WithTCPListener("127.0.0.1", 0) + ctx := experimentalsock.WithConfig(testCtx, sockCfg) + r, w, err := os.Pipe() + require.NoError(t, err) + defer r.Close() + defer w.Close() + _, err = w.Write([]byte("wazero")) + require.NoError(t, err) + moduleConfig := wazero.NewModuleConfig().WithArgs("wasi", "mixed").WithSysNanosleep().WithStdin(r) + tcpAddrCh := make(chan *net.TCPAddr, 1) + ch := make(chan string, 1) + go func() { + ch <- compileAndRunWithPreStart(t, ctx, moduleConfig, bin, func(t *testing.T, mod api.Module) { + tcpAddrCh <- requireTCPListenerAddr(t, mod) + }) + }() + tcpAddr := <-tcpAddrCh + + // Give a little time for _start to complete + sleepALittle() + + // Now dial to the initial address, which should be now held by wazero. + conn, err := net.Dial("tcp", tcpAddr.String()) + require.NoError(t, err) + defer conn.Close() + + n, err := conn.Write([]byte("wazero")) + console := <-ch + require.NotEqual(t, 0, n) + require.NoError(t, err) + // Nonblocking connections may contain error logging, we ignore those. + require.Equal(t, "wazero\n", console[len(console)-7:]) +} diff --git a/internal/sysfs/sock_unix.go b/internal/sysfs/sock_unix.go index 29b9416352..1bc2803f1c 100644 --- a/internal/sysfs/sock_unix.go +++ b/internal/sysfs/sock_unix.go @@ -41,8 +41,9 @@ var _ socketapi.TCPSock = (*tcpListenerFile)(nil) type tcpListenerFile struct { baseSockFile - fd uintptr - addr *net.TCPAddr + fd uintptr + addr *net.TCPAddr + nonblock bool } // Accept implements the same method as documented on socketapi.TCPSock @@ -62,9 +63,15 @@ func (f *tcpListenerFile) Poll(flag sys.Pflag, timeoutMillis int32) (ready bool, // SetNonblock implements the same method as documented on sys.File func (f *tcpListenerFile) SetNonblock(enabled bool) sys.Errno { + f.nonblock = enabled return sys.UnwrapOSError(setNonblock(f.fd, enabled)) } +// IsNonblock implements the same method as documented on sys.File +func (f *tcpListenerFile) IsNonblock() bool { + return f.nonblock +} + // Close implements the same method as documented on sys.File func (f *tcpListenerFile) Close() sys.Errno { return sys.UnwrapOSError(syscall.Close(int(f.fd))) @@ -80,7 +87,8 @@ var _ socketapi.TCPConn = (*tcpConnFile)(nil) type tcpConnFile struct { baseSockFile - fd uintptr + fd uintptr + nonblock bool // closed is true when closed was called. This ensures proper sys.EBADF closed bool @@ -96,9 +104,15 @@ func newTcpConn(tc *net.TCPConn) socketapi.TCPConn { // SetNonblock implements the same method as documented on sys.File func (f *tcpConnFile) SetNonblock(enabled bool) (errno sys.Errno) { + f.nonblock = enabled return sys.UnwrapOSError(setNonblock(f.fd, enabled)) } +// IsNonblock implements the same method as documented on sys.File +func (f *tcpConnFile) IsNonblock() bool { + return f.nonblock +} + // Poll implements the same method as documented on sys.File func (f *tcpConnFile) Poll(flag sys.Pflag, timeoutMillis int32) (ready bool, errno sys.Errno) { return poll(f.fd, flag, timeoutMillis) diff --git a/internal/sysfs/sock_windows.go b/internal/sysfs/sock_windows.go index 716fe4b737..3c5a1b9339 100644 --- a/internal/sysfs/sock_windows.go +++ b/internal/sysfs/sock_windows.go @@ -100,7 +100,6 @@ func syscallConnControl(conn syscall.Conn, fn func(fd uintptr) (int, sys.Errno)) // because they are sensibly different from Unix's. func newTCPListenerFile(tl *net.TCPListener) socketapi.TCPSock { w := &winTcpListenerFile{tl: tl} - _ = w.SetNonblock(true) return w } @@ -117,8 +116,10 @@ type winTcpListenerFile struct { // Accept implements the same method as documented on socketapi.TCPSock func (f *winTcpListenerFile) Accept() (socketapi.TCPConn, sys.Errno) { // Ensure we have an incoming connection using winsock_select, otherwise return immediately. - if ready, errno := f.Poll(sys.POLLIN, 0); !ready || errno != 0 { - return nil, sys.EAGAIN + if f.nonblock { + if ready, errno := f.Poll(sys.POLLIN, 0); !ready || errno != 0 { + return nil, sys.EAGAIN + } } // Accept normally blocks goroutines, but we