Skip to content
This repository has been archived by the owner on Nov 9, 2022. It is now read-only.

Commit

Permalink
client: fix context handling.
Browse files Browse the repository at this point in the history
* fixes r3labs#131.
  • Loading branch information
peteut committed Apr 27, 2022
1 parent 9a15ccd commit 1f7e179
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,26 @@ func (c *Client) SubscribeWithContext(ctx context.Context, stream string, handle
return err
case msg := <-eventChan:
handler(msg)
case <- ctx.Done():
case <-ctx.Done():
return ctx.Err()
}
}
}

// Apply user specified reconnection strategy or default to standard NewExponentialBackOff() reconnection method
var err error

bCtx, cancel := context.WithCancel(ctx)
defer cancel()
if c.ReconnectStrategy != nil {
err = backoff.RetryNotify(operation, c.ReconnectStrategy, c.ReconnectNotify)
err = backoff.RetryNotify(
operation,
backoff.WithContext(c.ReconnectStrategy, bCtx), c.ReconnectNotify)
} else {
err = backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), c.ReconnectNotify)
err = backoff.RetryNotify(
operation,
backoff.WithContext(backoff.NewExponentialBackOff(), bCtx),
c.ReconnectNotify)
}
return err
}
Expand Down Expand Up @@ -184,10 +192,17 @@ func (c *Client) SubscribeChanWithContext(ctx context.Context, stream string, ch
defer c.cleanup(ch)
// Apply user specified reconnection strategy or default to standard NewExponentialBackOff() reconnection method
var err error

bCtx, cancel := context.WithCancel(ctx)
defer cancel()
if c.ReconnectStrategy != nil {
err = backoff.RetryNotify(operation, c.ReconnectStrategy, c.ReconnectNotify)
err = backoff.RetryNotify(operation,
backoff.WithContext(c.ReconnectStrategy, bCtx),
c.ReconnectNotify)
} else {
err = backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), c.ReconnectNotify)
err = backoff.RetryNotify(operation,
backoff.WithContext(backoff.NewExponentialBackOff(), bCtx),
c.ReconnectNotify)
}

// channel closed once connected
Expand Down

0 comments on commit 1f7e179

Please sign in to comment.