Skip to content

Commit 660c1a5

Browse files
committed
chore: wip
1 parent 1b4e662 commit 660c1a5

File tree

7 files changed

+185
-8
lines changed

7 files changed

+185
-8
lines changed

examples/key-rate-limiting.ts

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import { Queue, RateLimiter } from '../src'
2+
3+
interface EmailData {
4+
userId: string
5+
to: string
6+
subject: string
7+
content: string
8+
}
9+
10+
async function main() {
11+
console.log('🔒 Key-based Rate Limiting Example')
12+
13+
// Create an email queue with key-based rate limiting
14+
const emailQueue = new Queue<EmailData>('emails', {
15+
verbose: true,
16+
logLevel: 'info',
17+
})
18+
19+
// Create a rate limiter with key-based options
20+
// This limits each user to 2 emails per 10 seconds
21+
const emailRateLimiter = new RateLimiter(emailQueue, {
22+
max: 2, // 2 emails
23+
duration: 10000, // per 10 seconds
24+
keyPrefix: 'userId' // Rate limit by userId field
25+
})
26+
27+
// Demonstrate sending emails for different users
28+
const users = ['user1', 'user2', 'user3']
29+
30+
// Send multiple emails from different users
31+
for (let i = 0; i < 10; i++) {
32+
const userId = users[i % users.length]
33+
34+
// Send email
35+
console.log(`📧 Attempting to send email #${i+1} for user ${userId}`)
36+
37+
// Check if rate limited first
38+
const rateLimitCheck = await emailQueue.isRateLimited(undefined, { userId, to: '', subject: '', content: '' })
39+
40+
if (rateLimitCheck.limited) {
41+
console.log(`⛔ Rate limit reached for user ${userId}. Try again in ${rateLimitCheck.resetIn}ms`)
42+
continue
43+
}
44+
45+
// Add job to queue
46+
const job = await emailQueue.add({
47+
userId,
48+
to: `${userId}@example.com`,
49+
subject: `Test email #${i+1}`,
50+
content: `This is test email #${i+1} for ${userId}`
51+
})
52+
53+
console.log(`✅ Email #${i+1} queued for ${userId} with job ID: ${job.id}`)
54+
55+
// Wait a bit between requests to see the rate limiting in action
56+
await new Promise(resolve => setTimeout(resolve, 1000))
57+
}
58+
59+
// Process queue with a handler that logs emails
60+
emailQueue.process(1, async (job) => {
61+
const { userId, to, subject } = job.data
62+
console.log(`🚀 Processing email for ${userId} to ${to}: "${subject}"`)
63+
64+
// Simulate sending email
65+
await new Promise(resolve => setTimeout(resolve, 500))
66+
67+
return { success: true, sentAt: new Date() }
68+
})
69+
70+
// Let the queue process for a while
71+
await new Promise(resolve => setTimeout(resolve, 15000))
72+
73+
// Close the queue
74+
await emailQueue.close()
75+
console.log('👋 Queue closed')
76+
}
77+
78+
main().catch(error => {
79+
console.error('Error in example:', error)
80+
process.exit(1)
81+
})

examples/priority-queue.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,4 @@ async function main() {
7575
console.log('\n👋 All tasks completed, queue closed')
7676
}
7777

