Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes #96 NewConnPipe can wait indefinitely on peer #102

Merged
merged 1 commit into from
Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion test/survey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func surveyCases() []TestCase {
surv.MsgSize = 8
surv.WantTx = 1
surv.WantRx = int32(nresp)
surv.txdelay = time.Second / 7
surv.txdelay = time.Second / 5
surv.Synch = true
surv.NReply = int(nresp)
cases[0] = surv
Expand Down
105 changes: 98 additions & 7 deletions transport/conn.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018 The Mangos Authors
// Copyright 2019 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -118,8 +118,8 @@ func (p *conn) GetOption(n string) (interface{}, error) {
}

// NewConnPipe allocates a new Pipe using the supplied net.Conn, and
// initializes it. It performs the handshake required at the SP layer,
// only returning the Pipe once the SP layer negotiation is complete.
// initializes it. It performs no negotiation -- use a Handshaker to
// arrange for that.
//
// Stream oriented transports can utilize this to implement a Transport.
// The implementation will also need to implement PipeDialer, PipeAccepter,
Expand All @@ -141,10 +141,6 @@ func NewConnPipe(c net.Conn, proto ProtocolInfo, options map[string]interface{})
}
p.maxrx = p.options[mangos.OptionMaxRecvSize].(int)

if err := p.handshake(); err != nil {
return nil, err
}

return p, nil
}

Expand Down Expand Up @@ -191,3 +187,98 @@ func (p *conn) handshake() error {
p.open = true
return nil
}

type connHandshakerPipe interface {
handshake() error

Pipe
}

type connHandshakerItem struct {
c connHandshakerPipe
e error
}
type connHandshaker struct {
workq map[connHandshakerPipe]bool
doneq []*connHandshakerItem
closed bool
cv *sync.Cond
sync.Mutex
}

// NewConnHandshaker returns a Handshaker that works with
// Pipes created via NewConnPipe or NewConnPipeIPC.
func NewConnHandshaker() Handshaker {
h := &connHandshaker{
workq: make(map[connHandshakerPipe]bool),
closed: false,
}
h.cv = sync.NewCond(h)
return h
}

func (h *connHandshaker) Wait() (Pipe, error) {
h.Lock()
defer h.Unlock()
for len(h.doneq) == 0 && !h.closed {
h.cv.Wait()
}
if h.closed {
return nil, mangos.ErrClosed
}
item := h.doneq[0]
h.doneq = h.doneq[1:]
return item.c, item.e
}

func (h *connHandshaker) Start(p Pipe) error {
conn, ok := p.(connHandshakerPipe)
if !ok {
p.Close()
return mangos.ErrBadTran
}
h.Lock()
defer h.Unlock()
if h.closed {
p.Close()
return mangos.ErrClosed
}
h.workq[conn] = true
go h.worker(conn)
return nil
}

func (h *connHandshaker) Close() error {
h.Lock()
h.closed = true
h.cv.Broadcast()
for conn := range h.workq {
conn.Close()
}
for len(h.doneq) != 0 {
item := h.doneq[0]
h.doneq = h.doneq[1:]
item.c.Close()
}
return nil
}

func (h *connHandshaker) worker(conn connHandshakerPipe) {
item := &connHandshakerItem{c: conn}
item.e = conn.handshake()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the conn.handshake() never returns due to a bad connection (as in #96), the worker() goroutine would leak.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handshake should fail eventually. Arguably this might be a case where the timeouts would be useful ... to detect a stuck TCP session. That said, normally that shouldn't be an issue. Typically the kernel will eventually detect that the TCP peer is dead.

Having said that, I'd be willing to set some kind of reasonable timeout on the handshake in particular. That's probably a different case. (Probably the timeout should be a small number of seconds -- under normal circumstances the handshake should complete within a few round trip times.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to handle setting the timeout for this (to close the potential leak of the file and associated go routine) separately. That "leak" (I'd argue its not really a leak, as the peer could actually wake up and complete the operation some long time - days even! - later) is a far less bad situation than we have where a the bad peer would actually gum up everything immediately, which this PR fixes.

h.Lock()
defer h.Unlock()

delete(h.workq, conn)

if item.e != nil {
item.c.Close()
item.c = nil
} else if h.closed {
item.e = mangos.ErrClosed
item.c.Close()
item.c = nil
}
h.doneq = append(h.doneq, item)
h.cv.Broadcast()
}
6 changes: 1 addition & 5 deletions transport/connipc_posix.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// +build !windows

// Copyright 2018 The Mangos Authors
// Copyright 2019 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -39,10 +39,6 @@ func NewConnPipeIPC(c net.Conn, proto ProtocolInfo, options map[string]interface
}
p.maxrx = p.options[mangos.OptionMaxRecvSize].(int)

if err := p.handshake(); err != nil {
return nil, err
}

return p, nil
}

Expand Down
6 changes: 1 addition & 5 deletions transport/connipc_windows.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// +build windows

// Copyright 2018 The Mangos Authors
// Copyright 2019 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -39,10 +39,6 @@ func NewConnPipeIPC(c net.Conn, proto ProtocolInfo, options map[string]interface
}
p.maxrx = p.options[mangos.OptionMaxRecvSize].(int)

