forked from grafana/loki
/
dque.go
152 lines (127 loc) · 3.12 KB
/
dque.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package main
import (
"fmt"
"os"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/joncrlsn/dque"
"github.com/prometheus/common/model"
"github.com/pao214/loki/v2/clients/pkg/promtail/api"
"github.com/pao214/loki/v2/clients/pkg/promtail/client"
"github.com/pao214/loki/v2/pkg/logproto"
)
type dqueConfig struct {
queueDir string
queueSegmentSize int
queueSync bool
queueName string
}
var defaultDqueConfig = dqueConfig{
queueDir: "/tmp/flb-storage/loki",
queueSegmentSize: 500,
queueSync: false,
queueName: "dque",
}
type dqueEntry struct {
Lbs model.LabelSet
Ts time.Time
Line string
}
func dqueEntryBuilder() interface{} {
return &dqueEntry{}
}
type dqueClient struct {
logger log.Logger
queue *dque.DQue
loki client.Client
once sync.Once
wg sync.WaitGroup
entries chan api.Entry
}
// New makes a new dque loki client
func newDque(cfg *config, logger log.Logger, metrics *client.Metrics, streamLagLabels []string) (client.Client, error) {
var err error
q := &dqueClient{
logger: log.With(logger, "component", "queue", "name", cfg.bufferConfig.dqueConfig.queueName),
}
err = os.MkdirAll(cfg.bufferConfig.dqueConfig.queueDir, 0644)
if err != nil {
return nil, fmt.Errorf("cannot create queue directory: %s", err)
}
q.queue, err = dque.NewOrOpen(cfg.bufferConfig.dqueConfig.queueName, cfg.bufferConfig.dqueConfig.queueDir, cfg.bufferConfig.dqueConfig.queueSegmentSize, dqueEntryBuilder)
if err != nil {
return nil, err
}
if !cfg.bufferConfig.dqueConfig.queueSync {
_ = q.queue.TurboOn()
}
q.loki, err = client.New(metrics, cfg.clientConfig, streamLagLabels, logger)
if err != nil {
return nil, err
}
q.entries = make(chan api.Entry)
q.wg.Add(2)
go q.enqueuer()
go q.dequeuer()
return q, nil
}
func (c *dqueClient) dequeuer() {
defer c.wg.Done()
for {
// Dequeue the next item in the queue
entry, err := c.queue.DequeueBlock()
if err != nil {
switch err {
case dque.ErrQueueClosed:
return
default:
level.Error(c.logger).Log("msg", "error dequeuing record", "error", err)
continue
}
}
// Assert type of the response to an Item pointer so we can work with it
record, ok := entry.(*dqueEntry)
if !ok {
level.Error(c.logger).Log("msg", "error dequeued record is not an valid type", "error")
continue
}
c.loki.Chan() <- api.Entry{
Labels: record.Lbs,
Entry: logproto.Entry{
Timestamp: record.Ts,
Line: record.Line,
},
}
}
}
// Stop the client
func (c *dqueClient) Stop() {
c.once.Do(func() {
close(c.entries)
c.queue.Close()
c.loki.Stop()
c.wg.Wait()
})
}
func (c *dqueClient) Chan() chan<- api.Entry {
return c.entries
}
// Stop the client
func (c *dqueClient) StopNow() {
c.once.Do(func() {
close(c.entries)
c.queue.Close()
c.loki.StopNow()
c.wg.Wait()
})
}
func (c *dqueClient) enqueuer() {
defer c.wg.Done()
for e := range c.entries {
if err := c.queue.Enqueue(&dqueEntry{e.Labels, e.Timestamp, e.Line}); err != nil {
level.Warn(c.logger).Log("msg", fmt.Sprintf("cannot enqueue record %s:", e.Line), "err", err)
}
}
}