-
Notifications
You must be signed in to change notification settings - Fork 0
/
controller.go
179 lines (150 loc) · 4.44 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package lcm
import (
"context"
"encoding/json"
"github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
"sync"
"time"
)
const (
// Waiting a short while if any errors occurred
shortLoopInterval = 5 * time.Second
// Waiting for long while if no retrying elements found
longLoopInterval = 5 * time.Minute
)
// Controller is designed to control the life cycle of the job
type Controller interface {
// Run daemon process if needed
Serve() error
// New tracker from the new provided stats
New(stats *job.Stats) (job.Tracker, error)
// Track the life cycle of the specified existing job
Track(jobID string) (job.Tracker, error)
}
// basicController is default implementation of Controller based on redis
type basicController struct {
context context.Context
namespace string
pool *redis.Pool
callback job.HookCallback
wg *sync.WaitGroup
}
// NewController is the constructor of basic controller
func NewController(ctx *env.Context, ns string, pool *redis.Pool, callback job.HookCallback) Controller {
return &basicController{
context: ctx.SystemContext,
namespace: ns,
pool: pool,
callback: callback,
wg: ctx.WG,
}
}
// Serve ...
func (bc *basicController) Serve() error {
bc.wg.Add(1)
go bc.loopForRestoreDeadStatus()
logger.Info("Status restoring loop is started")
return nil
}
// New tracker
func (bc *basicController) New(stats *job.Stats) (job.Tracker, error) {
if stats == nil {
return nil, errors.New("nil stats when creating job tracker")
}
if err := stats.Validate(); err != nil {
return nil, errors.Errorf("error occurred when creating job tracker: %s", err)
}
bt := job.NewBasicTrackerWithStats(bc.context, stats, bc.namespace, bc.pool, bc.callback)
if err := bt.Save(); err != nil {
return nil, err
}
return bt, nil
}
// Track and attache with the job
func (bc *basicController) Track(jobID string) (job.Tracker, error) {
bt := job.NewBasicTrackerWithID(bc.context, jobID, bc.namespace, bc.pool, bc.callback)
if err := bt.Load(); err != nil {
return nil, err
}
return bt, nil
}
// loopForRestoreDeadStatus is a loop to restore the dead states of jobs
func (bc *basicController) loopForRestoreDeadStatus() {
defer func() {
logger.Info("Status restoring loop is stopped")
bc.wg.Done()
}()
token := make(chan bool, 1)
token <- true
for {
<-token
if err := bc.restoreDeadStatus(); err != nil {
waitInterval := shortLoopInterval
if err == rds.ErrNoElements {
// No elements
waitInterval = longLoopInterval
} else {
logger.Errorf("restore dead status error: %s, put it back to the retrying Q later again", err)
}
// wait for a while or be terminated
select {
case <-time.After(waitInterval):
case <-bc.context.Done():
return
}
}
// Return token
token <- true
}
}
// restoreDeadStatus try to restore the dead status
func (bc *basicController) restoreDeadStatus() error {
// Get one
deadOne, err := bc.popOneDead()
if err != nil {
return err
}
// Try to update status
t, err := bc.Track(deadOne.JobID)
if err != nil {
return err
}
return t.UpdateStatusWithRetry(job.Status(deadOne.TargetStatus))
}
// popOneDead retrieves one dead status from the backend Q from lowest to highest
func (bc *basicController) popOneDead() (*job.SimpleStatusChange, error) {
conn := bc.pool.Get()
defer func() {
_ = conn.Close()
}()
key := rds.KeyStatusUpdateRetryQueue(bc.namespace)
v, err := rds.ZPopMin(conn, key)
if err != nil {
return nil, err
}
if bytes, ok := v.([]byte); ok {
ssc := &job.SimpleStatusChange{}
if err := json.Unmarshal(bytes, ssc); err == nil {
return ssc, nil
}
}
return nil, errors.New("pop one dead error: bad result reply")
}