Skip to content

Commit ea9da7c

Browse files
committed
Rework ReceiveMessage
1 parent f7e97f0 commit ea9da7c

File tree

10 files changed

+225
-199
lines changed

10 files changed

+225
-199
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
- Ring got new options called `HashReplicas` and `Hash`. It is recommended to set `HashReplicas = 1000` for better keys distribution between shards.
66
- Cluster client was optimized to use much less memory when reloading cluster state.
7+
- ReceiveMessage is re-worked to not use ReceiveTimeout so it does not lose data when timeout occurres.
78

89
## v6.12
910

cluster.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1500,11 +1500,9 @@ func (c *ClusterClient) txPipelineReadQueued(
15001500
}
15011501

15021502
func (c *ClusterClient) pubSub(channels []string) *PubSub {
1503-
opt := c.opt.clientOptions()
1504-
15051503
var node *clusterNode
1506-
return &PubSub{
1507-
opt: opt,
1504+
pubsub := &PubSub{
1505+
opt: c.opt.clientOptions(),
15081506

15091507
newConn: func(channels []string) (*pool.Conn, error) {
15101508
if node == nil {
@@ -1527,6 +1525,8 @@ func (c *ClusterClient) pubSub(channels []string) *PubSub {
15271525
return node.Client.connPool.CloseConn(cn)
15281526
},
15291527
}
1528+
pubsub.init()
1529+
return pubsub
15301530
}
15311531

15321532
// Subscribe subscribes the client to the specified channels.

cluster_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ var _ = Describe("ClusterClient", func() {
453453

454454
ttl := cmds[(i*2)+1].(*redis.DurationCmd)
455455
dur := time.Duration(i+1) * time.Hour
456-
Expect(ttl.Val()).To(BeNumerically("~", dur, 10*time.Second))
456+
Expect(ttl.Val()).To(BeNumerically("~", dur, 30*time.Second))
457457
}
458458
})
459459

export_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package redis
33
import (
44
"fmt"
55
"net"
6-
"time"
76

87
"github.com/go-redis/redis/internal/hashtag"
98
"github.com/go-redis/redis/internal/pool"
@@ -17,10 +16,6 @@ func (c *PubSub) SetNetConn(netConn net.Conn) {
1716
c.cn = pool.NewConn(netConn)
1817
}
1918

20-
func (c *PubSub) ReceiveMessageTimeout(timeout time.Duration) (*Message, error) {
21-
return c.receiveMessage(timeout)
22-
}
23-
2419
func (c *ClusterClient) LoadState() (*clusterState, error) {
2520
return c.loadState()
2621
}

0 commit comments

Comments
 (0)