forked from miaokobot/miaospeed
/
testingpollitem.go
114 lines (93 loc) · 2.85 KB
/
testingpollitem.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package service
import (
"sync"
"time"
"github.com/moshaoli688/miaospeed/interfaces"
"github.com/moshaoli688/miaospeed/service/macros"
"github.com/moshaoli688/miaospeed/service/macros/invalid"
"github.com/moshaoli688/miaospeed/service/matrices"
"github.com/moshaoli688/miaospeed/service/taskpoll"
"github.com/moshaoli688/miaospeed/utils"
"github.com/moshaoli688/miaospeed/utils/structs"
"github.com/moshaoli688/miaospeed/vendors"
)
type TestingPollItem struct {
id string
name string
request *interfaces.SlaveRequest
matrices []interfaces.SlaveRequestMatrixEntry
macros []interfaces.SlaveRequestMacroType
results *structs.AsyncArr[interfaces.SlaveEntrySlot]
onProcess func(self *TestingPollItem, idx int, result interfaces.SlaveEntrySlot)
onExit func(self *TestingPollItem, exitCode taskpoll.TaskPollExitCode)
onProcessLock sync.Mutex
exitOnce sync.Once
}
func (tpi *TestingPollItem) ID() string {
return tpi.id
}
func (tpi *TestingPollItem) TaskName() string {
return tpi.name
}
func (tpi *TestingPollItem) Weight() uint {
// TODO: could arrange weight based on task size
// or customized rules
return 1
}
func (tpi *TestingPollItem) Count() int {
return len(tpi.request.Nodes)
}
func (tpi *TestingPollItem) Yield(idx int, tpc *taskpoll.TaskPollController) {
result := interfaces.SlaveEntrySlot{
ProxyInfo: interfaces.ProxyInfo{},
InvokeDuration: -1,
Matrices: []interfaces.MatrixResponse{},
}
defer func() {
utils.WrapErrorPure("Task yield error", recover())
tpi.results.Push(result)
tpi.onProcessLock.Lock()
defer tpi.onProcessLock.Unlock()
tpi.onProcess(tpi, idx, result)
}()
node := tpi.request.Nodes[idx]
vendor := vendors.Find(tpi.request.Vendor).Build(node.Name, node.Payload)
result.ProxyInfo = vendor.ProxyInfo()
macroMap := structs.NewAsyncMap[interfaces.SlaveRequestMacroType, interfaces.SlaveRequestMacro]()
startTime := time.Now().UnixMilli()
wg := sync.WaitGroup{}
wg.Add(len(tpi.macros))
for _, macro := range tpi.macros {
macroName := macro
go func() {
macro := macros.Find(macroName)
macro.Run(vendor, tpi.request)
macroMap.Set(macroName, macro)
wg.Done()
}()
}
wg.Wait()
endTime := time.Now().UnixMilli()
result.InvokeDuration = endTime - startTime
result.Matrices = structs.Map(tpi.matrices, func(me interfaces.SlaveRequestMatrixEntry) interfaces.MatrixResponse {
m := matrices.Find(me.Type)
macro := macroMap.MustGet(m.MacroJob())
if macro == nil {
macro = &invalid.Invalid{}
}
m.Extract(me, macro)
return interfaces.MatrixResponse{
Type: m.Type(),
Payload: utils.ToJSON(m),
}
})
}
func (tpi *TestingPollItem) OnExit(exitCode taskpoll.TaskPollExitCode) {
tpi.exitOnce.Do(func() {
tpi.onExit(tpi, exitCode)
})
}
func (tpi *TestingPollItem) Init() taskpoll.TaskPollItem {
tpi.results = structs.NewAsyncArr[interfaces.SlaveEntrySlot]()
return tpi
}