/
receiver.go
100 lines (77 loc) · 2.24 KB
/
receiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package receiver
import (
"bytes"
"fmt"
"log"
"sync"
"github.com/BurntSushi/toml"
"github.com/lomik/go-carbon/helper"
"github.com/lomik/go-carbon/points"
)
type Receiver interface {
Stop()
Stat(helper.StatCallback)
}
type protocolRecord struct {
newOptions func() interface{}
newReceiver func(name string, options interface{}, store func(*points.Points)) (Receiver, error)
}
var protocolMap = map[string]*protocolRecord{}
var protocolMapMutex sync.Mutex
func Register(protocol string,
newOptions func() interface{},
newReceiver func(name string, options interface{}, store func(*points.Points)) (Receiver, error)) {
protocolMapMutex.Lock()
defer protocolMapMutex.Unlock()
_, ok := protocolMap[protocol]
if ok {
log.Fatalf("protocol %#v already registered", protocol)
}
protocolMap[protocol] = &protocolRecord{
newOptions: newOptions,
newReceiver: newReceiver,
}
}
// WithProtocol marshal options to toml, unmarshal to map[string]interface{} and add "protocol" key to them
func WithProtocol(options interface{}, protocol string) (map[string]interface{}, error) {
buf := new(bytes.Buffer)
encoder := toml.NewEncoder(buf)
encoder.Indent = ""
if err := encoder.Encode(options); err != nil {
return nil, err
}
res := make(map[string]interface{})
if _, err := toml.Decode(buf.String(), &res); err != nil {
return nil, err
}
res["protocol"] = protocol
return res, nil
}
func New(name string, opts map[string]interface{}, store func(*points.Points)) (Receiver, error) {
protocolNameObj, ok := opts["protocol"]
if !ok {
return nil, fmt.Errorf("protocol unspecified for receiver %#v", name)
}
protocolName, ok := protocolNameObj.(string)
if !ok {
return nil, fmt.Errorf("bad protocol option %#v", protocolNameObj)
}
delete(opts, "protocol")
protocolMapMutex.Lock()
protocol, ok := protocolMap[protocolName]
protocolMapMutex.Unlock()
if !ok {
return nil, fmt.Errorf("unknown protocol %#v", protocolName)
}
buf := new(bytes.Buffer)
encoder := toml.NewEncoder(buf)
encoder.Indent = ""
if err := encoder.Encode(opts); err != nil {
return nil, err
}
options := protocol.newOptions()
if _, err := toml.Decode(buf.String(), options); err != nil {
return nil, err
}
return protocol.newReceiver(name, options, store)
}