diff --git a/message.go b/message.go index 7b49c501..0e966fec 100644 --- a/message.go +++ b/message.go @@ -201,6 +201,22 @@ func (r RedisResult) AsIntSlice() ([]int64, error) { return r.val.AsIntSlice() } +// AsXMessage delegates to RedisMessage.AsXMessage +func (r RedisResult) AsXMessage() (XMessage, error) { + if err := r.Error(); err != nil { + return XMessage{}, err + } + return r.val.AsXMessage() +} + +// AsXMessageSlice delegates to RedisMessage.AsXMessageSlice +func (r RedisResult) AsXMessageSlice() ([]XMessage, error) { + if err := r.Error(); err != nil { + return nil, err + } + return r.val.AsXMessageSlice() +} + // AsMap delegates to RedisMessage.AsMap func (r RedisResult) AsMap() (map[string]RedisMessage, error) { if err := r.Error(); err != nil { @@ -402,6 +418,52 @@ func (m *RedisMessage) AsIntSlice() ([]int64, error) { return s, nil } +type XMessage struct { + ID string + KeyValues map[string]string +} + +// AsXMessage check if message is a redis array/set response of length 2, and convert to XMessage +func (m *RedisMessage) AsXMessage() (XMessage, error) { + values, err := m.ToArray() + if err != nil { + return XMessage{}, err + } + if len(values) != 2 { + return XMessage{}, fmt.Errorf("got %d, wanted 2", len(values)) + } + id, err := values[0].ToString() + if err != nil { + return XMessage{}, err + } + keyValues, err := values[1].AsStrMap() + if err != nil { + return XMessage{}, err + } + return XMessage{ + ID: id, + KeyValues: keyValues, + }, nil +} + +// AsXMessageSlice check if message is a redis array/set response, and convert to []XMessage +func (m *RedisMessage) AsXMessageSlice() ([]XMessage, error) { + values, err := m.ToArray() + if err != nil { + return nil, err + } + msgs := make([]XMessage, 0, len(values)) + for _, v := range values { + msg, err := v.AsXMessage() + if err != nil { + return nil, err + } + msgs = append(msgs, msg) + } + return msgs, nil + +} + // AsMap check if message is a redis array/set response, and convert to map[string]RedisMessage func (m *RedisMessage) AsMap() (map[string]RedisMessage, error) { values, err := m.ToArray() diff --git a/message_test.go b/message_test.go index 06f2fc11..d255b8c1 100644 --- a/message_test.go +++ b/message_test.go @@ -170,6 +170,19 @@ func TestRedisResult(t *testing.T) { } }) + t.Run("AsIntSlice", func(t *testing.T) { + if _, err := (RedisResult{err: errors.New("other")}).AsIntSlice(); err == nil { + t.Fatal("AsIntSlice not failed as expected") + } + if _, err := (RedisResult{val: RedisMessage{typ: '-'}}).AsIntSlice(); err == nil { + t.Fatal("AsIntSlice not failed as expected") + } + values := []RedisMessage{{integer: 2, typ: ':'}} + if ret, _ := (RedisResult{val: RedisMessage{typ: '*', values: values}}).AsIntSlice(); !reflect.DeepEqual(ret, []int64{2}) { + t.Fatal("AsIntSlice not get value as expected") + } + }) + t.Run("AsMap", func(t *testing.T) { if _, err := (RedisResult{err: errors.New("other")}).AsMap(); err == nil { t.Fatal("AsMap not failed as expected") @@ -373,6 +386,19 @@ func TestRedisMessage(t *testing.T) { (&RedisMessage{typ: 't'}).AsStrSlice() }) + t.Run("AsIntSlice", func(t *testing.T) { + if _, err := (&RedisMessage{typ: '_'}).AsIntSlice(); err == nil { + t.Fatal("AsIntSlice not failed as expected") + } + + defer func() { + if !strings.Contains(recover().(string), "redis message type t is not a array") { + t.Fatal("AsIntSlice not panic as expected") + } + }() + (&RedisMessage{typ: 't'}).AsIntSlice() + }) + t.Run("AsMap", func(t *testing.T) { if _, err := (&RedisMessage{typ: '_'}).AsMap(); err == nil { t.Fatal("AsMap not failed as expected") diff --git a/rueidiscompat/adapter.go b/rueidiscompat/adapter.go index 00b56df0..ae4c0b3d 100644 --- a/rueidiscompat/adapter.go +++ b/rueidiscompat/adapter.go @@ -124,52 +124,53 @@ type Cmdable interface { RPushX(ctx context.Context, key string, elements ...string) *IntCmd LMove(ctx context.Context, source, destination, srcpos, destpos string) *StringCmd BLMove(ctx context.Context, source, destination, srcpos, destpos string, timeout time.Duration) *StringCmd + + SAdd(ctx context.Context, key string, members ...string) *IntCmd + SCard(ctx context.Context, key string) *IntCmd + SDiff(ctx context.Context, keys ...string) *StringSliceCmd + SDiffStore(ctx context.Context, destination string, keys ...string) *IntCmd + SInter(ctx context.Context, keys ...string) *StringSliceCmd + SInterStore(ctx context.Context, destination string, keys ...string) *IntCmd + SIsMember(ctx context.Context, key string, member string) *BoolCmd + SMIsMember(ctx context.Context, key string, members ...string) *BoolSliceCmd + SMembers(ctx context.Context, key string) *StringSliceCmd + SMembersMap(ctx context.Context, key string) *StringStructMapCmd + SMove(ctx context.Context, source, destination string, member string) *BoolCmd + SPop(ctx context.Context, key string) *StringCmd + SPopN(ctx context.Context, key string, count int64) *StringSliceCmd + SRandMember(ctx context.Context, key string) *StringCmd + SRandMemberN(ctx context.Context, key string, count int64) *StringSliceCmd + SRem(ctx context.Context, key string, members ...string) *IntCmd + SUnion(ctx context.Context, keys ...string) *StringSliceCmd + SUnionStore(ctx context.Context, destination string, keys ...string) *IntCmd + + XAdd(ctx context.Context, a XAddArgs) *StringCmd + XDel(ctx context.Context, stream string, ids ...string) *IntCmd + XLen(ctx context.Context, stream string) *IntCmd + XRange(ctx context.Context, stream, start, stop string) *XMessageSliceCmd + XRangeN(ctx context.Context, stream, start, stop string, count int64) *XMessageSliceCmd + XRevRange(ctx context.Context, stream, start, stop string) *XMessageSliceCmd + XRevRangeN(ctx context.Context, stream string, start, stop string, count int64) *XMessageSliceCmd + XRead(ctx context.Context, a XReadArgs) *XStreamSliceCmd + XReadStreams(ctx context.Context, streams []string, ids []string) *XStreamSliceCmd + XGroupCreate(ctx context.Context, stream, group, start string) *StatusCmd + XGroupCreateMkStream(ctx context.Context, stream, group, start string) *StatusCmd + XGroupSetID(ctx context.Context, stream, group, start string) *StatusCmd + XGroupDestroy(ctx context.Context, stream, group string) *IntCmd + XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd + XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd + XReadGroup(ctx context.Context, a XReadGroupArgs) *XStreamSliceCmd + XAck(ctx context.Context, stream, group string, ids ...string) *IntCmd + XPending(ctx context.Context, stream, group string) *XPendingCmd + XPendingExt(ctx context.Context, a XPendingExtArgs) *XPendingExtCmd + XClaim(ctx context.Context, a XClaimArgs) *XMessageSliceCmd + XClaimJustID(ctx context.Context, a XClaimArgs) *StringSliceCmd + XAutoClaim(ctx context.Context, a XAutoClaimArgs) *XAutoClaimCmd + XAutoClaimJustID(ctx context.Context, a XAutoClaimArgs) *XAutoClaimJustIDCmd + // Implemented until here. // TODO: // - // SAdd(ctx context.Context, key string, members ...interface{}) *IntCmd - // SCard(ctx context.Context, key string) *IntCmd - // SDiff(ctx context.Context, keys ...string) *StringSliceCmd - // SDiffStore(ctx context.Context, destination string, keys ...string) *IntCmd - // SInter(ctx context.Context, keys ...string) *StringSliceCmd - // SInterStore(ctx context.Context, destination string, keys ...string) *IntCmd - // SIsMember(ctx context.Context, key string, member interface{}) *BoolCmd - // SMIsMember(ctx context.Context, key string, members ...interface{}) *BoolSliceCmd - // SMembers(ctx context.Context, key string) *StringSliceCmd - // SMembersMap(ctx context.Context, key string) *StringStructMapCmd - // SMove(ctx context.Context, source, destination string, member interface{}) *BoolCmd - // SPop(ctx context.Context, key string) *StringCmd - // SPopN(ctx context.Context, key string, count int64) *StringSliceCmd - // SRandMember(ctx context.Context, key string) *StringCmd - // SRandMemberN(ctx context.Context, key string, count int64) *StringSliceCmd - // SRem(ctx context.Context, key string, members ...interface{}) *IntCmd - // SUnion(ctx context.Context, keys ...string) *StringSliceCmd - // SUnionStore(ctx context.Context, destination string, keys ...string) *IntCmd - - // XAdd(ctx context.Context, a *XAddArgs) *StringCmd - // XDel(ctx context.Context, stream string, ids ...string) *IntCmd - // XLen(ctx context.Context, stream string) *IntCmd - // XRange(ctx context.Context, stream, start, stop string) *XMessageSliceCmd - // XRangeN(ctx context.Context, stream, start, stop string, count int64) *XMessageSliceCmd - // XRevRange(ctx context.Context, stream string, start, stop string) *XMessageSliceCmd - // XRevRangeN(ctx context.Context, stream string, start, stop string, count int64) *XMessageSliceCmd - // XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd - // XReadStreams(ctx context.Context, streams ...string) *XStreamSliceCmd - // XGroupCreate(ctx context.Context, stream, group, start string) *StatusCmd - // XGroupCreateMkStream(ctx context.Context, stream, group, start string) *StatusCmd - // XGroupSetID(ctx context.Context, stream, group, start string) *StatusCmd - // XGroupDestroy(ctx context.Context, stream, group string) *IntCmd - // XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd - // XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd - // XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd - // XAck(ctx context.Context, stream, group string, ids ...string) *IntCmd - // XPending(ctx context.Context, stream, group string) *XPendingCmd - // XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd - // XClaim(ctx context.Context, a *XClaimArgs) *XMessageSliceCmd - // XClaimJustID(ctx context.Context, a *XClaimArgs) *StringSliceCmd - // XAutoClaim(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimCmd - // XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimJustIDCmd - // // TODO: XTrim and XTrimApprox remove in v9. // XTrim(ctx context.Context, key string, maxLen int64) *IntCmd // XTrimApprox(ctx context.Context, key string, maxLen int64) *IntCmd @@ -656,11 +657,11 @@ func (c *Compat) MGet(ctx context.Context, keys ...string) *SliceCmd { func (c *Compat) MSet(ctx context.Context, keys []string, values []string) *StatusCmd { if len(keys) != len(values) { - panic(fmt.Sprintf("keys and values must be same lengthL %d != %d", len(keys), len(values))) + panic(fmt.Sprintf("keys and values must be same length %d != %d", len(keys), len(values))) } partial := c.client.B().Mset().KeyValue() - for i := 0; i < len(keys); i++ { - partial = partial.KeyValue(keys[i], values[i]) + for i, v := range keys { + partial = partial.KeyValue(v, values[i]) } cmd := partial.Build() resp := c.client.Do(ctx, cmd) @@ -672,8 +673,8 @@ func (c *Compat) MSetNX(ctx context.Context, keys []string, values []string) *Bo panic(fmt.Sprintf("keys and values must be same length %d != %d", len(keys), len(values))) } partial := c.client.B().Msetnx().KeyValue() - for i := 0; i < len(keys); i++ { - partial = partial.KeyValue(keys[i], values[i]) + for i, v := range keys { + partial = partial.KeyValue(v, values[i]) } cmd := partial.Build() resp := c.client.Do(ctx, cmd) @@ -943,7 +944,7 @@ func (c *Compat) HMGet(ctx context.Context, key string, fields ...string) *Slice // HSet requires Redis v4 for multiple field/value pairs support. func (c *Compat) HSet(ctx context.Context, key string, keys []string, values []string) *IntCmd { if len(keys) != len(values) { - panic(fmt.Sprintf("keys and values must be same lengthL %d != %d", len(keys), len(values))) + panic(fmt.Sprintf("keys and values must be same length %d != %d", len(keys), len(values))) } partial := c.client.B().Hset().Key(key).FieldValue() for i := 0; i < len(keys); i++ { @@ -957,7 +958,7 @@ func (c *Compat) HSet(ctx context.Context, key string, keys []string, values []s // HMSet is a deprecated version of HSet left for compatibility with Redis 3. func (c *Compat) HMSet(ctx context.Context, key string, keys []string, values []string) *BoolCmd { if len(keys) != len(values) { - panic(fmt.Sprintf("keys and values must be same lengthL %d != %d", len(keys), len(values))) + panic(fmt.Sprintf("keys and values must be same length %d != %d", len(keys), len(values))) } partial := c.client.B().Hset().Key(key).FieldValue() for i := 0; i < len(keys); i++ { @@ -1180,3 +1181,324 @@ func (c *Compat) BLMove(ctx context.Context, source, destination, srcpos, destpo } return newStringCmd(resp) } + +func (c *Compat) SAdd(ctx context.Context, key string, members ...string) *IntCmd { + cmd := c.client.B().Sadd().Key(key).Member(members...).Build() + resp := c.client.Do(ctx, cmd) + return newIntCmd(resp) +} + +func (c *Compat) SCard(ctx context.Context, key string) *IntCmd { + cmd := c.client.B().Scard().Key(key).Build() + resp := c.client.Do(ctx, cmd) + return newIntCmd(resp) +} + +func (c *Compat) SDiff(ctx context.Context, keys ...string) *StringSliceCmd { + cmd := c.client.B().Sdiff().Key(keys...).Build() + resp := c.client.Do(ctx, cmd) + return newStringSliceCmd(resp) +} + +func (c *Compat) SDiffStore(ctx context.Context, destination string, keys ...string) *IntCmd { + cmd := c.client.B().Sdiffstore().Destination(destination).Key(keys...).Build() + resp := c.client.Do(ctx, cmd) + return newIntCmd(resp) +} + +func (c *Compat) SInter(ctx context.Context, keys ...string) *StringSliceCmd { + cmd := c.client.B().Sinter().Key(keys...).Build() + resp := c.client.Do(ctx, cmd) + return newStringSliceCmd(resp) +} + +func (c *Compat) SInterStore(ctx context.Context, destination string, keys ...string) *IntCmd { + cmd := c.client.B().Sinterstore().Destination(destination).Key(keys...).Build() + resp := c.client.Do(ctx, cmd) + return newIntCmd(resp) +} + +func (c *Compat) SIsMember(ctx context.Context, key string, member string) *BoolCmd { + cmd := c.client.B().Sismember().Key(key).Member(member).Build() + resp := c.client.Do(ctx, cmd) + return newBoolCmd(resp) +} + +func (c *Compat) SMIsMember(ctx context.Context, key string, members ...string) *BoolSliceCmd { + cmd := c.client.B().Smismember().Key(key).Member(members...).Build() + resp := c.client.Do(ctx, cmd) + return newBoolSliceCmd(resp) +} + +func (c *Compat) SMembers(ctx context.Context, key string) *StringSliceCmd { + cmd := c.client.B().Smembers().Key(key).Build() + resp := c.client.Do(ctx, cmd) + return newStringSliceCmd(resp) +} + +func (c *Compat) SMembersMap(ctx context.Context, key string) *StringStructMapCmd { + cmd := c.client.B().Smembers().Key(key).Build() + resp := c.client.Do(ctx, cmd) + return newStringStructMapCmd(resp) +} + +func (c *Compat) SMove(ctx context.Context, source, destination string, member string) *BoolCmd { + cmd := c.client.B().Smove().Source(source).Destination(destination).Member(member).Build() + resp := c.client.Do(ctx, cmd) + return newBoolCmd(resp) +} + +func (c *Compat) SPop(ctx context.Context, key string) *StringCmd { + cmd := c.client.B().Spop().Key(key).Build() + resp := c.client.Do(ctx, cmd) + return newStringCmd(resp) +} + +func (c *Compat) SPopN(ctx context.Context, key string, count int64) *StringSliceCmd { + cmd := c.client.B().Spop().Key(key).Count(count).Build() + resp := c.client.Do(ctx, cmd) + return newStringSliceCmd(resp) +} + +func (c *Compat) SRandMember(ctx context.Context, key string) *StringCmd { + cmd := c.client.B().Srandmember().Key(key).Build() + resp := c.client.Do(ctx, cmd) + return newStringCmd(resp) +} + +func (c *Compat) SRandMemberN(ctx context.Context, key string, count int64) *StringSliceCmd { + cmd := c.client.B().Srandmember().Key(key).Count(count).Build() + resp := c.client.Do(ctx, cmd) + return newStringSliceCmd(resp) +} + +func (c *Compat) SRem(ctx context.Context, key string, members ...string) *IntCmd { + cmd := c.client.B().Srem().Key(key).Member(members...).Build() + resp := c.client.Do(ctx, cmd) + return newIntCmd(resp) +} + +func (c *Compat) SUnion(ctx context.Context, keys ...string) *StringSliceCmd { + cmd := c.client.B().Sunion().Key(keys...).Build() + resp := c.client.Do(ctx, cmd) + return newStringSliceCmd(resp) +} + +func (c *Compat) SUnionStore(ctx context.Context, destination string, keys ...string) *IntCmd { + cmd := c.client.B().Sunionstore().Destination(destination).Key(keys...).Build() + resp := c.client.Do(ctx, cmd) + return newIntCmd(resp) +} + +func (c *Compat) XAdd(ctx context.Context, a XAddArgs) *StringCmd { + if len(a.Fields) != len(a.Values) { + panic(fmt.Sprintf("fields and values must be same length %d != %d", len(a.Fields), len(a.Values))) + } + cmd := c.client.B().Arbitrary("XADD").Keys(a.Stream) + if a.NoMkStream { + cmd = cmd.Args("NOMKSTREAM") + } + switch { + case a.MaxLen > 0: + if a.Approx { + cmd = cmd.Args("MAXLEN", "~", strconv.FormatInt(a.MaxLen, 10)) + } else { + cmd = cmd.Args("MAXLEN", strconv.FormatInt(a.MaxLen, 10)) + } + case a.MinID != "": + if a.Approx { + cmd = cmd.Args("MINID", "~", a.MinID) + } else { + cmd = cmd.Args("MINID", a.MinID) + } + } + if a.Limit > 0 { + cmd = cmd.Args("LIMIT", strconv.FormatInt(a.Limit, 10)) + } + if a.ID != "" { + cmd = cmd.Args(a.ID) + } else { + cmd = cmd.Args("*") + } + for i, v := range a.Fields { + cmd = cmd.Args(v, a.Values[i]) + } + resp := c.client.Do(ctx, cmd.Build()) + return newStringCmd(resp) +} + +func (c *Compat) XDel(ctx context.Context, stream string, ids ...string) *IntCmd { + cmd := c.client.B().Xdel().Key(stream).Id(ids...).Build() + resp := c.client.Do(ctx, cmd) + return newIntCmd(resp) +} + +func (c *Compat) XLen(ctx context.Context, stream string) *IntCmd { + cmd := c.client.B().Xlen().Key(stream).Build() + resp := c.client.Do(ctx, cmd) + return newIntCmd(resp) +} + +func (c *Compat) XRange(ctx context.Context, stream, start, stop string) *XMessageSliceCmd { + cmd := c.client.B().Xrange().Key(stream).Start(start).End(stop).Build() + resp := c.client.Do(ctx, cmd) + return newXMessageSliceCmd(resp) +} + +func (c *Compat) XRangeN(ctx context.Context, stream, start, stop string, count int64) *XMessageSliceCmd { + cmd := c.client.B().Xrange().Key(stream).Start(start).End(stop).Count(count).Build() + resp := c.client.Do(ctx, cmd) + return newXMessageSliceCmd(resp) +} + +func (c *Compat) XRevRange(ctx context.Context, stream, start, stop string) *XMessageSliceCmd { + cmd := c.client.B().Xrevrange().Key(stream).End(stop).Start(start).Build() + resp := c.client.Do(ctx, cmd) + return newXMessageSliceCmd(resp) +} + +func (c *Compat) XRevRangeN(ctx context.Context, stream, start, stop string, count int64) *XMessageSliceCmd { + cmd := c.client.B().Xrevrange().Key(stream).End(stop).Start(start).Count(count).Build() + resp := c.client.Do(ctx, cmd) + return newXMessageSliceCmd(resp) +} + +func (c *Compat) XRead(ctx context.Context, a XReadArgs) *XStreamSliceCmd { + if len(a.Streams) != len(a.IDs) { + panic(fmt.Sprintf("streams and ids must be same length %d != %d", len(a.Streams), len(a.IDs))) + } + cmd := c.client.B().Arbitrary("XREAD") + if a.Count > 0 { + cmd = cmd.Args("COUNT", strconv.FormatInt(a.Count, 10)) + } + if a.Block >= 0 { + cmd = cmd.Args("BLOCK", strconv.FormatInt(formatMs(a.Block), 10)) + } + cmd = cmd.Args("STREAMS") + for i, v := range a.Streams { + cmd = cmd.Args(v, a.IDs[i]) + } + resp := c.client.Do(ctx, cmd.Build()) + return newXStreamSliceCmd(resp) +} + +func (c *Compat) XReadStreams(ctx context.Context, streams []string, ids []string) *XStreamSliceCmd { + // XRead handles len(streams) != len(ids) + return c.XRead(ctx, XReadArgs{Streams: streams, IDs: ids, Block: -1}) +} + +func (c *Compat) XGroupCreate(ctx context.Context, stream, group, start string) *StatusCmd { + cmd := c.client.B().XgroupCreate().Key(stream).Groupname(group).Id(start).Build() + resp := c.client.Do(ctx, cmd) + return newStatusCmd(resp) +} + +func (c *Compat) XGroupCreateMkStream(ctx context.Context, stream, group, start string) *StatusCmd { + cmd := c.client.B().XgroupCreate().Key(stream).Groupname(group).Id(start).Mkstream().Build() + resp := c.client.Do(ctx, cmd) + return newStatusCmd(resp) +} + +func (c *Compat) XGroupSetID(ctx context.Context, stream, group, start string) *StatusCmd { + cmd := c.client.B().XgroupSetid().Key(stream).Groupname(group).Id(start).Build() + resp := c.client.Do(ctx, cmd) + return newStatusCmd(resp) +} + +func (c *Compat) XGroupDestroy(ctx context.Context, stream, group string) *IntCmd { + cmd := c.client.B().XgroupDestroy().Key(stream).Groupname(group).Build() + resp := c.client.Do(ctx, cmd) + return newIntCmd(resp) +} + +func (c *Compat) XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd { + cmd := c.client.B().XgroupCreateconsumer().Key(stream).Groupname(group).Consumername(consumer).Build() + resp := c.client.Do(ctx, cmd) + return newIntCmd(resp) +} + +func (c *Compat) XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd { + cmd := c.client.B().XgroupDelconsumer().Key(stream).Groupname(group).Consumername(consumer).Build() + resp := c.client.Do(ctx, cmd) + return newIntCmd(resp) +} + +func (c *Compat) XReadGroup(ctx context.Context, a XReadGroupArgs) *XStreamSliceCmd { + if len(a.Streams) != len(a.IDs) { + panic(fmt.Sprintf("streams and ids must be same length %d != %d", len(a.Streams), len(a.IDs))) + } + cmd := c.client.B().Arbitrary("XREADGROUP") + cmd = cmd.Args("GROUP", a.Group, a.Consumer) + if a.Count > 0 { + cmd = cmd.Args("COUNT", strconv.FormatInt(a.Count, 10)) + } + if a.Block >= 0 { + cmd = cmd.Args("BLOCK", strconv.FormatInt(formatMs(a.Block), 10)) + } + if a.NoAck { + cmd = cmd.Args("NOACK") + } + cmd = cmd.Args("STREAMS") + for i, v := range a.Streams { + cmd = cmd.Args(v, a.IDs[i]) + } + resp := c.client.Do(ctx, cmd.Build()) + return newXStreamSliceCmd(resp) +} + +func (c *Compat) XAck(ctx context.Context, stream, group string, ids ...string) *IntCmd { + cmd := c.client.B().Xack().Key(stream).Group(group).Id(ids...).Build() + resp := c.client.Do(ctx, cmd) + return newIntCmd(resp) +} + +func (c *Compat) XPending(ctx context.Context, stream, group string) *XPendingCmd { + cmd := c.client.B().Xpending().Key(stream).Group(group).Build() + resp := c.client.Do(ctx, cmd) + return newXPendingCmd(resp) +} + +func (c *Compat) XPendingExt(ctx context.Context, a XPendingExtArgs) *XPendingExtCmd { + cmd := c.client.B().Arbitrary("XPENDING").Keys(a.Stream, a.Group) + if a.Idle != 0 { + cmd = cmd.Args("IDLE", strconv.FormatInt(formatMs(a.Idle), 10)) + } + cmd = cmd.Args(a.Start, a.End, strconv.FormatInt(a.Count, 10)) + if a.Consumer != "" { + cmd = cmd.Args(a.Consumer) + } + resp := c.client.Do(ctx, cmd.Build()) + return newXPendingExtCmd(resp) +} + +func (c *Compat) XClaim(ctx context.Context, a XClaimArgs) *XMessageSliceCmd { + cmd := c.client.B().Xclaim().Key(a.Stream).Group(a.Group).Consumer(a.Consumer).MinIdleTime(strconv.FormatInt(formatMs(a.MinIdle), 10)).Id(a.Messages...).Build() + resp := c.client.Do(ctx, cmd) + return newXMessageSliceCmd(resp) +} + +func (c *Compat) XClaimJustID(ctx context.Context, a XClaimArgs) *StringSliceCmd { + cmd := c.client.B().Xclaim().Key(a.Stream).Group(a.Group).Consumer(a.Consumer).MinIdleTime(strconv.FormatInt(formatMs(a.MinIdle), 10)).Id(a.Messages...).Justid().Build() + resp := c.client.Do(ctx, cmd) + return newStringSliceCmd(resp) +} + +func (c *Compat) XAutoClaim(ctx context.Context, a XAutoClaimArgs) *XAutoClaimCmd { + var resp rueidis.RedisResult + if a.Count > 0 { + resp = c.client.Do(ctx, c.client.B().Xautoclaim().Key(a.Stream).Group(a.Group).Consumer(a.Consumer).MinIdleTime(strconv.FormatInt(formatMs(a.MinIdle), 10)).Start(a.Start).Count(a.Count).Build()) + } else { + resp = c.client.Do(ctx, c.client.B().Xautoclaim().Key(a.Stream).Group(a.Group).Consumer(a.Consumer).MinIdleTime(strconv.FormatInt(formatMs(a.MinIdle), 10)).Start(a.Start).Build()) + } + return newXAutoClaimCmd(resp) +} + +func (c *Compat) XAutoClaimJustID(ctx context.Context, a XAutoClaimArgs) *XAutoClaimJustIDCmd { + var resp rueidis.RedisResult + if a.Count > 0 { + resp = c.client.Do(ctx, c.client.B().Xautoclaim().Key(a.Stream).Group(a.Group).Consumer(a.Consumer).MinIdleTime(strconv.FormatInt(formatMs(a.MinIdle), 10)).Start(a.Start).Count(a.Count).Justid().Build()) + } else { + resp = c.client.Do(ctx, c.client.B().Xautoclaim().Key(a.Stream).Group(a.Group).Consumer(a.Consumer).MinIdleTime(strconv.FormatInt(formatMs(a.MinIdle), 10)).Start(a.Start).Justid().Build()) + } + return newXAutoClaimJustIDCmd(resp) +} diff --git a/rueidiscompat/command.go b/rueidiscompat/command.go index b48f2682..131690e8 100644 --- a/rueidiscompat/command.go +++ b/rueidiscompat/command.go @@ -1,6 +1,7 @@ package rueidiscompat import ( + "fmt" "strconv" "time" @@ -247,6 +248,40 @@ func (cmd *IntSliceCmd) String() (string, error) { return cmd.res.ToString() } +type BoolSliceCmd struct { + res rueidis.RedisResult + val []bool + err error +} + +func newBoolSliceCmd(res rueidis.RedisResult) *BoolSliceCmd { + ints, err := res.AsIntSlice() + if err != nil { + return &BoolSliceCmd{res: res, err: err} + } + val := make([]bool, 0, len(ints)) + for i := 0; i < len(ints); i++ { + val = append(val, i == 1) + } + return &BoolSliceCmd{res: res, val: val, err: err} +} + +func (cmd *BoolSliceCmd) SetVal(val []bool) { + cmd.val = val +} + +func (cmd *BoolSliceCmd) Val() []bool { + return cmd.val +} + +func (cmd *BoolSliceCmd) Result() ([]bool, error) { + return cmd.val, cmd.err +} + +func (cmd *BoolSliceCmd) String() (string, error) { + return cmd.res.ToString() +} + type FloatCmd struct { res rueidis.RedisResult val float64 @@ -277,35 +312,34 @@ func (cmd *FloatCmd) String() (string, error) { type ScanCmd struct { res rueidis.RedisResult cursor uint64 - page []string + keys []string err error } func newScanCmd(res rueidis.RedisResult) *ScanCmd { - cursor_list, err := res.ToArray() + ret, err := res.ToArray() if err != nil { return &ScanCmd{res: res, err: err} } - raw_cursor, raw_page := cursor_list[0], cursor_list[1] - cursor, err := raw_cursor.ToInt64() + cursor, err := ret[0].ToInt64() if err != nil { return &ScanCmd{res: res, err: err} } - page, err := raw_page.AsStrSlice() - return &ScanCmd{res: res, cursor: uint64(cursor), page: page, err: err} + keys, err := ret[1].AsStrSlice() + return &ScanCmd{res: res, cursor: uint64(cursor), keys: keys, err: err} } -func (cmd *ScanCmd) SetVal(page []string, cursor uint64) { - cmd.page = page +func (cmd *ScanCmd) SetVal(keys []string, cursor uint64) { + cmd.keys = keys cmd.cursor = cursor } func (cmd *ScanCmd) Val() (keys []string, cursor uint64) { - return cmd.page, cmd.cursor + return cmd.keys, cmd.cursor } func (cmd *ScanCmd) Result() (keys []string, cursor uint64, err error) { - return cmd.page, cmd.cursor, cmd.err + return cmd.keys, cmd.cursor, cmd.err } func (cmd *ScanCmd) String() (string, error) { @@ -339,6 +373,358 @@ func (cmd *StringStringMapCmd) String() (string, error) { return cmd.res.ToString() } +type StringStructMapCmd struct { + res rueidis.RedisResult + val map[string]struct{} + err error +} + +func newStringStructMapCmd(res rueidis.RedisResult) *StringStructMapCmd { + strSlice, err := res.AsStrSlice() + if err != nil { + return &StringStructMapCmd{res: res, err: err} + } + val := make(map[string]struct{}, len(strSlice)) + for _, v := range strSlice { + val[v] = struct{}{} + } + return &StringStructMapCmd{res: res, val: val, err: err} +} + +func (cmd *StringStructMapCmd) SetVal(val map[string]struct{}) { + cmd.val = val +} + +func (cmd *StringStructMapCmd) Val() map[string]struct{} { + return cmd.val +} + +func (cmd *StringStructMapCmd) Result() (map[string]struct{}, error) { + return cmd.val, cmd.err +} + +func (cmd *StringStructMapCmd) String() (string, error) { + return cmd.res.ToString() +} + +type XMessageSliceCmd struct { + res rueidis.RedisResult + val []rueidis.XMessage + err error +} + +func newXMessageSliceCmd(res rueidis.RedisResult) *XMessageSliceCmd { + val, err := res.AsXMessageSlice() + return &XMessageSliceCmd{res: res, val: val, err: err} +} + +func (cmd *XMessageSliceCmd) SetVal(val []rueidis.XMessage) { + cmd.val = val +} + +func (cmd *XMessageSliceCmd) Val() []rueidis.XMessage { + return cmd.val +} + +func (cmd *XMessageSliceCmd) Result() ([]rueidis.XMessage, error) { + return cmd.val, cmd.err +} + +func (cmd *XMessageSliceCmd) String() (string, error) { + return cmd.res.ToString() +} + +type XStream struct { + Stream string + Messages []rueidis.XMessage +} + +type XStreamSliceCmd struct { + res rueidis.RedisResult + val []XStream + err error +} + +func newXStreamSliceCmd(res rueidis.RedisResult) *XStreamSliceCmd { + arrs, err := res.ToArray() + if err != nil { + return &XStreamSliceCmd{res: res, err: err} + } + val := make([]XStream, 0, len(arrs)) + for _, v := range arrs { + arr, err := v.ToArray() + if err != nil { + return &XStreamSliceCmd{res: res, err: err} + } + if len(arr) != 2 { + return &XStreamSliceCmd{res: res, err: fmt.Errorf("got %d, wanted 2", len(arr))} + } + stream, err := arr[0].ToString() + if err != nil { + return &XStreamSliceCmd{res: res, err: err} + } + msgs, err := arr[1].AsXMessageSlice() + if err != nil { + return &XStreamSliceCmd{res: res, err: err} + } + val = append(val, XStream{Stream: stream, Messages: msgs}) + } + return &XStreamSliceCmd{res: res, val: val, err: err} +} + +func (cmd *XStreamSliceCmd) SetVal(val []XStream) { + cmd.val = val +} + +func (cmd *XStreamSliceCmd) Val() []XStream { + return cmd.val +} + +func (cmd *XStreamSliceCmd) Result() ([]XStream, error) { + return cmd.val, cmd.err +} + +func (cmd *XStreamSliceCmd) String() (string, error) { + return cmd.res.ToString() +} + +type XPending struct { + Count int64 + Lower string + Higher string + Consumers map[string]int64 +} + +type XPendingCmd struct { + res rueidis.RedisResult + val XPending + err error +} + +func newXPendingCmd(res rueidis.RedisResult) *XPendingCmd { + arr, err := res.ToArray() + if err != nil { + return &XPendingCmd{res: res, err: err} + } + if len(arr) != 4 { + return &XPendingCmd{res: res, err: fmt.Errorf("got %d, wanted 4", len(arr))} + } + count, err := arr[0].ToInt64() + if err != nil { + return &XPendingCmd{res: res, err: err} + } + lower, err := arr[1].ToString() + if err != nil { + return &XPendingCmd{res: res, err: err} + } + higher, err := arr[2].ToString() + if err != nil { + return &XPendingCmd{res: res, err: err} + } + val := XPending{ + Count: count, + Lower: lower, + Higher: higher, + } + consumerArr, err := arr[3].ToArray() + if err != nil { + return &XPendingCmd{res: res, err: err} + } + for _, v := range consumerArr { + consumer, err := v.ToArray() + if err != nil { + return &XPendingCmd{res: res, err: err} + } + if len(arr) != 2 { + return &XPendingCmd{res: res, err: fmt.Errorf("got %d, wanted 2", len(arr))} + } + consumerName, err := consumer[0].ToString() + if err != nil { + return &XPendingCmd{res: res, err: err} + } + consumerPending, err := consumer[1].AsInt64() + if err != nil { + return &XPendingCmd{res: res, err: err} + } + if val.Consumers == nil { + val.Consumers = make(map[string]int64) + } + val.Consumers[consumerName] = consumerPending + } + return &XPendingCmd{res: res, val: val, err: err} +} + +func (cmd *XPendingCmd) SetVal(val XPending) { + cmd.val = val +} + +func (cmd *XPendingCmd) Val() XPending { + return cmd.val +} + +func (cmd *XPendingCmd) Result() (XPending, error) { + return cmd.val, cmd.err +} + +func (cmd *XPendingCmd) String() (string, error) { + return cmd.res.ToString() +} + +type XPendingExt struct { + ID string + Consumer string + Idle time.Duration + RetryCount int64 +} + +type XPendingExtCmd struct { + res rueidis.RedisResult + val []XPendingExt + err error +} + +func newXPendingExtCmd(res rueidis.RedisResult) *XPendingExtCmd { + arrs, err := res.ToArray() + if err != nil { + return &XPendingExtCmd{res: res, err: err} + } + val := make([]XPendingExt, 0, len(arrs)) + for _, v := range arrs { + arr, err := v.ToArray() + if err != nil { + return &XPendingExtCmd{res: res, err: err} + } + if len(arr) != 4 { + return &XPendingExtCmd{res: res, err: fmt.Errorf("got %d, wanted 4", len(arr))} + } + id, err := arr[0].ToString() + if err != nil { + return &XPendingExtCmd{res: res, err: err} + } + consumer, err := arr[1].ToString() + if err != nil { + return &XPendingExtCmd{res: res, err: err} + } + idle, err := arr[2].ToInt64() + if err != nil { + return &XPendingExtCmd{res: res, err: err} + } + retryCount, err := arr[3].ToInt64() + if err != nil { + return &XPendingExtCmd{res: res, err: err} + } + val = append(val, XPendingExt{ + ID: id, + Consumer: consumer, + Idle: time.Duration(idle) * time.Millisecond, + RetryCount: retryCount, + }) + } + return &XPendingExtCmd{res: res, val: val, err: err} +} + +func (cmd *XPendingExtCmd) SetVal(val []XPendingExt) { + cmd.val = val +} + +func (cmd *XPendingExtCmd) Val() []XPendingExt { + return cmd.val +} + +func (cmd *XPendingExtCmd) Result() ([]XPendingExt, error) { + return cmd.val, cmd.err +} + +func (cmd *XPendingExtCmd) String() (string, error) { + return cmd.res.ToString() +} + +type XAutoClaimCmd struct { + res rueidis.RedisResult + start string + val []rueidis.XMessage + err error +} + +func newXAutoClaimCmd(res rueidis.RedisResult) *XAutoClaimCmd { + arr, err := res.ToArray() + if err != nil { + return &XAutoClaimCmd{res: res, err: err} + } + if len(arr) != 2 { + return &XAutoClaimCmd{res: res, err: fmt.Errorf("got %d, wanted 2", len(arr))} + } + start, err := arr[0].ToString() + if err != nil { + return &XAutoClaimCmd{res: res, err: err} + } + val, err := arr[1].AsXMessageSlice() + if err != nil { + return &XAutoClaimCmd{res: res, err: err} + } + return &XAutoClaimCmd{res: res, val: val, start: start, err: err} +} + +func (cmd *XAutoClaimCmd) SetVal(val []rueidis.XMessage, start string) { + cmd.val = val + cmd.start = start +} + +func (cmd *XAutoClaimCmd) Val() (messages []rueidis.XMessage, start string) { + return cmd.val, cmd.start +} + +func (cmd *XAutoClaimCmd) Result() (messages []rueidis.XMessage, start string, err error) { + return cmd.val, cmd.start, cmd.err +} + +func (cmd *XAutoClaimCmd) String() (string, error) { + return cmd.res.ToString() +} + +type XAutoClaimJustIDCmd struct { + res rueidis.RedisResult + start string + val []string + err error +} + +func newXAutoClaimJustIDCmd(res rueidis.RedisResult) *XAutoClaimJustIDCmd { + arr, err := res.ToArray() + if err != nil { + return &XAutoClaimJustIDCmd{res: res, err: err} + } + if len(arr) != 2 { + return &XAutoClaimJustIDCmd{res: res, err: fmt.Errorf("got %d, wanted 2", len(arr))} + } + start, err := arr[0].ToString() + if err != nil { + return &XAutoClaimJustIDCmd{res: res, err: err} + } + val, err := arr[1].AsStrSlice() + if err != nil { + return &XAutoClaimJustIDCmd{res: res, err: err} + } + return &XAutoClaimJustIDCmd{res: res, val: val, start: start, err: err} +} + +func (cmd *XAutoClaimJustIDCmd) SetVal(val []string, start string) { + cmd.val = val + cmd.start = start +} + +func (cmd *XAutoClaimJustIDCmd) Val() (ids []string, start string) { + return cmd.val, cmd.start +} + +func (cmd *XAutoClaimJustIDCmd) Result() (ids []string, start string, err error) { + return cmd.val, cmd.start, cmd.err +} + +func (cmd *XAutoClaimJustIDCmd) String() (string, error) { + return cmd.res.ToString() +} + type Sort struct { By string Offset, Count int64 @@ -390,6 +776,68 @@ type LPosArgs struct { Rank, MaxLen int64 } +// Note: len(Fields) and len(Values) must be the same. +// MaxLen/MaxLenApprox and MinID are in conflict, only one of them can be used. +type XAddArgs struct { + Stream string + NoMkStream bool + MaxLen int64 // MAXLEN N + + MinID string + // Approx causes MaxLen and MinID to use "~" matcher (instead of "="). + Approx bool + Limit int64 + ID string + Fields []string + Values []string +} + +// Note: len(Streams) and len(IDs) must be the same. +type XReadArgs struct { + Streams []string // list of streams + IDs []string // list of ids + Count int64 + Block time.Duration +} + +// Note: len(Streams) and len(IDs) must be the same. +type XReadGroupArgs struct { + Group string + Consumer string + Streams []string // list of streams + IDs []string // list of ids + Count int64 + Block time.Duration + NoAck bool +} + +type XPendingExtArgs struct { + Stream string + Group string + Idle time.Duration + Start string + End string + Count int64 + Consumer string +} + +type XClaimArgs struct { + Stream string + Group string + Consumer string + MinIdle time.Duration + Messages []string +} + +type XAutoClaimArgs struct { + Stream string + Group string + MinIdle time.Duration + Start string + Count int64 + Consumer string +} + func usePrecise(dur time.Duration) bool { return dur < time.Second || dur%time.Second != 0 }