diff --git a/client.go b/client.go index 6576aaf..b99ab89 100644 --- a/client.go +++ b/client.go @@ -103,7 +103,7 @@ func (c *Client) SubscribeWithContext(ctx context.Context, stream string, handle return err case msg := <-eventChan: handler(msg) - case <- ctx.Done(): + case <-ctx.Done(): return ctx.Err() } } @@ -111,10 +111,18 @@ func (c *Client) SubscribeWithContext(ctx context.Context, stream string, handle // Apply user specified reconnection strategy or default to standard NewExponentialBackOff() reconnection method var err error + + bCtx, cancel := context.WithCancel(ctx) + defer cancel() if c.ReconnectStrategy != nil { - err = backoff.RetryNotify(operation, c.ReconnectStrategy, c.ReconnectNotify) + err = backoff.RetryNotify( + operation, + backoff.WithContext(c.ReconnectStrategy, bCtx), c.ReconnectNotify) } else { - err = backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), c.ReconnectNotify) + err = backoff.RetryNotify( + operation, + backoff.WithContext(backoff.NewExponentialBackOff(), bCtx), + c.ReconnectNotify) } return err } @@ -184,10 +192,17 @@ func (c *Client) SubscribeChanWithContext(ctx context.Context, stream string, ch defer c.cleanup(ch) // Apply user specified reconnection strategy or default to standard NewExponentialBackOff() reconnection method var err error + + bCtx, cancel := context.WithCancel(ctx) + defer cancel() if c.ReconnectStrategy != nil { - err = backoff.RetryNotify(operation, c.ReconnectStrategy, c.ReconnectNotify) + err = backoff.RetryNotify(operation, + backoff.WithContext(c.ReconnectStrategy, bCtx), + c.ReconnectNotify) } else { - err = backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), c.ReconnectNotify) + err = backoff.RetryNotify(operation, + backoff.WithContext(backoff.NewExponentialBackOff(), bCtx), + c.ReconnectNotify) } // channel closed once connected