Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broken pipe if session is closed "early" #23

Closed
linde12 opened this issue Jul 1, 2017 · 7 comments
Closed

broken pipe if session is closed "early" #23

linde12 opened this issue Jul 1, 2017 · 7 comments

Comments

@linde12
Copy link

linde12 commented Jul 1, 2017

Hello!

EDIT: See test case in next comment. I also tested with yamux and it's the same, which makes me more unsure whether it's a bug or not. If not, please point me in the right direction 👍

I noticed that once my server sent it's data and closed the session, i would sometimes get a broken pipe on the client which would cause data loss.

What happens is that the server sends file of ~300kb and immedately closes the stream, returns and closes the session. The client receives the file, but the s.die channel in stream.go's Read method is called before the internal stream buffer has been emptied.

I added some print to the <-s.die case and there was still data in the buffer (see code block at the bottom)

Is this by design? If so, how should i work around it? I would expect the internal buffer of a stream to be emptied before and not get a "broken pipe" error, but there seems to be some race?

I even verified with wireshark that all bytes are sent over the network and received by the client, but my data is still corrupted sometimes due to this.

READ:
	select {
	case <-s.die:
		fmt.Println("smux: read broken pipe")
		s.bufferLock.Lock()
		n, err = s.buffer.Read(b)
		fmt.Printf("smux: buffer still had %v bytes to be read\n", n)
		s.bufferLock.Unlock()
		return 0, errors.New(errBrokenPipe)
	case <-deadline:
		return n, errTimeout
	default:
	}
  ...
@linde12
Copy link
Author

linde12 commented Jul 1, 2017

I wrote a test for this. And if you add my modification(see code block above) to stream.go you will see that there is still data in the internal buffer to be read.

I replace everything in session.go with this temporarily, and then i ran go test -test.v -run TestRace many times. Sometimes it passes, sometimes it fails.

Here is the code:

package smux

import (
	"io"
	"log"
	"math/rand"
	"net"
	"net/http"
	"testing"
)

func init() {
	go func() {
		log.Println(http.ListenAndServe("localhost:6060", nil))
	}()
	log.SetFlags(log.LstdFlags | log.Lshortfile)
	ln, err := net.Listen("tcp", "127.0.0.1:19999")
	if err != nil {
		// handle error
		panic(err)
	}
	go func() {
		for {
			conn, err := ln.Accept()
			if err != nil {
				// handle error
			}
			go handleConnection(conn)
		}
	}()
}

func handleConnection(conn net.Conn) {
	session, _ := Server(conn, nil)
	for {
		if stream, err := session.AcceptStream(); err == nil {
			go func(s io.ReadWriteCloser) {
				buf := make([]byte, 50000)
				read := 0
				for read != len(buf) {
					n, err := s.Read(buf[read:])
					if err != nil {
						log.Fatalf("server read err: %v\n", err)
					}
					read += n
				}
				s.Write(buf)
				// Server closes stream properly
				s.Close()
				// And immedately after closes the session
				// which still causes the race to happen
				session.Close()
			}(stream)
		} else {
			return
		}
	}
}

func TestRace(t *testing.T) {
	cli, err := net.Dial("tcp", "127.0.0.1:19999")
	if err != nil {
		t.Fatal(err)
	}
	session, _ := Client(cli, nil)
	stream, _ := session.OpenStream()
	data := make([]byte, 50000)
	for i := range data {
		data[i] = byte(rand.Int())
	}
	if _, err = stream.Write(data); err != nil {
		t.Fatalf("write to stream failed: %v\n", err)
	}

	buf := make([]byte, len(data))
	read := 0
	for read != len(buf) {
		if n, err := stream.Read(buf[read:]); err == nil {
			read += n
		} else {
			log.Fatalf("Read error: %v\n", err)
		}
	}

	if string(buf) != string(data) {
		t.Fatal("Buf and data are not eq\n")
	}
}

@xtaci
Copy link
Owner

xtaci commented Jul 1, 2017

https://github.com/xtaci/smux/blob/master/stream.go#L55

are your referring to this ?

AFAIK,the behavior of reading after close is not specified in net.Conn.

also , it's completly ok to remove this select.

@linde12
Copy link
Author

linde12 commented Jul 1, 2017

Yes, exactly. I would expect that the internal buffer of stream would have to be completely read(via stream.Read) by a consumer before it says that the pipe is broken. On the server session.Close() was run after stream.Write(...) so i would expect to receive all data from the write operation before receiving the "broken pipe" message.

With a normal net.Conn, if you do this

buf := make([]byte, LEN)
conn.Write(buf)
conn.Close()

the client will first receive 50.000 bytes and then the connection will be closed.

But with smux it becomes a race because of the multiplexing. The client might receive some data, and then maybe a signal saying the session was closed before the rest of the data has been read/consumed.

xtaci added a commit that referenced this issue Jul 1, 2017
@xtaci
Copy link
Owner

xtaci commented Jul 1, 2017

try the commit pls

@linde12
Copy link
Author

linde12 commented Jul 1, 2017

Works like a charm for me!

@xtaci
Copy link
Owner

xtaci commented Jul 1, 2017

happy to hear

@linde12
Copy link
Author

linde12 commented Jul 1, 2017

Thanks for your effort, closing! 🥇

@linde12 linde12 closed this as completed Jul 1, 2017
ccsexyz pushed a commit to ccsexyz/smux that referenced this issue Oct 9, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants