Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,12 @@ func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error {
type XMessage struct {
ID string
Values map[string]interface{}
// MillisElapsedFromDelivery is the number of milliseconds since the entry was last delivered.
// Only populated when using XREADGROUP with CLAIM argument for claimed entries.
MillisElapsedFromDelivery int64
// DeliveredCount is the number of times the entry was delivered.
// Only populated when using XREADGROUP with CLAIM argument for claimed entries.
DeliveredCount int64
}

type XMessageSliceCmd struct {
Expand Down Expand Up @@ -1641,10 +1647,16 @@ func readXMessageSlice(rd *proto.Reader) ([]XMessage, error) {
}

func readXMessage(rd *proto.Reader) (XMessage, error) {
if err := rd.ReadFixedArrayLen(2); err != nil {
// Read array length can be 2 or 4 (with CLAIM metadata)
n, err := rd.ReadArrayLen()
if err != nil {
return XMessage{}, err
}

if n != 2 && n != 4 {
return XMessage{}, fmt.Errorf("redis: got %d elements in the XMessage array, expected 2 or 4", n)
}

id, err := rd.ReadString()
if err != nil {
return XMessage{}, err
Expand All @@ -1657,10 +1669,24 @@ func readXMessage(rd *proto.Reader) (XMessage, error) {
}
}

return XMessage{
msg := XMessage{
ID: id,
Values: v,
}, nil
}

if n == 4 {
msg.MillisElapsedFromDelivery, err = rd.ReadInt()
if err != nil {
return XMessage{}, err
}

msg.DeliveredCount, err = rd.ReadInt()
if err != nil {
return XMessage{}, err
}
}

return msg, nil
}

func stringInterfaceMapParser(rd *proto.Reader) (map[string]interface{}, error) {
Expand Down
236 changes: 236 additions & 0 deletions commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6749,6 +6749,242 @@ var _ = Describe("Commands", func() {
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(2)))
})

It("should XReadGroup with CLAIM argument", func() {
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")

time.Sleep(100 * time.Millisecond)

res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "group",
Consumer: "consumer2",
Streams: []string{"stream", ">"},
Claim: 50 * time.Millisecond,
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(HaveLen(1))
Expect(res[0].Stream).To(Equal("stream"))

messages := res[0].Messages
Expect(len(messages)).To(BeNumerically(">=", 1))

for _, msg := range messages {
if msg.MillisElapsedFromDelivery > 0 {
Expect(msg.MillisElapsedFromDelivery).To(BeNumerically(">=", 50))
Expect(msg.DeliveredCount).To(BeNumerically(">=", 1))
}
}
})

It("should XReadGroup with CLAIM and COUNT", func() {
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")

time.Sleep(100 * time.Millisecond)

res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "group",
Consumer: "consumer3",
Streams: []string{"stream", ">"},
Claim: 50 * time.Millisecond,
Count: 2,
}).Result()
Expect(err).NotTo(HaveOccurred())

if len(res) > 0 && len(res[0].Messages) > 0 {
Expect(len(res[0].Messages)).To(BeNumerically("<=", 2))
}
})

It("should XReadGroup with CLAIM and NOACK", func() {
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")

time.Sleep(100 * time.Millisecond)

res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "group",
Consumer: "consumer4",
Streams: []string{"stream", ">"},
Claim: 50 * time.Millisecond,
NoAck: true,
}).Result()
Expect(err).NotTo(HaveOccurred())

if len(res) > 0 {
Expect(res[0].Stream).To(Equal("stream"))
}
})

It("should XReadGroup CLAIM empties PEL after acknowledgment", func() {
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")

time.Sleep(100 * time.Millisecond)

res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "group",
Consumer: "consumer5",
Streams: []string{"stream", ">"},
Claim: 50 * time.Millisecond,
}).Result()
Expect(err).NotTo(HaveOccurred())

if len(res) > 0 && len(res[0].Messages) > 0 {
ids := make([]string, len(res[0].Messages))
for i, msg := range res[0].Messages {
ids[i] = msg.ID
}

n, err := client.XAck(ctx, "stream", "group", ids...).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(BeNumerically(">=", 1))

pending, err := client.XPending(ctx, "stream", "group").Result()
Expect(err).NotTo(HaveOccurred())
Expect(pending.Count).To(BeNumerically("<", 3))
}
})

It("should XReadGroup backward compatibility without CLAIM", func() {
res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "group",
Consumer: "consumer_compat",
Streams: []string{"stream", "0"},
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(HaveLen(1))
Expect(res[0].Stream).To(Equal("stream"))

for _, msg := range res[0].Messages {
Expect(msg.MillisElapsedFromDelivery).To(Equal(int64(0)))
Expect(msg.DeliveredCount).To(Equal(int64(0)))
}
})

It("should XReadGroup CLAIM with multiple streams", func() {
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")

id, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: "stream2",
ID: "1-0",
Values: map[string]interface{}{"field1": "value1"},
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(id).To(Equal("1-0"))

id, err = client.XAdd(ctx, &redis.XAddArgs{
Stream: "stream2",
ID: "2-0",
Values: map[string]interface{}{"field2": "value2"},
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(id).To(Equal("2-0"))

err = client.XGroupCreate(ctx, "stream2", "group2", "0").Err()
Expect(err).NotTo(HaveOccurred())

_, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "group2",
Consumer: "consumer1",
Streams: []string{"stream2", ">"},
}).Result()
Expect(err).NotTo(HaveOccurred())

time.Sleep(100 * time.Millisecond)

res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "group2",
Consumer: "consumer2",
Streams: []string{"stream2", ">"},
Claim: 50 * time.Millisecond,
}).Result()
Expect(err).NotTo(HaveOccurred())

if len(res) > 0 {
Expect(res[0].Stream).To(Equal("stream2"))
if len(res[0].Messages) > 0 {
for _, msg := range res[0].Messages {
if msg.MillisElapsedFromDelivery > 0 {
Expect(msg.DeliveredCount).To(BeNumerically(">=", 1))
}
}
}
}
})

It("should XReadGroup CLAIM work consistently on RESP2 and RESP3", func() {
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")

streamName := "stream-resp-test"
err := client.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
Values: map[string]interface{}{"field1": "value1"},
}).Err()
Expect(err).NotTo(HaveOccurred())

err = client.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
Values: map[string]interface{}{"field2": "value2"},
}).Err()
Expect(err).NotTo(HaveOccurred())

groupName := "resp-test-group"
err = client.XGroupCreate(ctx, streamName, groupName, "0").Err()
Expect(err).NotTo(HaveOccurred())

_, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: "consumer1",
Streams: []string{streamName, ">"},
}).Result()
Expect(err).NotTo(HaveOccurred())

time.Sleep(100 * time.Millisecond)

// Test with RESP2 (protocol 2)
resp2Client := redis.NewClient(&redis.Options{
Addr: redisAddr,
Protocol: 2,
})
defer resp2Client.Close()

resp2Result, err := resp2Client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: "consumer2",
Streams: []string{streamName, "0"},
Claim: 50 * time.Millisecond,
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resp2Result).To(HaveLen(1))

// Test with RESP3 (protocol 3)
resp3Client := redis.NewClient(&redis.Options{
Addr: redisAddr,
Protocol: 3,
})
defer resp3Client.Close()

resp3Result, err := resp3Client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: "consumer3",
Streams: []string{streamName, "0"},
Claim: 50 * time.Millisecond,
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resp3Result).To(HaveLen(1))

Expect(len(resp2Result[0].Messages)).To(Equal(len(resp3Result[0].Messages)))

