From e7fbb32e501cf8cdf38d42b321a2bf277dd6c592 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Wed, 8 Oct 2025 13:40:53 -0400 Subject: [PATCH 1/9] Accept deadline in write args and callback for GC results Part of RUNT-372 --- queue/client.go | 147 +++++++++++++++++++++++++++++++++++++--- queue/client_test.go | 12 +++- queue/queue.go | 49 -------------- queue/types.go | 5 +- queue/writetracking.lua | 12 ++-- 5 files changed, 157 insertions(+), 68 deletions(-) diff --git a/queue/client.go b/queue/client.go index bf6cf6a..ab1e765 100644 --- a/queue/client.go +++ b/queue/client.go @@ -7,10 +7,12 @@ import ( "errors" "fmt" "regexp" + "strconv" "strings" "time" "github.com/redis/go-redis/v9" + "go.uber.org/multierr" "github.com/replicate/go/shuffleshard" ) @@ -20,10 +22,16 @@ var ( ErrInvalidWriteArgs = errors.New("queue: invalid write arguments") ErrNoMatchingMessageInStream = errors.New("queue: no matching message in stream") ErrInvalidMetaCancelation = errors.New("queue: invalid meta cancelation") + ErrStopGC = errors.New("queue: stop garbage collection") streamSuffixPattern = regexp.MustCompile(`\A:s(\d+)\z`) ) +const ( + metaCancelationGCBatchSize = 100 + metaCancelationGCFuncTrackValuesSize = 20 +) + type Client struct { rdb redis.Cmdable ttl time.Duration // ttl for all keys in queue @@ -52,14 +60,120 @@ func (c *Client) Prepare(ctx context.Context) error { return prepare(ctx, c.rdb) } +// OnGCFunc is called periodically during GC with variadic "track values" as extracted +// from the meta cancelation key. +type OnGCFunc func(ctx context.Context, trackValues []string) error + // GC performs all garbage collection operations that cannot be automatically -// performed via key expiry. -func (c *Client) GC(ctx context.Context) error { - if _, err := gcMetaCancelation(ctx, c.rdb); err != nil { +// performed via key expiry, which is the "meta:cancelation" hash at the time of this +// writing. +func (c *Client) GC(ctx context.Context, f OnGCFunc) error { + now, err := c.rdb.Time(ctx).Result() + if err != nil { return err } - return nil + nowUnix := now.Unix() + nonFatalErrors := []error{} + idsToDelete := []string{} + keysToDelete := []string{} + iter := c.rdb.HScan(ctx, MetaCancelationHash, 0, "*:expiry:*", 0).Iterator() + + for iter.Next(ctx) { + key := iter.Val() + + if len(idsToDelete) >= metaCancelationGCFuncTrackValuesSize { + if err := c.callOnGC(ctx, f, idsToDelete); err != nil { + if errors.Is(err, ErrStopGC) { + return err + } + + nonFatalErrors = append(nonFatalErrors, err) + } + + idsToDelete = []string{} + } + + keyParts := strings.Split(key, ":") + if len(keyParts) != 3 { + continue + } + + keyTime, err := strconv.ParseInt(keyParts[2], 0, 64) + if err != nil { + nonFatalErrors = append(nonFatalErrors, err) + continue + } + + if keyTime > nowUnix { + keysToDelete = append(keysToDelete, key, keyParts[0]) + idsToDelete = append(idsToDelete, keyParts[0]) + } + } + + if err := c.callOnGC(ctx, f, idsToDelete); err != nil { + if errors.Is(err, ErrStopGC) { + return err + } + + nonFatalErrors = append(nonFatalErrors, err) + } + + if err := iter.Err(); err != nil { + return err + } + + for i := 0; i < len(keysToDelete); i += metaCancelationGCBatchSize { + sliceEnd := i + metaCancelationGCBatchSize + if sliceEnd > len(keysToDelete) { + sliceEnd = len(keysToDelete) + } + + if err := c.rdb.HDel( + ctx, + MetaCancelationHash, + keysToDelete[i:sliceEnd]..., + ).Err(); err != nil { + return err + } + } + + return multierr.Combine(nonFatalErrors...) +} + +func (c *Client) callOnGC(ctx context.Context, f OnGCFunc, idsToDelete []string) error { + if f == nil { + return nil + } + + pipe := c.rdb.Pipeline() + hValCmds := make([]*redis.StringCmd, len(idsToDelete)) + + for i, idToDelete := range idsToDelete { + hValCmds[i] = pipe.HGet(ctx, MetaCancelationHash, idToDelete) + } + + if _, err := pipe.Exec(ctx); err != nil { + return err + } + + trackValues := make([]string, len(idsToDelete)) + + for i, hValCmd := range hValCmds { + msgBytes, err := hValCmd.Bytes() + if err != nil { + return err + } + + msg := &metaCancelation{} + if err := json.Unmarshal(msgBytes, msg); err != nil { + return err + } + + trackValues[i] = msg.TrackValue + } + + return f(ctx, trackValues) } // Len calculates the aggregate length (XLEN) of the queue. It adds up the @@ -267,12 +381,24 @@ func (c *Client) write(ctx context.Context, args *WriteArgs) (string, error) { // Capacity: 3 (for seconds, streams, n) + len(shard) + 2*len(values) cmdArgs := make([]any, 0, 3+len(shard)+2*len(args.Values)) - cmdArgs = append(cmdArgs, int(c.ttl.Seconds())) - cmdArgs = append(cmdArgs, args.Streams) - cmdArgs = append(cmdArgs, len(shard)) + cmdArgs = append( + cmdArgs, + int(c.ttl.Seconds()), + args.Streams, + len(shard), + ) if c.trackField != "" { - cmdArgs = append(cmdArgs, c.trackField) + deadline := args.Deadline + if deadline.IsZero() { + deadline = time.Now().Add(25 * time.Hour) + } + + cmdArgs = append( + cmdArgs, + c.trackField, + deadline.Unix(), + ) } for _, s := range shard { @@ -290,8 +416,9 @@ func (c *Client) write(ctx context.Context, args *WriteArgs) (string, error) { } type metaCancelation struct { - StreamID string `json:"stream_id"` - MsgID string `json:"msg_id"` + StreamID string `json:"stream_id"` + MsgID string `json:"msg_id"` + TrackValue string `json:"track_value"` } // Del supports removal of a message when the given `fieldValue` matches a "meta diff --git a/queue/client_test.go b/queue/client_test.go index 84f2678..602c088 100644 --- a/queue/client_test.go +++ b/queue/client_test.go @@ -462,7 +462,17 @@ func TestClientGCIntegration(t *testing.T) { runClientWriteIntegrationTest(ctx, t, rdb, client, true) - require.NoError(t, client.GC(ctx)) + gcTrackedFields := []string{} + + onGCFunc := func(_ context.Context, trackedFields []string) error { + gcTrackedFields = append(gcTrackedFields, trackedFields...) + + return nil + } + + require.NoError(t, client.GC(ctx, onGCFunc)) + + require.Len(t, gcTrackedFields, 15) } // TestPickupLatencyIntegration runs a test with a mostly-empty queue -- by diff --git a/queue/queue.go b/queue/queue.go index 0ede775..c69aafe 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -35,9 +35,7 @@ package queue import ( "context" _ "embed" // to provide go:embed support - "strconv" "strings" - "time" "github.com/redis/go-redis/v9" ) @@ -76,8 +74,6 @@ var ( const ( MetaCancelationHash = "meta:cancelation" - - metaCancelationGCBatchSize = 100 ) func prepare(ctx context.Context, rdb redis.Cmdable) error { @@ -101,48 +97,3 @@ func prepare(ctx context.Context, rdb redis.Cmdable) error { } return nil } - -func gcMetaCancelation(ctx context.Context, rdb redis.Cmdable) (int, error) { - now := time.Now().UTC().Unix() - keysToDelete := []string{} - iter := rdb.HScan(ctx, MetaCancelationHash, 0, "*:expiry:*", 0).Iterator() - - for iter.Next(ctx) { - key := iter.Val() - - keyParts := strings.Split(key, ":") - if len(keyParts) != 3 { - continue - } - - keyTime, err := strconv.ParseInt(keyParts[2], 0, 64) - if err != nil { - continue - } - - if keyTime > now { - keysToDelete = append(keysToDelete, key, keyParts[0]) - } - } - - if err := iter.Err(); err != nil { - return 0, err - } - - for i := 0; i < len(keysToDelete); i += metaCancelationGCBatchSize { - sliceEnd := i + metaCancelationGCBatchSize - if sliceEnd > len(keysToDelete) { - sliceEnd = len(keysToDelete) - } - - if err := rdb.HDel( - ctx, - MetaCancelationHash, - keysToDelete[i:sliceEnd]..., - ).Err(); err != nil { - return 0, err - } - } - - return len(keysToDelete), nil -} diff --git a/queue/types.go b/queue/types.go index a5c0105..960a75f 100644 --- a/queue/types.go +++ b/queue/types.go @@ -15,8 +15,9 @@ func (e queueError) Error() string { const Empty = queueError("queue: empty") type WriteArgs struct { - Name string // queue name - Values map[string]any // message values + Name string // queue name + Values map[string]any // message values + Deadline time.Time // time after which message will be cancel (only when tracked) Streams int // total number of streams StreamsPerShard int // number of streams in each shard diff --git a/queue/writetracking.lua b/queue/writetracking.lua index ad13689..aa2ddd9 100644 --- a/queue/writetracking.lua +++ b/queue/writetracking.lua @@ -1,6 +1,6 @@ -- Write commands take the form -- --- EVALSHA sha 1 key seconds streams n track_field sid [sid ...] field value [field value ...] +-- EVALSHA sha 1 key seconds streams n track_field deadline sid [sid ...] field value [field value ...] -- -- - `key` is the base key for the queue, e.g. "prediction:input:abcd1234" -- - `seconds` determines the expiry timeout for all keys that make up the @@ -12,6 +12,7 @@ -- or equal to `streams`. -- - `track_field` is the name of the key in `fields` used for tracking the stream -- message ID for cancelation. +-- - `deadline` is the unix timestamp used in the cancelation key. -- - `sid` are the stream IDs to consider writing to. They must be in the range -- [0, `streams`). The message will be written to the shortest of the selected -- streams. @@ -27,8 +28,9 @@ local ttl = tonumber(ARGV[1], 10) local writestreams = tonumber(ARGV[2], 10) local n = tonumber(ARGV[3], 10) local track_field = ARGV[4] -local sids = { unpack(ARGV, 5, 5 + n - 1) } -local fields = { unpack(ARGV, 5 + n, #ARGV) } +local deadline = ARGV[5] +local sids = { unpack(ARGV, 6, 6 + n - 1) } +local fields = { unpack(ARGV, 6 + n, #ARGV) } local key_meta = base .. ':meta' local key_notifications = base .. ':notifications' @@ -117,9 +119,7 @@ redis.call('XADD', key_notifications, 'MAXLEN', '1', '*', 's', selected_sid) if track_value ~= '' then local cancelation_key = redis.sha1hex(track_value) - local server_time = redis.call('TIME') - local expiry_unixtime = tonumber(server_time[1]) + 90000 -- 25 hours - local cancelation_expiry_key = cancelation_key .. ':expiry:' .. tostring(expiry_unixtime) + local cancelation_expiry_key = cancelation_key .. ':expiry:' .. tostring(deadline) redis.call( 'HSET', From 49348da4b87f5277e19d73385e58482987256c96 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Thu, 9 Oct 2025 17:05:52 -0400 Subject: [PATCH 2/9] Do both GC callback and HDel in batches --- queue/client.go | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/queue/client.go b/queue/client.go index ab1e765..c41335e 100644 --- a/queue/client.go +++ b/queue/client.go @@ -28,8 +28,7 @@ var ( ) const ( - metaCancelationGCBatchSize = 100 - metaCancelationGCFuncTrackValuesSize = 20 + metaCancelationGCBatchSize = 100 ) type Client struct { @@ -60,8 +59,8 @@ func (c *Client) Prepare(ctx context.Context) error { return prepare(ctx, c.rdb) } -// OnGCFunc is called periodically during GC with variadic "track values" as extracted -// from the meta cancelation key. +// OnGCFunc is called periodically during GC *before* deleting the expired keys. The +// argument given is the "track values" as extracted from the meta cancelation key. type OnGCFunc func(ctx context.Context, trackValues []string) error // GC performs all garbage collection operations that cannot be automatically @@ -82,8 +81,8 @@ func (c *Client) GC(ctx context.Context, f OnGCFunc) error { for iter.Next(ctx) { key := iter.Val() - if len(idsToDelete) >= metaCancelationGCFuncTrackValuesSize { - if err := c.callOnGC(ctx, f, idsToDelete); err != nil { + if len(idsToDelete) >= metaCancelationGCBatchSize { + if err := c.gcProcessBatch(ctx, f, idsToDelete, keysToDelete); err != nil { if errors.Is(err, ErrStopGC) { return err } @@ -92,6 +91,7 @@ func (c *Client) GC(ctx context.Context, f OnGCFunc) error { } idsToDelete = []string{} + keysToDelete = []string{} } keyParts := strings.Split(key, ":") @@ -111,7 +111,7 @@ func (c *Client) GC(ctx context.Context, f OnGCFunc) error { } } - if err := c.callOnGC(ctx, f, idsToDelete); err != nil { + if err := c.gcProcessBatch(ctx, f, idsToDelete, keysToDelete); err != nil { if errors.Is(err, ErrStopGC) { return err } @@ -123,22 +123,27 @@ func (c *Client) GC(ctx context.Context, f OnGCFunc) error { return err } - for i := 0; i < len(keysToDelete); i += metaCancelationGCBatchSize { - sliceEnd := i + metaCancelationGCBatchSize - if sliceEnd > len(keysToDelete) { - sliceEnd = len(keysToDelete) - } + return multierr.Combine(nonFatalErrors...) +} - if err := c.rdb.HDel( - ctx, - MetaCancelationHash, - keysToDelete[i:sliceEnd]..., - ).Err(); err != nil { +func (c *Client) gcProcessBatch(ctx context.Context, f OnGCFunc, idsToDelete, keysToDelete []string) error { + if err := c.callOnGC(ctx, f, idsToDelete); err != nil { + // NOTE: The client `OnGCFunc` may request interruption via the `ErrStopGC` + // error as a way to prevent the `HDel`. + if errors.Is(err, ErrStopGC) { return err } } - return multierr.Combine(nonFatalErrors...) + if err := c.rdb.HDel( + ctx, + MetaCancelationHash, + keysToDelete..., + ).Err(); err != nil { + return err + } + + return nil } func (c *Client) callOnGC(ctx context.Context, f OnGCFunc, idsToDelete []string) error { From aea7d3e6ab036a1a5712dcc6b68c16696007ca1a Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Thu, 9 Oct 2025 20:17:35 -0400 Subject: [PATCH 3/9] Calculate expiry better; get server time for each batch --- queue/client.go | 27 ++++++++++++++++++++++----- queue/client_test.go | 3 ++- queue/writetracking.lua | 7 ++++++- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/queue/client.go b/queue/client.go index c41335e..65b9e23 100644 --- a/queue/client.go +++ b/queue/client.go @@ -73,9 +73,12 @@ func (c *Client) GC(ctx context.Context, f OnGCFunc) error { } nowUnix := now.Unix() + nonFatalErrors := []error{} + idsToDelete := []string{} keysToDelete := []string{} + iter := c.rdb.HScan(ctx, MetaCancelationHash, 0, "*:expiry:*", 0).Iterator() for iter.Next(ctx) { @@ -92,6 +95,13 @@ func (c *Client) GC(ctx context.Context, f OnGCFunc) error { idsToDelete = []string{} keysToDelete = []string{} + + now, err = c.rdb.Time(ctx).Result() + if err != nil { + return err + } + + nowUnix = now.Unix() } keyParts := strings.Split(key, ":") @@ -105,7 +115,7 @@ func (c *Client) GC(ctx context.Context, f OnGCFunc) error { continue } - if keyTime > nowUnix { + if nowUnix > keyTime { keysToDelete = append(keysToDelete, key, keyParts[0]) idsToDelete = append(idsToDelete, keyParts[0]) } @@ -127,6 +137,10 @@ func (c *Client) GC(ctx context.Context, f OnGCFunc) error { } func (c *Client) gcProcessBatch(ctx context.Context, f OnGCFunc, idsToDelete, keysToDelete []string) error { + if len(idsToDelete) == 0 || len(keysToDelete) == 0 { + return nil + } + if err := c.callOnGC(ctx, f, idsToDelete); err != nil { // NOTE: The client `OnGCFunc` may request interruption via the `ErrStopGC` // error as a way to prevent the `HDel`. @@ -394,15 +408,18 @@ func (c *Client) write(ctx context.Context, args *WriteArgs) (string, error) { ) if c.trackField != "" { - deadline := args.Deadline - if deadline.IsZero() { - deadline = time.Now().Add(25 * time.Hour) + deadlineUnix := int64(0) + if !args.Deadline.IsZero() { + deadlineUnix = args.Deadline.Unix() } cmdArgs = append( cmdArgs, c.trackField, - deadline.Unix(), + // NOTE: Deadline is an optional field in WriteArgs, so the Unix value may be + // passed as zero so that the writeTrackingScript uses a default value of the + // server time + ttl. + deadlineUnix, ) } diff --git a/queue/client_test.go b/queue/client_test.go index 602c088..04aeb5b 100644 --- a/queue/client_test.go +++ b/queue/client_test.go @@ -312,6 +312,7 @@ func runClientWriteIntegrationTest(ctx context.Context, t *testing.T, rdb *redis "name": "panda", "tracketytrack": trackID.String(), }, + Deadline: time.Now().Add(-1 * time.Hour), }) require.NoError(t, err) } @@ -472,7 +473,7 @@ func TestClientGCIntegration(t *testing.T) { require.NoError(t, client.GC(ctx, onGCFunc)) - require.Len(t, gcTrackedFields, 15) + require.Len(t, gcTrackedFields, 10) } // TestPickupLatencyIntegration runs a test with a mostly-empty queue -- by diff --git a/queue/writetracking.lua b/queue/writetracking.lua index aa2ddd9..a294d88 100644 --- a/queue/writetracking.lua +++ b/queue/writetracking.lua @@ -28,7 +28,7 @@ local ttl = tonumber(ARGV[1], 10) local writestreams = tonumber(ARGV[2], 10) local n = tonumber(ARGV[3], 10) local track_field = ARGV[4] -local deadline = ARGV[5] +local deadline = tonumber(ARGV[5], 10) local sids = { unpack(ARGV, 6, 6 + n - 1) } local fields = { unpack(ARGV, 6 + n, #ARGV) } @@ -118,6 +118,11 @@ local id = redis.call('XADD', key_stream, '*', unpack(fields)) redis.call('XADD', key_notifications, 'MAXLEN', '1', '*', 's', selected_sid) if track_value ~= '' then + if deadline == 0 then + local server_time = redis.call('TIME') + deadline = tonumber(server_time[1], 10) + ttl + end + local cancelation_key = redis.sha1hex(track_value) local cancelation_expiry_key = cancelation_key .. ':expiry:' .. tostring(deadline) From 0e065f65e562f23055af48a088fe4ff1707bf072 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Fri, 10 Oct 2025 08:29:33 -0400 Subject: [PATCH 4/9] Include deadline in meta cancelation value --- queue/client.go | 1 + queue/writetracking.lua | 1 + 2 files changed, 2 insertions(+) diff --git a/queue/client.go b/queue/client.go index 65b9e23..ea69818 100644 --- a/queue/client.go +++ b/queue/client.go @@ -441,6 +441,7 @@ type metaCancelation struct { StreamID string `json:"stream_id"` MsgID string `json:"msg_id"` TrackValue string `json:"track_value"` + Deadline int64 `json:"deadline"` } // Del supports removal of a message when the given `fieldValue` matches a "meta diff --git a/queue/writetracking.lua b/queue/writetracking.lua index a294d88..05a5cdf 100644 --- a/queue/writetracking.lua +++ b/queue/writetracking.lua @@ -134,6 +134,7 @@ if track_value ~= '' then ['stream_id'] = key_stream, ['track_value'] = track_value, ['msg_id'] = id, + ['deadline'] = deadline, }), cancelation_expiry_key, '1' From 0b455595efe9f51fab3853a9c9ce740240d93fcb Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Fri, 10 Oct 2025 10:57:14 -0400 Subject: [PATCH 5/9] Use HSCAN without values since we don't need them --- queue/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue/client.go b/queue/client.go index ea69818..c75ada2 100644 --- a/queue/client.go +++ b/queue/client.go @@ -79,7 +79,7 @@ func (c *Client) GC(ctx context.Context, f OnGCFunc) error { idsToDelete := []string{} keysToDelete := []string{} - iter := c.rdb.HScan(ctx, MetaCancelationHash, 0, "*:expiry:*", 0).Iterator() + iter := c.rdb.HScanNoValues(ctx, MetaCancelationHash, 0, "*:expiry:*", 0).Iterator() for iter.Next(ctx) { key := iter.Val() From e4aee231042776982eba8d86eb88198717074d25 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Fri, 10 Oct 2025 11:28:39 -0400 Subject: [PATCH 6/9] Return total from GC --- queue/client.go | 16 +++++++++------- queue/client_test.go | 4 +++- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/queue/client.go b/queue/client.go index c75ada2..c14304e 100644 --- a/queue/client.go +++ b/queue/client.go @@ -66,10 +66,10 @@ type OnGCFunc func(ctx context.Context, trackValues []string) error // GC performs all garbage collection operations that cannot be automatically // performed via key expiry, which is the "meta:cancelation" hash at the time of this // writing. -func (c *Client) GC(ctx context.Context, f OnGCFunc) error { +func (c *Client) GC(ctx context.Context, f OnGCFunc) (uint64, error) { now, err := c.rdb.Time(ctx).Result() if err != nil { - return err + return 0, err } nowUnix := now.Unix() @@ -80,14 +80,16 @@ func (c *Client) GC(ctx context.Context, f OnGCFunc) error { keysToDelete := []string{} iter := c.rdb.HScanNoValues(ctx, MetaCancelationHash, 0, "*:expiry:*", 0).Iterator() + total := uint64(0) for iter.Next(ctx) { key := iter.Val() + total++ if len(idsToDelete) >= metaCancelationGCBatchSize { if err := c.gcProcessBatch(ctx, f, idsToDelete, keysToDelete); err != nil { if errors.Is(err, ErrStopGC) { - return err + return total, err } nonFatalErrors = append(nonFatalErrors, err) @@ -98,7 +100,7 @@ func (c *Client) GC(ctx context.Context, f OnGCFunc) error { now, err = c.rdb.Time(ctx).Result() if err != nil { - return err + return total, err } nowUnix = now.Unix() @@ -123,17 +125,17 @@ func (c *Client) GC(ctx context.Context, f OnGCFunc) error { if err := c.gcProcessBatch(ctx, f, idsToDelete, keysToDelete); err != nil { if errors.Is(err, ErrStopGC) { - return err + return total, err } nonFatalErrors = append(nonFatalErrors, err) } if err := iter.Err(); err != nil { - return err + return total, err } - return multierr.Combine(nonFatalErrors...) + return total, multierr.Combine(nonFatalErrors...) } func (c *Client) gcProcessBatch(ctx context.Context, f OnGCFunc, idsToDelete, keysToDelete []string) error { diff --git a/queue/client_test.go b/queue/client_test.go index 04aeb5b..1e96282 100644 --- a/queue/client_test.go +++ b/queue/client_test.go @@ -471,7 +471,9 @@ func TestClientGCIntegration(t *testing.T) { return nil } - require.NoError(t, client.GC(ctx, onGCFunc)) + total, err := client.GC(ctx, onGCFunc) + require.NoError(t, err) + require.Equal(t, uint64(15), total) require.Len(t, gcTrackedFields, 10) } From 67a4f7f6edbeef4b9949b24caba4b292d913ebad Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Fri, 10 Oct 2025 12:46:45 -0400 Subject: [PATCH 7/9] Use field values directly instead of sha1 because the sha1 thing is only there for paranoia reasons and it's a pain to translate while debugging. --- queue/client.go | 8 ++------ queue/client_test.go | 7 ++----- queue/writetracking.lua | 5 ++--- 3 files changed, 6 insertions(+), 14 deletions(-) diff --git a/queue/client.go b/queue/client.go index c14304e..9bc8cd0 100644 --- a/queue/client.go +++ b/queue/client.go @@ -2,7 +2,6 @@ package queue import ( "context" - "crypto/sha1" "encoding/json" "errors" "fmt" @@ -449,9 +448,7 @@ type metaCancelation struct { // Del supports removal of a message when the given `fieldValue` matches a "meta // cancelation" key as written when using a client with tracking support. func (c *Client) Del(ctx context.Context, fieldValue string) error { - metaCancelationKey := fmt.Sprintf("%x", sha1.Sum([]byte(fieldValue))) - - msgBytes, err := c.rdb.HGet(ctx, MetaCancelationHash, metaCancelationKey).Bytes() + msgBytes, err := c.rdb.HGet(ctx, MetaCancelationHash, fieldValue).Bytes() if err != nil { return err } @@ -476,8 +473,7 @@ func (c *Client) Del(ctx context.Context, fieldValue string) error { if n == 0 { return fmt.Errorf( - "key=%q field-value=%q stream=%q message-id=%q: %w", - metaCancelationKey, + "field-value=%q stream=%q message-id=%q: %w", fieldValue, msg.StreamID, msg.MsgID, diff --git a/queue/client_test.go b/queue/client_test.go index 1e96282..0f3a300 100644 --- a/queue/client_test.go +++ b/queue/client_test.go @@ -3,7 +3,6 @@ package queue_test import ( "context" crand "crypto/rand" - "crypto/sha1" "errors" "fmt" "math/rand" @@ -440,12 +439,10 @@ func TestClientDelIntegration(t *testing.T) { require.Error(t, client.Del(ctx, trackIDs[0]+"oops")) require.Error(t, client.Del(ctx, "bogustown")) - metaCancelationKey := fmt.Sprintf("%x", sha1.Sum([]byte(trackIDs[1]))) - - metaCancel, err := rdb.HGet(ctx, queue.MetaCancelationHash, metaCancelationKey).Result() + metaCancel, err := rdb.HGet(ctx, queue.MetaCancelationHash, trackIDs[1]).Result() require.NoError(t, err) - rdb.HSet(ctx, queue.MetaCancelationHash, metaCancelationKey, "{{[,bogus"+metaCancel) + rdb.HSet(ctx, queue.MetaCancelationHash, trackIDs[1], "{{[,bogus"+metaCancel) require.Error(t, client.Del(ctx, trackIDs[1])) diff --git a/queue/writetracking.lua b/queue/writetracking.lua index 05a5cdf..340a03f 100644 --- a/queue/writetracking.lua +++ b/queue/writetracking.lua @@ -123,13 +123,12 @@ if track_value ~= '' then deadline = tonumber(server_time[1], 10) + ttl end - local cancelation_key = redis.sha1hex(track_value) - local cancelation_expiry_key = cancelation_key .. ':expiry:' .. tostring(deadline) + local cancelation_expiry_key = track_value .. ':expiry:' .. tostring(deadline) redis.call( 'HSET', '__META_CANCELATION_HASH__', - cancelation_key, + track_value, cjson.encode({ ['stream_id'] = key_stream, ['track_value'] = track_value, From c16aef1b26bba47acc70486049b0b4eef77059cf Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Fri, 10 Oct 2025 14:08:54 -0400 Subject: [PATCH 8/9] Also return count deleted from GC --- queue/client.go | 39 +++++++++++++++++++++------------------ queue/client_test.go | 3 ++- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/queue/client.go b/queue/client.go index 9bc8cd0..a66856c 100644 --- a/queue/client.go +++ b/queue/client.go @@ -65,10 +65,10 @@ type OnGCFunc func(ctx context.Context, trackValues []string) error // GC performs all garbage collection operations that cannot be automatically // performed via key expiry, which is the "meta:cancelation" hash at the time of this // writing. -func (c *Client) GC(ctx context.Context, f OnGCFunc) (uint64, error) { +func (c *Client) GC(ctx context.Context, f OnGCFunc) (uint64, uint64, error) { now, err := c.rdb.Time(ctx).Result() if err != nil { - return 0, err + return 0, 0, err } nowUnix := now.Unix() @@ -80,26 +80,30 @@ func (c *Client) GC(ctx context.Context, f OnGCFunc) (uint64, error) { iter := c.rdb.HScanNoValues(ctx, MetaCancelationHash, 0, "*:expiry:*", 0).Iterator() total := uint64(0) + nDeleted := uint64(0) for iter.Next(ctx) { key := iter.Val() total++ if len(idsToDelete) >= metaCancelationGCBatchSize { - if err := c.gcProcessBatch(ctx, f, idsToDelete, keysToDelete); err != nil { + n, err := c.gcProcessBatch(ctx, f, idsToDelete, keysToDelete) + if err != nil { if errors.Is(err, ErrStopGC) { - return total, err + return total, nDeleted, err } nonFatalErrors = append(nonFatalErrors, err) } + nDeleted += uint64(n / 2) + idsToDelete = []string{} keysToDelete = []string{} now, err = c.rdb.Time(ctx).Result() if err != nil { - return total, err + return total, nDeleted, err } nowUnix = now.Unix() @@ -122,43 +126,42 @@ func (c *Client) GC(ctx context.Context, f OnGCFunc) (uint64, error) { } } - if err := c.gcProcessBatch(ctx, f, idsToDelete, keysToDelete); err != nil { + n, err := c.gcProcessBatch(ctx, f, idsToDelete, keysToDelete) + if err != nil { if errors.Is(err, ErrStopGC) { - return total, err + return total, nDeleted, err } nonFatalErrors = append(nonFatalErrors, err) } + nDeleted += uint64(n / 2) + if err := iter.Err(); err != nil { - return total, err + return total, nDeleted, err } - return total, multierr.Combine(nonFatalErrors...) + return total, nDeleted, multierr.Combine(nonFatalErrors...) } -func (c *Client) gcProcessBatch(ctx context.Context, f OnGCFunc, idsToDelete, keysToDelete []string) error { +func (c *Client) gcProcessBatch(ctx context.Context, f OnGCFunc, idsToDelete, keysToDelete []string) (int64, error) { if len(idsToDelete) == 0 || len(keysToDelete) == 0 { - return nil + return 0, nil } if err := c.callOnGC(ctx, f, idsToDelete); err != nil { // NOTE: The client `OnGCFunc` may request interruption via the `ErrStopGC` // error as a way to prevent the `HDel`. if errors.Is(err, ErrStopGC) { - return err + return 0, err } } - if err := c.rdb.HDel( + return c.rdb.HDel( ctx, MetaCancelationHash, keysToDelete..., - ).Err(); err != nil { - return err - } - - return nil + ).Result() } func (c *Client) callOnGC(ctx context.Context, f OnGCFunc, idsToDelete []string) error { diff --git a/queue/client_test.go b/queue/client_test.go index 0f3a300..01df4fc 100644 --- a/queue/client_test.go +++ b/queue/client_test.go @@ -468,9 +468,10 @@ func TestClientGCIntegration(t *testing.T) { return nil } - total, err := client.GC(ctx, onGCFunc) + total, nDeleted, err := client.GC(ctx, onGCFunc) require.NoError(t, err) require.Equal(t, uint64(15), total) + require.Equal(t, uint64(10), nDeleted) require.Len(t, gcTrackedFields, 10) } From 28aa39ba001e8103b4e484d8f201e4fceaf8b614 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Fri, 10 Oct 2025 14:20:21 -0400 Subject: [PATCH 9/9] Minor rework of deletion count --- queue/client.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/queue/client.go b/queue/client.go index a66856c..c22d26a 100644 --- a/queue/client.go +++ b/queue/client.go @@ -80,7 +80,7 @@ func (c *Client) GC(ctx context.Context, f OnGCFunc) (uint64, uint64, error) { iter := c.rdb.HScanNoValues(ctx, MetaCancelationHash, 0, "*:expiry:*", 0).Iterator() total := uint64(0) - nDeleted := uint64(0) + twiceDeleted := uint64(0) for iter.Next(ctx) { key := iter.Val() @@ -90,20 +90,20 @@ func (c *Client) GC(ctx context.Context, f OnGCFunc) (uint64, uint64, error) { n, err := c.gcProcessBatch(ctx, f, idsToDelete, keysToDelete) if err != nil { if errors.Is(err, ErrStopGC) { - return total, nDeleted, err + return total, twiceDeleted / 2, err } nonFatalErrors = append(nonFatalErrors, err) } - nDeleted += uint64(n / 2) + twiceDeleted += uint64(n) idsToDelete = []string{} keysToDelete = []string{} now, err = c.rdb.Time(ctx).Result() if err != nil { - return total, nDeleted, err + return total, twiceDeleted / 2, err } nowUnix = now.Unix() @@ -129,19 +129,19 @@ func (c *Client) GC(ctx context.Context, f OnGCFunc) (uint64, uint64, error) { n, err := c.gcProcessBatch(ctx, f, idsToDelete, keysToDelete) if err != nil { if errors.Is(err, ErrStopGC) { - return total, nDeleted, err + return total, twiceDeleted / 2, err } nonFatalErrors = append(nonFatalErrors, err) } - nDeleted += uint64(n / 2) + twiceDeleted += uint64(n) if err := iter.Err(); err != nil { - return total, nDeleted, err + return total, twiceDeleted / 2, err } - return total, nDeleted, multierr.Combine(nonFatalErrors...) + return total, twiceDeleted / 2, multierr.Combine(nonFatalErrors...) } func (c *Client) gcProcessBatch(ctx context.Context, f OnGCFunc, idsToDelete, keysToDelete []string) (int64, error) {