Skip to content

Commit

Permalink
Merge pull request #43 from simonferquel/deadlock-on-concurrent-rw
Browse files Browse the repository at this point in the history
Flush outgoing content before sending EOF message
  • Loading branch information
John Howard authored Apr 26, 2017
2 parents fff283a + 7c7d6b4 commit f3b1913
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 1 deletion.
4 changes: 4 additions & 0 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,7 @@ func (f *win32File) SetWriteDeadline(t time.Time) error {
f.writeDeadline = t
return nil
}

func (f *win32File) Flush() error {
return syscall.FlushFileBuffers(f.handle)
}
6 changes: 5 additions & 1 deletion pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ func (f *win32MessageBytePipe) CloseWrite() error {
if f.writeClosed {
return errPipeWriteClosed
}
_, err := f.win32File.Write(nil)
err := f.win32File.Flush()
if err != nil {
return err
}
_, err = f.win32File.Write(nil)
if err != nil {
return err
}
Expand Down
65 changes: 65 additions & 0 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,68 @@ func TestDialTimesOutByDefault(t *testing.T) {
t.Fatalf("expected ErrTimeout, got %v", err)
}
}

type CloseWriter interface {
CloseWrite() error
}

func TestEchoWithMessaging(t *testing.T) {
c := PipeConfig{
MessageMode: true, // Use message mode so that CloseWrite() is supported
InputBufferSize: 65536, // Use 64KB buffers to improve performance
OutputBufferSize: 65536,
}
l, err := ListenPipe(testPipeName, &c)

if err != nil {
t.Fatal(err)
}
defer l.Close()
listenerDone := make(chan bool)
clientDone := make(chan bool)
go func() {
// server echo
conn, e := l.Accept()
if e != nil {
t.Fatal(e)
}
time.Sleep(500 * time.Millisecond) // make *sure* we don't begin to read before eof signal is sent
io.Copy(conn, conn)
conn.(CloseWriter).CloseWrite()
close(listenerDone)
}()
timeout := 1 * time.Second
client, err := DialPipe(testPipeName, &timeout)
if err != nil {
t.Fatal(err)
}
defer client.Close()

go func() {
// client read back
bytes := make([]byte, 2)
n, e := client.Read(bytes)
if e != nil {
t.Fatal(e)
}
if n != 2 {
t.Fatalf("expected 2 bytes, got %v", n)
}
close(clientDone)
}()

payload := make([]byte, 2)
payload[0] = 0
payload[1] = 1

n, err := client.Write(payload)
if err != nil {
t.Fatal(err)
}
if n != 2 {
t.Fatalf("expected 2 bytes, got %v", n)
}
client.(CloseWriter).CloseWrite()
<-listenerDone
<-clientDone
}

0 comments on commit f3b1913

Please sign in to comment.