Skip to content
This repository has been archived by the owner on Dec 21, 2019. It is now read-only.

Commit

Permalink
fixes #86 Reconnect interval too long, needs backoff handling
Browse files Browse the repository at this point in the history
  • Loading branch information
gdamore committed Feb 9, 2015
1 parent c15321d commit b0c40a4
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 4 deletions.
12 changes: 11 additions & 1 deletion compat/compat.go
Expand Up @@ -157,6 +157,12 @@ func NewSocket(d Domain, p Protocol) (*Socket, error) {
return nil, err
}

// Compat mode sockets should timeout on send if we don't have any pipes
if err = s.sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
s.sock.Close()
return nil, err
}

s.rto = -1
s.sto = -1
all.AddTransports(s.sock)
Expand Down Expand Up @@ -185,7 +191,11 @@ func (s *Socket) Bind(addr string) (*Endpoint, error) {
// to a remote peer. The client will attempt to keep reconnecting.
// This wraps around mangos' Dial() socket inteface.
func (s *Socket) Connect(addr string) (*Endpoint, error) {
if err := s.sock.Dial(addr); err != nil {
d, err := s.sock.NewDialer(addr, nil)
if err != nil {
return nil, err
}
if err := d.Dial(); err != nil {
return nil, err
}
return &Endpoint{Address: addr}, nil
Expand Down
2 changes: 1 addition & 1 deletion compat/compat_test.go
Expand Up @@ -234,7 +234,7 @@ func TestCompatInp(t *testing.T) {
// timeout when no transport was present. But that semantic is diametrically
// opposed with guaranteed delivery. The only time we block or timeout is
// if there is backpressure.
func xTestCompatSendTimeout(t *testing.T) {
func TestCompatSendTimeout(t *testing.T) {

addr := "tcp://127.0.0.1:19"
push, err := NewSocket(AF_SP, PUSH)
Expand Down
14 changes: 12 additions & 2 deletions core.go
Expand Up @@ -41,6 +41,7 @@ type socket struct {
rdeadline time.Duration
wdeadline time.Duration
reconntime time.Duration // reconnect time after error or disconnect
reconnmax time.Duration // max reconnect interval
linger time.Duration

pipes []*pipe
Expand Down Expand Up @@ -106,7 +107,8 @@ func newSocket(proto Protocol) *socket {
sock.uwq = make(chan *Message, sock.uwqLen)
sock.urq = make(chan *Message, sock.urqLen)
sock.closeq = make(chan struct{})
sock.reconntime = time.Second * 1 // make it a tunable?
sock.reconntime = time.Millisecond * 100 // make it a tunable?
sock.reconnmax = time.Minute
sock.proto = proto
sock.transports = make(map[string]Transport)
sock.linger = time.Second
Expand Down Expand Up @@ -501,9 +503,13 @@ func (d *dialer) Address() string {

// dialer is used to dial or redial from a goroutine.
func (d *dialer) dialer() {
rtime := d.sock.reconntime
rtmax := d.sock.reconnmax
for {
p, err := d.d.Dial()
if err == nil {
// reset retry time
rtime = d.sock.reconntime
d.sock.Lock()
if d.closed {
p.Close()
Expand All @@ -525,7 +531,11 @@ func (d *dialer) dialer() {
return
case <-d.sock.closeq: // exit if parent socket closed
return
case <-time.After(d.sock.reconntime):
case <-time.After(rtime):
rtime *= 2
if rtime > rtmax {
rtime = rtmax
}
continue
}
}
Expand Down

0 comments on commit b0c40a4

Please sign in to comment.