Skip to content

Commit

Permalink
fixes #96 NewConnPipe can wait indefinitely on peer
Browse files Browse the repository at this point in the history
  • Loading branch information
gdamore committed Aug 2, 2019
1 parent 12bbb93 commit 1c3fb54
Show file tree
Hide file tree
Showing 10 changed files with 389 additions and 118 deletions.
2 changes: 1 addition & 1 deletion test/survey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func surveyCases() []TestCase {
surv.MsgSize = 8
surv.WantTx = 1
surv.WantRx = int32(nresp)
surv.txdelay = time.Second / 7
surv.txdelay = time.Second / 5
surv.Synch = true
surv.NReply = int(nresp)
cases[0] = surv
Expand Down
105 changes: 98 additions & 7 deletions transport/conn.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018 The Mangos Authors
// Copyright 2019 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -118,8 +118,8 @@ func (p *conn) GetOption(n string) (interface{}, error) {
}

// NewConnPipe allocates a new Pipe using the supplied net.Conn, and
// initializes it. It performs the handshake required at the SP layer,
// only returning the Pipe once the SP layer negotiation is complete.
// initializes it. It performs no negotiation -- use a Handshaker to
// arrange for that.
//
// Stream oriented transports can utilize this to implement a Transport.
// The implementation will also need to implement PipeDialer, PipeAccepter,
Expand All @@ -141,10 +141,6 @@ func NewConnPipe(c net.Conn, proto ProtocolInfo, options map[string]interface{})
}
p.maxrx = p.options[mangos.OptionMaxRecvSize].(int)

if err := p.handshake(); err != nil {
return nil, err
}

return p, nil
}

Expand Down Expand Up @@ -191,3 +187,98 @@ func (p *conn) handshake() error {
p.open = true
return nil
}

type connHandshakerPipe interface {
handshake() error

Pipe
}

type connHandshakerItem struct {
c connHandshakerPipe
e error
}
type connHandshaker struct {
workq map[connHandshakerPipe]bool
doneq []*connHandshakerItem
closed bool
cv *sync.Cond
sync.Mutex
}

// NewConnHandshaker returns a Handshaker that works with
// Pipes created via NewConnPipe or NewConnPipeIPC.
func NewConnHandshaker() Handshaker {
h := &connHandshaker{
workq: make(map[connHandshakerPipe]bool),
closed: false,
}
h.cv = sync.NewCond(h)
return h
}

func (h *connHandshaker) Wait() (Pipe, error) {
h.Lock()
defer h.Unlock()
for len(h.doneq) == 0 && !h.closed {
h.cv.Wait()
}
if h.closed {
return nil, mangos.ErrClosed
}
item := h.doneq[0]
h.doneq = h.doneq[1:]
return item.c, item.e
}

func (h *connHandshaker) Start(p Pipe) error {
conn, ok := p.(connHandshakerPipe)
if !ok {
p.Close()
return mangos.ErrBadTran
}
h.Lock()
defer h.Unlock()
if h.closed {
p.Close()
return mangos.ErrClosed
}
h.workq[conn] = true
go h.worker(conn)
return nil
}

func (h *connHandshaker) Close() error {
h.Lock()
h.closed = true
h.cv.Broadcast()
for conn := range h.workq {
conn.Close()
}
for len(h.doneq) != 0 {
item := h.doneq[0]
h.doneq = h.doneq[1:]
item.c.Close()
}
return nil
}

func (h *connHandshaker) worker(conn connHandshakerPipe) {
item := &connHandshakerItem{c: conn}
item.e = conn.handshake()
h.Lock()
defer h.Unlock()

delete(h.workq, conn)

if item.e != nil {
item.c.Close()
item.c = nil
} else if h.closed {
item.e = mangos.ErrClosed
item.c.Close()
item.c = nil
}
h.doneq = append(h.doneq, item)
h.cv.Broadcast()
}
6 changes: 1 addition & 5 deletions transport/connipc_posix.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// +build !windows

// Copyright 2018 The Mangos Authors
// Copyright 2019 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -39,10 +39,6 @@ func NewConnPipeIPC(c net.Conn, proto ProtocolInfo, options map[string]interface
}
p.maxrx = p.options[mangos.OptionMaxRecvSize].(int)

if err := p.handshake(); err != nil {
return nil, err
}

return p, nil
}

Expand Down
6 changes: 1 addition & 5 deletions transport/connipc_windows.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// +build windows

// Copyright 2018 The Mangos Authors
// Copyright 2019 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -39,10 +39,6 @@ func NewConnPipeIPC(c net.Conn, proto ProtocolInfo, options map[string]interface
}
p.maxrx = p.options[mangos.OptionMaxRecvSize].(int)

if err := p.handshake(); err != nil {
return nil, err
}

return p, nil
}

Expand Down
37 changes: 37 additions & 0 deletions transport/handshaker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2019 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
// You may obtain a copy of the license at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package transport

// Handshaker is used to support dealing with asynchronous
// handshaking used for some transports. This allows the
// initial handshaking to be done in the background, without
// stalling the server's accept queue. This is important to
// ensure that a slow remote peer cannot bog down the server
// or effect a denial-of-service for new connections.
type Handshaker interface {
// Start injects a pipe into the handshaker. The
// handshaking is done asynchronously on a Go routine.
Start(Pipe) error

// Waits for until a pipe has completely finished the
// handshaking and returns it.
Wait() (Pipe, error)

// Close is used to close the handshaker. Any existing
// negotiations will be canceled, and the underlying
// transport sockets will be closed. Any new attempts
// to start will return mangos.ErrClosed.
Close() error
}
2 changes: 1 addition & 1 deletion transport/ipc/ipc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"nanomsg.org/go/mangos/v2/test"
)

var tt = test.NewTranTest(Transport, "ipc://test1234")
var tt = test.NewTranTest(Transport, "ipc:///tmp/test1234")

func TestIpcListenAndAccept(t *testing.T) {
switch runtime.GOOS {
Expand Down
72 changes: 55 additions & 17 deletions transport/ipc/ipc_unix.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// +build !windows,!nacl,!plan9

// Copyright 2018 The Mangos Authors
// Copyright 2019 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -64,9 +64,10 @@ func (o options) set(name string, val interface{}) error {
}

type dialer struct {
addr *net.UnixAddr
proto transport.ProtocolInfo
opts options
addr *net.UnixAddr
proto transport.ProtocolInfo
opts options
handshaker transport.Handshaker
}

// Dial implements the Dialer Dial method
Expand All @@ -76,7 +77,16 @@ func (d *dialer) Dial() (transport.Pipe, error) {
if err != nil {
return nil, err
}
return transport.NewConnPipeIPC(conn, d.proto, d.opts)
p, err := transport.NewConnPipeIPC(conn, d.proto, d.opts)
if err != nil {
conn.Close()
return nil, err
}
if err = d.handshaker.Start(p); err != nil {
conn.Close()
return nil, err
}
return d.handshaker.Wait()
}

// SetOption implements Dialer SetOption method.
Expand All @@ -90,10 +100,12 @@ func (d *dialer) GetOption(n string) (interface{}, error) {
}

type listener struct {
addr *net.UnixAddr
proto transport.ProtocolInfo
listener *net.UnixListener
opts options
addr *net.UnixAddr
proto transport.ProtocolInfo
listener *net.UnixListener
opts options
handshaker transport.Handshaker
closeq chan struct{}
}

// Listen implements the PipeListener Listen method.
Expand All @@ -102,7 +114,31 @@ func (l *listener) Listen() error {
if err != nil {
return err
}
closeq := make(chan struct{})
l.closeq = closeq
l.listener = listener
go func() {
for {
conn, err := l.listener.AcceptUnix()
if err != nil {
select {
case <-closeq:
return
default:
continue
}
}
p, err := transport.NewConnPipeIPC(conn, l.proto, l.opts)
if err != nil {
conn.Close()
continue
}
if err = l.handshaker.Start(p); err != nil {
conn.Close()
continue
}
}
}()
return nil
}

Expand All @@ -113,16 +149,15 @@ func (l *listener) Address() string {
// Accept implements the the PipeListener Accept method.
func (l *listener) Accept() (transport.Pipe, error) {

conn, err := l.listener.AcceptUnix()
if err != nil {
return nil, err
}
return transport.NewConnPipeIPC(conn, l.proto, l.opts)
return l.handshaker.Wait()
}

// Close implements the PipeListener Close method.
func (l *listener) Close() error {
l.listener.Close()
if l.listener != nil {
l.listener.Close()
}
l.handshaker.Close()
return nil
}

Expand Down Expand Up @@ -152,8 +187,9 @@ func (t ipcTran) NewDialer(addr string, sock mangos.Socket) (transport.Dialer, e
}

d := &dialer{
proto: sock.Info(),
opts: make(map[string]interface{}),
proto: sock.Info(),
opts: make(map[string]interface{}),
handshaker: transport.NewConnHandshaker(),
}
d.opts[mangos.OptionMaxRecvSize] = 0
if d.addr, err = net.ResolveUnixAddr("unix", addr); err != nil {
Expand All @@ -179,5 +215,7 @@ func (t ipcTran) NewListener(addr string, sock mangos.Socket) (transport.Listene
return nil, err
}

l.handshaker = transport.NewConnHandshaker()

return l, nil
}

0 comments on commit 1c3fb54

Please sign in to comment.