/
worker_manager.go
159 lines (131 loc) · 3.87 KB
/
worker_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package worker
import (
"errors"
"fmt"
"github.com/tendermint/tendermint/libs/log"
"github.com/rhizomata-io/dist-daemon-tendermint/daemon/common"
"github.com/rhizomata-io/dist-daemon-tendermint/daemon/job"
"github.com/rhizomata-io/dist-daemon-tendermint/types"
)
// Manager manager for jobs
type Manager struct {
common.Context
config common.DaemonConfig
dao Repository
logger log.Logger
facReg *factoryRegistry
workers map[string]Worker
spaceRegistry types.SpaceRegistry
}
// NewManager ..
func NewManager(context common.Context, spaceRegistry types.SpaceRegistry) *Manager {
dao := NewRepository(context.GetConfig(), context, context.GetClient())
manager := Manager{Context: context, dao: dao, logger: context, spaceRegistry: spaceRegistry}
manager.facReg = NewFactoryRegistry()
manager.workers = make(map[string]Worker)
return &manager
}
func (manager *Manager) RegisterWorkerFactory(factory Factory) error {
err := manager.facReg.RegisterFactory(factory)
if err == nil {
manager.spaceRegistry.RegisterSpaceIfNotExist(factory.Space())
}
return err
}
func (manager *Manager) GetRepository() Repository {
return manager.dao
}
func (manager *Manager) Start() {
}
// ContainsWorker if worker id is registered.
func (manager *Manager) ContainsWorker(id string) bool {
return manager.workers[id] != nil
}
// GetWorker get worker for id
func (manager *Manager) GetWorker(id string) Worker {
return manager.workers[id]
}
// registerWorker ..
func (manager *Manager) registerWorker(job job.Job) error {
if manager.workers[job.ID] != nil {
return errors.New(fmt.Sprintf("worker[%s] is already registered. "+
"If you want register new one, DeregisterWorker first", job.ID))
}
worker, err := manager.newWorker(job)
if err != nil {
return err
}
manager.workers[job.ID] = worker
err = worker.Start()
return err
}
func (manager *Manager) newWorker(job job.Job) (Worker, error) {
fac, err := manager.facReg.GetFactory(job.FactoryName)
if err != nil {
return nil, err
}
helper := NewHelper(fac.Space(), manager.config, manager.logger, job, manager.dao)
if err != nil {
manager.logger.Error(fmt.Sprintf("cannot find worker factory '%s'", job.FactoryName), err)
return nil, err
}
worker, err := fac.NewWorker(helper)
if err != nil {
manager.logger.Error("cannot create worker ", err)
return nil, err
}
return worker, err
}
// deregisterWorker ..
func (manager *Manager) deregisterWorker(jobID string) error {
worker := manager.workers[jobID]
if worker == nil {
return errors.New("Worker[" + jobID + "] is not registered.")
}
err := worker.Stop()
if err == nil {
delete(manager.workers, jobID)
}
return err
}
// SetJobs ...
func (manager *Manager) SetJobs(jobs []job.Job) {
manager.logger.Info("[WorkerManager] Set Jobs:", "job_count", len(jobs))
tempWorkers := make(map[string]Worker)
newWorkers := make(map[string]Worker)
for id, worker := range manager.workers {
tempWorkers[id] = worker
}
for _, job := range jobs {
worker := tempWorkers[job.ID]
if worker != nil {
delete(tempWorkers, job.ID)
} else {
worker2, err := manager.newWorker(job)
if err != nil {
manager.logger.Error("[ERROR-WorkerMan] Cannot create worker ", err)
continue
} else {
worker = worker2
manager.logger.Info("[WARN-WorkerMan] New Worker ", "jonID", job.ID)
}
}
newWorkers[job.ID] = worker
}
// 제거된 worker 종료하기
for id, worker := range tempWorkers {
worker.Stop()
manager.logger.Info("[WARN-WorkerMan] Dispose Worker ", "jonID", id)
}
manager.workers = newWorkers
for id, worker := range manager.workers {
if !worker.IsStarted() {
go func(id string, worker Worker) {
manager.logger.Info("[WARN-WorkerMan] New Worker Starting ", "jonID", id)
worker.Start()
}(id, worker)
} else {
manager.logger.Info("[WARN-WorkerMan] Remained Worker ", "jonID", id)
}
}
}