/
cronjob.go
143 lines (127 loc) · 4.41 KB
/
cronjob.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/*
Copyright 2021 The KodeRover Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workflow
import (
"encoding/json"
"go.uber.org/zap"
"github.com/koderover/zadig/pkg/microservice/aslan/config"
commonmodels "github.com/koderover/zadig/pkg/microservice/aslan/core/common/repository/models"
commonrepo "github.com/koderover/zadig/pkg/microservice/aslan/core/common/repository/mongodb"
commonservice "github.com/koderover/zadig/pkg/microservice/aslan/core/common/service"
"github.com/koderover/zadig/pkg/microservice/aslan/core/common/service/nsq"
"github.com/koderover/zadig/pkg/setting"
e "github.com/koderover/zadig/pkg/tool/errors"
)
func HandleCronjob(workflow *commonmodels.Workflow, log *zap.SugaredLogger) error {
workflowSchedule := workflow.Schedules
if workflowSchedule != nil {
workflow.Schedules = nil
workflowSchedule.Enabled = workflow.ScheduleEnabled
payload := &commonservice.CronjobPayload{
Name: workflow.Name,
JobType: config.WorkflowCronjob,
}
if workflowSchedule.Enabled {
deleteList, err := UpdateCronjob(workflow.Name, config.WorkflowCronjob, "", workflowSchedule, log)
if err != nil {
log.Errorf("Failed to update cronjob, the error is: %v", err)
return e.ErrUpsertCronjob.AddDesc(err.Error())
}
payload.Action = setting.TypeEnableCronjob
payload.DeleteList = deleteList
payload.JobList = workflowSchedule.Items
} else {
payload.Action = setting.TypeDisableCronjob
}
pl, _ := json.Marshal(payload)
err := nsq.Publish(setting.TopicCronjob, pl)
if err != nil {
log.Errorf("Failed to publish to nsq topic: %s, the error is: %v", setting.TopicCronjob, err)
return e.ErrUpsertCronjob.AddDesc(err.Error())
}
}
return nil
}
func UpdateCronjob(parentName, parentType, productName string, schedule *commonmodels.ScheduleCtrl, log *zap.SugaredLogger) (deleteList []string, err error) {
idMap := make(map[string]bool)
deleteList = make([]string, 0)
jobList, err := commonrepo.NewCronjobColl().List(&commonrepo.ListCronjobParam{
ParentName: parentName,
ParentType: parentType,
})
if err != nil {
log.Errorf("cannot get cron job list from mongodb, the error is: %v", err)
return nil, err
}
// 把id扔到一个map里面方便统计管理
for _, cron := range jobList {
idMap[cron.ID.Hex()] = true
}
for _, tasks := range schedule.Items {
// 非空ID:修改cronjob,保留这个cronjob 空ID: 直接新建条目
job := &commonmodels.Cronjob{
Name: parentName,
Type: parentType,
Number: tasks.Number,
Frequency: tasks.Frequency,
Time: tasks.Time,
Cron: tasks.Cron,
MaxFailure: tasks.MaxFailures,
TaskArgs: tasks.TaskArgs,
WorkflowArgs: tasks.WorkflowArgs,
TestArgs: tasks.TestArgs,
JobType: string(tasks.Type),
Enabled: true,
}
if !tasks.ID.IsZero() {
job.ID = tasks.ID
if parentType == config.TestingCronjob {
job.ProductName = productName
}
err := commonrepo.NewCronjobColl().Update(job)
if err != nil {
log.Errorf("Failed to update task of id %s, the error is: %v", tasks.ID.Hex(), err)
return nil, err
}
delete(idMap, tasks.ID.Hex())
} else {
if parentType == config.TestingCronjob {
job.ProductName = productName
}
err := commonrepo.NewCronjobColl().Create(job)
if err != nil {
log.Errorf("Failed to create task, error: %v", err)
return nil, err
}
tasks.ID = job.ID
}
}
//统计需要删除的cronjob列表
for k := range idMap {
deleteList = append(deleteList, k)
}
err = commonrepo.NewCronjobColl().Delete(&commonrepo.CronjobDeleteOption{
IDList: deleteList,
})
if err != nil {
log.Errorf("Failed to delete cronjobs: %v from mongodb, the error is: %v", deleteList, err)
return nil, err
}
return deleteList, nil
}
func DeleteCronjob(parentName, parentType string) error {
return commonrepo.NewCronjobColl().Delete(&commonrepo.CronjobDeleteOption{
ParentName: parentName,
ParentType: parentType,
})
}