diff --git a/cluster_test.go b/cluster_test.go index 67129cb72..bfc1c2598 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -677,6 +677,30 @@ var _ = Describe("ClusterClient", func() { Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred()) }) + It("should CLUSTER LINKS", func() { + res, err := client.ClusterLinks(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).NotTo(BeEmpty()) + + // Iterate over the ClusterLink results and validate the map keys. + for _, link := range res { + + Expect(link.Direction).NotTo(BeEmpty()) + Expect([]string{"from", "to"}).To(ContainElement(link.Direction)) + Expect(link.Node).NotTo(BeEmpty()) + Expect(link.CreateTime).To(BeNumerically(">", 0)) + + Expect(link.Events).NotTo(BeEmpty()) + validEventChars := []rune{'r', 'w'} + for _, eventChar := range link.Events { + Expect(validEventChars).To(ContainElement(eventChar)) + } + + Expect(link.SendBufferAllocated).To(BeNumerically(">=", 0)) + Expect(link.SendBufferUsed).To(BeNumerically(">=", 0)) + } + }) + It("should cluster client setname", func() { err := client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error { return c.Ping(ctx).Err() diff --git a/command.go b/command.go index 6eba56404..04aebe93d 100644 --- a/command.go +++ b/command.go @@ -4246,3 +4246,92 @@ func (cmd *KeyFlagsCmd) readReply(rd *proto.Reader) error { return nil } + +// --------------------------------------------------------------------------------------------------- + +type ClusterLink struct { + Direction string + Node string + CreateTime int64 + Events string + SendBufferAllocated int64 + SendBufferUsed int64 +} + +type ClusterLinksCmd struct { + baseCmd + + val []ClusterLink +} + +var _ Cmder = (*ClusterLinksCmd)(nil) + +func NewClusterLinksCmd(ctx context.Context, args ...interface{}) *ClusterLinksCmd { + return &ClusterLinksCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + } +} + +func (cmd *ClusterLinksCmd) SetVal(val []ClusterLink) { + cmd.val = val +} + +func (cmd *ClusterLinksCmd) Val() []ClusterLink { + return cmd.val +} + +func (cmd *ClusterLinksCmd) Result() ([]ClusterLink, error) { + return cmd.Val(), cmd.Err() +} + +func (cmd *ClusterLinksCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *ClusterLinksCmd) readReply(rd *proto.Reader) error { + n, err := rd.ReadArrayLen() + if err != nil { + return err + } + cmd.val = make([]ClusterLink, n) + + for i := 0; i < len(cmd.val); i++ { + m, err := rd.ReadMapLen() + if err != nil { + return err + } + + for j := 0; j < m; j++ { + key, err := rd.ReadString() + if err != nil { + return err + } + + switch key { + case "direction": + cmd.val[i].Direction, err = rd.ReadString() + case "node": + cmd.val[i].Node, err = rd.ReadString() + case "create-time": + cmd.val[i].CreateTime, err = rd.ReadInt() + case "events": + cmd.val[i].Events, err = rd.ReadString() + case "send-buffer-allocated": + cmd.val[i].SendBufferAllocated, err = rd.ReadInt() + case "send-buffer-used": + cmd.val[i].SendBufferUsed, err = rd.ReadInt() + default: + return fmt.Errorf("redis: unexpected key %q in CLUSTER LINKS reply", key) + } + + if err != nil { + return err + } + } + } + + return nil +} diff --git a/commands.go b/commands.go index bcabbafe0..a494fdd00 100644 --- a/commands.go +++ b/commands.go @@ -422,6 +422,7 @@ type Cmdable interface { PubSubShardNumSub(ctx context.Context, channels ...string) *MapStringIntCmd ClusterSlots(ctx context.Context) *ClusterSlotsCmd + ClusterLinks(ctx context.Context) *ClusterLinksCmd ClusterNodes(ctx context.Context) *StringCmd ClusterMeet(ctx context.Context, host, port string) *StatusCmd ClusterForget(ctx context.Context, nodeID string) *StatusCmd @@ -3497,6 +3498,12 @@ func (c cmdable) ClusterSlots(ctx context.Context) *ClusterSlotsCmd { return cmd } +func (c cmdable) ClusterLinks(ctx context.Context) *ClusterLinksCmd { + cmd := NewClusterLinksCmd(ctx, "cluster", "links") + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) ClusterNodes(ctx context.Context) *StringCmd { cmd := NewStringCmd(ctx, "cluster", "nodes") _ = c(ctx, cmd)