From e9dfcc4ee9ea57cc1e3bb84b6c95cf6084fd12f6 Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Fri, 17 May 2019 09:00:53 -0400 Subject: [PATCH 01/18] Improve Close UX Closes #78 --- websocket.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/websocket.go b/websocket.go index 912508d5..c0737cad 100644 --- a/websocket.go +++ b/websocket.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "os" "runtime" "sync" "sync/atomic" @@ -353,6 +354,12 @@ func (c *Conn) writePong(p []byte) error { // Close closes the WebSocket connection with the given status code and reason. // It will write a WebSocket close frame with a timeout of 5 seconds. +// The connection can only be closed once. Additional calls to Close +// are no-ops. +// The maximum length of reason must be 125 bytes otherwise an internal +// error will be sent to the peer. For this reason, you should avoid +// sending a dynamic reason. +// Close will unblock all goroutines interacting with the connection. func (c *Conn) Close(code StatusCode, reason string) error { err := c.exportedClose(code, reason) if err != nil { @@ -372,17 +379,14 @@ func (c *Conn) exportedClose(code StatusCode, reason string) error { // Definitely worth seeing what popular browsers do later. p, err := ce.bytes() if err != nil { + fmt.Fprintf(os.Stderr, "failed to marshal close frame: %v\n", err) ce = CloseError{ Code: StatusInternalError, } p, _ = ce.bytes() } - cerr := c.writeClose(p, ce) - if err != nil { - return err - } - return cerr + return c.writeClose(p, ce) } func (c *Conn) writeClose(p []byte, cerr CloseError) error { From 4b724aec198eb8f06c50806dc61d52639d30b0a0 Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Wed, 29 May 2019 18:11:22 -0400 Subject: [PATCH 02/18] Rewrite websocket.go to remove writeLoop and simplify readLoop Closes #79 Closes #73 --- accept.go | 3 + ci/lint/entrypoint.sh | 2 +- export_test.go | 2 +- websocket.go | 573 +++++++++++++++++++++--------------------- 4 files changed, 297 insertions(+), 283 deletions(-) diff --git a/accept.go b/accept.go index 2cf1dc01..207ecc74 100644 --- a/accept.go +++ b/accept.go @@ -75,6 +75,7 @@ func verifyClientRequest(w http.ResponseWriter, r *http.Request) error { // Accept accepts a WebSocket handshake from a client and upgrades the // the connection to WebSocket. +// // Accept will reject the handshake if the Origin domain is not the same as the Host unless // the InsecureSkipVerify option is set. func Accept(w http.ResponseWriter, r *http.Request, opts AcceptOptions) (*Conn, error) { @@ -132,6 +133,8 @@ func accept(w http.ResponseWriter, r *http.Request, opts AcceptOptions) (*Conn, closer: netConn, } c.init() + // TODO document. + c.Context(r.Context()) return c, nil } diff --git a/ci/lint/entrypoint.sh b/ci/lint/entrypoint.sh index c539495e..09c31683 100755 --- a/ci/lint/entrypoint.sh +++ b/ci/lint/entrypoint.sh @@ -7,5 +7,5 @@ source ci/lib.sh || exit 1 shellcheck ./**/*.sh ) -go vet -composites=false ./... +go vet -composites=false -lostcancel=false ./... go run golang.org/x/lint/golint -set_exit_status ./... diff --git a/export_test.go b/export_test.go index d180e119..465ba9eb 100644 --- a/export_test.go +++ b/export_test.go @@ -14,5 +14,5 @@ import ( // exceeds the buffer size which is 4096 right now as then an extra syscall // will be necessary to complete the message. func (c *Conn) Write(ctx context.Context, typ MessageType, p []byte) error { - return c.writeSingleFrame(ctx, opcode(typ), p) + return c.writeCompleteMessage(ctx, opcode(typ), p) } diff --git a/websocket.go b/websocket.go index c0737cad..6e35281a 100644 --- a/websocket.go +++ b/websocket.go @@ -8,17 +8,11 @@ import ( "os" "runtime" "sync" - "sync/atomic" "time" "golang.org/x/xerrors" ) -type frame struct { - opcode opcode - payload []byte -} - // Conn represents a WebSocket connection. // All methods except Reader can be used concurrently. // Please be sure to call Close on the connection when you @@ -34,31 +28,41 @@ type Conn struct { closeErr error closed chan struct{} - // Writers should send on write to begin sending - // a message and then follow that up with some data - // on writeBytes. - // Send on control to write a control message. - // writeDone will be sent back when the message is written - // Send on writeFlush to flush the message and wait for a - // ping on writeDone. - // writeDone will be closed if the data message write errors. - write chan MessageType - control chan frame - fastWrite chan frame - writeBytes chan []byte - writeDone chan struct{} - writeFlush chan struct{} - - // Readers should receive on read to begin reading a message. - // Then send a byte slice to readBytes to read into it. - // The n of bytes read will be sent on readDone once the read into a slice is complete. - // readDone will be closed if the read fails. - // activeReader will be set to 0 on io.EOF. - activeReader int64 - inMsg bool - read chan opcode - readBytes chan []byte - readDone chan int + writeDataLock chan struct{} + writeFrameLock chan struct{} + + readData chan header + readDone chan struct{} + + setReadTimeout chan context.Context + setWriteTimeout chan context.Context + setConnContext chan context.Context + getConnContext chan context.Context +} + +// Context returns a context derived from parent that will be cancelled +// when the connection is closed. +// If the parent context is cancelled, the connection will be closed. +// +// This is an experimental API meaning it may be remove in the future. +// Please let me know how you feel about it. +func (c *Conn) Context(parent context.Context) context.Context { + select { + case <-c.closed: + ctx, cancel := context.WithCancel(parent) + cancel() + return ctx + case c.setConnContext <- parent: + } + + select { + case <-c.closed: + ctx, cancel := context.WithCancel(parent) + cancel() + return ctx + case ctx := <-c.getConnContext: + return ctx + } } func (c *Conn) close(err error) { @@ -85,124 +89,110 @@ func (c *Conn) Subprotocol() string { func (c *Conn) init() { c.closed = make(chan struct{}) - c.write = make(chan MessageType) - c.control = make(chan frame) - c.fastWrite = make(chan frame) - c.writeBytes = make(chan []byte) - c.writeDone = make(chan struct{}) - c.writeFlush = make(chan struct{}) + c.writeDataLock = make(chan struct{}, 1) + c.writeFrameLock = make(chan struct{}, 1) + + c.readData = make(chan header) + c.readDone = make(chan struct{}) - c.read = make(chan opcode) - c.readBytes = make(chan []byte) - c.readDone = make(chan int) + c.setReadTimeout = make(chan context.Context) + c.setWriteTimeout = make(chan context.Context) + c.setConnContext = make(chan context.Context) + c.getConnContext = make(chan context.Context) runtime.SetFinalizer(c, func(c *Conn) { c.close(xerrors.New("connection garbage collected")) }) - go c.writeLoop() + go c.timeoutLoop() go c.readLoop() } // We never mask inside here because our mask key is always 0,0,0,0. // See comment on secWebSocketKey. -func (c *Conn) writeFrame(h header, p []byte) { +func (c *Conn) writeFrame(ctx context.Context, h header, p []byte) error { + err := c.acquireLock(ctx, c.writeFrameLock) + if err != nil { + return err + } + defer c.releaseLock(c.writeFrameLock) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.closed: + return c.closeErr + case c.setWriteTimeout <- ctx: + } + defer func() { + // We have to remove the write timeout, even if ctx is cancelled. + select { + case <-c.closed: + return + case c.setWriteTimeout <- context.Background(): + } + }() + + h.masked = c.client + h.payloadLength = int64(len(p)) + b2 := marshalHeader(h) - _, err := c.bw.Write(b2) + _, err = c.bw.Write(b2) if err != nil { c.close(xerrors.Errorf("failed to write to connection: %w", err)) - return + return c.closeErr } - _, err = c.bw.Write(p) if err != nil { c.close(xerrors.Errorf("failed to write to connection: %w", err)) - return + return c.closeErr + } if h.fin { err := c.bw.Flush() if err != nil { c.close(xerrors.Errorf("failed to write to connection: %w", err)) - return + return c.closeErr } } -} -func (c *Conn) writeLoopFastWrite(frame frame) { - h := header{ - fin: true, - opcode: frame.opcode, - payloadLength: int64(len(frame.payload)), - masked: c.client, - } - c.writeFrame(h, frame.payload) - select { - case <-c.closed: - case c.writeDone <- struct{}{}: - } + return nil } -func (c *Conn) writeLoop() { - defer close(c.writeDone) +func (c *Conn) timeoutLoop() { + readCtx := context.Background() + writeCtx := context.Background() + parentCtx := context.Background() + cancelCtx := func() {} + defer func() { + // We do not defer cancelCtx because its value can change. + cancelCtx() + }() -messageLoop: for { - var dataType MessageType select { case <-c.closed: return - case control := <-c.control: - c.writeLoopFastWrite(control) - continue - case frame := <-c.fastWrite: - c.writeLoopFastWrite(frame) - continue - case dataType = <-c.write: - } - - var firstSent bool - for { + case readCtx = <-c.setWriteTimeout: + case writeCtx = <-c.setReadTimeout: + case <-readCtx.Done(): + c.close(xerrors.Errorf("data read timed out: %w", readCtx.Err())) + case <-writeCtx.Done(): + c.close(xerrors.Errorf("data write timed out: %w", writeCtx.Err())) + case <-parentCtx.Done(): + c.close(xerrors.Errorf("parent context cancelled: %w", parentCtx.Err())) + return + case parentCtx = <-c.setConnContext: + var ctx context.Context + ctx, cancelCtx = context.WithCancel(parentCtx) select { case <-c.closed: return - case control := <-c.control: - c.writeLoopFastWrite(control) - case b := <-c.writeBytes: - h := header{ - fin: false, - opcode: opcode(dataType), - payloadLength: int64(len(b)), - masked: c.client, - } - - if firstSent { - h.opcode = opContinuation - } - firstSent = true - - c.writeFrame(h, b) - - select { - case <-c.closed: - return - case c.writeDone <- struct{}{}: - } - case <-c.writeFlush: - h := header{ - fin: true, - opcode: opcode(dataType), - payloadLength: 0, - masked: c.client, - } - - if firstSent { - h.opcode = opContinuation - } - - c.writeFrame(h, nil) - - continue messageLoop + case <-parentCtx.Done(): + c.close(xerrors.Errorf("parent context cancelled: %w", parentCtx.Err())) + return + case c.getConnContext <- ctx: } } } @@ -250,19 +240,20 @@ func (c *Conn) handleControl(h header) { } } -func (c *Conn) readLoop() { - defer close(c.readDone) - +func (c *Conn) readTillData() (header, error) { for { h, err := readHeader(c.br) if err != nil { - c.close(xerrors.Errorf("failed to read header: %w", err)) - return + return header{}, xerrors.Errorf("failed to read header: %w", err) } if h.rsv1 || h.rsv2 || h.rsv3 { - c.Close(StatusProtocolError, fmt.Sprintf("received header with rsv bits set: %v:%v:%v", h.rsv1, h.rsv2, h.rsv3)) - return + ce := CloseError{ + Code: StatusProtocolError, + Reason: fmt.Sprintf("received header with rsv bits set: %v:%v:%v", h.rsv1, h.rsv2, h.rsv3), + } + c.Close(ce.Code, ce.Reason) + return header{}, ce } if h.opcode.controlOp() { @@ -271,84 +262,46 @@ func (c *Conn) readLoop() { } switch h.opcode { - case opBinary, opText: - if c.inMsg { - c.Close(StatusProtocolError, "cannot read new data frame when previous frame is not finished") - return - } - - select { - case <-c.closed: - return - case c.read <- h.opcode: - c.inMsg = true - } - case opContinuation: - if !c.inMsg { - c.Close(StatusProtocolError, "continuation frame not after data or text frame") - return - } + case opBinary, opText, opContinuation: + return h, nil default: - c.Close(StatusProtocolError, fmt.Sprintf("unknown opcode %v", h.opcode)) - return + ce := CloseError{ + Code: StatusProtocolError, + Reason: fmt.Sprintf("unknown opcode %v", h.opcode), + } + c.Close(ce.Code, ce.Reason) + return header{}, ce } + } +} - err = c.dataReadLoop(h) +func (c *Conn) readLoop() { + for { + h, err := c.readTillData() if err != nil { - c.close(xerrors.Errorf("failed to read from connection: %w", err)) + c.close(err) return } - } -} -func (c *Conn) dataReadLoop(h header) error { - maskPos := 0 - left := h.payloadLength - firstReadDone := false - for left > 0 || !firstReadDone { select { case <-c.closed: - return c.closeErr - case b := <-c.readBytes: - if int64(len(b)) > left { - b = b[:left] - } - - _, err := io.ReadFull(c.br, b) - if err != nil { - return xerrors.Errorf("failed to read from connection: %w", err) - } - left -= int64(len(b)) - - if h.masked { - maskPos = fastXOR(h.maskKey, maskPos, b) - } - - // Must set this before we signal the read is done. - // The reader will use this to return io.EOF and - // c.Read will use it to check if the reader has been completed. - if left == 0 && h.fin { - atomic.StoreInt64(&c.activeReader, 0) - c.inMsg = false - } + return + case c.readData <- h: + } - select { - case <-c.closed: - return c.closeErr - case c.readDone <- len(b): - firstReadDone = true - } + select { + case <-c.closed: + return + case <-c.readDone: } } - - return nil } func (c *Conn) writePong(p []byte) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - err := c.writeSingleFrame(ctx, opPong, p) + err := c.writeCompleteMessage(ctx, opPong, p) return err } @@ -393,7 +346,7 @@ func (c *Conn) writeClose(p []byte, cerr CloseError) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - err := c.writeSingleFrame(ctx, opClose, p) + err := c.writeCompleteMessage(ctx, opClose, p) c.close(cerr) @@ -408,33 +361,40 @@ func (c *Conn) writeClose(p []byte, cerr CloseError) error { return nil } -func (c *Conn) writeSingleFrame(ctx context.Context, opcode opcode, p []byte) error { - ch := c.fastWrite - if opcode.controlOp() { - ch = c.control - } +func (c *Conn) acquireLock(ctx context.Context, lock chan struct{}) error { select { - case <-c.closed: - return c.closeErr - case ch <- frame{ - opcode: opcode, - payload: p, - }: case <-ctx.Done(): - c.close(xerrors.Errorf("control frame write timed out: %w", ctx.Err())) return ctx.Err() - } - - select { case <-c.closed: return c.closeErr - case <-c.writeDone: + case lock <- struct{}{}: return nil - case <-ctx.Done(): - return ctx.Err() } } +func (c *Conn) releaseLock(lock chan struct{}) { + <-lock +} + +func (c *Conn) writeCompleteMessage(ctx context.Context, opcode opcode, p []byte) error { + if !opcode.controlOp() { + err := c.acquireLock(ctx, c.writeDataLock) + if err != nil { + return err + } + defer c.releaseLock(c.writeDataLock) + } + + err := c.writeFrame(ctx, header{ + fin: true, + opcode: opcode, + }, p) + if err != nil { + return xerrors.Errorf("failed to write frame: %v", err) + } + return nil +} + // Writer returns a writer bounded by the context that will write // a WebSocket message of type dataType to the connection. // @@ -451,27 +411,27 @@ func (c *Conn) Writer(ctx context.Context, typ MessageType) (io.WriteCloser, err } func (c *Conn) writer(ctx context.Context, typ MessageType) (io.WriteCloser, error) { - select { - case <-c.closed: - return nil, c.closeErr - case <-ctx.Done(): - return nil, ctx.Err() - case c.write <- typ: - return messageWriter{ - ctx: ctx, - c: c, - }, nil + err := c.acquireLock(ctx, c.writeDataLock) + if err != nil { + return nil, err } + return &messageWriter{ + ctx: ctx, + opcode: opcode(typ), + c: c, + }, nil } // messageWriter enables writing to a WebSocket connection. type messageWriter struct { - ctx context.Context - c *Conn + ctx context.Context + opcode opcode + c *Conn + closed bool } // Write writes the given bytes to the WebSocket connection. -func (w messageWriter) Write(p []byte) (int, error) { +func (w *messageWriter) Write(p []byte) (int, error) { n, err := w.write(p) if err != nil { return n, xerrors.Errorf("failed to write: %w", err) @@ -479,31 +439,23 @@ func (w messageWriter) Write(p []byte) (int, error) { return n, nil } -func (w messageWriter) write(p []byte) (int, error) { - select { - case <-w.c.closed: - return 0, w.c.closeErr - case w.c.writeBytes <- p: - select { - case <-w.ctx.Done(): - w.c.close(xerrors.Errorf("data write timed out: %w", w.ctx.Err())) - // Wait for writeLoop to complete so we know p is done with. - <-w.c.writeDone - return 0, w.ctx.Err() - case _, ok := <-w.c.writeDone: - if !ok { - return 0, w.c.closeErr - } - return len(p), nil - } - case <-w.ctx.Done(): - return 0, w.ctx.Err() +func (w *messageWriter) write(p []byte) (int, error) { + if w.closed { + return 0, xerrors.Errorf("cannot use closed writer") + } + err := w.c.writeFrame(w.ctx, header{ + opcode: w.opcode, + }, p) + if err != nil { + return 0, err } + w.opcode = opContinuation + return len(p), nil } // Close flushes the frame to the connection. // This must be called for every messageWriter. -func (w messageWriter) Close() error { +func (w *messageWriter) Close() error { err := w.close() if err != nil { return xerrors.Errorf("failed to close writer: %w", err) @@ -511,15 +463,22 @@ func (w messageWriter) Close() error { return nil } -func (w messageWriter) close() error { - select { - case <-w.c.closed: - return w.c.closeErr - case <-w.ctx.Done(): - return w.ctx.Err() - case w.c.writeFlush <- struct{}{}: - return nil +func (w *messageWriter) close() error { + if w.closed { + return xerrors.Errorf("cannot use closed writer") } + w.closed = true + + err := w.c.writeFrame(w.ctx, header{ + fin: true, + opcode: w.opcode, + }, nil) + if err != nil { + return err + } + + w.c.releaseLock(w.c.writeDataLock) + return nil } // Reader will wait until there is a WebSocket data message to read from the connection. @@ -542,49 +501,70 @@ func (c *Conn) Reader(ctx context.Context) (MessageType, io.Reader, error) { } func (c *Conn) reader(ctx context.Context) (MessageType, io.Reader, error) { - if !atomic.CompareAndSwapInt64(&c.activeReader, 0, 1) { - // If the next read yields io.EOF we are good to go. - r := messageReader{ - ctx: ctx, - c: c, - } - _, err := r.Read(nil) - if err == nil { - return 0, nil, xerrors.New("previous message not fully read") - } - if !xerrors.Is(err, io.EOF) { - return 0, nil, xerrors.Errorf("failed to check if last message at io.EOF: %w", err) - } + // if !atomic.CompareAndSwapInt64(&c.activeReader, 0, 1) { + // // If the next read yields io.EOF we are good to go. + // r := messageReader{ + // ctx: ctx, + // c: c, + // } + // _, err := r.Read(nil) + // if err == nil { + // return 0, nil, xerrors.New("previous message not fully read") + // } + // if !xerrors.Is(err, io.EOF) { + // return 0, nil, xerrors.Errorf("failed to check if last message at io.EOF: %w", err) + // } + // + // atomic.StoreInt64(&c.activeReader, 1) + // } - atomic.StoreInt64(&c.activeReader, 1) + select { + case <-c.closed: + return 0, nil, c.closeErr + case <-ctx.Done(): + return 0, nil, ctx.Err() + case c.setReadTimeout <- ctx: } select { case <-c.closed: return 0, nil, c.closeErr - case opcode := <-c.read: - return MessageType(opcode), messageReader{ - ctx: ctx, - c: c, - }, nil case <-ctx.Done(): return 0, nil, ctx.Err() + case h := <-c.readData: + if h.opcode == opContinuation { + if h.fin && h.payloadLength == 0 { + select { + case <-c.closed: + return 0, nil, c.closeErr + case c.readDone <- struct{}{}: + return c.reader(ctx) + } + } + return 0, nil, xerrors.Errorf("previous reader was not read to EOF") + } + return MessageType(h.opcode), &messageReader{ + h: &h, + c: c, + }, nil } } // messageReader enables reading a data frame from the WebSocket connection. type messageReader struct { - ctx context.Context - c *Conn + maskPos int + h *header + c *Conn + eofed bool } // Read reads as many bytes as possible into p. -func (r messageReader) Read(p []byte) (int, error) { +func (r *messageReader) Read(p []byte) (int, error) { n, err := r.read(p) if err != nil { // Have to return io.EOF directly for now, we cannot wrap as xerrors // isn't used in stdlib. - if err == io.EOF { + if xerrors.Is(err, io.EOF) { return n, io.EOF } return n, xerrors.Errorf("failed to read: %w", err) @@ -592,31 +572,62 @@ func (r messageReader) Read(p []byte) (int, error) { return n, nil } -func (r messageReader) read(p []byte) (_ int, err error) { - if atomic.LoadInt64(&r.c.activeReader) == 0 { - return 0, io.EOF +func (r *messageReader) read(p []byte) (int, error) { + if r.eofed { + return 0, xerrors.Errorf("cannot use EOFed reader") } - select { - case <-r.c.closed: - return 0, r.c.closeErr - case r.c.readBytes <- p: + if r.h == nil { select { - case <-r.ctx.Done(): - r.c.close(xerrors.Errorf("data read timed out: %w", r.ctx.Err())) - // Wait for readLoop to complete so we know p is done. - <-r.c.readDone - return 0, r.ctx.Err() - case n, ok := <-r.c.readDone: - if !ok { - return 0, r.c.closeErr + case <-r.c.closed: + return 0, r.c.closeErr + case h := <-r.c.readData: + if h.opcode != opContinuation { + ce := CloseError{ + Code: StatusProtocolError, + Reason: "cannot read new data frame when previous frame is not finished", + } + r.c.Close(ce.Code, ce.Reason) + return 0, ce } - if atomic.LoadInt64(&r.c.activeReader) == 0 { + r.h = &h + } + } + + if int64(len(p)) > r.h.payloadLength { + p = p[:r.h.payloadLength] + } + + n, err := io.ReadFull(r.c.br, p) + + r.h.payloadLength -= int64(n) + if r.h.masked { + r.maskPos = fastXOR(r.h.maskKey, r.maskPos, p) + } + + if err != nil { + r.c.close(xerrors.Errorf("failed to read control frame payload: %w", err)) + return n, r.c.closeErr + } + + if r.h.payloadLength == 0 { + select { + case <-r.c.closed: + return n, r.c.closeErr + case r.c.readDone <- struct{}{}: + } + if r.h.fin { + r.eofed = true + select { + case <-r.c.closed: + return n, r.c.closeErr + case r.c.setReadTimeout <- context.Background(): return n, io.EOF } - return n, nil } - case <-r.ctx.Done(): - return 0, r.ctx.Err() + r.maskPos = 0 + r.h = nil } + + return n, nil } From b39ca873380498fd7ac2bb1d9fa221404cf90da8 Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Wed, 29 May 2019 23:21:55 -0400 Subject: [PATCH 03/18] Fix bugs and improve docs --- README.md | 6 +-- accept.go | 9 +++- dial.go | 43 ++++++++++++++++-- example_echo_test.go | 5 ++- go.mod | 2 +- go.sum | 2 + statuscode.go | 2 +- websocket.go | 101 +++++++++++++++++++++++-------------------- websocket_test.go | 14 +++--- wsjson/wsjson.go | 14 +++++- 10 files changed, 128 insertions(+), 70 deletions(-) diff --git a/README.md b/README.md index 3f427427..1b9af616 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ go get nhooyr.io/websocket@v0.2.0 - Zero dependencies outside of the stdlib for the core library - JSON and ProtoBuf helpers in the wsjson and wspb subpackages - High performance -- Concurrent writes +- Concurrent reads and writes out of the box ## Roadmap @@ -122,8 +122,8 @@ also uses net/http's Client and ResponseWriter directly for WebSocket handshakes gorilla/websocket writes its handshakes to the underlying net.Conn which means it has to reinvent hooks for TLS and proxies and prevents support of HTTP/2. -Some more advantages of nhooyr/websocket are that it supports concurrent writes and makes it -very easy to close the connection with a status code and reason. +Some more advantages of nhooyr/websocket are that it supports concurrent reads, +writes and makes it very easy to close the connection with a status code and reason. In terms of performance, there is no significant difference between the two. Will update with benchmarks soon ([#75](https://github.com/nhooyr/websocket/issues/75)). diff --git a/accept.go b/accept.go index 207ecc74..17016d23 100644 --- a/accept.go +++ b/accept.go @@ -1,8 +1,10 @@ package websocket import ( + "bytes" "crypto/sha1" "encoding/base64" + "io" "net/http" "net/textproto" "net/url" @@ -78,6 +80,9 @@ func verifyClientRequest(w http.ResponseWriter, r *http.Request) error { // // Accept will reject the handshake if the Origin domain is not the same as the Host unless // the InsecureSkipVerify option is set. +// +// The returned connection will be bound by r.Context(). Use c.Context() to change +// the bounding context. func Accept(w http.ResponseWriter, r *http.Request, opts AcceptOptions) (*Conn, error) { c, err := accept(w, r, opts) if err != nil { @@ -126,6 +131,9 @@ func accept(w http.ResponseWriter, r *http.Request, opts AcceptOptions) (*Conn, return nil, err } + b, _ := brw.Reader.Peek(brw.Reader.Buffered()) + brw.Reader.Reset(io.MultiReader(bytes.NewReader(b), netConn)) + c := &Conn{ subprotocol: w.Header().Get("Sec-WebSocket-Protocol"), br: brw.Reader, @@ -133,7 +141,6 @@ func accept(w http.ResponseWriter, r *http.Request, opts AcceptOptions) (*Conn, closer: netConn, } c.init() - // TODO document. c.Context(r.Context()) return c, nil diff --git a/dial.go b/dial.go index 3c7e71db..f1ad725b 100644 --- a/dial.go +++ b/dial.go @@ -5,13 +5,13 @@ import ( "bytes" "context" "encoding/base64" + "golang.org/x/xerrors" "io" "io/ioutil" "net/http" "net/url" "strings" - - "golang.org/x/xerrors" + "sync" ) // DialOptions represents the options available to pass to Dial. @@ -112,8 +112,8 @@ func dial(ctx context.Context, u string, opts DialOptions) (_ *Conn, _ *http.Res c := &Conn{ subprotocol: resp.Header.Get("Sec-WebSocket-Protocol"), - br: bufio.NewReader(rwc), - bw: bufio.NewWriter(rwc), + br: getBufioReader(rwc), + bw: getBufioWriter(rwc), closer: rwc, client: true, } @@ -140,3 +140,38 @@ func verifyServerResponse(resp *http.Response) error { return nil } + +// The below pools can only be used by the client because http.Hijacker will always +// have a bufio.Reader/Writer for us so it doesn't make sense to use a pool on top. + +var bufioReaderPool = sync.Pool{ + New: func() interface{} { + return bufio.NewReader(nil) + }, +} + +func getBufioReader(r io.Reader) *bufio.Reader { + br := bufioReaderPool.Get().(*bufio.Reader) + br.Reset(r) + return br +} + +func returnBufioReader(br *bufio.Reader) { + bufioReaderPool.Put(br) +} + +var bufioWriterPool = sync.Pool{ + New: func() interface{} { + return bufio.NewWriter(nil) + }, +} + +func getBufioWriter(w io.Writer) *bufio.Writer { + bw := bufioWriterPool.Get().(*bufio.Writer) + bw.Reset(w) + return bw +} + +func returnBufioWriter(bw *bufio.Writer) { + bufioWriterPool.Put(bw) +} diff --git a/example_echo_test.go b/example_echo_test.go index ab0e8e70..a86d5b89 100644 --- a/example_echo_test.go +++ b/example_echo_test.go @@ -51,6 +51,7 @@ func Example_echo() { // Now we dial the server, send the messages and echo the responses. err = client("ws://" + l.Addr().String()) + time.Sleep(time.Second) if err != nil { log.Fatalf("client failed: %v", err) } @@ -66,6 +67,8 @@ func Example_echo() { // It ensures the client speaks the echo subprotocol and // only allows one message every 100ms with a 10 message burst. func echoServer(w http.ResponseWriter, r *http.Request) error { + log.Printf("serving %v", r.RemoteAddr) + c, err := websocket.Accept(w, r, websocket.AcceptOptions{ Subprotocols: []string{"echo"}, }) @@ -83,7 +86,7 @@ func echoServer(w http.ResponseWriter, r *http.Request) error { for { err = echo(r.Context(), c, l) if err != nil { - return xerrors.Errorf("failed to echo: %w", err) + return xerrors.Errorf("failed to echo with %v: %w", r.RemoteAddr, err) } } } diff --git a/go.mod b/go.mod index f747eecf..cc9a865d 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,6 @@ require ( golang.org/x/text v0.3.2 // indirect golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 golang.org/x/tools v0.0.0-20190429184909-35c670923e21 - golang.org/x/xerrors v0.0.0-20190315151331-d61658bd2e18 + golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522 mvdan.cc/sh v2.6.4+incompatible ) diff --git a/go.sum b/go.sum index 63aaa2a5..90c93463 100644 --- a/go.sum +++ b/go.sum @@ -30,5 +30,7 @@ golang.org/x/tools v0.0.0-20190429184909-35c670923e21 h1:Kjcw+D2LTzLmxOHrMK9uvYP golang.org/x/tools v0.0.0-20190429184909-35c670923e21/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/xerrors v0.0.0-20190315151331-d61658bd2e18 h1:1AGvnywFL1aB5KLRxyLseWJI6aSYPo3oF7HSpXdWQdU= golang.org/x/xerrors v0.0.0-20190315151331-d61658bd2e18/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522 h1:bhOzK9QyoD0ogCnFro1m2mz41+Ib0oOhfJnBp5MR4K4= +golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= mvdan.cc/sh v2.6.4+incompatible h1:eD6tDeh0pw+/TOTI1BBEryZ02rD2nMcFsgcvde7jffM= mvdan.cc/sh v2.6.4+incompatible/go.mod h1:IeeQbZq+x2SUGBensq/jge5lLQbS3XT2ktyp3wrt4x8= diff --git a/statuscode.go b/statuscode.go index d4223745..c7b20367 100644 --- a/statuscode.go +++ b/statuscode.go @@ -49,7 +49,7 @@ type CloseError struct { } func (ce CloseError) Error() string { - return fmt.Sprintf("websocket closed with status = %v and reason = %q", ce.Code, ce.Reason) + return fmt.Sprintf("status = %v and reason = %q", ce.Code, ce.Reason) } func parseClosePayload(p []byte) (CloseError, error) { diff --git a/websocket.go b/websocket.go index 6e35281a..0b77966e 100644 --- a/websocket.go +++ b/websocket.go @@ -14,7 +14,8 @@ import ( ) // Conn represents a WebSocket connection. -// All methods except Reader can be used concurrently. +// All methods may be called concurrently. +// // Please be sure to call Close on the connection when you // are finished with it to release resources. type Conn struct { @@ -31,8 +32,10 @@ type Conn struct { writeDataLock chan struct{} writeFrameLock chan struct{} - readData chan header - readDone chan struct{} + readDataLock chan struct{} + readData chan header + readDone chan struct{} + readLoopDone chan struct{} setReadTimeout chan context.Context setWriteTimeout chan context.Context @@ -44,7 +47,7 @@ type Conn struct { // when the connection is closed. // If the parent context is cancelled, the connection will be closed. // -// This is an experimental API meaning it may be remove in the future. +// This is an experimental API that may be remove in the future. // Please let me know how you feel about it. func (c *Conn) Context(parent context.Context) context.Context { select { @@ -77,6 +80,18 @@ func (c *Conn) close(err error) { c.closeErr = xerrors.Errorf("websocket closed: %w", cerr) close(c.closed) + + // See comment in dial.go + if c.client { + go func() { + <-c.readLoopDone + c.readDataLock <- struct{}{} + c.writeFrameLock <- struct{}{} + + returnBufioReader(c.br) + returnBufioWriter(c.bw) + }() + } }) } @@ -94,6 +109,8 @@ func (c *Conn) init() { c.readData = make(chan header) c.readDone = make(chan struct{}) + c.readDataLock = make(chan struct{}, 1) + c.readLoopDone = make(chan struct{}) c.setReadTimeout = make(chan context.Context) c.setWriteTimeout = make(chan context.Context) @@ -174,8 +191,8 @@ func (c *Conn) timeoutLoop() { select { case <-c.closed: return - case readCtx = <-c.setWriteTimeout: - case writeCtx = <-c.setReadTimeout: + case writeCtx = <-c.setWriteTimeout: + case readCtx = <-c.setReadTimeout: case <-readCtx.Done(): c.close(xerrors.Errorf("data read timed out: %w", readCtx.Err())) case <-writeCtx.Done(): @@ -276,6 +293,8 @@ func (c *Conn) readTillData() (header, error) { } func (c *Conn) readLoop() { + defer close(c.readLoopDone) + for { h, err := c.readTillData() if err != nil { @@ -487,8 +506,7 @@ func (w *messageWriter) close() error { // // Your application must keep reading messages for the Conn to automatically respond to ping // and close frames and not become stuck waiting for a data message to be read. -// Please ensure to read the full message from io.Reader. If you do not read till -// io.EOF, the connection will break unless the next read would have yielded io.EOF. +// Please ensure to read the full message from io.Reader. // // You can only read a single message at a time so do not call this method // concurrently. @@ -500,30 +518,10 @@ func (c *Conn) Reader(ctx context.Context) (MessageType, io.Reader, error) { return typ, r, nil } -func (c *Conn) reader(ctx context.Context) (MessageType, io.Reader, error) { - // if !atomic.CompareAndSwapInt64(&c.activeReader, 0, 1) { - // // If the next read yields io.EOF we are good to go. - // r := messageReader{ - // ctx: ctx, - // c: c, - // } - // _, err := r.Read(nil) - // if err == nil { - // return 0, nil, xerrors.New("previous message not fully read") - // } - // if !xerrors.Is(err, io.EOF) { - // return 0, nil, xerrors.Errorf("failed to check if last message at io.EOF: %w", err) - // } - // - // atomic.StoreInt64(&c.activeReader, 1) - // } - - select { - case <-c.closed: - return 0, nil, c.closeErr - case <-ctx.Done(): - return 0, nil, ctx.Err() - case c.setReadTimeout <- ctx: +func (c *Conn) reader(ctx context.Context) (_ MessageType, _ io.Reader, err error) { + err = c.acquireLock(ctx, c.readDataLock) + if err != nil { + return 0, nil, err } select { @@ -533,25 +531,24 @@ func (c *Conn) reader(ctx context.Context) (MessageType, io.Reader, error) { return 0, nil, ctx.Err() case h := <-c.readData: if h.opcode == opContinuation { - if h.fin && h.payloadLength == 0 { - select { - case <-c.closed: - return 0, nil, c.closeErr - case c.readDone <- struct{}{}: - return c.reader(ctx) - } + ce := CloseError{ + Code: StatusProtocolError, + Reason: "continuation frame not after data or text frame", } - return 0, nil, xerrors.Errorf("previous reader was not read to EOF") + c.Close(ce.Code, ce.Reason) + return 0, nil, ce } return MessageType(h.opcode), &messageReader{ - h: &h, - c: c, + ctx: ctx, + h: &h, + c: c, }, nil } } // messageReader enables reading a data frame from the WebSocket connection. type messageReader struct { + ctx context.Context maskPos int h *header c *Conn @@ -598,8 +595,20 @@ func (r *messageReader) read(p []byte) (int, error) { p = p[:r.h.payloadLength] } + select { + case <-r.c.closed: + return 0, r.c.closeErr + case r.c.setReadTimeout <- r.ctx: + } + n, err := io.ReadFull(r.c.br, p) + select { + case <-r.c.closed: + return 0, r.c.closeErr + case r.c.setReadTimeout <- context.Background(): + } + r.h.payloadLength -= int64(n) if r.h.masked { r.maskPos = fastXOR(r.h.maskKey, r.maskPos, p) @@ -618,12 +627,8 @@ func (r *messageReader) read(p []byte) (int, error) { } if r.h.fin { r.eofed = true - select { - case <-r.c.closed: - return n, r.c.closeErr - case r.c.setReadTimeout <- context.Background(): - return n, io.EOF - } + r.c.releaseLock(r.c.readDataLock) + return n, io.EOF } r.maskPos = 0 r.h = nil diff --git a/websocket_test.go b/websocket_test.go index 8d18c738..0ac0557d 100644 --- a/websocket_test.go +++ b/websocket_test.go @@ -293,10 +293,6 @@ func TestHandshake(t *testing.T) { if err != nil { return err } - err = write() - if err != nil { - return err - } c.Close(websocket.StatusNormalClosure, "") return nil @@ -329,11 +325,6 @@ func TestHandshake(t *testing.T) { if err != nil { return err } - // Read twice to ensure the un EOFed previous reader works correctly. - err = read() - if err != nil { - return err - } c.Close(websocket.StatusNormalClosure, "") return nil @@ -766,6 +757,11 @@ func benchConn(b *testing.B, echo, stream bool, size int) { if err != nil { b.Fatal(err) } + + _, err = r.Read(nil) + if !xerrors.Is(err, io.EOF) { + b.Fatalf("more data in reader than needed") + } } } b.StopTimer() diff --git a/wsjson/wsjson.go b/wsjson/wsjson.go index 9dd61bd0..853369e5 100644 --- a/wsjson/wsjson.go +++ b/wsjson/wsjson.go @@ -4,9 +4,8 @@ package wsjson import ( "context" "encoding/json" - "io" - "golang.org/x/xerrors" + "io" "nhooyr.io/websocket" ) @@ -41,6 +40,17 @@ func read(ctx context.Context, c *websocket.Conn, v interface{}) error { return xerrors.Errorf("failed to decode json: %w", err) } + // Have to ensure we read till EOF. + // Unfortunate but necessary evil for now. Can improve later. + // The code to do this automatically gets complicated fast because + // we support concurrent reading. + // So the Reader has to synchronize with Read somehow. + // Maybe its best to bring back the old readLoop? + _, err = r.Read(nil) + if !xerrors.Is(err, io.EOF) { + return xerrors.Errorf("more data than needed in reader") + } + return nil } From 510eed537e7076101356784cd047de5cc447c8d5 Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Thu, 30 May 2019 00:02:05 -0400 Subject: [PATCH 04/18] Document bug with release of bufio read/writers for client --- dial.go | 3 ++- go.sum | 2 -- websocket.go | 1 + websocket_test.go | 2 +- wsjson/wsjson.go | 5 +++-- 5 files changed, 7 insertions(+), 6 deletions(-) diff --git a/dial.go b/dial.go index f1ad725b..53acd32c 100644 --- a/dial.go +++ b/dial.go @@ -5,13 +5,14 @@ import ( "bytes" "context" "encoding/base64" - "golang.org/x/xerrors" "io" "io/ioutil" "net/http" "net/url" "strings" "sync" + + "golang.org/x/xerrors" ) // DialOptions represents the options available to pass to Dial. diff --git a/go.sum b/go.sum index 90c93463..7965958d 100644 --- a/go.sum +++ b/go.sum @@ -28,8 +28,6 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190429184909-35c670923e21 h1:Kjcw+D2LTzLmxOHrMK9uvYP/NigJ0EdwMgzt6EU+Ghs= golang.org/x/tools v0.0.0-20190429184909-35c670923e21/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= -golang.org/x/xerrors v0.0.0-20190315151331-d61658bd2e18 h1:1AGvnywFL1aB5KLRxyLseWJI6aSYPo3oF7HSpXdWQdU= -golang.org/x/xerrors v0.0.0-20190315151331-d61658bd2e18/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522 h1:bhOzK9QyoD0ogCnFro1m2mz41+Ib0oOhfJnBp5MR4K4= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= mvdan.cc/sh v2.6.4+incompatible h1:eD6tDeh0pw+/TOTI1BBEryZ02rD2nMcFsgcvde7jffM= diff --git a/websocket.go b/websocket.go index 0b77966e..f4becd7c 100644 --- a/websocket.go +++ b/websocket.go @@ -85,6 +85,7 @@ func (c *Conn) close(err error) { if c.client { go func() { <-c.readLoopDone + // TODO this does not work if reader errors out. c.readDataLock <- struct{}{} c.writeFrameLock <- struct{}{} diff --git a/websocket_test.go b/websocket_test.go index 0ac0557d..b8d7b56c 100644 --- a/websocket_test.go +++ b/websocket_test.go @@ -758,7 +758,7 @@ func benchConn(b *testing.B, echo, stream bool, size int) { b.Fatal(err) } - _, err = r.Read(nil) + _, err = r.Read([]byte{0}) if !xerrors.Is(err, io.EOF) { b.Fatalf("more data in reader than needed") } diff --git a/wsjson/wsjson.go b/wsjson/wsjson.go index 853369e5..0ff33bf3 100644 --- a/wsjson/wsjson.go +++ b/wsjson/wsjson.go @@ -4,9 +4,10 @@ package wsjson import ( "context" "encoding/json" - "golang.org/x/xerrors" "io" + "golang.org/x/xerrors" + "nhooyr.io/websocket" ) @@ -46,7 +47,7 @@ func read(ctx context.Context, c *websocket.Conn, v interface{}) error { // we support concurrent reading. // So the Reader has to synchronize with Read somehow. // Maybe its best to bring back the old readLoop? - _, err = r.Read(nil) + _, err = r.Read([]byte{0}) if !xerrors.Is(err, io.EOF) { return xerrors.Errorf("more data than needed in reader") } From 93cda77fe202184112fcc433129d984caa6eb5b9 Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Thu, 30 May 2019 00:04:12 -0400 Subject: [PATCH 05/18] Disable reuse of client buffers for now Don't want buggy code in master. --- websocket.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/websocket.go b/websocket.go index f4becd7c..25688b08 100644 --- a/websocket.go +++ b/websocket.go @@ -85,12 +85,12 @@ func (c *Conn) close(err error) { if c.client { go func() { <-c.readLoopDone - // TODO this does not work if reader errors out. - c.readDataLock <- struct{}{} - c.writeFrameLock <- struct{}{} - - returnBufioReader(c.br) - returnBufioWriter(c.bw) + // TODO this does not work if reader errors out so skip for now. + // c.readDataLock <- struct{}{} + // c.writeFrameLock <- struct{}{} + // + // returnBufioReader(c.br) + // returnBufioWriter(c.bw) }() } }) @@ -326,12 +326,15 @@ func (c *Conn) writePong(p []byte) error { } // Close closes the WebSocket connection with the given status code and reason. +// // It will write a WebSocket close frame with a timeout of 5 seconds. // The connection can only be closed once. Additional calls to Close // are no-ops. +// // The maximum length of reason must be 125 bytes otherwise an internal // error will be sent to the peer. For this reason, you should avoid // sending a dynamic reason. +// // Close will unblock all goroutines interacting with the connection. func (c *Conn) Close(code StatusCode, reason string) error { err := c.exportedClose(code, reason) From 0c12d5dbc9a15fe63351f9d50bd3df37064697d8 Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Thu, 30 May 2019 01:23:25 -0400 Subject: [PATCH 06/18] Add .codecov.yaml --- docs/.codecov.yaml | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 docs/.codecov.yaml diff --git a/docs/.codecov.yaml b/docs/.codecov.yaml new file mode 100644 index 00000000..21e130eb --- /dev/null +++ b/docs/.codecov.yaml @@ -0,0 +1,7 @@ +coverage: + status: + project: + default: + # Otherwise codecov/patch status keeps failing when there is a tiny + # change in code coverage. + threshold: 5 From cccb035c90940d3b48421450de8b8bb05bef00cd Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Thu, 30 May 2019 01:27:13 -0400 Subject: [PATCH 07/18] Use the superior yml extension for codecov --- docs/.codecov.yaml => ci/.codecov.yml | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename docs/.codecov.yaml => ci/.codecov.yml (100%) diff --git a/docs/.codecov.yaml b/ci/.codecov.yml similarity index 100% rename from docs/.codecov.yaml rename to ci/.codecov.yml From 027e6af3c3d2ab86f0031832f9d738da0a3d48fa Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Thu, 30 May 2019 01:39:56 -0400 Subject: [PATCH 08/18] Fix .codecov.yml --- ci/.codecov.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ci/.codecov.yml b/ci/.codecov.yml index 21e130eb..7d614ef3 100644 --- a/ci/.codecov.yml +++ b/ci/.codecov.yml @@ -1,7 +1,9 @@ coverage: status: + # Prevent small changes in coverage from failing CI. project: default: - # Otherwise codecov/patch status keeps failing when there is a tiny - # change in code coverage. + threshold: 5 + patch: + default: threshold: 5 From 4de90619274ae26e8b691f200bfff403cb490f85 Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Thu, 30 May 2019 03:10:19 -0400 Subject: [PATCH 09/18] Expand API Closes #1 (Ping API) Closes #62 (Read/Write convienence methods) Closes #83 (SetReadLimit) --- example_echo_test.go | 2 - export_test.go | 18 -------- websocket.go | 99 ++++++++++++++++++++++++++++++++++++++++---- websocket_test.go | 2 + wsjson/wsjson.go | 4 -- wspb/wspb.go | 21 +--------- 6 files changed, 95 insertions(+), 51 deletions(-) delete mode 100644 export_test.go diff --git a/example_echo_test.go b/example_echo_test.go index a86d5b89..405c7a41 100644 --- a/example_echo_test.go +++ b/example_echo_test.go @@ -94,7 +94,6 @@ func echoServer(w http.ResponseWriter, r *http.Request) error { // echo reads from the websocket connection and then writes // the received message back to it. // The entire function has 10s to complete. -// The received message is limited to 32768 bytes. func echo(ctx context.Context, c *websocket.Conn, l *rate.Limiter) error { ctx, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() @@ -108,7 +107,6 @@ func echo(ctx context.Context, c *websocket.Conn, l *rate.Limiter) error { if err != nil { return err } - r = io.LimitReader(r, 32768) w, err := c.Writer(ctx, typ) if err != nil { diff --git a/export_test.go b/export_test.go deleted file mode 100644 index 465ba9eb..00000000 --- a/export_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package websocket - -import ( - "context" -) - -// Write writes p as a single data frame to the connection. This is an optimization -// method for when the entire message is in memory and does not need to be streamed -// to the peer via Writer. -// -// This prevents the allocation of the Writer. -// Furthermore Writer always has to write an additional fin frame when Close is -// called on the writer which can result in worse performance if the full message -// exceeds the buffer size which is 4096 right now as then an extra syscall -// will be necessary to complete the message. -func (c *Conn) Write(ctx context.Context, typ MessageType, p []byte) error { - return c.writeCompleteMessage(ctx, opcode(typ), p) -} diff --git a/websocket.go b/websocket.go index 25688b08..00decaad 100644 --- a/websocket.go +++ b/websocket.go @@ -5,9 +5,13 @@ import ( "context" "fmt" "io" + "io/ioutil" + "math/rand" "os" "runtime" + "strconv" "sync" + "sync/atomic" "time" "golang.org/x/xerrors" @@ -25,6 +29,8 @@ type Conn struct { closer io.Closer client bool + msgReadLimit int64 + closeOnce sync.Once closeErr error closed chan struct{} @@ -41,14 +47,16 @@ type Conn struct { setWriteTimeout chan context.Context setConnContext chan context.Context getConnContext chan context.Context + + pingListener map[string]chan<- struct{} } // Context returns a context derived from parent that will be cancelled -// when the connection is closed. +// when the connection is closed or broken. // If the parent context is cancelled, the connection will be closed. // -// This is an experimental API that may be remove in the future. -// Please let me know how you feel about it. +// This is an experimental API that may be removed in the future. +// Please let me know how you feel about it in https://github.com/nhooyr/websocket/issues/79 func (c *Conn) Context(parent context.Context) context.Context { select { case <-c.closed: @@ -105,6 +113,8 @@ func (c *Conn) Subprotocol() string { func (c *Conn) init() { c.closed = make(chan struct{}) + c.msgReadLimit = 32768 + c.writeDataLock = make(chan struct{}, 1) c.writeFrameLock = make(chan struct{}, 1) @@ -118,6 +128,8 @@ func (c *Conn) init() { c.setConnContext = make(chan context.Context) c.getConnContext = make(chan context.Context) + c.pingListener = make(map[string]chan<- struct{}) + runtime.SetFinalizer(c, func(c *Conn) { c.close(xerrors.New("connection garbage collected")) }) @@ -242,6 +254,10 @@ func (c *Conn) handleControl(h header) { case opPing: c.writePong(b) case opPong: + listener, ok := c.pingListener[string(b)] + if ok { + close(listener) + } case opClose: ce, err := parseClosePayload(b) if err != nil { @@ -321,7 +337,7 @@ func (c *Conn) writePong(p []byte) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - err := c.writeCompleteMessage(ctx, opPong, p) + err := c.writeMessage(ctx, opPong, p) return err } @@ -369,7 +385,7 @@ func (c *Conn) writeClose(p []byte, cerr CloseError) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - err := c.writeCompleteMessage(ctx, opClose, p) + err := c.writeMessage(ctx, opClose, p) c.close(cerr) @@ -399,7 +415,7 @@ func (c *Conn) releaseLock(lock chan struct{}) { <-lock } -func (c *Conn) writeCompleteMessage(ctx context.Context, opcode opcode, p []byte) error { +func (c *Conn) writeMessage(ctx context.Context, opcode opcode, p []byte) error { if !opcode.controlOp() { err := c.acquireLock(ctx, c.writeDataLock) if err != nil { @@ -445,6 +461,30 @@ func (c *Conn) writer(ctx context.Context, typ MessageType) (io.WriteCloser, err }, nil } +// Read is a convenience method to read a single message from the connection. +// +// See the Reader method if you want to be able to reuse buffers or want to stream a message. +func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) { + typ, r, err := c.Reader(ctx) + if err != nil { + return 0, nil, err + } + + b, err := ioutil.ReadAll(r) + if err != nil { + return typ, b, err + } + + return typ, b, nil +} + +// Write is a convenience method to write a message to the connection. +// +// See the Writer method if you want to stream a message. +func (c *Conn) Write(ctx context.Context, typ MessageType, p []byte) error { + return c.writeMessage(ctx, opcode(typ), p) +} + // messageWriter enables writing to a WebSocket connection. type messageWriter struct { ctx context.Context @@ -519,7 +559,7 @@ func (c *Conn) Reader(ctx context.Context) (MessageType, io.Reader, error) { if err != nil { return 0, nil, xerrors.Errorf("failed to get reader: %w", err) } - return typ, r, nil + return typ, io.LimitReader(r, c.msgReadLimit), nil } func (c *Conn) reader(ctx context.Context) (_ MessageType, _ io.Reader, err error) { @@ -640,3 +680,48 @@ func (r *messageReader) read(p []byte) (int, error) { return n, nil } + +// SetReadLimit sets the max number of bytes to read for a single message. +// It applies to the Reader and Read methods. +// +// By default, the connection has a message read limit of 32768 bytes. +func (c *Conn) SetReadLimit(n int64) { + atomic.StoreInt64(&c.msgReadLimit, n) +} + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +// Ping sends a ping to the peer and waits for a pong. +// Use this to measure latency or ensure the peer is responsive. +// +// This API is experimental and subject to change. +// Please provide feedback in https://github.com/nhooyr/websocket/issues/1. +func (c *Conn) Ping(ctx context.Context) error { + err := c.ping(ctx) + if err != nil { + return xerrors.Errorf("failed to ping: %w", err) + } + return nil +} + +func (c *Conn) ping(ctx context.Context) error { + id := rand.Uint64() + p := strconv.FormatUint(id, 10) + + pong := make(chan struct{}) + c.pingListener[p] = pong + + err := c.writeMessage(ctx, opPing, []byte(p)) + if err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-pong: + return nil + } +} diff --git a/websocket_test.go b/websocket_test.go index b8d7b56c..d982732a 100644 --- a/websocket_test.go +++ b/websocket_test.go @@ -489,6 +489,8 @@ func TestAutobahnServer(t *testing.T) { func echoLoop(ctx context.Context, c *websocket.Conn) { defer c.Close(websocket.StatusInternalError, "") + c.SetReadLimit(1 << 30) + ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() diff --git a/wsjson/wsjson.go b/wsjson/wsjson.go index 0ff33bf3..d85700bc 100644 --- a/wsjson/wsjson.go +++ b/wsjson/wsjson.go @@ -12,8 +12,6 @@ import ( ) // Read reads a json message from c into v. -// For security reasons, it will not read messages -// larger than 32768 bytes. func Read(ctx context.Context, c *websocket.Conn, v interface{}) error { err := read(ctx, c, v) if err != nil { @@ -33,8 +31,6 @@ func read(ctx context.Context, c *websocket.Conn, v interface{}) error { return xerrors.Errorf("unexpected frame type for json (expected %v): %v", websocket.MessageText, typ) } - r = io.LimitReader(r, 32768) - d := json.NewDecoder(r) err = d.Decode(v) if err != nil { diff --git a/wspb/wspb.go b/wspb/wspb.go index 90a0d046..edffede1 100644 --- a/wspb/wspb.go +++ b/wspb/wspb.go @@ -3,7 +3,6 @@ package wspb import ( "context" - "io" "io/ioutil" "github.com/golang/protobuf/proto" @@ -13,8 +12,6 @@ import ( ) // Read reads a protobuf message from c into v. -// For security reasons, it will not read messages -// larger than 32768 bytes. func Read(ctx context.Context, c *websocket.Conn, v proto.Message) error { err := read(ctx, c, v) if err != nil { @@ -34,8 +31,6 @@ func read(ctx context.Context, c *websocket.Conn, v proto.Message) error { return xerrors.Errorf("unexpected frame type for protobuf (expected %v): %v", websocket.MessageBinary, typ) } - r = io.LimitReader(r, 32768) - b, err := ioutil.ReadAll(r) if err != nil { return xerrors.Errorf("failed to read message: %w", err) @@ -64,19 +59,5 @@ func write(ctx context.Context, c *websocket.Conn, v proto.Message) error { return xerrors.Errorf("failed to marshal protobuf: %w", err) } - w, err := c.Writer(ctx, websocket.MessageBinary) - if err != nil { - return err - } - - _, err = w.Write(b) - if err != nil { - return err - } - - err = w.Close() - if err != nil { - return err - } - return nil + return c.Write(ctx, websocket.MessageBinary, b) } From 47fbf33ee3cf3fb6607a6b9f5f29e3beb373fcfd Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Thu, 30 May 2019 03:16:16 -0400 Subject: [PATCH 10/18] Update README.md now that pings are supported --- README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/README.md b/README.md index 1b9af616..dad8ec67 100644 --- a/README.md +++ b/README.md @@ -86,8 +86,6 @@ c.Close(websocket.StatusNormalClosure, "") - A minimal API is easier to maintain due to less docs, tests and bugs - A minimal API is also easier to use and learn - Context based cancellation is more ergonomic and robust than setting deadlines -- No ping support because TCP keep alives work fine for HTTP/1.1 and they do not make - sense with HTTP/2 (see [#1](https://github.com/nhooyr/websocket/issues/1)) - net.Conn is never exposed as WebSocket over HTTP/2 will not have a net.Conn. - Using net/http's Client for dialing means we do not have to reinvent dialing hooks and configurations like other WebSocket libraries From c2118360b52dcff8456319ccf2d4ab4a9ae6cd7c Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Thu, 30 May 2019 13:35:20 -0400 Subject: [PATCH 11/18] Up test read limit --- websocket_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/websocket_test.go b/websocket_test.go index d982732a..1907d516 100644 --- a/websocket_test.go +++ b/websocket_test.go @@ -489,7 +489,7 @@ func TestAutobahnServer(t *testing.T) { func echoLoop(ctx context.Context, c *websocket.Conn) { defer c.Close(websocket.StatusInternalError, "") - c.SetReadLimit(1 << 30) + c.SetReadLimit(1 << 40) ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() From 8af7761a5958aa7b9a0cbeb211982ae1ca8113bc Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Thu, 30 May 2019 13:36:38 -0400 Subject: [PATCH 12/18] Mark Read and Write as experimental APIs --- websocket.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/websocket.go b/websocket.go index 00decaad..c4b4e017 100644 --- a/websocket.go +++ b/websocket.go @@ -464,6 +464,9 @@ func (c *Conn) writer(ctx context.Context, typ MessageType) (io.WriteCloser, err // Read is a convenience method to read a single message from the connection. // // See the Reader method if you want to be able to reuse buffers or want to stream a message. +// +// This is an experimental API, please let me know how you feel about it in +// https://github.com/nhooyr/websocket/issues/62 func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) { typ, r, err := c.Reader(ctx) if err != nil { @@ -481,6 +484,9 @@ func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) { // Write is a convenience method to write a message to the connection. // // See the Writer method if you want to stream a message. +// +// This is an experimental API, please let me know how you feel about it in +// https://github.com/nhooyr/websocket/issues/62 func (c *Conn) Write(ctx context.Context, typ MessageType, p []byte) error { return c.writeMessage(ctx, opcode(typ), p) } From 579ce18eaf231d872272e8b00f91a3fa4da61a65 Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Thu, 30 May 2019 13:45:12 -0400 Subject: [PATCH 13/18] Cleanup ping listener map --- websocket.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/websocket.go b/websocket.go index c4b4e017..7d33a36a 100644 --- a/websocket.go +++ b/websocket.go @@ -48,7 +48,8 @@ type Conn struct { setConnContext chan context.Context getConnContext chan context.Context - pingListener map[string]chan<- struct{} + pingListenerMu sync.Mutex + pingListener map[string]chan<- struct{} } // Context returns a context derived from parent that will be cancelled @@ -254,7 +255,9 @@ func (c *Conn) handleControl(h header) { case opPing: c.writePong(b) case opPong: + c.pingListenerMu.Lock() listener, ok := c.pingListener[string(b)] + c.pingListenerMu.Unlock() if ok { close(listener) } @@ -717,7 +720,16 @@ func (c *Conn) ping(ctx context.Context) error { p := strconv.FormatUint(id, 10) pong := make(chan struct{}) + + c.pingListenerMu.Lock() c.pingListener[p] = pong + c.pingListenerMu.Unlock() + + defer func() { + c.pingListenerMu.Lock() + delete(c.pingListener, p) + c.pingListenerMu.Unlock() + }() err := c.writeMessage(ctx, opPing, []byte(p)) if err != nil { From a97030c8a15d412139af28c1cae04cde4730225a Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Thu, 30 May 2019 14:16:32 -0400 Subject: [PATCH 14/18] Enable buffer reuse again for clients --- websocket.go | 102 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 64 insertions(+), 38 deletions(-) diff --git a/websocket.go b/websocket.go index 7d33a36a..6480aed6 100644 --- a/websocket.go +++ b/websocket.go @@ -38,10 +38,10 @@ type Conn struct { writeDataLock chan struct{} writeFrameLock chan struct{} - readDataLock chan struct{} - readData chan header - readDone chan struct{} - readLoopDone chan struct{} + readMsgLock chan struct{} + readMsg chan header + readMsgDone chan struct{} + readFrameLock chan struct{} setReadTimeout chan context.Context setWriteTimeout chan context.Context @@ -90,17 +90,15 @@ func (c *Conn) close(err error) { close(c.closed) + // This ensures every goroutine that interacts + // with the conn closes before it can interact with the connection + c.readFrameLock <- struct{}{} + c.writeFrameLock <- struct{}{} + // See comment in dial.go if c.client { - go func() { - <-c.readLoopDone - // TODO this does not work if reader errors out so skip for now. - // c.readDataLock <- struct{}{} - // c.writeFrameLock <- struct{}{} - // - // returnBufioReader(c.br) - // returnBufioWriter(c.bw) - }() + returnBufioReader(c.br) + returnBufioWriter(c.bw) } }) } @@ -119,10 +117,10 @@ func (c *Conn) init() { c.writeDataLock = make(chan struct{}, 1) c.writeFrameLock = make(chan struct{}, 1) - c.readData = make(chan header) - c.readDone = make(chan struct{}) - c.readDataLock = make(chan struct{}, 1) - c.readLoopDone = make(chan struct{}) + c.readMsg = make(chan header) + c.readMsgDone = make(chan struct{}) + c.readMsgLock = make(chan struct{}, 1) + c.readFrameLock = make(chan struct{}, 1) c.setReadTimeout = make(chan context.Context) c.setWriteTimeout = make(chan context.Context) @@ -141,8 +139,8 @@ func (c *Conn) init() { // We never mask inside here because our mask key is always 0,0,0,0. // See comment on secWebSocketKey. -func (c *Conn) writeFrame(ctx context.Context, h header, p []byte) error { - err := c.acquireLock(ctx, c.writeFrameLock) +func (c *Conn) writeFrame(ctx context.Context, h header, p []byte) (err error) { + err = c.acquireLock(ctx, c.writeFrameLock) if err != nil { return err } @@ -164,27 +162,33 @@ func (c *Conn) writeFrame(ctx context.Context, h header, p []byte) error { } }() + defer func() { + if err != nil { + // We need to always release the lock first before closing the connection to ensure + // the lock can be acquired inside close. + c.releaseLock(c.writeFrameLock) + c.close(err) + } + }() + h.masked = c.client h.payloadLength = int64(len(p)) b2 := marshalHeader(h) _, err = c.bw.Write(b2) if err != nil { - c.close(xerrors.Errorf("failed to write to connection: %w", err)) - return c.closeErr + return xerrors.Errorf("failed to write to connection: %w", err) } _, err = c.bw.Write(p) if err != nil { - c.close(xerrors.Errorf("failed to write to connection: %w", err)) - return c.closeErr + return xerrors.Errorf("failed to write to connection: %w", err) } if h.fin { err := c.bw.Flush() if err != nil { - c.close(xerrors.Errorf("failed to write to connection: %w", err)) - return c.closeErr + return xerrors.Errorf("failed to write to connection: %w", err) } } @@ -279,9 +283,9 @@ func (c *Conn) handleControl(h header) { func (c *Conn) readTillData() (header, error) { for { - h, err := readHeader(c.br) + h, err := c.readHeader() if err != nil { - return header{}, xerrors.Errorf("failed to read header: %w", err) + return header{}, err } if h.rsv1 || h.rsv2 || h.rsv3 { @@ -312,9 +316,22 @@ func (c *Conn) readTillData() (header, error) { } } -func (c *Conn) readLoop() { - defer close(c.readLoopDone) +func (c *Conn) readHeader() (header, error) { + err := c.acquireLock(context.Background(), c.readFrameLock) + if err != nil { + return header{}, err + } + defer c.releaseLock(c.readFrameLock) + h, err := readHeader(c.br) + if err != nil { + return header{}, xerrors.Errorf("failed to read header: %w", err) + } + + return h, nil +} + +func (c *Conn) readLoop() { for { h, err := c.readTillData() if err != nil { @@ -325,13 +342,13 @@ func (c *Conn) readLoop() { select { case <-c.closed: return - case c.readData <- h: + case c.readMsg <- h: } select { case <-c.closed: return - case <-c.readDone: + case <-c.readMsgDone: } } } @@ -374,7 +391,7 @@ func (c *Conn) exportedClose(code StatusCode, reason string) error { // Definitely worth seeing what popular browsers do later. p, err := ce.bytes() if err != nil { - fmt.Fprintf(os.Stderr, "failed to marshal close frame: %v\n", err) + fmt.Fprintf(os.Stderr, "websocket: failed to marshal close frame: %v\n", err) ce = CloseError{ Code: StatusInternalError, } @@ -415,7 +432,11 @@ func (c *Conn) acquireLock(ctx context.Context, lock chan struct{}) error { } func (c *Conn) releaseLock(lock chan struct{}) { - <-lock + // Allow multiple releases. + select { + case <-lock: + default: + } } func (c *Conn) writeMessage(ctx context.Context, opcode opcode, p []byte) error { @@ -572,7 +593,7 @@ func (c *Conn) Reader(ctx context.Context) (MessageType, io.Reader, error) { } func (c *Conn) reader(ctx context.Context) (_ MessageType, _ io.Reader, err error) { - err = c.acquireLock(ctx, c.readDataLock) + err = c.acquireLock(ctx, c.readMsgLock) if err != nil { return 0, nil, err } @@ -582,7 +603,7 @@ func (c *Conn) reader(ctx context.Context) (_ MessageType, _ io.Reader, err erro return 0, nil, c.closeErr case <-ctx.Done(): return 0, nil, ctx.Err() - case h := <-c.readData: + case h := <-c.readMsg: if h.opcode == opContinuation { ce := CloseError{ Code: StatusProtocolError, @@ -631,7 +652,7 @@ func (r *messageReader) read(p []byte) (int, error) { select { case <-r.c.closed: return 0, r.c.closeErr - case h := <-r.c.readData: + case h := <-r.c.readMsg: if h.opcode != opContinuation { ce := CloseError{ Code: StatusProtocolError, @@ -654,7 +675,12 @@ func (r *messageReader) read(p []byte) (int, error) { case r.c.setReadTimeout <- r.ctx: } + err := r.c.acquireLock(r.ctx, r.c.readFrameLock) + if err != nil { + return 0, err + } n, err := io.ReadFull(r.c.br, p) + r.c.releaseLock(r.c.readFrameLock) select { case <-r.c.closed: @@ -676,11 +702,11 @@ func (r *messageReader) read(p []byte) (int, error) { select { case <-r.c.closed: return n, r.c.closeErr - case r.c.readDone <- struct{}{}: + case r.c.readMsgDone <- struct{}{}: } if r.h.fin { r.eofed = true - r.c.releaseLock(r.c.readDataLock) + r.c.releaseLock(r.c.readMsgLock) return n, io.EOF } r.maskPos = 0 From b3e9764758a88a932356f2314d04fceebf349b5e Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Thu, 30 May 2019 14:25:09 -0400 Subject: [PATCH 15/18] Minor improvements --- websocket.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/websocket.go b/websocket.go index 6480aed6..db2e82e7 100644 --- a/websocket.go +++ b/websocket.go @@ -201,7 +201,7 @@ func (c *Conn) timeoutLoop() { parentCtx := context.Background() cancelCtx := func() {} defer func() { - // We do not defer cancelCtx because its value can change. + // We do not defer cancelCtx directly because its value may change. cancelCtx() }() @@ -224,9 +224,6 @@ func (c *Conn) timeoutLoop() { select { case <-c.closed: return - case <-parentCtx.Done(): - c.close(xerrors.Errorf("parent context cancelled: %w", parentCtx.Err())) - return case c.getConnContext <- ctx: } } From 3e9c7286cd364592e7a3bc64782a3d2f73aa35be Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Thu, 30 May 2019 15:00:48 -0400 Subject: [PATCH 16/18] Add ping test --- websocket_test.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/websocket_test.go b/websocket_test.go index 1907d516..f1905c30 100644 --- a/websocket_test.go +++ b/websocket_test.go @@ -376,6 +376,39 @@ func TestHandshake(t *testing.T) { return nil }, }, + { + name: "ping", + server: func(w http.ResponseWriter, r *http.Request) error { + c, err := websocket.Accept(w, r, websocket.AcceptOptions{}) + if err != nil { + return err + } + defer c.Close(websocket.StatusInternalError, "") + + err = c.Ping(r.Context()) + if err != nil { + return err + } + + c.Close(websocket.StatusNormalClosure, "") + return nil + }, + client: func(ctx context.Context, u string) error { + c, _, err := websocket.Dial(ctx, u, websocket.DialOptions{}) + if err != nil { + return err + } + defer c.Close(websocket.StatusInternalError, "") + + err = c.Ping(ctx) + if err != nil { + return err + } + + c.Close(websocket.StatusNormalClosure, "") + return nil + }, + }, } for _, tc := range testCases { From a696daf40933f24d65d20163e67ab2582e3ff421 Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Thu, 30 May 2019 15:09:58 -0400 Subject: [PATCH 17/18] Link to the bufio hijack net/http issue --- accept.go | 1 + 1 file changed, 1 insertion(+) diff --git a/accept.go b/accept.go index 17016d23..a80f70aa 100644 --- a/accept.go +++ b/accept.go @@ -131,6 +131,7 @@ func accept(w http.ResponseWriter, r *http.Request, opts AcceptOptions) (*Conn, return nil, err } + // https://github.com/golang/go/issues/32314 b, _ := brw.Reader.Peek(brw.Reader.Buffered()) brw.Reader.Reset(io.MultiReader(bytes.NewReader(b), netConn)) From 15b83658bccfd47f7aedf290f7d375eecae7b72f Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Fri, 31 May 2019 15:46:03 -0400 Subject: [PATCH 18/18] Justify lack of compression support Closes #5 --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index dad8ec67..4f184f4f 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,6 @@ go get nhooyr.io/websocket@v0.2.0 ## Roadmap - [ ] WebSockets over HTTP/2 [#4](https://github.com/nhooyr/websocket/issues/4) -- [ ] Deflate extension support [#5](https://github.com/nhooyr/websocket/issues/5) ## Examples @@ -89,6 +88,8 @@ c.Close(websocket.StatusNormalClosure, "") - net.Conn is never exposed as WebSocket over HTTP/2 will not have a net.Conn. - Using net/http's Client for dialing means we do not have to reinvent dialing hooks and configurations like other WebSocket libraries +- We do not support the compression extension because Go's compress/flate library is very memory intensive + and browsers do not handle WebSocket compression intelligently. See [#5](https://github.com/nhooyr/websocket/issues/5) ## Comparison