feat: add prefix to interaction with redis#61
Conversation
Summary by CodeRabbit
WalkthroughAdds an optional Changes
Sequence Diagram(s)sequenceDiagram
participant Worker
participant JobHandler
participant Redis
Worker->>JobHandler: process job
JobHandler->>Redis: getKey(job)
alt Error occurs
JobHandler->>JobHandler: if RetryFailures <= 0
alt Terminal failure
JobHandler->>Redis: SET failureKey = failureReason
JobHandler->>JobHandler: LOG final failure
else Retry available
JobHandler->>JobHandler: RetryFailures -= 1
JobHandler->>JobHandler: set DelayedStatus
JobHandler->>JobHandler: LOG retry attempt
end
end
sequenceDiagram
participant Component as Caller
participant QueueModule
participant Redis
Component->>QueueModule: request limit check (queueName, options.Prefix)
QueueModule->>QueueModule: key = getKey(Prefix, queueName).ToLower()
QueueModule->>Redis: GET/INCR/EXPIRE key
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20–30 minutes Possibly related issues
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
d1186fc to
e8e66a4
Compare
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
module.go (2)
41-45: Update the comment to reflect the new naming convention.The comment states the name is in the form
"<name>Queue", but the implementation now produces"<name>_QUEUE".Apply this diff to correct the comment:
// getQueueName generates a unique name for a queue provider. // -// The name is in the form "<name>Queue". +// The name is in the form "<name>_QUEUE". func getQueueName(name string) core.Provide { return core.Provide(fmt.Sprintf("%s_QUEUE", name)) }
48-51: Update the comment to reflect the new naming convention.The comment states the queue is exported under the name
"<name>Queue", but the implementation now uses"<name>_QUEUE".Apply this diff to correct the comment:
// Register registers a new queue module with the given name and options. The // registered module creates a new queue with the given name and options, and -// exports the queue under the name "<name>Queue". +// exports the queue under the name "<name>_QUEUE". func Register(name string, opts ...*Options) core.Modules {
🧹 Nitpick comments (2)
queue.go (1)
42-55: Consider adding documentation for the Prefix field.The new
Prefixfield lacks inline documentation explaining its purpose and behavior. Adding a comment would help users understand how it affects Redis key generation.Apply this diff to add documentation:
RemoveOnComplete bool RemoveOnFail bool Delay time.Duration Timeout time.Duration // Default: 1 minutes + Prefix string // Prefix is prepended to queue names for Redis keys - Prefix string }job.go (1)
129-135: Consider extracting common key derivation logic.Both
Queue.getKey()(queue.go:425-431) andJob.getKey()implement similar prefix+name lowercasing logic. Consider extracting the shared prefix logic to reduce duplication.For example, Job.getKey() could delegate to Queue.getKey():
func (job *Job) getKey() string { - if job.queue.config.Prefix != "" { - prefix := job.queue.config.Prefix - return fmt.Sprintf("%s:%s", strings.ToLower(prefix+job.queue.Name), job.Id) - } - return fmt.Sprintf("%s:%s", strings.ToLower(job.queue.Name), job.Id) + queueKey := job.queue.getKey() + return fmt.Sprintf("%s:%s", queueKey, job.Id) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
job.go(2 hunks)module.go(1 hunks)queue.go(4 hunks)queue_test.go(1 hunks)
🔇 Additional comments (4)
queue_test.go (1)
17-31: LGTM! Good test coverage for the new Prefix feature.The test appropriately validates queue behavior with a prefix configured.
queue.go (2)
344-365: LGTM! Correct use of derived keys for rate limiting.The
IsLimitmethod correctly uses thegetKey()helper to derive Redis keys, ensuring consistency with the new prefix feature. The key is used appropriately for Get, Incr, and Expire operations.
425-431: LGTM! Clean key derivation logic.The
getKeyhelper correctly derives Redis keys by incorporating the prefix when configured, with appropriate lowercasing for consistency.job.go (1)
93-112: Good separation of terminal failure vs retry logic.The refactored error handling cleanly separates the terminal failure path (storing to Redis) from the retry path (decrementing retry count), improving code clarity.
| if job.RetryFailures <= 0 { | ||
| client := job.queue.client | ||
| key := job.getKey() | ||
| _, err := client.Set(context.Background(), key, job.FailedReason, 0).Result() | ||
| if err != nil { | ||
| job.queue.formatLog(LoggerFatal, "Failed to store error: %s", err.Error()) | ||
| } | ||
| job.queue.formatLog(LoggerError, "Failed job %s ", job.Id) | ||
| return | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
Consider adding TTL to error storage keys to prevent unbounded growth.
Failed job errors are stored in Redis without expiration (TTL = 0), which will cause these keys to persist indefinitely. For high-volume queues, this could lead to unbounded Redis memory growth.
Apply this diff to add a configurable TTL:
// Store error
if job.RetryFailures <= 0 {
client := job.queue.client
key := job.getKey()
- _, err := client.Set(context.Background(), key, job.FailedReason, 0).Result()
+ // Store error with 24 hour expiration (configurable via queue options if needed)
+ ttl := 24 * time.Hour
+ _, err := client.Set(context.Background(), key, job.FailedReason, ttl).Result()
if err != nil {
job.queue.formatLog(LoggerFatal, "Failed to store error: %s", err.Error())
}Alternatively, verify whether there's existing cleanup logic for these keys:
🏁 Script executed:
#!/bin/bash
# Description: Check for Redis key cleanup or expiration logic
# Search for Del operations on job keys
rg -n '\.Del\(' --type=go -C3
# Search for cleanup or garbage collection logic
rg -n 'cleanup|Clean|expire|Expire' --type=go -C3
# Search for error key retrieval (to understand lifecycle)
ast-grep --pattern $'client.Get($$$, $KEY)'Length of output: 253
I need to gather more information about how error keys are specifically managed. Let me run a targeted verification script.
🏁 Script executed:
#!/bin/bash
# Check the context of the Expire call found
grep -n "Expire" queue.go -B5 -A5
# Find all Set operations storing errors
rg -n 'FailedReason' --type=go -B2 -A2
# Search for job.getKey() to understand the key pattern
rg -n 'getKey\(' --type=go -B2 -A2
# Check if there's queue configuration for error TTL
rg -n 'config|Config' job.go -C2 | head -30Length of output: 2072
Apply TTL to error storage keys to prevent unbounded Redis memory growth.
Error messages are stored at job.go:101 with TTL=0, causing them to persist indefinitely in Redis. The only Expire() call found (queue.go:361) applies only to rate limiter keys, not error keys. For high-volume queues, this creates unbounded memory growth.
if job.RetryFailures <= 0 {
client := job.queue.client
key := job.getKey()
- _, err := client.Set(context.Background(), key, job.FailedReason, 0).Result()
+ // Store error with 24 hour expiration (configurable via queue options if needed)
+ ttl := 24 * time.Hour
+ _, err := client.Set(context.Background(), key, job.FailedReason, ttl).Result()
if err != nil {
job.queue.formatLog(LoggerFatal, "Failed to store error: %s", err.Error())
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if job.RetryFailures <= 0 { | |
| client := job.queue.client | |
| key := job.getKey() | |
| _, err := client.Set(context.Background(), key, job.FailedReason, 0).Result() | |
| if err != nil { | |
| job.queue.formatLog(LoggerFatal, "Failed to store error: %s", err.Error()) | |
| } | |
| job.queue.formatLog(LoggerError, "Failed job %s ", job.Id) | |
| return | |
| } | |
| if job.RetryFailures <= 0 { | |
| client := job.queue.client | |
| key := job.getKey() | |
| // Store error with 24 hour expiration (configurable via queue options if needed) | |
| ttl := 24 * time.Hour | |
| _, err := client.Set(context.Background(), key, job.FailedReason, ttl).Result() | |
| if err != nil { | |
| job.queue.formatLog(LoggerFatal, "Failed to store error: %s", err.Error()) | |
| } | |
| job.queue.formatLog(LoggerError, "Failed job %s ", job.Id) | |
| return | |
| } |
🤖 Prompt for AI Agents
In job.go around lines 98 to 107, the failure message is stored in Redis with
TTL=0 (persisting forever); change the Set call to apply a non-zero expiration
so error keys are evicted automatically (e.g., use a configured duration like
queue.config.ErrorTTL or a sensible default such as 24h), or call
client.SetEX/Set with the chosen TTL instead of 0; ensure the TTL value is
configurable on the queue and used here so high-volume queues do not grow Redis
memory unbounded.
| func getQueueName(name string) core.Provide { | ||
| return core.Provide(fmt.Sprintf("%sQueue", name)) | ||
| return core.Provide(fmt.Sprintf("%s_QUEUE", name)) | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
🧩 Analysis chain
Verify that all references to queue provider names have been updated.
The provider naming format has changed from "<name>Queue" to "<name>_QUEUE". This is a breaking change that will affect all code that injects or references queue providers.
Run the following script to identify potential references that may need updating:
🏁 Script executed:
#!/bin/bash
# Description: Find potential references to the old queue naming pattern
# Search for string literals that might reference the old pattern
rg -n 'Queue"' --type=go -C2
# Search for potential queue injection sites
rg -n 'Inject\(' --type=go -C3
# Search for Ref calls that might use queue names
rg -n '\.Ref\(' --type=go -C2Length of output: 3088
Update stale documentation comments to reflect the new queue naming convention.
The verification confirms that the implementation correctly uses the new %s_QUEUE format and all code references are abstracted through the Inject() function, which automatically handles the naming. However, two documentation comments are outdated and should be corrected:
- module.go:43: Comment states
"<name>Queue"but should state"<name>_QUEUE" - module.go:50: Comment states
"<name>Queue"but should state"<name>_QUEUE"
All functional code already references queues through Inject(), which calls getQueueName(), so the actual breaking change risk is mitigated by the abstraction layer. Only these comments need updating to prevent developer confusion.
🤖 Prompt for AI Agents
In module.go around lines 43–50, two documentation comments still refer to the
old "<name>Queue" naming; update both comments (at ~line 43 and ~line 50) to use
the new "<name>_QUEUE" format so they match the implemented
getQueueName(fmt.Sprintf("%s_QUEUE", name)) convention and avoid developer
confusion; no code logic changes required since Inject() already abstracts
naming.
No description provided.