This repository has been archived by the owner on Jan 21, 2022. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
writer.go
244 lines (221 loc) · 8.76 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
package stats
import (
"context"
"strings"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/global"
"go.uber.org/zap"
)
// Writer for stats collection
type Writer interface {
// Write stats collection to anything
Write([]stat)
}
type zapWriter struct {
logger *zap.Logger
}
func (w *zapWriter) Write(stats []stat) {
for _, s := range stats {
w.logger.Info("spanner stats", w.getFields(s)...)
}
}
func (w *zapWriter) getFields(s stat) []zap.Field {
switch s := s.(type) {
case *QueryStat:
return []zap.Field{
zap.String("type", "QueryStat"),
zap.Time("IntervalEnd", s.IntervalEnd),
zap.String("Text", s.Text),
zap.Bool("TextTruncated", s.TextTruncated),
zap.Int64("TextFingerprint", s.TextFingerprint),
zap.Int64("ExecutionCount", s.ExecutionCount),
zap.Float64("AvgLatencySeconds", s.AvgLatencySeconds),
zap.Float64("AvgRows", s.AvgRows),
zap.Float64("AvgBytes", s.AvgBytes),
zap.Float64("AvgRowsScanned", s.AvgRowsScanned),
zap.Float64("AvgCPUSeconds", s.AvgCPUSeconds),
}
case *TransactionStat:
return []zap.Field{
zap.String("type", "TransactionStat"),
zap.Time("IntervalEnd", s.IntervalEnd),
zap.Int64("Fprint", s.Fprint),
zap.Strings("ReadColumns", s.ReadColumns),
zap.Strings("WriteConstructiveColumns", s.WriteConstructiveColumns),
zap.Strings("WriteDeleteTables", s.WriteDeleteTables),
zap.Int64("CommitAttemptCount", s.CommitAttemptCount),
zap.Int64("CommitFailedPreconditionCount", s.CommitFailedPreconditionCount),
zap.Int64("CommitAbortCount", s.CommitAbortCount),
zap.Float64("AvgParticipants", s.AvgParticipants),
zap.Float64("AvgTotalLatencySeconds", s.AvgTotalLatencySeconds),
zap.Float64("AvgCommitLatencySeconds", s.AvgCommitLatencySeconds),
zap.Float64("AvgBytes", s.AvgBytes),
}
case *LockStat:
return []zap.Field{
zap.String("type", "LockStat"),
zap.Time("IntervalEnd", s.IntervalEnd),
zap.ByteString("RowRangeStartKey", s.RowRangeStartKey),
zap.Float64("LockWaitSeconds", s.LockWaitSeconds),
zap.Any("SampleLockRequests", s.SampleLockRequests),
}
}
return nil
}
// NewZapWriter return new Writer of *zap.Logger
func NewZapWriter(logger *zap.Logger) Writer {
return &zapWriter{
logger: logger,
}
}
type otelWriter struct {
query otelWriterQuery
transaction otelWriterTransaction
lock otelWriterLock
}
type otelWriterQuery struct {
meter metric.Meter
measures otelWriterQueryMeasures
}
type otelWriterQueryMeasures struct {
intervalEnd metric.Int64ValueRecorder
executionCount metric.Int64Counter
avgLatencySeconds metric.Float64ValueRecorder
avgRows metric.Float64ValueRecorder
avgBytes metric.Float64ValueRecorder
avgRowsScanned metric.Float64ValueRecorder
avgCPUSeconds metric.Float64ValueRecorder
}
type otelWriterTransaction struct {
meter metric.Meter
measures otelWriterTransactionMeasures
}
type otelWriterTransactionMeasures struct {
intervalEnd metric.Int64ValueRecorder
commitAttemptCount metric.Int64Counter
commitFailedPreconditionCount metric.Int64Counter
commitAbortCount metric.Int64Counter
avgParticipants metric.Float64ValueRecorder
avgTotalLatencySeconds metric.Float64ValueRecorder
avgCommitLatencySeconds metric.Float64ValueRecorder
avgBytes metric.Float64ValueRecorder
}
type otelWriterLock struct {
meter metric.Meter
measures otelWriterLockMeasures
}
type otelWriterLockMeasures struct {
intervalEnd metric.Int64ValueRecorder
lockWaitSeconds metric.Float64ValueRecorder
}
const (
otelMeterNameQuery = "spanner.stats.query"
otelMeterNameTransaction = "spanner.stats.transaction"
otelMeterNameLock = "spanner.stats.lock"
)
func (w *otelWriter) Write(stats []stat) {
for _, s := range stats {
switch s := s.(type) {
case *QueryStat:
w.query.meter.RecordBatch(
context.Background(),
[]attribute.KeyValue{
attribute.String(
"Text",
strings.NewReplacer("\r", " ", "\n", " ", "\t", " ").Replace(s.Text),
),
attribute.Int64("TextFingerprint", s.TextFingerprint),
},
w.query.measures.intervalEnd.Measurement(s.IntervalEnd.UnixNano()),
w.query.measures.executionCount.Measurement(s.ExecutionCount),
w.query.measures.avgLatencySeconds.Measurement(s.AvgLatencySeconds),
w.query.measures.avgRows.Measurement(s.AvgRows),
w.query.measures.avgBytes.Measurement(s.AvgBytes),
w.query.measures.avgRowsScanned.Measurement(s.AvgRowsScanned),
w.query.measures.avgCPUSeconds.Measurement(s.AvgCPUSeconds),
)
case *TransactionStat:
w.transaction.meter.RecordBatch(
context.Background(),
[]attribute.KeyValue{
attribute.String("ReadColumns", strings.Join(s.ReadColumns, ",")),
attribute.String("WriteConstructiveColumns", strings.Join(s.WriteConstructiveColumns, ",")),
attribute.String("WriteDeleteTables", strings.Join(s.WriteDeleteTables, ",")),
attribute.Int64("Fprint", s.Fprint),
},
w.transaction.measures.intervalEnd.Measurement(s.IntervalEnd.UnixNano()),
w.transaction.measures.commitAttemptCount.Measurement(s.CommitAttemptCount),
w.transaction.measures.commitFailedPreconditionCount.Measurement(s.CommitFailedPreconditionCount),
w.transaction.measures.commitAbortCount.Measurement(s.CommitAbortCount),
w.transaction.measures.avgParticipants.Measurement(s.AvgParticipants),
w.transaction.measures.avgTotalLatencySeconds.Measurement(s.AvgTotalLatencySeconds),
w.transaction.measures.avgCommitLatencySeconds.Measurement(s.AvgCommitLatencySeconds),
w.transaction.measures.avgBytes.Measurement(s.AvgBytes),
)
case *LockStat:
w.lock.meter.RecordBatch(
context.Background(),
[]attribute.KeyValue{
attribute.String("RowRangeStartKey", string(string(s.RowRangeStartKey))),
attribute.String("SampleLockRequests", func() string {
result := strings.Builder{}
for _, l := range s.SampleLockRequests {
result.WriteRune('(')
result.WriteString(l.Column)
result.WriteRune(',')
result.WriteString(l.LockMode)
result.WriteString("),")
}
return result.String()
}()),
},
w.lock.measures.intervalEnd.Measurement(s.IntervalEnd.UnixNano()),
w.lock.measures.lockWaitSeconds.Measurement(s.LockWaitSeconds),
)
}
}
}
// NewOpenTelemetryWriter return new Writer of OpenTelemetry
func NewOpenTelemetryWriter() Writer {
queryMeter := global.Meter(otelMeterNameQuery)
queryMust := metric.Must(queryMeter)
transactionMeter := global.Meter(otelMeterNameTransaction)
transactionMust := metric.Must(transactionMeter)
lockMeter := global.Meter(otelMeterNameLock)
lockMust := metric.Must(lockMeter)
return &otelWriter{
query: otelWriterQuery{
meter: queryMeter,
measures: otelWriterQueryMeasures{
intervalEnd: queryMust.NewInt64ValueRecorder(otelMeterNameQuery + ".IntervalEnd"),
executionCount: queryMust.NewInt64Counter(otelMeterNameQuery + ".ExecutionCount"),
avgLatencySeconds: queryMust.NewFloat64ValueRecorder(otelMeterNameQuery + ".AvgLatencySeconds"),
avgRows: queryMust.NewFloat64ValueRecorder(otelMeterNameQuery + ".AvgRows"),
avgBytes: queryMust.NewFloat64ValueRecorder(otelMeterNameQuery + ".AvgBytes"),
avgRowsScanned: queryMust.NewFloat64ValueRecorder(otelMeterNameQuery + ".AvgRowsScanned"),
avgCPUSeconds: queryMust.NewFloat64ValueRecorder(otelMeterNameQuery + ".AvgCpuSeconds"),
},
},
transaction: otelWriterTransaction{
meter: transactionMeter,
measures: otelWriterTransactionMeasures{
intervalEnd: transactionMust.NewInt64ValueRecorder(otelMeterNameQuery + ".IntervalEnd"),
commitAttemptCount: transactionMust.NewInt64Counter(otelMeterNameQuery + ".CommitAttemptCount"),
commitFailedPreconditionCount: transactionMust.NewInt64Counter(otelMeterNameQuery + ".CommitFailedPreconditionCount"),
commitAbortCount: transactionMust.NewInt64Counter(otelMeterNameQuery + ".CommitAbortCount"),
avgParticipants: transactionMust.NewFloat64ValueRecorder(otelMeterNameQuery + ".AvgParticipants"),
avgTotalLatencySeconds: transactionMust.NewFloat64ValueRecorder(otelMeterNameQuery + ".AvgTotalLatencySeconds"),
avgCommitLatencySeconds: transactionMust.NewFloat64ValueRecorder(otelMeterNameQuery + ".AvgCommitLatencySeconds"),
avgBytes: transactionMust.NewFloat64ValueRecorder(otelMeterNameQuery + ".AvgBytes"),
},
},
lock: otelWriterLock{
meter: transactionMeter,
measures: otelWriterLockMeasures{
intervalEnd: lockMust.NewInt64ValueRecorder(otelMeterNameLock + ".IntervalEnd"),
lockWaitSeconds: lockMust.NewFloat64ValueRecorder(otelMeterNameQuery + ".LockWaitSeconds"),
},
},
}
}