Skip to content

Commit 850509a

Browse files
committed
chore: wip
1 parent bef9330 commit 850509a

File tree

1 file changed

+106
-1
lines changed

1 file changed

+106
-1
lines changed

src/metrics.ts

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,106 @@
1-
export { Metrics, type QueueMetrics }
1+
import type { Queue } from './queue'
2+
3+
export interface QueueMetrics {
4+
completed: number[]
5+
failed: number[]
6+
delayed: number[]
7+
active: number[]
8+
waiting: number[]
9+
added: number
10+
processedRate: number
11+
}
12+
13+
export class Metrics {
14+
private queue: Queue
15+
private collectInterval: number
16+
private intervalId: number | null = null
17+
private metrics: QueueMetrics = {
18+
completed: [],
19+
failed: [],
20+
delayed: [],
21+
active: [],
22+
waiting: [],
23+
added: 0,
24+
processedRate: 0,
25+
}
26+
27+
constructor(queue: Queue) {
28+
this.queue = queue
29+
this.collectInterval = 30000 // Default 30 seconds
30+
this.startCollecting()
31+
}
32+
33+
/**
34+
* Start collecting metrics at regular intervals
35+
*/
36+
private startCollecting(): void {
37+
// Clear existing interval if any
38+
if (this.intervalId) {
39+
clearInterval(this.intervalId)
40+
}
41+
42+
// Set up new collection interval
43+
this.intervalId = setInterval(async () => {
44+
await this.collectMetrics()
45+
}, this.collectInterval) as unknown as number
46+
}
47+
48+
/**
49+
* Stop collecting metrics
50+
*/
51+
stop(): void {
52+
if (this.intervalId) {
53+
clearInterval(this.intervalId)
54+
this.intervalId = null
55+
}
56+
}
57+
58+
/**
59+
* Collect current metrics from Redis
60+
*/
61+
private async collectMetrics(): Promise<void> {
62+
try {
63+
// Get current job counts
64+
const counts = await this.queue.getJobCounts()
65+
66+
// Update metrics with current counts
67+
this.metrics.completed.unshift(counts.completed)
68+
this.metrics.failed.unshift(counts.failed)
69+
this.metrics.delayed.unshift(counts.delayed)
70+
this.metrics.active.unshift(counts.active)
71+
this.metrics.waiting.unshift(counts.waiting)
72+
73+
// Keep only the last 100 data points for each metric
74+
const maxDataPoints = 100
75+
this.metrics.completed = this.metrics.completed.slice(0, maxDataPoints)
76+
this.metrics.failed = this.metrics.failed.slice(0, maxDataPoints)
77+
this.metrics.delayed = this.metrics.delayed.slice(0, maxDataPoints)
78+
this.metrics.active = this.metrics.active.slice(0, maxDataPoints)
79+
this.metrics.waiting = this.metrics.waiting.slice(0, maxDataPoints)
80+
81+
// Calculate processing rate (jobs per minute)
82+
const totalProcessed = counts.completed + counts.failed
83+
const elapsedMinutes = this.collectInterval / 60000
84+
this.metrics.processedRate = totalProcessed / elapsedMinutes
85+
}
86+
catch (err) {
87+
console.error('Error collecting metrics:', err)
88+
}
89+
}
90+
91+
/**
92+
* Track a job being added to the queue
93+
*/
94+
trackJobAdded(): void {
95+
this.metrics.added++
96+
}
97+
98+
/**
99+
* Get the current metrics
100+
*/
101+
async getMetrics(): Promise<QueueMetrics> {
102+
// Collect fresh metrics before returning
103+
await this.collectMetrics()
104+
return { ...this.metrics }
105+
}
106+
}

0 commit comments

Comments
 (0)