-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
metrics.go
129 lines (108 loc) ยท 3.95 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package serverplugin
import (
"context"
"net"
"time"
"github.com/rcrowley/go-metrics"
"github.com/rcrowley/go-metrics/exp"
"github.com/smallnest/rpcx/protocol"
"github.com/smallnest/rpcx/server"
)
// MetricsPlugin has an issue. It changes seq of requests and it is wrong!!!!
// we should use other methods to map requests and responses not but seq.
// MetricsPlugin collects metrics of a rpc server.
// You can report metrics to log, syslog, Graphite, InfluxDB or others to display them in Dashboard such as grafana, Graphite.
type MetricsPlugin struct {
Registry metrics.Registry
Prefix string
}
//NewMetricsPlugin creates a new MetricsPlugirn
func NewMetricsPlugin(registry metrics.Registry) *MetricsPlugin {
return &MetricsPlugin{Registry: registry}
}
func (p *MetricsPlugin) withPrefix(m string) string {
return p.Prefix + m
}
// Register handles registering event.
func (p *MetricsPlugin) Register(name string, rcvr interface{}, metadata string) error {
serviceCounter := metrics.GetOrRegisterCounter(p.withPrefix("serviceCounter"), p.Registry)
serviceCounter.Inc(1)
return nil
}
// HandleConnAccept handles connections from clients
func (p *MetricsPlugin) HandleConnAccept(conn net.Conn) (net.Conn, bool) {
clientMeter := metrics.GetOrRegisterMeter(p.withPrefix("clientMeter"), p.Registry)
clientMeter.Mark(1)
return conn, true
}
// PreReadRequest marks start time of calling service
func (p *MetricsPlugin) PreReadRequest(ctx context.Context) error {
return nil
}
// PostReadRequest counts read
func (p *MetricsPlugin) PostReadRequest(ctx context.Context, r *protocol.Message, e error) error {
sp := r.ServicePath
sm := r.ServiceMethod
if sp == "" {
return nil
}
m := metrics.GetOrRegisterMeter(p.withPrefix("service."+sp+"."+sm+".Read_Qps"), p.Registry)
m.Mark(1)
return nil
}
// PostWriteResponse count write
func (p *MetricsPlugin) PostWriteResponse(ctx context.Context, req *protocol.Message, res *protocol.Message, e error) error {
sp := res.ServicePath
sm := res.ServiceMethod
if sp == "" {
return nil
}
m := metrics.GetOrRegisterMeter(p.withPrefix("service."+sp+"."+sm+".Write_Qps"), p.Registry)
m.Mark(1)
t := ctx.Value(server.StartRequestContextKey).(int64)
if t > 0 {
t = time.Now().UnixNano() - t
if t < 30*time.Minute.Nanoseconds() { //it is impossible that calltime exceeds 30 minute
//Historgram
h := metrics.GetOrRegisterHistogram(p.withPrefix("service."+sp+"."+sm+".CallTime"), p.Registry,
metrics.NewExpDecaySample(1028, 0.015))
h.Update(t)
}
}
return nil
}
// Log reports metrics into logs.
//
// p.Log( 5 * time.Second, log.New(os.Stderr, "metrics: ", log.Lmicroseconds))
//
func (p *MetricsPlugin) Log(freq time.Duration, l metrics.Logger) {
go metrics.Log(p.Registry, freq, l)
}
// Graphite reports metrics into graphite.
//
// addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:2003")
// p.Graphite(10e9, "metrics", addr)
//
func (p *MetricsPlugin) Graphite(freq time.Duration, prefix string, addr *net.TCPAddr) {
go metrics.Graphite(p.Registry, freq, prefix, addr)
}
// InfluxDB reports metrics into influxdb.
//
// p.InfluxDB(10e9, "http://127.0.0.1:8086","metrics", "test","test"})
//
func (p *MetricsPlugin) InfluxDB(freq time.Duration, url, database, username, password string) {
go InfluxDB(p.Registry, freq, url, database, username, password)
}
// InfluxDBWithTags reports metrics into influxdb with tags.
// you can set node info into tags.
//
// p.InfluxDBWithTags(10e9, "http://127.0.0.1:8086","metrics", "test","test", map[string]string{"host":"127.0.0.1"})
//
func (p *MetricsPlugin) InfluxDBWithTags(freq time.Duration, url, database, username, password string, tags map[string]string) {
go InfluxDBWithTags(p.Registry, freq, url, database, username, password, tags)
}
// Exp uses the same mechanism as the official expvar but exposed under /debug/metrics,
// which shows a json representation of all your usual expvars as well as all your go-metrics.
func (p *MetricsPlugin) Exp() {
exp.Exp(metrics.DefaultRegistry)
}