78-
main().catch(console.error)
78+
main().catch(console.error)

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@
6565
"example:basic": "bun examples/basic.ts",
6666
"example:advanced": "bun examples/advanced.ts",
6767
"example:advanced-features": "bun examples/advanced-features.ts",
68-
"example:priority-queue": "bun examples/priority-queue.ts"
68+
"example:priority-queue": "bun examples/priority-queue.ts",
69+
"example:key-rate-limiting": "bun examples/key-rate-limiting.ts"
6970
},
7071
"devDependencies": {
7172
"@stacksjs/docs": "^0.70.23",

src/priority-queue.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ export class PriorityQueue<T = any> {
256256

257257
for (let priority = this.priorityLevels - 1; priority >= 0; priority--) {
258258
const priorityKey = this.getPriorityKey(priority)
259-
const jobIds = await this.redisClient.send('LRANGE', [priorityKey, start, end])
259+
const jobIds = await this.redisClient.send('LRANGE', [priorityKey, start.toString(), end.toString()])
260260

261261
if (jobIds && jobIds.length > 0) {
262262
for (const jobId of jobIds) {
@@ -323,7 +323,7 @@ export class PriorityQueue<T = any> {
323323

324324
if (length && length > 0) {
325325
// Get all jobs from this priority level
326-
const jobIds = await this.redisClient.send('LRANGE', [priorityKey, 0, -1])
326+
const jobIds = await this.redisClient.send('LRANGE', [priorityKey, '0', '-1'])
327327

328328
if (jobIds && jobIds.length > 0) {
329329
// Move jobs to the front of the waiting queue in reverse order
@@ -374,7 +374,7 @@ export class PriorityQueue<T = any> {
374374

375375
for (let priority = 0; priority < this.priorityLevels; priority++) {
376376
const priorityKey = this.getPriorityKey(priority)
377-
const jobIds = await this.redisClient.send('LRANGE', [priorityKey, 0, -1])
377+
const jobIds = await this.redisClient.send('LRANGE', [priorityKey, '0', '-1'])
378378

379379
if (jobIds && jobIds.length > 0) {
380380
for (const jobId of jobIds) {

src/queue.ts

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import { Metrics } from './metrics'
1010
import { StalledJobChecker } from './stalled-checker'
1111
import { generateId, getRedisClient, mergeOptions } from './utils'
1212
import { Worker } from './worker'
13+
import { RateLimiter } from './rate-limiter'
14+
import type { RateLimitResult } from './rate-limiter'
1315

1416
export class Queue<T = any> {
1517
name: string
@@ -22,13 +24,16 @@ export class Queue<T = any> {
2224
private cleanupService: CleanupService | null = null
2325
private stalledChecker: StalledJobChecker | null = null
2426
private readonly logger = createLogger()
27+
private limiter: RateLimiter | null = null
28+
private defaultJobOptions: JobOptions | undefined
2529

2630
constructor(name: string, options?: QueueConfig) {
2731
this.name = name
2832
this.prefix = options?.prefix || config.prefix || 'queue'
2933
this.redisClient = getRedisClient(options)
3034
this.keyPrefix = `${this.prefix}:${this.name}`
3135
this.events = new JobEvents(name)
36+
this.defaultJobOptions = options?.defaultJobOptions
3237

3338
// Set logger level if specified
3439
if (options?.logLevel) {
@@ -51,6 +56,12 @@ export class Queue<T = any> {
5156
this.logger.debug(`Stalled job checker started for queue ${name}`)
5257
}
5358

59+
// Initialize rate limiter if provided
60+
if (options?.limiter) {
61+
this.limiter = new RateLimiter(this, options.limiter)
62+
this.logger.debug(`Rate limiter configured for queue ${name}`)
63+
}
64+
5465
// Initialize scripts
5566
this.init()
5667
}
@@ -71,10 +82,29 @@ export class Queue<T = any> {
7182
}
7283

7384
/**
74-
* Add a job to the queue
85+
* Add a job to the queue with rate limiting support
7586
*/
7687
async add(data: T, options?: JobOptions): Promise<Job<T>> {
7788
try {
89+
// Check rate limit if configured
90+
if (this.limiter) {
91+
// If we have keyPrefix in the limiter, check rate limit based on data
92+
const limiterResult = await this.limiter.check(data)
93+
94+
if (limiterResult.limited) {
95+
this.logger.warn(`Rate limit exceeded, retrying in ${limiterResult.resetIn}ms`)
96+
97+
// If rate limited, add to delayed queue with the reset time
98+
const opts = {
99+
...this.defaultJobOptions,
100+
...options,
101+
delay: limiterResult.resetIn
102+
}
103+
104+
return this.add(data, opts)
105+
}
106+
}
107+
78108
const opts = mergeOptions(options)
79109
const jobId = opts.jobId || generateId()
80110
const timestamp = Date.now()
@@ -492,4 +522,28 @@ export class Queue<T = any> {
492522
getJobKey(jobId: string): string {
493523
return `${this.keyPrefix}:job:${jobId}`
494524
}
525+
526+
/**
527+
* Check if the queue is rate limited for a specific key
528+
*/
529+
async isRateLimited(key?: string, data?: T): Promise<{ limited: boolean; resetIn: number }> {
530+
if (!this.limiter) {
531+
return { limited: false, resetIn: 0 }
532+
}
533+
534+
let result: RateLimitResult
535+
536+
if (key) {
537+
// Use explicit key if provided
538+
result = await this.limiter.checkByKey(key)
539+
} else {
540+
// Use data with keyPrefix from limiter options
541+
result = await this.limiter.check(data)
542+
}
543+
544+
return {
545+
limited: result.limited,
546+
resetIn: result.resetIn,
547+
}
548+
}
495549
}

src/rate-limiter.ts

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,53 @@ export class RateLimiter {
1919
/**
2020
* Check if the rate limit has been exceeded
2121
*/
22-
async check(): Promise<RateLimitResult> {
22+
async check(data?: any): Promise<RateLimitResult> {
2323
const key = this.queue.getKey('limit')
2424
const now = Date.now()
2525

26+
// Determine the identifier based on keyPrefix if provided
27+
let identifier = this.queue.name
28+
29+
if (this.options.keyPrefix && data) {
30+
if (typeof this.options.keyPrefix === 'function') {
31+
// Use the function to generate a key from the data
32+
const keyValue = this.options.keyPrefix(data)
33+
identifier = `${this.queue.name}:${keyValue}`
34+
} else {
35+
// Use the keyPrefix as a property path in the data object
36+
const keyValue = data[this.options.keyPrefix]
37+
if (keyValue) {
38+
identifier = `${this.queue.name}:${keyValue}`
39+
}
40+
}
41+
}
42+
2643
const result = await this.queue.redisClient.send('rateLimit', [
2744
key,
28-
this.queue.name,
45+
identifier,
46+
this.options.max.toString(),
47+
this.options.duration.toString(),
48+
now.toString(),
49+
])
50+
51+
return {
52+
limited: result[0] === 1,
53+
remaining: result[1],
54+
resetIn: result[2],
55+
}
56+
}
57+
58+
/**
59+
* Check if the rate limit has been exceeded for a specific key
60+
*/
61+
async checkByKey(key: string): Promise<RateLimitResult> {
62+
const identifier = `${this.queue.name}:${key}`
63+
const limitKey = this.queue.getKey('limit')
64+
const now = Date.now()
65+
66+
const result = await this.queue.redisClient.send('rateLimit', [
67+
limitKey,
68+
identifier,
2969
this.options.max.toString(),
3070
this.options.duration.toString(),
3171
now.toString(),

src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ export interface JobOptions {
5353
export interface RateLimiter {
5454
max: number
5555
duration: number
56+
keyPrefix?: string | ((data: any) => string)
5657
}
5758

5859
export interface Job<T = any> {

0 commit comments

Comments
 (0)