/
queue.go
119 lines (95 loc) 路 2.51 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package pipeline
import (
"context"
"github.com/alitto/pond"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
type Enqueuer[T any] interface {
Enqueue(context.Context, T)
}
type QueueItemProcessor[T any] interface {
ProcessItem(context.Context, T)
}
type Listener[T any] interface {
Listen(T)
}
type QueueDriver[T any] interface {
Enqueue(T)
SetListener(Listener[T])
}
type namedDriver interface {
Name() string
}
type Queue[T any] struct {
name string
driver QueueDriver[T]
itemProcessor QueueItemProcessor[T]
enqueueHistogram metric.Int64Histogram
listenHistogram metric.Int64Histogram
EnqueuePreprocessorFn func(context.Context, T) T
ListenPreprocessorFn func(context.Context, T) (context.Context, T)
workerPool *pond.WorkerPool
}
const (
QueueWorkerCount = 20
QueueWorkerBufferSize = QueueWorkerCount * 1_000 // 1k jobs per worker
)
func NewQueue[T any](driver QueueDriver[T], itemProcessor QueueItemProcessor[T]) *Queue[T] {
queue := &Queue[T]{
itemProcessor: itemProcessor,
workerPool: pond.New(QueueWorkerCount, QueueWorkerBufferSize),
}
if namedDriver, ok := driver.(namedDriver); ok {
queue.name = namedDriver.Name()
}
queue.SetDriver(driver)
return queue
}
func (q *Queue[T]) InitializeMetrics(meter metric.Meter) {
q.enqueueHistogram, _ = meter.Int64Histogram("messaging.enqueue")
q.listenHistogram, _ = meter.Int64Histogram("messaging.listen")
}
func (q *Queue[T]) SetDriver(driver QueueDriver[T]) {
q.driver = driver
driver.SetListener(q)
}
func (q Queue[T]) Enqueue(ctx context.Context, item T) {
select {
default:
case <-ctx.Done():
return
}
// use a worker to enqueue the job in case the driver takes a bit to actually enqueue
// this way we release the caller as soon as possible
q.workerPool.Submit(func() {
if q.EnqueuePreprocessorFn != nil {
item = q.EnqueuePreprocessorFn(ctx, item)
}
q.enqueueHistogram.Record(ctx, 1, metric.WithAttributes(
attribute.String("queue.name", q.name),
))
q.driver.Enqueue(item)
})
}
func (q Queue[T]) Listen(item T) {
ctx := context.Background()
if q.ListenPreprocessorFn != nil {
ctx, item = q.ListenPreprocessorFn(ctx, item)
}
// Process the item with cancellation monitoring
select {
default:
case <-ctx.Done():
return
}
q.workerPool.Submit(func() {
q.listenHistogram.Record(ctx, 1, metric.WithAttributes(
attribute.String("queue.name", q.name),
))
q.itemProcessor.ProcessItem(ctx, item)
})
}
func (q *Queue[T]) Stop() {
q.workerPool.StopAndWait()
}