diff --git a/test/survey_test.go b/test/survey_test.go index 3241568..274298c 100644 --- a/test/survey_test.go +++ b/test/survey_test.go @@ -124,7 +124,7 @@ func surveyCases() []TestCase { surv.MsgSize = 8 surv.WantTx = 1 surv.WantRx = int32(nresp) - surv.txdelay = time.Second / 7 + surv.txdelay = time.Second / 5 surv.Synch = true surv.NReply = int(nresp) cases[0] = surv diff --git a/transport/conn.go b/transport/conn.go index 742eeaa..3821ad8 100644 --- a/transport/conn.go +++ b/transport/conn.go @@ -1,4 +1,4 @@ -// Copyright 2018 The Mangos Authors +// Copyright 2019 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -118,8 +118,8 @@ func (p *conn) GetOption(n string) (interface{}, error) { } // NewConnPipe allocates a new Pipe using the supplied net.Conn, and -// initializes it. It performs the handshake required at the SP layer, -// only returning the Pipe once the SP layer negotiation is complete. +// initializes it. It performs no negotiation -- use a Handshaker to +// arrange for that. // // Stream oriented transports can utilize this to implement a Transport. // The implementation will also need to implement PipeDialer, PipeAccepter, @@ -141,10 +141,6 @@ func NewConnPipe(c net.Conn, proto ProtocolInfo, options map[string]interface{}) } p.maxrx = p.options[mangos.OptionMaxRecvSize].(int) - if err := p.handshake(); err != nil { - return nil, err - } - return p, nil } @@ -191,3 +187,98 @@ func (p *conn) handshake() error { p.open = true return nil } + +type connHandshakerPipe interface { + handshake() error + + Pipe +} + +type connHandshakerItem struct { + c connHandshakerPipe + e error +} +type connHandshaker struct { + workq map[connHandshakerPipe]bool + doneq []*connHandshakerItem + closed bool + cv *sync.Cond + sync.Mutex +} + +// NewConnHandshaker returns a Handshaker that works with +// Pipes created via NewConnPipe or NewConnPipeIPC. +func NewConnHandshaker() Handshaker { + h := &connHandshaker{ + workq: make(map[connHandshakerPipe]bool), + closed: false, + } + h.cv = sync.NewCond(h) + return h +} + +func (h *connHandshaker) Wait() (Pipe, error) { + h.Lock() + defer h.Unlock() + for len(h.doneq) == 0 && !h.closed { + h.cv.Wait() + } + if h.closed { + return nil, mangos.ErrClosed + } + item := h.doneq[0] + h.doneq = h.doneq[1:] + return item.c, item.e +} + +func (h *connHandshaker) Start(p Pipe) error { + conn, ok := p.(connHandshakerPipe) + if !ok { + p.Close() + return mangos.ErrBadTran + } + h.Lock() + defer h.Unlock() + if h.closed { + p.Close() + return mangos.ErrClosed + } + h.workq[conn] = true + go h.worker(conn) + return nil +} + +func (h *connHandshaker) Close() error { + h.Lock() + h.closed = true + h.cv.Broadcast() + for conn := range h.workq { + conn.Close() + } + for len(h.doneq) != 0 { + item := h.doneq[0] + h.doneq = h.doneq[1:] + item.c.Close() + } + return nil +} + +func (h *connHandshaker) worker(conn connHandshakerPipe) { + item := &connHandshakerItem{c: conn} + item.e = conn.handshake() + h.Lock() + defer h.Unlock() + + delete(h.workq, conn) + + if item.e != nil { + item.c.Close() + item.c = nil + } else if h.closed { + item.e = mangos.ErrClosed + item.c.Close() + item.c = nil + } + h.doneq = append(h.doneq, item) + h.cv.Broadcast() +} diff --git a/transport/connipc_posix.go b/transport/connipc_posix.go index bcbbb58..a881c13 100644 --- a/transport/connipc_posix.go +++ b/transport/connipc_posix.go @@ -1,6 +1,6 @@ // +build !windows -// Copyright 2018 The Mangos Authors +// Copyright 2019 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -39,10 +39,6 @@ func NewConnPipeIPC(c net.Conn, proto ProtocolInfo, options map[string]interface } p.maxrx = p.options[mangos.OptionMaxRecvSize].(int) - if err := p.handshake(); err != nil { - return nil, err - } - return p, nil } diff --git a/transport/connipc_windows.go b/transport/connipc_windows.go index 299a2a1..304ed1f 100644 --- a/transport/connipc_windows.go +++ b/transport/connipc_windows.go @@ -1,6 +1,6 @@ // +build windows -// Copyright 2018 The Mangos Authors +// Copyright 2019 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -39,10 +39,6 @@ func NewConnPipeIPC(c net.Conn, proto ProtocolInfo, options map[string]interface } p.maxrx = p.options[mangos.OptionMaxRecvSize].(int) - if err := p.handshake(); err != nil { - return nil, err - } - return p, nil } diff --git a/transport/handshaker.go b/transport/handshaker.go new file mode 100644 index 0000000..9cc37d7 --- /dev/null +++ b/transport/handshaker.go @@ -0,0 +1,37 @@ +// Copyright 2019 The Mangos Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use file except in compliance with the License. +// You may obtain a copy of the license at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transport + +// Handshaker is used to support dealing with asynchronous +// handshaking used for some transports. This allows the +// initial handshaking to be done in the background, without +// stalling the server's accept queue. This is important to +// ensure that a slow remote peer cannot bog down the server +// or effect a denial-of-service for new connections. +type Handshaker interface { + // Start injects a pipe into the handshaker. The + // handshaking is done asynchronously on a Go routine. + Start(Pipe) error + + // Waits for until a pipe has completely finished the + // handshaking and returns it. + Wait() (Pipe, error) + + // Close is used to close the handshaker. Any existing + // negotiations will be canceled, and the underlying + // transport sockets will be closed. Any new attempts + // to start will return mangos.ErrClosed. + Close() error +} diff --git a/transport/ipc/ipc_test.go b/transport/ipc/ipc_test.go index c9cf621..791a0ca 100644 --- a/transport/ipc/ipc_test.go +++ b/transport/ipc/ipc_test.go @@ -21,7 +21,7 @@ import ( "nanomsg.org/go/mangos/v2/test" ) -var tt = test.NewTranTest(Transport, "ipc://test1234") +var tt = test.NewTranTest(Transport, "ipc:///tmp/test1234") func TestIpcListenAndAccept(t *testing.T) { switch runtime.GOOS { diff --git a/transport/ipc/ipc_unix.go b/transport/ipc/ipc_unix.go index 8edf2e4..1758e03 100644 --- a/transport/ipc/ipc_unix.go +++ b/transport/ipc/ipc_unix.go @@ -1,6 +1,6 @@ // +build !windows,!nacl,!plan9 -// Copyright 2018 The Mangos Authors +// Copyright 2019 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -64,9 +64,10 @@ func (o options) set(name string, val interface{}) error { } type dialer struct { - addr *net.UnixAddr - proto transport.ProtocolInfo - opts options + addr *net.UnixAddr + proto transport.ProtocolInfo + opts options + handshaker transport.Handshaker } // Dial implements the Dialer Dial method @@ -76,7 +77,16 @@ func (d *dialer) Dial() (transport.Pipe, error) { if err != nil { return nil, err } - return transport.NewConnPipeIPC(conn, d.proto, d.opts) + p, err := transport.NewConnPipeIPC(conn, d.proto, d.opts) + if err != nil { + conn.Close() + return nil, err + } + if err = d.handshaker.Start(p); err != nil { + conn.Close() + return nil, err + } + return d.handshaker.Wait() } // SetOption implements Dialer SetOption method. @@ -90,10 +100,12 @@ func (d *dialer) GetOption(n string) (interface{}, error) { } type listener struct { - addr *net.UnixAddr - proto transport.ProtocolInfo - listener *net.UnixListener - opts options + addr *net.UnixAddr + proto transport.ProtocolInfo + listener *net.UnixListener + opts options + handshaker transport.Handshaker + closeq chan struct{} } // Listen implements the PipeListener Listen method. @@ -102,7 +114,31 @@ func (l *listener) Listen() error { if err != nil { return err } + closeq := make(chan struct{}) + l.closeq = closeq l.listener = listener + go func() { + for { + conn, err := l.listener.AcceptUnix() + if err != nil { + select { + case <-closeq: + return + default: + continue + } + } + p, err := transport.NewConnPipeIPC(conn, l.proto, l.opts) + if err != nil { + conn.Close() + continue + } + if err = l.handshaker.Start(p); err != nil { + conn.Close() + continue + } + } + }() return nil } @@ -113,16 +149,15 @@ func (l *listener) Address() string { // Accept implements the the PipeListener Accept method. func (l *listener) Accept() (transport.Pipe, error) { - conn, err := l.listener.AcceptUnix() - if err != nil { - return nil, err - } - return transport.NewConnPipeIPC(conn, l.proto, l.opts) + return l.handshaker.Wait() } // Close implements the PipeListener Close method. func (l *listener) Close() error { - l.listener.Close() + if l.listener != nil { + l.listener.Close() + } + l.handshaker.Close() return nil } @@ -152,8 +187,9 @@ func (t ipcTran) NewDialer(addr string, sock mangos.Socket) (transport.Dialer, e } d := &dialer{ - proto: sock.Info(), - opts: make(map[string]interface{}), + proto: sock.Info(), + opts: make(map[string]interface{}), + handshaker: transport.NewConnHandshaker(), } d.opts[mangos.OptionMaxRecvSize] = 0 if d.addr, err = net.ResolveUnixAddr("unix", addr); err != nil { @@ -179,5 +215,7 @@ func (t ipcTran) NewListener(addr string, sock mangos.Socket) (transport.Listene return nil, err } + l.handshaker = transport.NewConnHandshaker() + return l, nil } diff --git a/transport/ipc/ipc_windows.go b/transport/ipc/ipc_windows.go index c69016d..3c0ad57 100644 --- a/transport/ipc/ipc_windows.go +++ b/transport/ipc/ipc_windows.go @@ -1,6 +1,6 @@ // +build windows -// Copyright 2018 The Mangos Authors +// Copyright 2019 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -55,9 +55,10 @@ const ( ) type dialer struct { - path string - proto transport.ProtocolInfo - opts map[string]interface{} + path string + proto transport.ProtocolInfo + opts map[string]interface{} + handshaker transport.Handshaker } // Dial implements the PipeDialer Dial method. @@ -67,7 +68,16 @@ func (d *dialer) Dial() (transport.Pipe, error) { if err != nil { return nil, err } - return transport.NewConnPipeIPC(conn, d.proto, d.opts) + p, err := transport.NewConnPipeIPC(conn, d.proto, d.opts) + if err != nil { + conn.Close() + return nil, err + } + if err = d.handshaker.Start(p); err != nil { + conn.Close() + return nil, err + } + return d.handshaker.Wait() } // SetOption implements a stub PipeDialer SetOption method. @@ -81,10 +91,12 @@ func (d *dialer) GetOption(n string) (interface{}, error) { } type listener struct { - path string - proto transport.ProtocolInfo - listener net.Listener - opts map[string]interface{} + path string + proto transport.ProtocolInfo + listener net.Listener + opts map[string]interface{} + handshaker transport.Handshaker + closeq chan struct{} } // Listen implements the PipeListener Listen method. @@ -97,11 +109,35 @@ func (l *listener) Listen() error { MessageMode: false, } + closeq := make(chan struct{}) listener, err := winio.ListenPipe("\\\\.\\pipe\\"+l.path, config) if err != nil { return err } l.listener = listener + l.closeq = closeq + go func() { + for { + conn, err := l.listener.Accept() + if err != nil { + select { + case <-closeq: + return + default: + continue + } + } + p, err := transport.NewConnPipeIPC(conn, l.proto, l.opts) + if err != nil { + conn.Close() + continue + } + if err = l.handshaker.Start(p); err != nil { + conn.Close() + continue + } + } + }() return nil } @@ -112,11 +148,7 @@ func (l *listener) Address() string { // Accept implements the the PipeListener Accept method. func (l *listener) Accept() (mangos.TranPipe, error) { - conn, err := l.listener.Accept() - if err != nil { - return nil, err - } - return transport.NewConnPipeIPC(conn, l.proto, l.opts) + return l.handshaker.Wait() } // Close implements the PipeListener Close method. @@ -124,6 +156,7 @@ func (l *listener) Close() error { if l.listener != nil { l.listener.Close() } + l.handshaker.Close() return nil } @@ -181,9 +214,10 @@ func (t ipcTran) NewDialer(address string, sock mangos.Socket) (mangos.TranDiale } d := &dialer{ - proto: sock.Info(), - path: address, - opts: make(map[string]interface{}), + proto: sock.Info(), + path: address, + opts: make(map[string]interface{}), + handshaker: transport.NewConnHandshaker(), } d.opts[mangos.OptionMaxRecvSize] = 0 @@ -200,9 +234,10 @@ func (t ipcTran) NewListener(address string, sock mangos.Socket) (transport.List } l := &listener{ - proto: sock.Info(), - path: address, - opts: make(map[string]interface{}), + proto: sock.Info(), + path: address, + opts: make(map[string]interface{}), + handshaker: transport.NewConnHandshaker(), } l.opts[OptionInputBufferSize] = int32(4096) diff --git a/transport/tcp/tcp.go b/transport/tcp/tcp.go index 31210f7..98cfc56 100644 --- a/transport/tcp/tcp.go +++ b/transport/tcp/tcp.go @@ -1,4 +1,4 @@ -// Copyright 2018 The Mangos Authors +// Copyright 2019 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -102,9 +102,10 @@ func (o options) configTCP(conn *net.TCPConn) error { } type dialer struct { - addr string - proto transport.ProtocolInfo - opts options + addr string + proto transport.ProtocolInfo + opts options + handshaker transport.Handshaker } func (d *dialer) Dial() (_ transport.Pipe, err error) { @@ -125,7 +126,16 @@ func (d *dialer) Dial() (_ transport.Pipe, err error) { return nil, err } - return transport.NewConnPipe(conn, d.proto, d.opts) + p, err := transport.NewConnPipe(conn, d.proto, d.opts) + if err != nil { + conn.Close() + return nil, err + } + if err = d.handshaker.Start(p); err != nil { + conn.Close() + return nil, err + } + return d.handshaker.Wait() } func (d *dialer) SetOption(n string, v interface{}) error { @@ -137,11 +147,13 @@ func (d *dialer) GetOption(n string) (interface{}, error) { } type listener struct { - addr *net.TCPAddr - bound net.Addr - proto transport.ProtocolInfo - listener *net.TCPListener - opts options + addr *net.TCPAddr + bound net.Addr + proto transport.ProtocolInfo + listener *net.TCPListener + opts options + handshaker transport.Handshaker + closeq chan struct{} } func (l *listener) Accept() (transport.Pipe, error) { @@ -149,22 +161,43 @@ func (l *listener) Accept() (transport.Pipe, error) { if l.listener == nil { return nil, mangos.ErrClosed } - conn, err := l.listener.AcceptTCP() - if err != nil { - return nil, err - } - if err = l.opts.configTCP(conn); err != nil { - conn.Close() - return nil, err - } - return transport.NewConnPipe(conn, l.proto, l.opts) + return l.handshaker.Wait() } func (l *listener) Listen() (err error) { l.listener, err = net.ListenTCP("tcp", l.addr) - if err == nil { - l.bound = l.listener.Addr() + if err != nil { + return } + closeq := make(chan struct{}) + l.closeq = closeq + l.bound = l.listener.Addr() + go func() { + for { + conn, err := l.listener.AcceptTCP() + if err != nil { + select { + case <-closeq: + return + default: + continue + } + } + if err = l.opts.configTCP(conn); err != nil { + conn.Close() + continue + } + p, err := transport.NewConnPipe(conn, l.proto, l.opts) + if err != nil { + conn.Close() + continue + } + if err = l.handshaker.Start(p); err != nil { + conn.Close() + continue + } + } + }() return } @@ -176,7 +209,11 @@ func (l *listener) Address() string { } func (l *listener) Close() error { - l.listener.Close() + if l.listener != nil { + close(l.closeq) + l.listener.Close() + } + l.handshaker.Close() return nil } @@ -205,7 +242,11 @@ func (t tcpTran) NewDialer(addr string, sock mangos.Socket) (transport.Dialer, e return nil, err } - d := &dialer{addr: addr, proto: sock.Info(), opts: newOptions()} + d := &dialer{addr: addr, + proto: sock.Info(), + opts: newOptions(), + handshaker: transport.NewConnHandshaker(), + } return d, nil } @@ -222,5 +263,6 @@ func (t tcpTran) NewListener(addr string, sock mangos.Socket) (transport.Listene return nil, err } + l.handshaker = transport.NewConnHandshaker() return l, nil } diff --git a/transport/tlstcp/tlstcp.go b/transport/tlstcp/tlstcp.go index c9f360d..3cdb34e 100644 --- a/transport/tlstcp/tlstcp.go +++ b/transport/tlstcp/tlstcp.go @@ -1,4 +1,4 @@ -// Copyright 2018 The Mangos Authors +// Copyright 2019 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -105,9 +105,10 @@ func newOptions(t tlsTran) options { } type dialer struct { - addr string - proto transport.ProtocolInfo - opts options + addr string + proto transport.ProtocolInfo + opts options + handshaker transport.Handshaker } func (d *dialer) Dial() (transport.Pipe, error) { @@ -142,7 +143,16 @@ func (d *dialer) Dial() (transport.Pipe, error) { opts[n] = v } opts[mangos.OptionTLSConnState] = conn.ConnectionState() - return transport.NewConnPipe(conn, d.proto, opts) + p, err := transport.NewConnPipe(conn, d.proto, opts) + if err != nil { + conn.Close() + return nil, err + } + if err = d.handshaker.Start(p); err != nil { + conn.Close() + return nil, err + } + return d.handshaker.Wait() } func (d *dialer) SetOption(n string, v interface{}) error { @@ -154,12 +164,14 @@ func (d *dialer) GetOption(n string) (interface{}, error) { } type listener struct { - addr *net.TCPAddr - bound net.Addr - listener *net.TCPListener - proto transport.ProtocolInfo - opts options - config *tls.Config + addr *net.TCPAddr + bound net.Addr + listener *net.TCPListener + proto transport.ProtocolInfo + opts options + config *tls.Config + handshaker transport.Handshaker + closeq chan struct{} } func (l *listener) Listen() error { @@ -176,12 +188,53 @@ func (l *listener) Listen() error { return mangos.ErrTLSNoCert } + closeq := make(chan struct{}) if l.listener, err = net.ListenTCP("tcp", l.addr); err != nil { return err } - + l.closeq = closeq l.bound = l.listener.Addr() + go func() { + for { + tconn, err := l.listener.AcceptTCP() + if err != nil { + select { + case <-closeq: + return + default: + continue + } + } + + if err = l.opts.configTCP(tconn); err != nil { + tconn.Close() + continue + } + + conn := tls.Server(tconn, l.config) + if err = conn.Handshake(); err != nil { + conn.Close() + continue + } + opts := make(map[string]interface{}) + for n, v := range l.opts { + opts[n] = v + } + opts[mangos.OptionTLSConnState] = conn.ConnectionState() + p, err := transport.NewConnPipe(conn, l.proto, opts) + if err != nil { + conn.Close() + continue + } + + if err = l.handshaker.Start(p); err != nil { + conn.Close() + continue + } + } + }() + return nil } @@ -193,32 +246,14 @@ func (l *listener) Address() string { } func (l *listener) Accept() (transport.Pipe, error) { - - tconn, err := l.listener.AcceptTCP() - if err != nil { - return nil, err - } - - if err = l.opts.configTCP(tconn); err != nil { - tconn.Close() - return nil, err - } - - conn := tls.Server(tconn, l.config) - if err = conn.Handshake(); err != nil { - conn.Close() - return nil, err - } - opts := make(map[string]interface{}) - for n, v := range l.opts { - opts[n] = v - } - opts[mangos.OptionTLSConnState] = conn.ConnectionState() - return transport.NewConnPipe(conn, l.proto, opts) + return l.handshaker.Wait() } func (l *listener) Close() error { - l.listener.Close() + if l.listener != nil { + l.listener.Close() + } + l.handshaker.Close() return nil } @@ -249,11 +284,11 @@ func (t tlsTran) NewDialer(addr string, sock mangos.Socket) (transport.Dialer, e } d := &dialer{ - proto: sock.Info(), - opts: newOptions(t), - addr: addr, + proto: sock.Info(), + opts: newOptions(t), + addr: addr, + handshaker: transport.NewConnHandshaker(), } - return d, nil } @@ -271,6 +306,7 @@ func (t tlsTran) NewListener(addr string, sock mangos.Socket) (transport.Listene if l.addr, err = transport.ResolveTCPAddr(addr); err != nil { return nil, err } + l.handshaker = transport.NewConnHandshaker() return l, nil }