if err := p.handshake(); err != nil {
return nil, err
}

return p, nil
}

Expand Down
37 changes: 37 additions & 0 deletions transport/handshaker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2019 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
// You may obtain a copy of the license at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package transport

// Handshaker is used to support dealing with asynchronous
// handshaking used for some transports. This allows the
// initial handshaking to be done in the background, without
// stalling the server's accept queue. This is important to
// ensure that a slow remote peer cannot bog down the server
// or effect a denial-of-service for new connections.
type Handshaker interface {
// Start injects a pipe into the handshaker. The
// handshaking is done asynchronously on a Go routine.
Start(Pipe) error

// Waits for until a pipe has completely finished the
// handshaking and returns it.
Wait() (Pipe, error)

// Close is used to close the handshaker. Any existing
// negotiations will be canceled, and the underlying
// transport sockets will be closed. Any new attempts
// to start will return mangos.ErrClosed.
Close() error
}
2 changes: 1 addition & 1 deletion transport/ipc/ipc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"nanomsg.org/go/mangos/v2/test"
)

var tt = test.NewTranTest(Transport, "ipc://test1234")
var tt = test.NewTranTest(Transport, "ipc:///tmp/test1234")

func TestIpcListenAndAccept(t *testing.T) {
switch runtime.GOOS {
Expand Down
72 changes: 55 additions & 17 deletions transport/ipc/ipc_unix.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// +build !windows,!nacl,!plan9

// Copyright 2018 The Mangos Authors
// Copyright 2019 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -64,9 +64,10 @@ func (o options) set(name string, val interface{}) error {
}

type dialer struct {
addr *net.UnixAddr
proto transport.ProtocolInfo
opts options
addr *net.UnixAddr
proto transport.ProtocolInfo
opts options
handshaker transport.Handshaker
}

// Dial implements the Dialer Dial method
Expand All @@ -76,7 +77,16 @@ func (d *dialer) Dial() (transport.Pipe, error) {
if err != nil {
return nil, err
}
return transport.NewConnPipeIPC(conn, d.proto, d.opts)
p, err := transport.NewConnPipeIPC(conn, d.proto, d.opts)
if err != nil {
conn.Close()
return nil, err
}
if err = d.handshaker.Start(p); err != nil {
conn.Close()
return nil, err
}
return d.handshaker.Wait()
}

// SetOption implements Dialer SetOption method.
Expand All @@ -90,10 +100,12 @@ func (d *dialer) GetOption(n string) (interface{}, error) {
}

type listener struct {
addr *net.UnixAddr
proto transport.ProtocolInfo
listener *net.UnixListener
opts options
addr *net.UnixAddr
proto transport.ProtocolInfo
listener *net.UnixListener
opts options
handshaker transport.Handshaker
closeq chan struct{}
}

// Listen implements the PipeListener Listen method.
Expand All @@ -102,7 +114,31 @@ func (l *listener) Listen() error {
if err != nil {
return err
}
closeq := make(chan struct{})
l.closeq = closeq
l.listener = listener
go func() {
for {
conn, err := l.listener.AcceptUnix()
if err != nil {
select {
case <-closeq:
return
default:
continue
}
}
p, err := transport.NewConnPipeIPC(conn, l.proto, l.opts)
if err != nil {
conn.Close()
continue
}
if err = l.handshaker.Start(p); err != nil {
conn.Close()
continue
}
}
}()
return nil
}

Expand All @@ -113,16 +149,15 @@ func (l *listener) Address() string {
// Accept implements the the PipeListener Accept method.
func (l *listener) Accept() (transport.Pipe, error) {

conn, err := l.listener.AcceptUnix()
if err != nil {
return nil, err
}
return transport.NewConnPipeIPC(conn, l.proto, l.opts)
return l.handshaker.Wait()
}

// Close implements the PipeListener Close method.
func (l *listener) Close() error {
l.listener.Close()
if l.listener != nil {
l.listener.Close()
}
l.handshaker.Close()
return nil
}

Expand Down Expand Up @@ -152,8 +187,9 @@ func (t ipcTran) NewDialer(addr string, sock mangos.Socket) (transport.Dialer, e
}

d := &dialer{
proto: sock.Info(),
opts: make(map[string]interface{}),
proto: sock.Info(),
opts: make(map[string]interface{}),
handshaker: transport.NewConnHandshaker(),
}
d.opts[mangos.OptionMaxRecvSize] = 0
if d.addr, err = net.ResolveUnixAddr("unix", addr); err != nil {
Expand All @@ -179,5 +215,7 @@ func (t ipcTran) NewListener(addr string, sock mangos.Socket) (transport.Listene
return nil, err
}

l.handshaker = transport.NewConnHandshaker()

return l, nil
}