Skip to content

Commit c357d18

Browse files
committed
Faster renew the subscription
1 parent 49aac99 commit c357d18

File tree

3 files changed

+20
-13
lines changed

3 files changed

+20
-13
lines changed

internal/pool/pool.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
163163
}
164164
}
165165
p.connsMu.Unlock()
166+
166167
return cn, nil
167168
}
168169

@@ -408,16 +409,17 @@ func (p *ConnPool) closed() bool {
408409
}
409410

410411
func (p *ConnPool) Filter(fn func(*Conn) bool) error {
411-
var firstErr error
412412
p.connsMu.Lock()
413+
defer p.connsMu.Unlock()
414+
415+
var firstErr error
413416
for _, cn := range p.conns {
414417
if fn(cn) {
415418
if err := p.closeConn(cn); err != nil && firstErr == nil {
416419
firstErr = err
417420
}
418421
}
419422
}
420-
p.connsMu.Unlock()
421423
return firstErr
422424
}
423425

pubsub.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ import (
1313
"github.com/go-redis/redis/v8/internal/proto"
1414
)
1515

16-
const pingTimeout = 30 * time.Second
16+
const (
17+
pingTimeout = time.Second
18+
chanSendTimeout = time.Minute
19+
)
1720

1821
var errPingTimeout = errors.New("redis: ping timeout")
1922

@@ -454,15 +457,14 @@ func (c *PubSub) getContext() context.Context {
454457
if c.cmd != nil {
455458
return c.cmd.ctx
456459
}
457-
458460
return context.Background()
459461
}
460462

461463
func (c *PubSub) initPing() {
462464
ctx := context.TODO()
463465
c.ping = make(chan struct{}, 1)
464466
go func() {
465-
timer := time.NewTimer(pingTimeout)
467+
timer := time.NewTimer(time.Minute)
466468
timer.Stop()
467469

468470
healthy := true
@@ -499,7 +501,7 @@ func (c *PubSub) initMsgChan(size int) {
499501
ctx := context.TODO()
500502
c.msgCh = make(chan *Message, size)
501503
go func() {
502-
timer := time.NewTimer(pingTimeout)
504+
timer := time.NewTimer(time.Minute)
503505
timer.Stop()
504506

505507
var errCount int
@@ -531,7 +533,7 @@ func (c *PubSub) initMsgChan(size int) {
531533
case *Pong:
532534
// Ignore.
533535
case *Message:
534-
timer.Reset(pingTimeout)
536+
timer.Reset(chanSendTimeout)
535537
select {
536538
case c.msgCh <- msg:
537539
if !timer.Stop() {
@@ -540,7 +542,10 @@ func (c *PubSub) initMsgChan(size int) {
540542
case <-timer.C:
541543
internal.Logger.Printf(
542544
c.getContext(),
543-
"redis: %s channel is full for %s (message is dropped)", c, pingTimeout)
545+
"redis: %s channel is full for %s (message is dropped)",
546+
c,
547+
chanSendTimeout,
548+
)
544549
}
545550
default:
546551
internal.Logger.Printf(c.getContext(), "redis: unknown message type: %T", msg)

sentinel_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,12 @@ var _ = Describe("Sentinel", func() {
7070
return client.Get(ctx, "foo").Err()
7171
}, "15s", "100ms").ShouldNot(HaveOccurred())
7272

73-
// Publish message to check if subscription is renewed.
74-
err = client.Publish(ctx, "foo", "hello").Err()
75-
Expect(err).NotTo(HaveOccurred())
76-
73+
// Check if subscription is renewed.
7774
var msg *redis.Message
78-
Eventually(ch, "15s").Should(Receive(&msg))
75+
Eventually(func() <-chan *redis.Message {
76+
_ = client.Publish(ctx, "foo", "hello").Err()
77+
return ch
78+
}, "15s").Should(Receive(&msg))
7979
Expect(msg.Channel).To(Equal("foo"))
8080
Expect(msg.Payload).To(Equal("hello"))
8181
})

0 commit comments

Comments
 (0)