diff --git a/compat/compat.go b/compat/compat.go index d4e7202..11d3e11 100644 --- a/compat/compat.go +++ b/compat/compat.go @@ -157,6 +157,12 @@ func NewSocket(d Domain, p Protocol) (*Socket, error) { return nil, err } + // Compat mode sockets should timeout on send if we don't have any pipes + if err = s.sock.SetOption(mangos.OptionWriteQLen, 0); err != nil { + s.sock.Close() + return nil, err + } + s.rto = -1 s.sto = -1 all.AddTransports(s.sock) @@ -185,7 +191,11 @@ func (s *Socket) Bind(addr string) (*Endpoint, error) { // to a remote peer. The client will attempt to keep reconnecting. // This wraps around mangos' Dial() socket inteface. func (s *Socket) Connect(addr string) (*Endpoint, error) { - if err := s.sock.Dial(addr); err != nil { + d, err := s.sock.NewDialer(addr, nil) + if err != nil { + return nil, err + } + if err := d.Dial(); err != nil { return nil, err } return &Endpoint{Address: addr}, nil diff --git a/compat/compat_test.go b/compat/compat_test.go index b595cbd..8e4aa4b 100644 --- a/compat/compat_test.go +++ b/compat/compat_test.go @@ -234,7 +234,7 @@ func TestCompatInp(t *testing.T) { // timeout when no transport was present. But that semantic is diametrically // opposed with guaranteed delivery. The only time we block or timeout is // if there is backpressure. -func xTestCompatSendTimeout(t *testing.T) { +func TestCompatSendTimeout(t *testing.T) { addr := "tcp://127.0.0.1:19" push, err := NewSocket(AF_SP, PUSH) diff --git a/core.go b/core.go index d7d810a..f20b173 100644 --- a/core.go +++ b/core.go @@ -41,6 +41,7 @@ type socket struct { rdeadline time.Duration wdeadline time.Duration reconntime time.Duration // reconnect time after error or disconnect + reconnmax time.Duration // max reconnect interval linger time.Duration pipes []*pipe @@ -106,7 +107,8 @@ func newSocket(proto Protocol) *socket { sock.uwq = make(chan *Message, sock.uwqLen) sock.urq = make(chan *Message, sock.urqLen) sock.closeq = make(chan struct{}) - sock.reconntime = time.Second * 1 // make it a tunable? + sock.reconntime = time.Millisecond * 100 // make it a tunable? + sock.reconnmax = time.Minute sock.proto = proto sock.transports = make(map[string]Transport) sock.linger = time.Second @@ -501,9 +503,13 @@ func (d *dialer) Address() string { // dialer is used to dial or redial from a goroutine. func (d *dialer) dialer() { + rtime := d.sock.reconntime + rtmax := d.sock.reconnmax for { p, err := d.d.Dial() if err == nil { + // reset retry time + rtime = d.sock.reconntime d.sock.Lock() if d.closed { p.Close() @@ -525,7 +531,11 @@ func (d *dialer) dialer() { return case <-d.sock.closeq: // exit if parent socket closed return - case <-time.After(d.sock.reconntime): + case <-time.After(rtime): + rtime *= 2 + if rtime > rtmax { + rtime = rtmax + } continue } }