/
handler.go
155 lines (120 loc) · 3.11 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package stats
/**
* handler.go - server stats handler
*
* @author Yaroslav Pogrebnyak <yyyaroslav@gmail.com>
*/
import (
"fmt"
"time"
"github.com/yyyar/gobetween/core"
"github.com/yyyar/gobetween/metrics"
"github.com/yyyar/gobetween/stats/counters"
)
const (
/* Stats update interval */
INTERVAL = 2 * time.Second
)
/**
* Handler processess data from server
*/
type Handler struct {
/* Server's name */
Name string
/* Server counter */
serverCounter *counters.BandwidthCounter
/* Backends counters */
BackendsCounter *counters.BackendsBandwidthCounter
/* Current stats */
latestStats Stats
/* ----- channels ----- */
/* Server traffic data */
Traffic chan core.ReadWriteCount
/* Server current connections count */
Connections chan uint
/* Current backends pool */
Backends chan []core.Backend
/* Channel for indicating stop request */
stopChan chan bool
/* Input channel for latest stats */
ServerStats chan counters.BandwidthStats
}
/**
* Creates new stats handler for the server
* with name 'name'
*/
func NewHandler(name string) *Handler {
handler := &Handler{
Name: name,
ServerStats: make(chan counters.BandwidthStats, 1),
Traffic: make(chan core.ReadWriteCount),
Connections: make(chan uint),
Backends: make(chan []core.Backend),
stopChan: make(chan bool),
latestStats: Stats{
RxTotal: 0,
TxTotal: 0,
RxSecond: 0,
TxSecond: 0,
Backends: []core.Backend{},
},
}
handler.serverCounter = counters.NewBandwidthCounter(INTERVAL, handler.ServerStats)
handler.BackendsCounter = counters.NewBackendsBandwidthCounter()
Store.Lock()
Store.handlers[name] = handler
Store.Unlock()
return handler
}
/**
* Start handler work asynchroniously
*/
func (this *Handler) Start() {
this.serverCounter.Start()
this.BackendsCounter.Start()
go func() {
for {
select {
/* stop stats processor requested */
case <-this.stopChan:
this.serverCounter.Stop()
this.BackendsCounter.Stop()
Store.Lock()
delete(Store.handlers, this.Name)
Store.Unlock()
// close channels
close(this.ServerStats)
close(this.Traffic)
close(this.Connections)
return
/* New server stats available */
case b := <-this.ServerStats:
this.latestStats.RxTotal = b.RxTotal
this.latestStats.TxTotal = b.TxTotal
this.latestStats.RxSecond = b.RxSecond
this.latestStats.TxSecond = b.TxSecond
metrics.ReportHandleStatsChange(fmt.Sprintf("%s", this.Name), b)
/* New server backends with stats available */
case backends := <-this.Backends:
this.latestStats.Backends = backends
/* New sever connections count available */
case connections := <-this.Connections:
this.latestStats.ActiveConnections = connections
metrics.ReportHandleConnectionsChange(fmt.Sprintf("%s", this.Name), connections)
/* New traffic stats available */
case rwc := <-this.Traffic:
// forward to counters
go func() {
this.serverCounter.Traffic <- rwc
this.BackendsCounter.Traffic <- rwc
}()
}
}
}()
}
/**
* Request handler stop and clear resources
*/
func (this *Handler) Stop() {
this.stopChan <- true
}