/
write_buffer.go
130 lines (116 loc) · 4.39 KB
/
write_buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/*
Copyright 2017 Simon J Mudd
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dtstruct
/*
query holds information about query metric and records the time taken
waiting before doing the query plus the time taken executing the query.
*/
import (
"gitee.com/opengauss/ham4db/go/config"
"github.com/montanaflynn/stats"
"time"
)
// MetricInterface records query metric of backend writes that go through
// a sized channel. It allows us to compare the time waiting to
// execute the query against the time needed to run it and in a
// "sized channel" the wait time may be significant and is good to
// measure.
type WriteBufferMetric struct {
Timestamp time.Time // time the metric was started
Instances int // number of flushed instances
WaitLatency time.Duration // waiting before flush
WriteLatency time.Duration // time writing to backend
}
// When records the timestamp of the start of the recording
func (m WriteBufferMetric) When() time.Time {
return m.Timestamp
}
type AggregatedWriteBufferMetric struct {
InstanceWriteBufferSize int // config setting
InstanceFlushIntervalMilliseconds int // config setting
CountInstances int
MaxInstances float64
MeanInstances float64
MedianInstances float64
P95Instances float64
MaxWaitSeconds float64
MeanWaitSeconds float64
MedianWaitSeconds float64
P95WaitSeconds float64
MaxWriteSeconds float64
MeanWriteSeconds float64
MedianWriteSeconds float64
P95WriteSeconds float64
}
// AggregatedSince returns the aggregated query metric for the period
// given from the values provided.
func AggregatedSince(c *Collection, t time.Time) AggregatedWriteBufferMetric {
// Initialise timing metric
var instancesCounter []float64
var waitTimings []float64
var writeTimings []float64
// Retrieve values since the time specified
values, err := c.Since(t)
a := AggregatedWriteBufferMetric{
InstanceWriteBufferSize: config.Config.InstanceWriteBufferSize,
InstanceFlushIntervalMilliseconds: config.Config.InstanceFlushIntervalMilliseconds,
}
if err != nil {
return a // empty data
}
// generate the metric
for _, v := range values {
instancesCounter = append(instancesCounter, float64(v.(*WriteBufferMetric).Instances))
waitTimings = append(waitTimings, v.(*WriteBufferMetric).WaitLatency.Seconds())
writeTimings = append(writeTimings, v.(*WriteBufferMetric).WriteLatency.Seconds())
a.CountInstances += v.(*WriteBufferMetric).Instances
}
// generate aggregate values
if s, err := stats.Max(stats.Float64Data(instancesCounter)); err == nil {
a.MaxInstances = s
}
if s, err := stats.Mean(stats.Float64Data(instancesCounter)); err == nil {
a.MeanInstances = s
}
if s, err := stats.Median(stats.Float64Data(instancesCounter)); err == nil {
a.MedianInstances = s
}
if s, err := stats.Percentile(stats.Float64Data(instancesCounter), 95); err == nil {
a.P95Instances = s
}
if s, err := stats.Max(stats.Float64Data(waitTimings)); err == nil {
a.MaxWaitSeconds = s
}
if s, err := stats.Mean(stats.Float64Data(waitTimings)); err == nil {
a.MeanWaitSeconds = s
}
if s, err := stats.Median(stats.Float64Data(waitTimings)); err == nil {
a.MedianWaitSeconds = s
}
if s, err := stats.Percentile(stats.Float64Data(waitTimings), 95); err == nil {
a.P95WaitSeconds = s
}
if s, err := stats.Max(stats.Float64Data(writeTimings)); err == nil {
a.MaxWriteSeconds = s
}
if s, err := stats.Mean(stats.Float64Data(writeTimings)); err == nil {
a.MeanWriteSeconds = s
}
if s, err := stats.Median(stats.Float64Data(writeTimings)); err == nil {
a.MedianWriteSeconds = s
}
if s, err := stats.Percentile(stats.Float64Data(writeTimings), 95); err == nil {
a.P95WriteSeconds = s
}
return a
}