-
Notifications
You must be signed in to change notification settings - Fork 390
/
server.go
122 lines (103 loc) · 2.52 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package telemetry
import (
"context"
"log"
"net"
"syscall"
"github.com/zeebo/admission"
"github.com/zeebo/admission/admproto"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
)
var (
mon = monkit.Package()
)
// Handler is called every time a new metric comes in
type Handler interface {
Metric(application, instance string, key []byte, val float64)
}
// HandlerFunc turns a func into a Handler
type HandlerFunc func(application, instance string, key []byte, val float64)
// Metric implements the Handler interface
func (f HandlerFunc) Metric(a, i string, k []byte, v float64) { f(a, i, k, v) }
// Server listens for incoming metrics
type Server struct {
conn net.PacketConn
}
// Addr returns the address the server is serving on
func (s *Server) Addr() string {
return s.conn.LocalAddr().String()
}
// Listen will start listening on addr for metrics
func Listen(addr string) (*Server, error) {
conn, err := net.ListenPacket("udp", addr)
if err != nil {
return nil, err
}
return &Server{conn: conn}, nil
}
// Close will stop listening
func (s *Server) Close() error {
return s.conn.Close()
}
// Serve will wait for metrics and call Handler h as they come in
func (s *Server) Serve(ctx context.Context, h Handler) error {
scconn, ok := s.conn.(syscall.Conn)
if !ok {
return Error.New("invalid conn: %T", s.conn)
}
rc, err := scconn.SyscallConn()
if err != nil {
return err
}
return admission.Dispatcher{
Handler: handlerWrapper{h: h},
Conn: rc,
}.Run(ctx)
}
// ListenAndServe combines Listen and Serve
func ListenAndServe(ctx context.Context, addr string, h Handler) error {
s, err := Listen(addr)
if err != nil {
return err
}
defer func() {
if err := s.Close(); err != nil {
log.Printf("Failed to close Server: %s", err)
}
}()
return s.Serve(ctx, h)
}
type handlerWrapper struct {
h Handler
}
var (
handleTask = mon.Task()
)
func (h handlerWrapper) Handle(ctx context.Context, m *admission.Message) {
finish := handleTask(nil)
data, err := admproto.CheckChecksum(m.Data)
if err != nil {
finish(&err)
return
}
r := admproto.NewReaderWith(m.Scratch[:])
data, applicationB, instanceB, err := r.Begin(data)
if err != nil {
finish(&err)
return
}
application, instance := string(applicationB), string(instanceB)
var key []byte
var value float64
for len(data) > 0 {
data, key, value, err = r.Next(data)
if err != nil {
finish(&err)
return
}
h.h.Metric(application, instance, key, value)
}
finish(nil)
}