-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue.go
59 lines (51 loc) · 1.61 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package ledgerexporter
import (
"context"
"github.com/prometheus/client_golang/prometheus"
"github.com/shantanu-hashcash/go/support/datastore"
)
// UploadQueue is a queue of LedgerMetaArchive objects which are scheduled for upload
type UploadQueue struct {
metaArchiveCh chan *datastore.LedgerMetaArchive
queueLengthMetric prometheus.Gauge
}
// NewUploadQueue constructs a new UploadQueue
func NewUploadQueue(size int, prometheusRegistry *prometheus.Registry) UploadQueue {
queueLengthMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "ledger_exporter",
Subsystem: "upload_queue",
Name: "length",
Help: "The number of objects queued for upload",
})
prometheusRegistry.MustRegister(queueLengthMetric)
return UploadQueue{
metaArchiveCh: make(chan *datastore.LedgerMetaArchive, size),
queueLengthMetric: queueLengthMetric,
}
}
// Enqueue will add an upload task to the queue. Enqueue may block if the queue is full.
func (u UploadQueue) Enqueue(ctx context.Context, archive *datastore.LedgerMetaArchive) error {
u.queueLengthMetric.Inc()
select {
case u.metaArchiveCh <- archive:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Dequeue will pop a task off the queue. Dequeue may block if the queue is empty.
func (u UploadQueue) Dequeue(ctx context.Context) (*datastore.LedgerMetaArchive, bool, error) {
select {
case <-ctx.Done():
return nil, false, ctx.Err()
case metaObject, ok := <-u.metaArchiveCh:
if ok {
u.queueLengthMetric.Dec()
}
return metaObject, ok, nil
}
}
// Close will close the queue.
func (u UploadQueue) Close() {
close(u.metaArchiveCh)
}