This repository has been archived by the owner on May 21, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 14
/
event.go
146 lines (128 loc) · 3.48 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package ultron
import (
"fmt"
"os"
"sort"
"strings"
"go.uber.org/zap"
)
type (
// ResultHandleFunc .
ResultHandleFunc func(*Result)
// ReportHandleFunc .
ReportHandleFunc func(Report)
eventHook struct {
retFuncs []ResultHandleFunc
repFuncs []ReportHandleFunc
Concurrency int // 控制ResultHandleFunc执行时的最大并发数,避免造成竞争
ch chan struct{}
}
)
var (
// LocalEventHook 单机运行时使用的钩子
LocalEventHook *eventHook
// SlaveEventHook 分布式执行时,节点钩子
SlaveEventHook *eventHook
// MasterEventHook 分布式执行时,主控钩子
MasterEventHook *eventHook
// LocalEventHookConcurrency .
LocalEventHookConcurrency = 200
// SlaveEventHookConcurrency .
SlaveEventHookConcurrency = 200
// MasterEventHookConcurrency 默认不用控制
MasterEventHookConcurrency = 0
cutLine = strings.Repeat("-", 126)
)
func newEventHook(c int) *eventHook {
return &eventHook{
retFuncs: []ResultHandleFunc{},
repFuncs: []ReportHandleFunc{},
Concurrency: c,
}
}
func (eh *eventHook) AddResultHandleFunc(retFunc ...ResultHandleFunc) {
for _, f := range retFunc {
eh.retFuncs = append(eh.retFuncs, f)
}
}
func (eh *eventHook) AddReportHandleFunc(repFunc ...ReportHandleFunc) {
for _, f := range repFunc {
eh.repFuncs = append(eh.repFuncs, f)
}
}
func (eh *eventHook) listen(retC resultPipeline, repC reportPipeline) {
if eh.Concurrency > 0 {
eh.ch = make(chan struct{}, eh.Concurrency)
}
if retC != nil {
go func(c resultPipeline) {
for r := range c {
if eh.Concurrency > 0 {
eh.ch <- struct{}{}
}
go func(r *Result) {
defer func() {
if eh.Concurrency > 0 {
<-eh.ch
}
}()
for _, f := range eh.retFuncs {
f(r)
}
}(r)
}
}(retC)
}
if repC != nil {
go func(c reportPipeline) {
for rep := range c {
go func(r Report) {
for _, f := range eh.repFuncs {
f(r)
}
}(rep)
}
}(repC)
}
}
func printReportToConsole(report Report) {
var full bool
var keys []string
var f *os.File
var err error
for k, r := range report {
keys = append(keys, k)
if r.FullHistory {
full = r.FullHistory
}
}
sort.Strings(keys)
s := fmt.Sprintf("|%-48s|%12s|%12s|%12s|%8s|%9s|%8s|%8s|\n", "Name", "Requests", "Failures", "QPS", "Min", "Max", "Avg", "Median")
d := fmt.Sprintf("\nPercentage of the requests completed within given times: \n\n|%-48s|%12s|%8s|%8s|%8s|%8s|%8s|%8s|%8s|\n", "Name", "Requests", "60%", "70%", "80%", "90%", "95%", "98%", "99%")
for _, key := range keys {
r := report[key]
s += fmt.Sprintf("|%-48s|%12d|%12d|%12d|%8d|%9d|%8d|%8d|\n", r.Name, r.Requests, r.Failures, r.QPS, r.Min, r.Max, r.Average, r.Median)
d += fmt.Sprintf("|%-48s|%12d|%8d|%8d|%8d|%8d|%8d|%8d|%8d|\n", r.Name, r.Requests, r.Distributions["0.60"], r.Distributions["0.70"], r.Distributions["0.80"], r.Distributions["0.90"], r.Distributions["0.95"], r.Distributions["0.98"], r.Distributions["0.99"])
}
op := cutLine + "\n" + s + d + cutLine + "\n"
fmt.Println(op)
if additionalOutput != "" {
f, err = os.OpenFile(additionalOutput, os.O_APPEND|os.O_WRONLY, 0666)
if err == nil {
defer f.Close()
f.WriteString(op)
}
}
if full {
data, err := json.MarshalIndent(report, "", " ")
if err == nil {
op = "============= Summary Report =============\n\n" + string(data) + "\n"
fmt.Printf(op)
if additionalOutput != "" {
f.WriteString(op)
}
} else {
Logger.Error("marshel report object failed", zap.Error(err))
}
}
}