forked from FactomProject/factomd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
queues.go
105 lines (94 loc) · 3.24 KB
/
queues.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package state
//
// Addressing Performance
// IQueues replace channels and monitor enqueues and dequeues
// with prometheus instrumentation. By tripping a prometheus call,
// performance is lost, but compared to the insight gained, is worth it.
// The performance does not affect our queue management.
//
// Benchmarks :: `go test -bench=. queues_test.go `
// BenchmarkChannels-4 20000000 94.7 ns/op
// BenchmarkQueues-4 10000000 153 ns/op
// BenchmarkConcurentChannels-4 10000000 138 ns/op
// BenchmarkConcurrentQueues-4 5000000 251 ns/op
// BenchmarkCompetingChannels-4 3000000 360 ns/op
// BenchmarkCompetingQueues-4 1000000 1302 ns/op
import (
"github.com/FactomProject/factomd/common/constants"
"github.com/FactomProject/factomd/common/interfaces"
"github.com/prometheus/client_golang/prometheus"
)
// Returning this is non-instrumented way
type GeneralMSGQueue chan interfaces.IMsg
// Length of underlying channel
func (q GeneralMSGQueue) Length() int {
return len(chan interfaces.IMsg(q))
}
// Cap of underlying channel
func (q GeneralMSGQueue) Cap() int {
return cap(chan interfaces.IMsg(q))
}
// Enqueue adds item to channel
func (q GeneralMSGQueue) Enqueue(t interfaces.IMsg) {
q <- t
}
// Dequeue returns the channel dequeue
func (q GeneralMSGQueue) Dequeue() interfaces.IMsg {
select {
case v := <-q:
return v
default:
return nil
}
}
func (q GeneralMSGQueue) BlockingDequeue() interfaces.IMsg {
return <-q
}
// measureMessage will increment/decrement prometheus based on type
func measureMessage(counter *prometheus.GaugeVec, msg interfaces.IMsg, increment bool) {
if msg == nil {
return
}
amt := float64(1)
if !increment {
amt = -1
}
if counter != nil {
switch msg.Type() {
case constants.EOM_MSG: // 1
counter.WithLabelValues("eom").Add(amt)
case constants.ACK_MSG: // 2
counter.WithLabelValues("ack").Add(amt)
case constants.FULL_SERVER_FAULT_MSG: // 5
counter.WithLabelValues("fault").Add(amt)
case constants.COMMIT_CHAIN_MSG: // 6
counter.WithLabelValues("commitchain").Add(amt)
case constants.COMMIT_ENTRY_MSG: // 7
counter.WithLabelValues("commitentry").Add(amt)
case constants.DIRECTORY_BLOCK_SIGNATURE_MSG: // 8
counter.WithLabelValues("dbsig").Add(amt)
case constants.FACTOID_TRANSACTION_MSG: // 10
counter.WithLabelValues("factoid").Add(amt)
case constants.HEARTBEAT_MSG: // 11
counter.WithLabelValues("heartbeat").Add(amt)
case constants.MISSING_MSG: // 13
counter.WithLabelValues("missingmsg").Add(amt)
case constants.MISSING_MSG_RESPONSE: // 14
counter.WithLabelValues("missingmsgresp").Add(amt)
case constants.MISSING_DATA: // 15
counter.WithLabelValues("missingdata").Add(amt)
case constants.DATA_RESPONSE: // 16
counter.WithLabelValues("dataresp").Add(amt)
case constants.REVEAL_ENTRY_MSG: // 17
counter.WithLabelValues("revealentry").Add(amt)
case constants.REQUEST_BLOCK_MSG: // 18
counter.WithLabelValues("requestblock").Add(amt)
case constants.DBSTATE_MISSING_MSG: // 19
counter.WithLabelValues("dbstatmissing").Add(amt)
case constants.DBSTATE_MSG: // 20
counter.WithLabelValues("dbstate").Add(amt)
default: // 23
counter.WithLabelValues("misc").Add(amt)
}
}
}