/
api.go
102 lines (83 loc) · 2.13 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package udp
import (
"cluster"
. "common"
"coordinator"
"encoding/json"
"net"
"protocol"
log "code.google.com/p/log4go"
)
type Server struct {
listenAddress string
database string
coordinator coordinator.Coordinator
clusterConfig *cluster.ClusterConfiguration
conn *net.UDPConn
user *cluster.ClusterAdmin
shutdown chan bool
}
func NewServer(listenAddress string, database string, coord coordinator.Coordinator, clusterConfig *cluster.ClusterConfiguration) *Server {
self := &Server{}
self.listenAddress = listenAddress
self.database = database
self.coordinator = coord
self.shutdown = make(chan bool, 1)
self.clusterConfig = clusterConfig
return self
}
func (self *Server) getAuth() {
// just use any (the first) of the list of admins.
names := self.clusterConfig.GetClusterAdmins()
self.user = self.clusterConfig.GetClusterAdmin(names[0])
}
func (self *Server) ListenAndServe() {
var err error
self.getAuth()
addr, err := net.ResolveUDPAddr("udp4", self.listenAddress)
if err != nil {
log.Error("UDPServer: ResolveUDPAddr: ", err)
return
}
if self.listenAddress != "" {
self.conn, err = net.ListenUDP("udp", addr)
if err != nil {
log.Error("UDPServer: Listen: ", err)
return
}
}
defer self.conn.Close()
self.HandleSocket(self.conn)
}
func (self *Server) HandleSocket(socket *net.UDPConn) {
buffer := make([]byte, 2048)
for {
n, _, err := socket.ReadFromUDP(buffer)
if err != nil || n == 0 {
log.Error("UDP ReadFromUDP error: %s", err)
continue
}
serializedSeries := []*SerializedSeries{}
err = json.Unmarshal(buffer[0:n], &serializedSeries)
if err != nil {
log.Error("UDP json error: %s", err)
continue
}
for _, s := range serializedSeries {
if len(s.Points) == 0 {
continue
}
series, err := ConvertToDataStoreSeries(s, SecondPrecision)
if err != nil {
log.Error("UDP cannot convert received data: %s", err)
continue
}
serie := []*protocol.Series{series}
err = self.coordinator.WriteSeriesData(self.user, self.database, serie)
if err != nil {
log.Error("UDP cannot write data: %s", err)
continue
}
}
}
}