/
writer.go
152 lines (129 loc) · 3.29 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package clickhousespanstore
import (
"context"
"database/sql"
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/prometheus/client_golang/prometheus"
)
type Encoding string
const (
// EncodingJSON is used for spans encoded as JSON.
EncodingJSON Encoding = "json"
// EncodingProto is used for spans encoded as Protobuf.
EncodingProto Encoding = "protobuf"
)
var (
numWritesWithBatchSize = prometheus.NewCounter(prometheus.CounterOpts{
Name: "jaeger_clickhouse_writes_with_batch_size_total",
Help: "Number of clickhouse writes due to batch size criteria",
})
numWritesWithFlushInterval = prometheus.NewCounter(prometheus.CounterOpts{
Name: "jaeger_clickhouse_writes_with_flush_interval_total",
Help: "Number of clickhouse writes due to flush interval criteria",
})
)
// SpanWriter for writing spans to ClickHouse
type SpanWriter struct {
workerParams WorkerParams
size int64
spans chan *model.Span
finish chan bool
done sync.WaitGroup
}
var registerWriterMetrics sync.Once
var _ spanstore.Writer = (*SpanWriter)(nil)
// NewSpanWriter returns a SpanWriter for the database
func NewSpanWriter(
logger hclog.Logger,
db *sql.DB,
indexTable,
spansTable TableName,
tenant string,
encoding Encoding,
delay time.Duration,
size int64,
maxSpanCount int,
) *SpanWriter {
writer := &SpanWriter{
workerParams: WorkerParams{
logger: logger,
db: db,
indexTable: indexTable,
spansTable: spansTable,
tenant: tenant,
encoding: encoding,
delay: delay,
},
size: size,
spans: make(chan *model.Span, size),
finish: make(chan bool),
}
writer.registerMetrics()
go writer.backgroundWriter(maxSpanCount)
return writer
}
func (w *SpanWriter) registerMetrics() {
registerWriterMetrics.Do(func() {
prometheus.MustRegister(numWritesWithBatchSize)
prometheus.MustRegister(numWritesWithFlushInterval)
})
}
func (w *SpanWriter) backgroundWriter(maxSpanCount int) {
pool := NewWorkerPool(&w.workerParams, maxSpanCount)
go pool.Work()
batch := make([]*model.Span, 0, w.size)
timer := time.After(w.workerParams.delay)
last := time.Now()
for {
w.done.Add(1)
flush := false
finish := false
select {
case span := <-w.spans:
batch = append(batch, span)
flush = len(batch) == cap(batch)
if flush {
w.workerParams.logger.Debug("Flush due to batch size", "size", len(batch))
numWritesWithBatchSize.Inc()
}
case <-timer:
timer = time.After(w.workerParams.delay)
flush = time.Since(last) > w.workerParams.delay && len(batch) > 0
if flush {
w.workerParams.logger.Debug("Flush due to timer")
numWritesWithFlushInterval.Inc()
}
case <-w.finish:
finish = true
flush = len(batch) > 0
w.workerParams.logger.Debug("Finish channel")
}
if flush {
pool.WriteBatch(batch)
batch = make([]*model.Span, 0, w.size)
last = time.Now()
}
if finish {
pool.Close()
}
w.done.Done()
if finish {
break
}
}
}
// WriteSpan writes the encoded span
func (w *SpanWriter) WriteSpan(_ context.Context, span *model.Span) error {
w.spans <- span
return nil
}
// Close Implements io.Closer and closes the underlying storage
func (w *SpanWriter) Close() error {
w.finish <- true
w.done.Wait()
return nil
}