forked from yuval-k/harbor
-
Notifications
You must be signed in to change notification settings - Fork 4
/
controller.go
255 lines (240 loc) · 8.31 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package operation
import (
"fmt"
"strings"
"time"
"github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/utils/log"
hjob "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/replication/dao/models"
"github.com/goharbor/harbor/src/replication/model"
"github.com/goharbor/harbor/src/replication/operation/execution"
"github.com/goharbor/harbor/src/replication/operation/flow"
"github.com/goharbor/harbor/src/replication/operation/scheduler"
)
// Controller handles the replication-related operations: start,
// stop, query, etc.
type Controller interface {
// trigger is used to specify what this replication is triggered by
StartReplication(policy *model.Policy, resource *model.Resource, trigger model.TriggerType) (int64, error)
StopReplication(int64) error
ListExecutions(...*models.ExecutionQuery) (int64, []*models.Execution, error)
GetExecution(int64) (*models.Execution, error)
ListTasks(...*models.TaskQuery) (int64, []*models.Task, error)
GetTask(int64) (*models.Task, error)
UpdateTaskStatus(id int64, status string, statusCondition ...string) error
GetTaskLog(int64) ([]byte, error)
}
const (
maxReplicators = 1024
)
var (
jobNotFoundErrorMsg = "object is not found"
)
// NewController returns a controller implementation
func NewController(js job.Client) Controller {
ctl := &controller{
replicators: make(chan struct{}, maxReplicators),
executionMgr: execution.NewDefaultManager(),
scheduler: scheduler.NewScheduler(js),
flowCtl: flow.NewController(),
}
for i := 0; i < maxReplicators; i++ {
ctl.replicators <- struct{}{}
}
return ctl
}
type controller struct {
replicators chan struct{}
flowCtl flow.Controller
executionMgr execution.Manager
scheduler scheduler.Scheduler
}
func (c *controller) StartReplication(policy *model.Policy, resource *model.Resource, trigger model.TriggerType) (int64, error) {
if !policy.Enabled {
return 0, fmt.Errorf("the policy %d is disabled", policy.ID)
}
if len(trigger) == 0 {
trigger = model.TriggerTypeManual
}
id, err := createExecution(c.executionMgr, policy.ID, trigger)
if err != nil {
return 0, err
}
// control the count of concurrent replication requests
log.Debugf("waiting for the available replicator ...")
<-c.replicators
log.Debugf("got an available replicator, starting the replication ...")
go func() {
defer func() {
c.replicators <- struct{}{}
}()
flow := c.createFlow(id, policy, resource)
if n, err := c.flowCtl.Start(flow); err != nil {
// only update the execution when got error.
// if got no error, it will be updated automatically
// when listing the execution records
if e := c.executionMgr.Update(&models.Execution{
ID: id,
Status: models.ExecutionStatusFailed,
StatusText: err.Error(),
Total: n,
Failed: n,
}, "Status", "StatusText", "Total", "Failed"); e != nil {
log.Errorf("failed to update the execution %d: %v", id, e)
}
log.Errorf("the execution %d failed: %v", id, err)
}
}()
return id, nil
}
// create different replication flows according to the input parameters
func (c *controller) createFlow(executionID int64, policy *model.Policy, resource *model.Resource) flow.Flow {
// replicate the deletion operation, so create a deletion flow
if resource != nil && resource.Deleted {
return flow.NewDeletionFlow(c.executionMgr, c.scheduler, executionID, policy, resource)
}
resources := []*model.Resource{}
if resource != nil {
resources = append(resources, resource)
}
return flow.NewCopyFlow(c.executionMgr, c.scheduler, executionID, policy, resources...)
}
func (c *controller) StopReplication(executionID int64) error {
_, tasks, err := c.ListTasks(&models.TaskQuery{
ExecutionID: executionID,
})
if err != nil {
return err
}
// no tasks, just set its status to "stopped"
if len(tasks) == 0 {
execution, err := c.executionMgr.Get(executionID)
if err != nil {
return err
}
if execution == nil {
return fmt.Errorf("the execution %d not found", executionID)
}
if execution.Status != models.ExecutionStatusInProgress {
log.Debugf("the execution %d isn't in progress, no need to stop", executionID)
return nil
}
if err = c.executionMgr.Update(&models.Execution{
ID: executionID,
Status: models.ExecutionStatusStopped,
EndTime: time.Now(),
}, models.ExecutionPropsName.Status, models.ExecutionPropsName.EndTime); err != nil {
return err
}
log.Debugf("the status of execution %d is set to stopped", executionID)
}
// got tasks, stopping the tasks one by one
for _, task := range tasks {
if isTaskInFinalStatus(task) {
log.Debugf("the task %d(job ID: %s) is in final status, its status is %s, skip", task.ID, task.JobID, task.Status)
continue
}
if err = c.scheduler.Stop(task.JobID); err != nil {
isStatusBehindError, ok := err.(*job.StatusBehindError)
if ok {
status := isStatusBehindError.Status()
switch hjob.Status(status) {
case hjob.ErrorStatus:
status = models.TaskStatusFailed
case hjob.SuccessStatus:
status = models.TaskStatusSucceed
}
e := c.executionMgr.UpdateTask(&models.Task{
ID: task.ID,
Status: status,
}, "Status")
if e != nil {
log.Errorf("failed to update the status the task %d(job ID: %s): %v", task.ID, task.JobID, e)
} else {
log.Debugf("got status behind error for task %d, update it's status to %s directly", task.ID, status)
}
continue
}
if isJobNotFoundError(err) {
e := c.executionMgr.UpdateTask(&models.Task{
ID: task.ID,
Status: models.ExecutionStatusStopped,
}, "Status")
if e != nil {
log.Errorf("failed to update the status the task %d(job ID: %s): %v", task.ID, task.JobID, e)
} else {
log.Debugf("got job not found error for task %d, update it's status to %s directly", task.ID, models.ExecutionStatusStopped)
}
continue
}
log.Errorf("failed to stop the task %d(job ID: %s): %v", task.ID, task.JobID, err)
continue
}
log.Debugf("the stop request for task %d(job ID: %s) sent", task.ID, task.JobID)
}
return nil
}
func isTaskInFinalStatus(task *models.Task) bool {
if task == nil {
return false
}
switch task.Status {
case models.TaskStatusSucceed,
models.TaskStatusStopped,
models.TaskStatusFailed:
return true
}
return false
}
func isJobNotFoundError(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), jobNotFoundErrorMsg)
}
func (c *controller) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) {
return c.executionMgr.List(query...)
}
func (c *controller) GetExecution(executionID int64) (*models.Execution, error) {
return c.executionMgr.Get(executionID)
}
func (c *controller) ListTasks(query ...*models.TaskQuery) (int64, []*models.Task, error) {
return c.executionMgr.ListTasks(query...)
}
func (c *controller) GetTask(id int64) (*models.Task, error) {
return c.executionMgr.GetTask(id)
}
func (c *controller) UpdateTaskStatus(id int64, status string, statusCondition ...string) error {
return c.executionMgr.UpdateTaskStatus(id, status, statusCondition...)
}
func (c *controller) GetTaskLog(taskID int64) ([]byte, error) {
return c.executionMgr.GetTaskLog(taskID)
}
// create the execution record in database
func createExecution(mgr execution.Manager, policyID int64, trigger model.TriggerType) (int64, error) {
id, err := mgr.Create(&models.Execution{
PolicyID: policyID,
Trigger: trigger,
Status: models.ExecutionStatusInProgress,
StartTime: time.Now(),
})
if err != nil {
return 0, fmt.Errorf("failed to create the execution record for replication based on policy %d: %v", policyID, err)
}
log.Debugf("an execution record for replication based on the policy %d created: %d", policyID, id)
return id, nil
}