/
hub.go
98 lines (86 loc) · 1.94 KB
/
hub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package server_register
import (
"fmt"
"sync"
"sync/atomic"
"go.uber.org/zap"
)
const (
ErrHostnameEmpty = "error: node's hostname is empty"
ErrTaskIdNoExisted = "error: task_id was not existed"
)
type Hub struct {
mu sync.RWMutex
nodes map[string]*Node
taskMu sync.RWMutex
taskId int32
maxTaskCount int
tasks []*Task
ret map[string]*Return
}
func NewNode(hostname string) *Node {
return &Node{
Hostname: hostname,
Status: NodeOnline,
}
}
func NewHub(max int) *Hub {
return &Hub{
nodes: make(map[string]*Node, 0),
taskId: 0,
maxTaskCount: max,
tasks: make([]*Task, 0),
ret: make(map[string]*Return, 0),
}
}
func (h *Hub) Register(hostname string) error {
if hostname == "" {
return fmt.Errorf(ErrHostnameEmpty)
}
h.mu.Lock()
defer h.mu.Unlock()
if _, ok := h.nodes[hostname]; !ok {
h.nodes[hostname] = NewNode(hostname)
}
return nil
}
func (h *Hub) PullTask(hostname string) *Task {
h.taskMu.RLock()
defer h.taskMu.RUnlock()
taskNum := len(h.tasks)
if taskNum == 0 {
return &Task{Id: 0, Command: make([]string, 0)}
}
if h.tasks[taskNum-1].Result[hostname] == "" {
h.tasks[taskNum-1].Result[hostname] = "wait output"
}
return h.tasks[taskNum-1]
}
func (h *Hub) CompleteTask(hostname string, taskId int32, output string) error {
zap.S().Info("CompleteTask hostname:%s taskId:%d output:(%s)", hostname, taskId, output)
h.taskMu.Lock()
defer h.taskMu.Unlock()
for i, v := range h.tasks {
if v.Id == taskId {
h.tasks[i].Result[hostname] = output
}
if taskId >= v.Id {
h.ret[hostname] = &Return{
TaskId: taskId,
Output: output,
}
return nil
}
}
return fmt.Errorf(ErrTaskIdNoExisted)
}
func (h *Hub) NewTask(commands []string) {
h.taskMu.Lock()
defer h.taskMu.Unlock()
task := &Task{
Id: atomic.AddInt32(&h.taskId, 1),
Command: commands,
Result: make(map[string]string, 0),
}
h.tasks = append(h.tasks, task)
}