for i := range resp2Result[0].Messages {
msg2 := resp2Result[0].Messages[i]
msg3 := resp3Result[0].Messages[i]

Expect(msg2.ID).To(Equal(msg3.ID))

if msg2.MillisElapsedFromDelivery > 0 {
Expect(msg3.MillisElapsedFromDelivery).To(BeNumerically(">", 0))
Expect(msg2.DeliveredCount).To(Equal(msg3.DeliveredCount))
}
}
})
})

Describe("xinfo", func() {
Expand Down
32 changes: 16 additions & 16 deletions doctests/stream_tutorial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ func ExampleClient_racefrance1() {
// REMOVE_END

// Output:
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]}]
// [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7]}]}]
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0}]
// [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7] 0 0}]}]
// 4
}

Expand Down Expand Up @@ -467,13 +467,13 @@ func ExampleClient_racefrance2() {
// STEP_END

// Output:
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7]} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}]
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]}]
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]}]
// [{1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7]} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}]
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7] 0 0} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9] 0 0}]
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0}]
// [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0}]
// [{1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7] 0 0} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9] 0 0}]
// []
// [{1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}]
// [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]}]}]
// [{1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9] 0 0}]
// [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0}]}]
}

func ExampleClient_xgroupcreate() {
Expand Down Expand Up @@ -999,18 +999,18 @@ func ExampleClient_raceitaly() {
// REMOVE_END

// Output:
// [{race:italy [{1692632639151-0 map[rider:Castilla]}]}]
// [{race:italy [{1692632639151-0 map[rider:Castilla] 0 0}]}]
// 1
// [{race:italy []}]
// [{race:italy [{1692632647899-0 map[rider:Royce]} {1692632662819-0 map[rider:Sam-Bodden]}]}]
// [{race:italy [{1692632647899-0 map[rider:Royce] 0 0} {1692632662819-0 map[rider:Sam-Bodden] 0 0}]}]
// &{2 1692632647899-0 1692632662819-0 map[Bob:2]}
// [{1692632647899-0 map[rider:Royce]}]
// [{1692632647899-0 map[rider:Royce]}]
// [{1692632647899-0 map[rider:Royce]}]
// [{1692632647899-0 map[rider:Royce] 0 0}]
// [{1692632647899-0 map[rider:Royce] 0 0}]
// [{1692632647899-0 map[rider:Royce] 0 0}]
// 1692632662819-0
// []
// 0-0
// &{5 1 2 1 1692632678249-0 0-0 5 {1692632639151-0 map[rider:Castilla]} {1692632678249-0 map[rider:Norem]} 1692632639151-0}
// &{5 1 2 1 1692632678249-0 0-0 5 {1692632639151-0 map[rider:Castilla] 0 0} {1692632678249-0 map[rider:Norem] 0 0} 1692632639151-0}
// [{italy_riders 3 2 1692632662819-0 3 2}]
// 2
// 0
Expand Down Expand Up @@ -1085,7 +1085,7 @@ func ExampleClient_xdel() {
// STEP_END

// Output:
// [{1692633198206-0 map[rider:Wood]} {1692633208557-0 map[rider:Henshaw]}]
// [{1692633198206-0 map[rider:Wood] 0 0} {1692633208557-0 map[rider:Henshaw] 0 0}]
// 1
// [{1692633198206-0 map[rider:Wood]}]
// [{1692633198206-0 map[rider:Wood] 0 0}]
}
5 changes: 5 additions & 0 deletions stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ type XReadGroupArgs struct {
Count int64
Block time.Duration
NoAck bool
Claim time.Duration // Claim idle pending entries older than this duration
}

func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd {
Expand All @@ -282,6 +283,10 @@ func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSlic
args = append(args, "noack")
keyPos++
}
if a.Claim > 0 {
args = append(args, "claim", int64(a.Claim/time.Millisecond))
keyPos += 2
}
args = append(args, "streams")
keyPos++
for _, s := range a.Streams {
Expand Down
Loading