From 118f85d1a36c73f808483df21d6ae286f2472e14 Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Mon, 17 Jul 2023 20:59:26 +0100 Subject: [PATCH] Adds tidigest commands (tdigest.merge missing) --- probabilistic.go | 436 +++++++++++++++++++++++++++++++++++------- probabilistic_test.go | 122 ++++++++++++ 2 files changed, 486 insertions(+), 72 deletions(-) diff --git a/probabilistic.go b/probabilistic.go index da3c3fd37..52fbcf427 100644 --- a/probabilistic.go +++ b/probabilistic.go @@ -7,46 +7,64 @@ import ( ) type ProbabilisticCmdble interface { - BFAdd(ctx context.Context, key, item interface{}) *IntCmd + BFAdd(ctx context.Context, key, element interface{}) *IntCmd BFCard(ctx context.Context, key string) *IntCmd - BFExists(ctx context.Context, key, item interface{}) *IntCmd + BFExists(ctx context.Context, key, element interface{}) *IntCmd BFInfo(ctx context.Context, key string) *BFInfoCmd BFInfoArg(ctx context.Context, key string, option BFInfo) *BFInfoCmd - BFInsert(ctx context.Context, key string, options *BFReserveOptions, items ...interface{}) *IntSliceCmd - BFMAdd(ctx context.Context, key string, items ...interface{}) *IntSliceCmd - BFMExists(ctx context.Context, key string, items ...interface{}) *IntSliceCmd + BFInsert(ctx context.Context, key string, options *BFReserveOptions, elements ...interface{}) *IntSliceCmd + BFMAdd(ctx context.Context, key string, elements ...interface{}) *IntSliceCmd + BFMExists(ctx context.Context, key string, elements ...interface{}) *IntSliceCmd BFReserve(ctx context.Context, key string, errorRate float64, capacity int64) *StatusCmd BFReserveExpansion(ctx context.Context, key string, errorRate float64, capacity, expansion int64) *StatusCmd BFReserveNonScaling(ctx context.Context, key string, errorRate float64, capacity int64) *StatusCmd BFReserveArgs(ctx context.Context, key string, options *BFReserveOptions) *StatusCmd - //TODO Loadchunk and scandump missing + //TODO LoadChunk and ScanDump missing - CFAdd(ctx context.Context, key, item interface{}) *IntCmd - CFAddNX(ctx context.Context, key, item interface{}) *IntCmd - CFCount(ctx context.Context, key, item interface{}) *IntCmd + CFAdd(ctx context.Context, key, element interface{}) *IntCmd + CFAddNX(ctx context.Context, key, element interface{}) *IntCmd + CFCount(ctx context.Context, key, element interface{}) *IntCmd CFDel(ctx context.Context, key string) *IntCmd - CFExists(ctx context.Context, key, item interface{}) *IntCmd + CFExists(ctx context.Context, key, element interface{}) *IntCmd CFInfo(ctx context.Context, key string) *CFInfoCmd + CFInsert(ctx context.Context, key string, options *CFInsertOptions, elements ...interface{}) *IntSliceCmd + CFInsertNx(ctx context.Context, key string, options *CFInsertOptions, elements ...interface{}) *IntSliceCmd + CFMExists(ctx context.Context, key string, elements ...interface{}) *IntSliceCmd CFReserve(ctx context.Context, key string, capacity int64) *StatusCmd - CFInsert(ctx context.Context, key string, options *CFInsertOptions, items ...interface{}) *IntSliceCmd - CFInsertNx(ctx context.Context, key string, options *CFInsertOptions, items ...interface{}) *IntSliceCmd - CFMExists(ctx context.Context, key string, items ...interface{}) *IntSliceCmd + //TODO LoadChunk and ScanDump missing - CMSIncrBy(ctx context.Context, key string, items ...interface{}) *IntSliceCmd + CMSIncrBy(ctx context.Context, key string, elements ...interface{}) *IntSliceCmd CMSInfo(ctx context.Context, key string) *CMSInfoCmd CMSInitByDim(ctx context.Context, key string, width, height int64) *StatusCmd CMSInitByProb(ctx context.Context, key string, errorRate, probability float64) *StatusCmd CMSMerge(ctx context.Context, destKey string, sourceKeys ...string) *StatusCmd CMSMergeWithWeight(ctx context.Context, destKey string, sourceKeys map[string]int) *StatusCmd - CMSQuery(ctx context.Context, key string, items ...interface{}) *IntSliceCmd + CMSQuery(ctx context.Context, key string, elements ...interface{}) *IntSliceCmd - TOPKAdd(ctx context.Context, key string, items ...interface{}) *StringSliceCmd + TOPKAdd(ctx context.Context, key string, elements ...interface{}) *StringSliceCmd + TOPKCount(ctx context.Context, key string, elements ...interface{}) *IntSliceCmd + TOPKIncrBy(ctx context.Context, key string, elements ...interface{}) *StringSliceCmd + TOPKInfo(ctx context.Context, key string) *TOPKInfoCmd + TOPKList(ctx context.Context, key string) *StringSliceCmd + TOPKListWithCount(ctx context.Context, key string) *MapStringIntCmd + TOPKQuery(ctx context.Context, key string, elements ...interface{}) *BoolSliceCmd TOPKReserve(ctx context.Context, key string, k int) *StatusCmd TOPKReserveWithOptions(ctx context.Context, key string, k int, width, depth int64, decay float64) *StatusCmd - TOPKInfo(ctx context.Context, key string) *TOPKInfoCmd - TOPKQuery(ctx context.Context, key string, items ...interface{}) *BoolSliceCmd - TOPKCount(ctx context.Context, key string, items ...interface{}) *IntSliceCmd - TOPKIncrBy(ctx context.Context, key string, items ...interface{}) *StringSliceCmd + + TDigestAdd(ctx context.Context, key string, elements ...float64) *StatusCmd + TDigestByRank(ctx context.Context, key string, rank ...uint) *FloatSliceCmd + TDigestByRevRank(ctx context.Context, key string, rank ...uint) *FloatSliceCmd + TDigestCDF(ctx context.Context, key string, elements ...float64) *FloatSliceCmd + TDigestCreate(ctx context.Context, key string) *StatusCmd + TDigestCreateWithCompression(ctx context.Context, key string, compression int) *StatusCmd + TDigestInfo(ctx context.Context, key string) *TDigestInfoCmd + TDigestMax(ctx context.Context, key string) *FloatCmd + TDigestMin(ctx context.Context, key string) *FloatCmd + TDigestQuantile(ctx context.Context, key string, elements ...float64) *FloatSliceCmd + TDigestRank(ctx context.Context, key string, values ...float64) *IntSliceCmd + TDigestReset(ctx context.Context, key string) *StatusCmd + TDigestRevRank(ctx context.Context, key string, values ...float64) *IntSliceCmd + TDigestTrimmedMean(ctx context.Context, key string, lowCutQuantile, highCutQuantile float64) *FloatCmd } type BFReserveOptions struct { @@ -87,7 +105,7 @@ func (b BFInfoArgs) String() string { case BFFILTERS: return "filters" case BFITEMS: - return "items" + return "elements" case BFEXPANSION: return "expansion" } @@ -140,8 +158,8 @@ func (c cmdable) BFReserveArgs(ctx context.Context, key string, options *BFReser return cmd } -func (c cmdable) BFAdd(ctx context.Context, key, item interface{}) *BoolCmd { - args := []interface{}{"bf.add", key, item} +func (c cmdable) BFAdd(ctx context.Context, key, element interface{}) *BoolCmd { + args := []interface{}{"bf.add", key, element} cmd := NewBoolCmd(ctx, args...) _ = c(ctx, cmd) return cmd @@ -154,8 +172,8 @@ func (c cmdable) BFCard(ctx context.Context, key string) *IntCmd { return cmd } -func (c cmdable) BFExists(ctx context.Context, key, item string) *BoolCmd { - args := []interface{}{"bf.exists", key, item} +func (c cmdable) BFExists(ctx context.Context, key, element string) *BoolCmd { + args := []interface{}{"bf.exists", key, element} cmd := NewBoolCmd(ctx, args...) _ = c(ctx, cmd) return cmd @@ -228,7 +246,7 @@ func (cmd *BFInfoCmd) readReply(rd *proto.Reader) (err error) { result.Size, err = rd.ReadInt() case "Number of filters": result.NumFilters, err = rd.ReadInt() - case "Number of items inserted": + case "Number of elements inserted": result.NumItemsInserted, err = rd.ReadInt() case "Expansion rate": result.ExpansionRate, err = rd.ReadInt() @@ -252,7 +270,7 @@ func (c cmdable) BFInfoArg(ctx context.Context, key string, option BFInfoArgs) * return cmd } -func (c cmdable) BFInsert(ctx context.Context, key string, options *BFReserveOptions, items ...string) *BoolSliceCmd { +func (c cmdable) BFInsert(ctx context.Context, key string, options *BFReserveOptions, elements ...string) *BoolSliceCmd { args := []interface{}{"bf.insert", key} if options != nil { if options.Error != 0 { @@ -268,8 +286,8 @@ func (c cmdable) BFInsert(ctx context.Context, key string, options *BFReserveOpt args = append(args, "nonscaling") } } - args = append(args, "items") - for _, s := range items { + args = append(args, "elements") + for _, s := range elements { args = append(args, s) } @@ -278,9 +296,9 @@ func (c cmdable) BFInsert(ctx context.Context, key string, options *BFReserveOpt return cmd } -func (c cmdable) BFMAdd(ctx context.Context, key string, items ...string) *BoolSliceCmd { +func (c cmdable) BFMAdd(ctx context.Context, key string, elements ...string) *BoolSliceCmd { args := []interface{}{"bf.madd", key} - for _, s := range items { + for _, s := range elements { args = append(args, s) } cmd := NewBoolSliceCmd(ctx, args...) @@ -288,9 +306,9 @@ func (c cmdable) BFMAdd(ctx context.Context, key string, items ...string) *BoolS return cmd } -func (c cmdable) BFMExists(ctx context.Context, key string, items ...string) *BoolSliceCmd { +func (c cmdable) BFMExists(ctx context.Context, key string, elements ...string) *BoolSliceCmd { args := []interface{}{"bf.mexists", key} - for _, s := range items { + for _, s := range elements { args = append(args, s) } cmd := NewBoolSliceCmd(ctx, args...) @@ -325,36 +343,36 @@ func (c cmdable) CFReserveArgs(ctx context.Context, key string, options *CFReser return cmd } -func (c cmdable) CFAdd(ctx context.Context, key, item string) *BoolCmd { - args := []interface{}{"cf.add", key, item} +func (c cmdable) CFAdd(ctx context.Context, key, element string) *BoolCmd { + args := []interface{}{"cf.add", key, element} cmd := NewBoolCmd(ctx, args...) _ = c(ctx, cmd) return cmd } -func (c cmdable) CFAddNX(ctx context.Context, key, item string) *BoolCmd { - args := []interface{}{"cf.addnx", key, item} +func (c cmdable) CFAddNX(ctx context.Context, key, element string) *BoolCmd { + args := []interface{}{"cf.addnx", key, element} cmd := NewBoolCmd(ctx, args...) _ = c(ctx, cmd) return cmd } -func (c cmdable) CFCount(ctx context.Context, key, item string) *IntCmd { - args := []interface{}{"cf.count", key, item} +func (c cmdable) CFCount(ctx context.Context, key, element string) *IntCmd { + args := []interface{}{"cf.count", key, element} cmd := NewIntCmd(ctx, args...) _ = c(ctx, cmd) return cmd } -func (c cmdable) CFDel(ctx context.Context, key string, item string) *BoolCmd { - args := []interface{}{"cf.del", key, item} +func (c cmdable) CFDel(ctx context.Context, key string, element string) *BoolCmd { + args := []interface{}{"cf.del", key, element} cmd := NewBoolCmd(ctx, args...) _ = c(ctx, cmd) return cmd } -func (c cmdable) CFExists(ctx context.Context, key, item string) *BoolCmd { - args := []interface{}{"cf.exists", key, item} +func (c cmdable) CFExists(ctx context.Context, key, element string) *BoolCmd { + args := []interface{}{"cf.exists", key, element} cmd := NewBoolCmd(ctx, args...) _ = c(ctx, cmd) return cmd @@ -423,9 +441,9 @@ func (cmd *CFInfoCmd) readReply(rd *proto.Reader) (err error) { result.NumBuckets, err = rd.ReadInt() case "Number of filters": result.NumFilters, err = rd.ReadInt() - case "Number of items inserted": + case "Number of elements inserted": result.NumItemsInserted, err = rd.ReadInt() - case "Number of items deleted": + case "Number of elements deleted": result.NumItemsDeleted, err = rd.ReadInt() case "Bucket size": result.BucketSize, err = rd.ReadInt() @@ -454,25 +472,25 @@ func (c cmdable) CFInfo(ctx context.Context, key string) *CFInfoCmd { return cmd } -func (c cmdable) CFInsert(ctx context.Context, key string, options *CFInsertOptions, items ...string) *BoolSliceCmd { +func (c cmdable) CFInsert(ctx context.Context, key string, options *CFInsertOptions, elements ...string) *BoolSliceCmd { args := []interface{}{"cf.insert", key} - args = c.getCfInsertArgs(args, options, items...) + args = c.getCfInsertArgs(args, options, elements...) cmd := NewBoolSliceCmd(ctx, args...) _ = c(ctx, cmd) return cmd } -func (c cmdable) CFInsertNx(ctx context.Context, key string, options *CFInsertOptions, items ...string) *IntSliceCmd { +func (c cmdable) CFInsertNx(ctx context.Context, key string, options *CFInsertOptions, elements ...string) *IntSliceCmd { args := []interface{}{"cf.insertnx", key} - args = c.getCfInsertArgs(args, options, items...) + args = c.getCfInsertArgs(args, options, elements...) cmd := NewIntSliceCmd(ctx, args...) _ = c(ctx, cmd) return cmd } -func (c cmdable) getCfInsertArgs(args []interface{}, options *CFInsertOptions, items ...string) []interface{} { +func (c cmdable) getCfInsertArgs(args []interface{}, options *CFInsertOptions, elements ...string) []interface{} { if options != nil { if options.Capacity != 0 { args = append(args, "capacity", options.Capacity) @@ -481,16 +499,16 @@ func (c cmdable) getCfInsertArgs(args []interface{}, options *CFInsertOptions, i args = append(args, "nocreate") } } - args = append(args, "items") - for _, s := range items { + args = append(args, "elements") + for _, s := range elements { args = append(args, s) } return args } -func (c cmdable) CFMExists(ctx context.Context, key string, items ...string) *BoolSliceCmd { +func (c cmdable) CFMExists(ctx context.Context, key string, elements ...string) *BoolSliceCmd { args := []interface{}{"cf.mexists", key} - for _, s := range items { + for _, s := range elements { args = append(args, s) } cmd := NewBoolSliceCmd(ctx, args...) @@ -502,11 +520,11 @@ func (c cmdable) CFMExists(ctx context.Context, key string, items ...string) *Bo // CMS commands //------------------------------------------- -func (c cmdable) CMSIncrBy(ctx context.Context, key string, items ...interface{}) *IntSliceCmd { - args := make([]interface{}, 2, 2+len(items)) +func (c cmdable) CMSIncrBy(ctx context.Context, key string, elements ...interface{}) *IntSliceCmd { + args := make([]interface{}, 2, 2+len(elements)) args[0] = "cms.incrby" args[1] = key - args = appendArgs(args, items) + args = appendArgs(args, elements) cmd := NewIntSliceCmd(ctx, args...) _ = c(ctx, cmd) @@ -640,9 +658,9 @@ func (c cmdable) CMSMergeWithWeight(ctx context.Context, destKey string, sourceK return cmd } -func (c cmdable) CMSQuery(ctx context.Context, key string, items ...interface{}) *IntSliceCmd { +func (c cmdable) CMSQuery(ctx context.Context, key string, elements ...interface{}) *IntSliceCmd { args := []interface{}{"cms.query", key} - for _, s := range items { + for _, s := range elements { args = append(args, s) } cmd := NewIntSliceCmd(ctx, args...) @@ -652,13 +670,13 @@ func (c cmdable) CMSQuery(ctx context.Context, key string, items ...interface{}) // ------------------------------------------- // TOPK commands -//------------------------------------------- +//-------------------------------------------- -func (c cmdable) TOPKAdd(ctx context.Context, key string, items ...interface{}) *StringSliceCmd { - args := make([]interface{}, 2, 2+len(items)) +func (c cmdable) TOPKAdd(ctx context.Context, key string, elements ...interface{}) *StringSliceCmd { + args := make([]interface{}, 2, 2+len(elements)) args[0] = "topk.add" args[1] = key - args = appendArgs(args, items) + args = appendArgs(args, elements) cmd := NewStringSliceCmd(ctx, args...) _ = c(ctx, cmd) @@ -765,33 +783,33 @@ func (c cmdable) TOPKInfo(ctx context.Context, key string) *TOPKInfoCmd { return cmd } -func (c cmdable) TOPKQuery(ctx context.Context, key string, items ...interface{}) *BoolSliceCmd { - args := make([]interface{}, 2, 2+len(items)) +func (c cmdable) TOPKQuery(ctx context.Context, key string, elements ...interface{}) *BoolSliceCmd { + args := make([]interface{}, 2, 2+len(elements)) args[0] = "topk.query" args[1] = key - args = appendArgs(args, items) + args = appendArgs(args, elements) cmd := NewBoolSliceCmd(ctx, args...) _ = c(ctx, cmd) return cmd } -func (c cmdable) TOPKCount(ctx context.Context, key string, items ...interface{}) *IntSliceCmd { - args := make([]interface{}, 2, 2+len(items)) +func (c cmdable) TOPKCount(ctx context.Context, key string, elements ...interface{}) *IntSliceCmd { + args := make([]interface{}, 2, 2+len(elements)) args[0] = "topk.count" args[1] = key - args = appendArgs(args, items) + args = appendArgs(args, elements) cmd := NewIntSliceCmd(ctx, args...) _ = c(ctx, cmd) return cmd } -func (c cmdable) TOPKIncrBy(ctx context.Context, key string, items ...interface{}) *StringSliceCmd { - args := make([]interface{}, 2, 2+len(items)) +func (c cmdable) TOPKIncrBy(ctx context.Context, key string, elements ...interface{}) *StringSliceCmd { + args := make([]interface{}, 2, 2+len(elements)) args[0] = "topk.incrby" args[1] = key - args = appendArgs(args, items) + args = appendArgs(args, elements) cmd := NewStringSliceCmd(ctx, args...) _ = c(ctx, cmd) @@ -813,3 +831,277 @@ func (c cmdable) TOPKListWithCount(ctx context.Context, key string) *MapStringIn _ = c(ctx, cmd) return cmd } + +// ------------------------------------------- +// t-digest commands +// -------------------------------------------- + +func (c cmdable) TDigestAdd(ctx context.Context, key string, elements ...float64) *StatusCmd { + args := make([]interface{}, 2, 2+len(elements)) + args[0] = "tdigest.add" + args[1] = key + + // Convert floatSlice to []interface{} + interfaceSlice := make([]interface{}, len(elements)) + for i, v := range elements { + interfaceSlice[i] = v + } + + args = append(args, interfaceSlice...) + + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) TDigestByRank(ctx context.Context, key string, rank ...uint) *FloatSliceCmd { + args := make([]interface{}, 2, 2+len(rank)) + args[0] = "tdigest.byrank" + args[1] = key + + // Convert uint slice to []interface{} + interfaceSlice := make([]interface{}, len(rank)) + for i, v := range rank { + interfaceSlice[i] = v + } + + args = append(args, interfaceSlice...) + + cmd := NewFloatSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) TDigestByRevRank(ctx context.Context, key string, rank ...uint) *FloatSliceCmd { + args := make([]interface{}, 2, 2+len(rank)) + args[0] = "tdigest.byrevrank" + args[1] = key + + // Convert uint slice to []interface{} + interfaceSlice := make([]interface{}, len(rank)) + for i, v := range rank { + interfaceSlice[i] = v + } + + args = append(args, interfaceSlice...) + + cmd := NewFloatSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) TDigestCDF(ctx context.Context, key string, elements ...float64) *FloatSliceCmd { + args := make([]interface{}, 2, 2+len(elements)) + args[0] = "tdigest.cdf" + args[1] = key + + // Convert floatSlice to []interface{} + interfaceSlice := make([]interface{}, len(elements)) + for i, v := range elements { + interfaceSlice[i] = v + } + + args = append(args, interfaceSlice...) + + cmd := NewFloatSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) TDigestCreate(ctx context.Context, key string) *StatusCmd { + args := []interface{}{"tdigest.create", key} + + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) TDigestCreateWithCompression(ctx context.Context, key string, compression int) *StatusCmd { + args := []interface{}{"tdigest.create", key, "compression", compression} + + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +type TDigestInfo struct { + Compression int64 + Capacity int64 + MergedNodes int64 + UnmergedNodes int64 + MergedWeight int64 + UnmergedWeight int64 + Observations int64 + TotalCompressions int64 + MemoryUsage int64 +} + +type TDigestInfoCmd struct { + baseCmd + + val TDigestInfo +} + +func NewTDigestInfoCmd(ctx context.Context, args ...interface{}) *TDigestInfoCmd { + return &TDigestInfoCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + } +} + +func (cmd *TDigestInfoCmd) SetVal(val TDigestInfo) { + cmd.val = val +} + +func (cmd *TDigestInfoCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *TDigestInfoCmd) Val() TDigestInfo { + return cmd.val +} + +func (cmd *TDigestInfoCmd) Result() (TDigestInfo, error) { + return cmd.val, cmd.err +} + +func (cmd *TDigestInfoCmd) readReply(rd *proto.Reader) (err error) { + n, err := rd.ReadMapLen() + if err != nil { + return err + } + + var key string + var result TDigestInfo + for f := 0; f < n; f++ { + key, err = rd.ReadString() + if err != nil { + return err + } + + switch key { + case "Compression": + result.Compression, err = rd.ReadInt() + case "Capacity": + result.Capacity, err = rd.ReadInt() + case "Merged nodes": + result.MergedNodes, err = rd.ReadInt() + case "Unmerged nodes": + result.UnmergedNodes, err = rd.ReadInt() + case "Merged weight": + result.MergedWeight, err = rd.ReadInt() + case "Unmerged weight": + result.UnmergedWeight, err = rd.ReadInt() + case "Observations": + result.Observations, err = rd.ReadInt() + case "Total compressions": + result.TotalCompressions, err = rd.ReadInt() + case "Memory usage": + result.MemoryUsage, err = rd.ReadInt() + default: + return fmt.Errorf("redis: tdigest.info unexpected key %s", key) + } + + if err != nil { + return err + } + } + + cmd.val = result + return nil +} + +func (c cmdable) TDigestInfo(ctx context.Context, key string) *TDigestInfoCmd { + args := []interface{}{"tdigest.info", key} + + cmd := NewTDigestInfoCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) TDigestMax(ctx context.Context, key string) *FloatCmd { + args := []interface{}{"tdigest.max", key} + + cmd := NewFloatCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) TDigestMin(ctx context.Context, key string) *FloatCmd { + args := []interface{}{"tdigest.min", key} + + cmd := NewFloatCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) TDigestQuantile(ctx context.Context, key string, elements ...float64) *FloatSliceCmd { + args := make([]interface{}, 2, 2+len(elements)) + args[0] = "tdigest.quantile" + args[1] = key + + // Convert floatSlice to []interface{} + interfaceSlice := make([]interface{}, len(elements)) + for i, v := range elements { + interfaceSlice[i] = v + } + + args = append(args, interfaceSlice...) + + cmd := NewFloatSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} +func (c cmdable) TDigestRank(ctx context.Context, key string, values ...float64) *IntSliceCmd { + args := make([]interface{}, 2, 2+len(values)) + args[0] = "tdigest.rank" + args[1] = key + + // Convert floatSlice to []interface{} + interfaceSlice := make([]interface{}, len(values)) + for i, v := range values { + interfaceSlice[i] = v + } + + args = append(args, interfaceSlice...) + + cmd := NewIntSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) TDigestReset(ctx context.Context, key string) *StatusCmd { + args := []interface{}{"tdigest.reset", key} + + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) TDigestRevRank(ctx context.Context, key string, values ...float64) *IntSliceCmd { + args := make([]interface{}, 2, 2+len(values)) + args[0] = "tdigest.revrank" + args[1] = key + + // Convert floatSlice to []interface{} + interfaceSlice := make([]interface{}, len(values)) + for i, v := range values { + interfaceSlice[i] = v + } + + args = append(args, interfaceSlice...) + + cmd := NewIntSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) TDigestTrimmedMean(ctx context.Context, key string, lowCutQuantile, highCutQuantile float64) *FloatCmd { + args := []interface{}{"tdigest.trimmed_mean", key, lowCutQuantile, highCutQuantile} + + cmd := NewFloatCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} diff --git a/probabilistic_test.go b/probabilistic_test.go index 6a661ab05..448328249 100644 --- a/probabilistic_test.go +++ b/probabilistic_test.go @@ -5,6 +5,7 @@ import ( . "github.com/bsm/ginkgo/v2" . "github.com/bsm/gomega" "github.com/redis/go-redis/v9" + "math" ) var _ = Describe("Probabilistic commands", Label("probabilistic"), func() { @@ -476,4 +477,125 @@ var _ = Describe("Probabilistic commands", Label("probabilistic"), func() { }) }) + + Describe("t-digest", Label("tdigest"), func() { + It("should TDigestCreate, TDigestAdd, TDigestQuantile, TDigestCDF, TDigestMerge, TDigestInfo", Label("tdigest", "tdigestcreate", "tdigestadd", "tdigestquantile", "tdigestcdf", "tdigestmerge", "tdigestinfo"), func() { + err := client.TDigestCreate(ctx, "tdigest1").Err() + Expect(err).NotTo(HaveOccurred()) + + info, err := client.TDigestInfo(ctx, "tdigest1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(info.Observations).To(BeEquivalentTo(int64(0))) + + // Test with empty sketch + byRank, err := client.TDigestByRank(ctx, "tdigest1", 0, 1, 2, 3).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(byRank)).To(BeEquivalentTo(4)) + + byRevRank, err := client.TDigestByRevRank(ctx, "tdigest1", 0, 1, 2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(byRevRank)).To(BeEquivalentTo(3)) + + cdf, err := client.TDigestCDF(ctx, "tdigest1", 15, 35, 70).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(cdf)).To(BeEquivalentTo(3)) + + max, err := client.TDigestMax(ctx, "tdigest1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(math.IsNaN(max)).To(BeTrue()) + + min, err := client.TDigestMin(ctx, "tdigest1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(math.IsNaN(min)).To(BeTrue()) + + quantile, err := client.TDigestQuantile(ctx, "tdigest1", 0.1, 0.2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(quantile)).To(BeEquivalentTo(2)) + + rank, err := client.TDigestRank(ctx, "tdigest1", 10, 20).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rank)).To(BeEquivalentTo(2)) + + revRank, err := client.TDigestRevRank(ctx, "tdigest1", 10, 20).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(revRank)).To(BeEquivalentTo(2)) + + trimmedMean, err := client.TDigestTrimmedMean(ctx, "tdigest1", 0.1, 0.6).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(math.IsNaN(trimmedMean)).To(BeTrue()) + + // Add elements + err = client.TDigestAdd(ctx, "tdigest1", 10, 20, 30, 40, 50, 60, 70, 80, 90, 100).Err() + Expect(err).NotTo(HaveOccurred()) + + info, err = client.TDigestInfo(ctx, "tdigest1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(info.Observations).To(BeEquivalentTo(int64(10))) + + byRank, err = client.TDigestByRank(ctx, "tdigest1", 0, 1, 2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(byRank)).To(BeEquivalentTo(3)) + Expect(byRank[0]).To(BeEquivalentTo(float64(10))) + Expect(byRank[1]).To(BeEquivalentTo(float64(20))) + Expect(byRank[2]).To(BeEquivalentTo(float64(30))) + + byRevRank, err = client.TDigestByRevRank(ctx, "tdigest1", 0, 1, 2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(byRevRank)).To(BeEquivalentTo(3)) + Expect(byRevRank[0]).To(BeEquivalentTo(float64(100))) + Expect(byRevRank[1]).To(BeEquivalentTo(float64(90))) + Expect(byRevRank[2]).To(BeEquivalentTo(float64(80))) + + cdf, err = client.TDigestCDF(ctx, "tdigest1", 15, 35, 70).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(cdf)).To(BeEquivalentTo(3)) + Expect(cdf[0]).To(BeEquivalentTo(float64(0.1))) + Expect(cdf[1]).To(BeEquivalentTo(float64(0.3))) + Expect(cdf[2]).To(BeEquivalentTo(float64(0.65))) + + max, err = client.TDigestMax(ctx, "tdigest1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(max).To(BeEquivalentTo(float64(100))) + + min, err = client.TDigestMin(ctx, "tdigest1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(min).To(BeEquivalentTo(float64(10))) + + quantile, err = client.TDigestQuantile(ctx, "tdigest1", 0.1, 0.2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(quantile)).To(BeEquivalentTo(2)) + Expect(quantile[0]).To(BeEquivalentTo(float64(20))) + Expect(quantile[1]).To(BeEquivalentTo(float64(30))) + + rank, err = client.TDigestRank(ctx, "tdigest1", 10, 20).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rank)).To(BeEquivalentTo(2)) + Expect(rank[0]).To(BeEquivalentTo(int64(0))) + Expect(rank[1]).To(BeEquivalentTo(int64(1))) + + revRank, err = client.TDigestRevRank(ctx, "tdigest1", 10, 20).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(revRank)).To(BeEquivalentTo(2)) + Expect(revRank[0]).To(BeEquivalentTo(int64(9))) + Expect(revRank[1]).To(BeEquivalentTo(int64(8))) + + trimmedMean, err = client.TDigestTrimmedMean(ctx, "tdigest1", 0.1, 0.6).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(trimmedMean).To(BeEquivalentTo(float64(40))) + + reset, err := client.TDigestReset(ctx, "tdigest1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(reset).To(BeEquivalentTo("OK")) + + }) + + It("should TDigestCreateWithCompression", Label("tdigest", "tcreatewithcompression"), func() { + err := client.TDigestCreateWithCompression(ctx, "tdigest1", 2000).Err() + Expect(err).NotTo(HaveOccurred()) + + info, err := client.TDigestInfo(ctx, "tdigest1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(info.Compression).To(BeEquivalentTo(int64(2000))) + }) + }) })