-
Notifications
You must be signed in to change notification settings - Fork 34
/
trigger.go
111 lines (102 loc) · 3.13 KB
/
trigger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package pipeline
import (
"database/sql"
"errors"
"github.com/kubespace/kubespace/pkg/informer/listwatcher"
listwatcherconfig "github.com/kubespace/kubespace/pkg/informer/listwatcher/config"
pipelinelistwatcher "github.com/kubespace/kubespace/pkg/informer/listwatcher/pipeline"
"github.com/kubespace/kubespace/pkg/model/types"
"gorm.io/gorm"
"k8s.io/klog/v2"
"time"
)
type PipelineTriggerManager struct {
db *gorm.DB
pipelineTriggerListWatcher listwatcher.Interface
}
func NewPipelineTriggerManager(db *gorm.DB, listwatcherConfig *listwatcherconfig.ListWatcherConfig) *PipelineTriggerManager {
return &PipelineTriggerManager{
db: db,
pipelineTriggerListWatcher: pipelinelistwatcher.NewPipelineTriggerListWatcher(listwatcherConfig, nil),
}
}
func (r *PipelineTriggerManager) Update(id uint, updates *types.PipelineTrigger) error {
return r.db.Debug().Model(types.PipelineTrigger{}).Where("id=?", id).Updates(updates).Error
}
func (r *PipelineTriggerManager) List(cond *PipelineTriggerCondition) ([]*types.PipelineTrigger, error) {
tx := r.conditionQuery(cond)
if tx == nil {
return nil, nil
}
var triggers []*types.PipelineTrigger
if err := tx.Find(&triggers).Error; err != nil {
return nil, err
}
return triggers, nil
}
type PipelineTriggerCondition struct {
Id uint
WorkspaceId uint
PipelineId uint
}
func (r *PipelineTriggerManager) conditionQuery(cond *PipelineTriggerCondition) *gorm.DB {
if cond == nil {
return nil
}
tx := r.db.Model(types.PipelineTrigger{})
if cond.Id != 0 {
tx = tx.Where("id = ?", cond.Id)
return tx
}
if cond.PipelineId != 0 {
tx = tx.Where("pipeline_id = ?", cond.PipelineId)
return tx
}
if cond.WorkspaceId != 0 {
var pipelines []*types.Pipeline
if err := r.db.Find(&pipelines, "workspace_id = ?", cond.WorkspaceId).Error; err != nil {
return nil
}
var pipelineIds []uint
for _, p := range pipelines {
pipelineIds = append(pipelineIds, p.ID)
}
tx = tx.Where("pipeline_id in ?", pipelineIds)
return tx
}
return nil
}
func (r *PipelineTriggerManager) UpdateTriggerTime(triggerTime time.Time, condition *PipelineTriggerCondition) error {
tx := r.conditionQuery(condition)
if tx == nil {
return nil
}
if err := tx.Updates(&types.PipelineTrigger{
NextTriggerTime: &sql.NullTime{Time: triggerTime, Valid: true}, UpdateTime: time.Now()}).Error; err != nil {
return err
}
if triggerTime.Before(time.Now()) {
// 如果触发时间小于当前时间,则发送通知给controller
var triggers []*types.PipelineTrigger
if err := tx.Find(&triggers).Error; err != nil {
klog.Warningf("list triggers error: %s", err.Error())
return nil
}
for _, t := range triggers {
if err := r.pipelineTriggerListWatcher.Notify(t); err != nil {
klog.Warningf("notify pipeline trigger id=%d error: %s", t.ID, err.Error())
}
}
}
return nil
}
func (r *PipelineTriggerManager) Get(id uint) (*types.PipelineTrigger, error) {
var obj types.PipelineTrigger
if err := r.db.First(&obj, "id = ?", id).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, err
}
return &obj, nil
}