From e899d659f3c01d44269d353898766ef5853af3e8 Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Wed, 8 Jan 2020 18:09:43 +0800 Subject: [PATCH] fix[jobservice]:job status is hung after restart - improve the status hook sending/resending approach - improve the status compare and set approach - simplify the relevant flow - add reaper to fix the out of sync jobs - fix #10244 , fix #9963 Signed-off-by: Steven Zou --- src/jobservice/common/list/list.go | 64 +++ src/jobservice/common/list/list_test.go | 57 +++ src/jobservice/common/rds/keys.go | 30 ++ src/jobservice/common/rds/scripts.go | 273 +++++++++++++ src/jobservice/common/rds/utils.go | 5 +- src/jobservice/hook/hook_agent.go | 207 ++++------ src/jobservice/hook/hook_agent_test.go | 235 ++++++------ src/jobservice/hook/hook_client_test.go | 9 +- src/jobservice/job/impl/context_test.go | 3 + src/jobservice/job/impl/sample/job.go | 4 +- src/jobservice/job/models.go | 9 + src/jobservice/job/tracker.go | 208 +++++----- src/jobservice/job/tracker_test.go | 65 ++-- src/jobservice/lcm/controller.go | 137 ++++--- src/jobservice/lcm/controller_test.go | 60 ++- src/jobservice/mgt/manager.go | 17 +- src/jobservice/mgt/manager_test.go | 18 +- src/jobservice/period/basic_scheduler.go | 63 +-- src/jobservice/period/basic_scheduler_test.go | 22 +- src/jobservice/period/enqueuer.go | 55 ++- src/jobservice/period/enqueuer_test.go | 58 ++- src/jobservice/period/policy_store.go | 245 +----------- src/jobservice/period/policy_store_test.go | 79 +--- src/jobservice/period/scheduler.go | 10 +- src/jobservice/runner/redis.go | 121 +++--- src/jobservice/runner/redis_test.go | 5 +- src/jobservice/runtime/bootstrap.go | 9 +- src/jobservice/worker/cworker/c_worker.go | 33 +- src/jobservice/worker/cworker/reaper.go | 363 ++++++++++++++++++ src/jobservice/worker/cworker/reaper_test.go | 243 ++++++++++++ 30 files changed, 1684 insertions(+), 1023 deletions(-) create mode 100644 src/jobservice/common/list/list.go create mode 100644 src/jobservice/common/list/list_test.go create mode 100644 src/jobservice/common/rds/scripts.go create mode 100644 src/jobservice/worker/cworker/reaper.go create mode 100644 src/jobservice/worker/cworker/reaper_test.go diff --git a/src/jobservice/common/list/list.go b/src/jobservice/common/list/list.go new file mode 100644 index 00000000000..9f18d6648f0 --- /dev/null +++ b/src/jobservice/common/list/list.go @@ -0,0 +1,64 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package list + +import ( + "container/list" + "sync" +) + +// SyncList is a sync list based on the container/list +type SyncList struct { + // For synchronization + lock *sync.RWMutex + // Use interface slice as the backend data struct + l *list.List +} + +// New a sync list +func New() *SyncList { + return &SyncList{ + lock: &sync.RWMutex{}, + l: list.New(), + } +} + +// Iterate the list +func (l *SyncList) Iterate(f func(ele interface{}) bool) { + l.lock.RLock() + defer l.lock.RUnlock() + + // Get the front pointer + for e := l.l.Front(); e != nil; { + // Keep the next one + next := e.Next() + + if f(e.Value) { + l.l.Remove(e) + } + + e = next + } +} + +// Push the element to the back of the list +func (l *SyncList) Push(ele interface{}) { + if ele != nil { + l.lock.Lock() + defer l.lock.Unlock() + + l.l.PushBack(ele) + } +} diff --git a/src/jobservice/common/list/list_test.go b/src/jobservice/common/list/list_test.go new file mode 100644 index 00000000000..5ecd783524e --- /dev/null +++ b/src/jobservice/common/list/list_test.go @@ -0,0 +1,57 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package list + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/suite" +) + +type ListSuite struct { + suite.Suite + + l *SyncList +} + +func TestListSuite(t *testing.T) { + suite.Run(t, &ListSuite{}) +} + +func (suite *ListSuite) SetupSuite() { + suite.l = New() + + suite.l.Push("a0") + suite.l.Push("a1") + suite.l.Push("b0") + suite.l.Push("a2") + + suite.Equal(4, suite.l.l.Len()) +} + +func (suite *ListSuite) TestIterate() { + suite.l.Iterate(func(ele interface{}) bool { + if s, ok := ele.(string); ok { + if strings.HasPrefix(s, "b") { + return true + } + } + + return false + }) + + suite.Equal(3, suite.l.l.Len()) +} diff --git a/src/jobservice/common/rds/keys.go b/src/jobservice/common/rds/keys.go index 4f9f09ba472..38b496d3453 100644 --- a/src/jobservice/common/rds/keys.go +++ b/src/jobservice/common/rds/keys.go @@ -86,3 +86,33 @@ func KeyHookEventRetryQueue(namespace string) string { func KeyStatusUpdateRetryQueue(namespace string) string { return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "status_change_events") } + +// KeyJobTrackInProgress returns the key of in progress jobs tracking queue +func KeyJobTrackInProgress(namespace string) string { + return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "job_track:inprogress") +} + +// KeyJobs returns the key of the specified job queue +func KeyJobs(namespace, jobType string) string { + return fmt.Sprintf("%sjobs:%s", KeyNamespacePrefix(namespace), jobType) +} + +// KeyJobLock returns the key of lock for the specified job type. +func KeyJobLock(namespace string, jobType string) string { + return fmt.Sprintf("%s:lock", KeyJobs(namespace, jobType)) +} + +// KeyJobLockInfo returns the key of lock_info for the specified job type. +func KeyJobLockInfo(namespace string, jobType string) string { + return fmt.Sprintf("%s:lock_info", KeyJobs(namespace, jobType)) +} + +// KeyInProgressQueue returns the key of the in progress queue for the specified job type. +func KeyInProgressQueue(namespace string, jobType string, workerPoolID string) string { + return fmt.Sprintf("%s:%s:inprogress", KeyJobs(namespace, jobType), workerPoolID) +} + +// KeyWorkerPools returns the key of the worker pool +func KeyWorkerPools(namespace string) string { + return KeyNamespacePrefix(namespace) + "worker_pools" +} diff --git a/src/jobservice/common/rds/scripts.go b/src/jobservice/common/rds/scripts.go new file mode 100644 index 00000000000..ff844adfb1d --- /dev/null +++ b/src/jobservice/common/rds/scripts.go @@ -0,0 +1,273 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rds + +import ( + "fmt" + + "github.com/gomodule/redigo/redis" +) + +const ( + requeueKeysPerJob = 4 +) + +// luaFuncStCodeText is common lua script function +var luaFuncStCodeText = ` +-- for easily compare status +local stMap = { ['Pending'] = 0, ['Scheduled'] = 1, ['Running'] = 2, ['Success'] = 3, ['Stopped'] = 3, ['Error'] = 3 } + +local function stCode(status) + return stMap[status] +end +` + +// luaFuncCompareText is common lua script function +var luaFuncCompareText = ` +local function compare(status, revision, checkInT) + local sCode = stCode(status) + local aCode = stCode(ARGV[1]) + local aRev = tonumber(ARGV[2]) + local aCheckInT = tonumber(ARGV[3]) + + if revision < aRev or + ( revision == aRev and sCode < aCode ) or + ( revision == aRev and sCode == aCode and (not checkInT or checkInT < aCheckInT)) + then + return 'ok' + end + + return 'no' +end +` + +// Script used to set the status of the job +// +// KEY[1]: key of job stats +// KEY[2]: key of inprogress track +// ARGV[1]: status text +// ARGV[2]: stats revision +// ARGV[3]: update timestamp +// ARGV[4]: job ID +var setStatusScriptText = fmt.Sprintf(` +%s + +local res, st, rev, aCode, aRev + +res = redis.call('hmget', KEYS[1], 'status', 'revision') +if res then + st = res[1] + aCode = stCode(ARGV[1]) + rev = tonumber(res[2]) + aRev = tonumber(ARGV[2]) + + -- set same status repeatedly is allowed + if rev < aRev or ( rev == aRev and (stCode(st) < aCode or st == ARGV[1])) then + redis.call('hmset', KEYS[1], 'status', ARGV[1], 'update_time', ARGV[3]) + -- update inprogress track if necessary + if aCode == 3 then + -- final status + local c = redis.call('hincrby', KEYS[2], ARGV[4], -1) + -- lock count is 0, del it + if c <= 0 then + redis.call('hdel', KEYS[2], ARGV[4]) + end + + if ARGV[1] == 'Success' or ARGV[1] == 'Stopped' then + -- expire the job stats with shorter interval + redis.call('expire', KEYS[1], 86400) + end + end + + return 'ok' + end +end + +return st +`, luaFuncStCodeText) + +// SetStatusScript is lua script for setting job status atomically +var SetStatusScript = redis.NewScript(2, setStatusScriptText) + +// Used to check if the status info provided is still validate +// +// KEY[1]: key of job stats +// ARGV[1]: job status +// ARGV[2]: revision of job stats +// ARGV[3]: check in timestamp +var isStatusMatchScriptText = fmt.Sprintf(` +%s + +%s + +local res, st, rev, checkInAt, ack + +res = redis.call('hmget', KEYS[1], 'status', 'revision', 'check_in_at', 'ack') +if res then + st = res[1] + rev = tonumber(res[2]) + checkInAt = tonumber(res[3]) + ack = res[4] + + local reply = compare(st, rev, checkInAt) + + if reply == 'ok' then + if not ack then + return 'ok' + end + -- ack exists, compare with ack + local a = cjson.decode(ack) + + st = a['status'] + rev = a['revision'] + checkInAt = a['check_in_at'] + + local reply2 = compare(st, rev, checkInAt) + if reply2 == 'ok' then + return 'ok' + end + end +end + +return 'no' +`, luaFuncStCodeText, luaFuncCompareText) + +// CheckStatusMatchScript is lua script for checking if the provided status is still matching +// the backend status. +var CheckStatusMatchScript = redis.NewScript(1, isStatusMatchScriptText) + +// Used to set the hook ACK +// +// KEY[1]: key of job stats +// KEY[2]: key of inprogress track +// ARGV[1]: job status +// ARGV[2]: revision of job stats +// ARGV[3]: check in timestamp +// ARGV[4]: job ID +var hookAckScriptText = fmt.Sprintf(` +%s + +%s + +local function canSetAck(jk, nrev) + local res = redis.call('hmget', jk, 'revision', 'ack') + if res then + local rev = tonumber(res[1]) + local ackv = res[2] + + if ackv then + -- ack existing + local ack = cjson.decode(ackv) + local cmp = compare(ack['status'], ack['revision'], ack['check_in_at']) + if cmp == 'ok' then + return 'ok' + end + else + -- no ack yet + if rev <= nrev then + return 'ok' + end + end + end + + return nil +end + +if canSetAck(KEYS[1], tonumber(ARGV[2])) ~= 'ok' then + return 'none' +end + +-- can-set-ack check is ok +-- set new ack +local newAck = {['status'] = ARGV[1], ['revision'] = tonumber(ARGV[2]), ['check_in_at'] = tonumber(ARGV[3])} +local ackJson = cjson.encode(newAck) + +redis.call('hset', KEYS[1], 'ack', ackJson) + +-- update the inprogress track +if stCode(ARGV[1]) == 3 then + -- final status + local c = redis.call('hincrby', KEYS[2], ARGV[4], -1) + -- lock count is 0, del it + if c <= 0 then + redis.call('hdel', KEYS[2], ARGV[4]) + end +end + +return 'ok' +`, luaFuncStCodeText, luaFuncCompareText) + +// HookAckScript is defined to set the hook event ACK in the job stats map +var HookAckScript = redis.NewScript(2, hookAckScriptText) + +// Used to reset job status +// +// KEYS[1]: key of job stats +// KEYS[2]: key of inprogress job track +// ARGV[1]: job ID +// ARGV[2]: start status +// ARGV[3]: revision of job stats +var statusResetScriptText = ` +local now = tonumber(ARGV[3]) + +-- reset status and revision +redis.call('hmset', KEYS[1], 'status', ARGV[2], 'revision', now, 'update_time', now) +redis.call('hdel', KEYS[1], 'ack', 'check_in', 'check_in_at') + +-- reset inprogress track +redis.call('hset', KEYS[2], ARGV[1], 2) +` + +// StatusResetScript is lua script to reset the job stats +var StatusResetScript = redis.NewScript(2, statusResetScriptText) + +// Copy from upstream worker framework +// Used by the reaper to re-enqueue jobs that were in progress +// +// KEYS[1] = the 1st job's in progress queue +// KEYS[2] = the 1st job's job queue +// KEYS[3] = the 2nd job's in progress queue +// KEYS[4] = the 2nd job's job queue +// ... +// KEYS[N] = the last job's in progress queue +// KEYS[N+1] = the last job's job queue +// ARGV[1] = workerPoolID for job queue +var redisLuaReenqueueJob = fmt.Sprintf(` +local function releaseLock(lockKey, lockInfoKey, workerPoolID) + redis.call('decr', lockKey) + redis.call('hincrby', lockInfoKey, workerPoolID, -1) +end + +local keylen = #KEYS +local res, jobQueue, inProgQueue, workerPoolID, lockKey, lockInfoKey +workerPoolID = ARGV[1] + +for i=1,keylen,%d do + inProgQueue = KEYS[i] + jobQueue = KEYS[i+1] + lockKey = KEYS[i+2] + lockInfoKey = KEYS[i+3] + res = redis.call('rpoplpush', inProgQueue, jobQueue) + if res then + releaseLock(lockKey, lockInfoKey, workerPoolID) + return {res, inProgQueue, jobQueue} + end +end +return nil`, requeueKeysPerJob) + +// RedisLuaReenqueueScript returns redis script of redisLuaReenqueueJob +func RedisLuaReenqueueScript(jobTypesCount int) *redis.Script { + return redis.NewScript(jobTypesCount*requeueKeysPerJob, redisLuaReenqueueJob) +} diff --git a/src/jobservice/common/rds/utils.go b/src/jobservice/common/rds/utils.go index fcd009be395..ff6482c94d9 100644 --- a/src/jobservice/common/rds/utils.go +++ b/src/jobservice/common/rds/utils.go @@ -2,10 +2,11 @@ package rds import ( "fmt" - "github.com/garyburd/redigo/redis" + "time" + "github.com/goharbor/harbor/src/jobservice/common/utils" + "github.com/gomodule/redigo/redis" "github.com/pkg/errors" - "time" ) // ErrNoElements is a pre defined error to describe the case that no elements got diff --git a/src/jobservice/hook/hook_agent.go b/src/jobservice/hook/hook_agent.go index ff68f0f9bad..eaf637ac7a4 100644 --- a/src/jobservice/hook/hook_agent.go +++ b/src/jobservice/hook/hook_agent.go @@ -17,31 +17,22 @@ package hook import ( "context" "encoding/json" - "math/rand" "net/url" - "time" - - "github.com/pkg/errors" - - "github.com/goharbor/harbor/src/jobservice/job" - "sync" + "time" "github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/env" - "github.com/goharbor/harbor/src/jobservice/lcm" + "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/logger" "github.com/gomodule/redigo/redis" + "github.com/pkg/errors" ) const ( - // Influenced by the worker number setting - maxEventChanBuffer = 1024 - // Max concurrent client handlers - maxHandlers = 5 // The max time for expiring the retrying events - // 180 days - maxEventExpireTime = 3600 * 24 * 180 + // 1 day + maxEventExpireTime = 3600 * 24 // Waiting a short while if any errors occurred shortLoopInterval = 5 * time.Second // Waiting for long while if no retrying elements found @@ -52,10 +43,9 @@ const ( type Agent interface { // Trigger hooks Trigger(evt *Event) error - // Serves events now + + // Serves retry loop now Serve() error - // Attach a job life cycle controller - Attach(ctl lcm.Controller) } // Event contains the hook URL and the data @@ -95,47 +85,53 @@ type basicAgent struct { context context.Context namespace string client Client - ctl lcm.Controller - events chan *Event - tokens chan bool redisPool *redis.Pool wg *sync.WaitGroup } // NewAgent is constructor of basic agent func NewAgent(ctx *env.Context, ns string, redisPool *redis.Pool) Agent { - tks := make(chan bool, maxHandlers) - // Put tokens - for i := 0; i < maxHandlers; i++ { - tks <- true - } return &basicAgent{ context: ctx.SystemContext, namespace: ns, client: NewClient(ctx.SystemContext), - events: make(chan *Event, maxEventChanBuffer), - tokens: tks, redisPool: redisPool, wg: ctx.WG, } } -// Attach a job life cycle controller -func (ba *basicAgent) Attach(ctl lcm.Controller) { - ba.ctl = ctl -} - // Trigger implements the same method of interface @Agent func (ba *basicAgent) Trigger(evt *Event) error { if evt == nil { - return errors.New("nil event") + return errors.New("nil web hook event") } if err := evt.Validate(); err != nil { - return err + return errors.Wrap(err, "trigger error") } - ba.events <- evt + // Treat hook event is success if it is successfully sent or cached in the retry queue. + if err := ba.client.SendEvent(evt); err != nil { + // Push event to the retry queue + if er := ba.pushForRetry(evt); er != nil { + // Failed to push to the hook event retry queue, return error with all context + return errors.Wrap(er, err.Error()) + } + + logger.Warningf("Send hook event '%s' to '%s' failed with error: %s; push hook event to the queue for retrying later", evt.Message, evt.URL, err) + // Treat as successful hook event as the event has been put into the retry queue for future resending. + return nil + } + + // Mark event hook ACK including "revision", "status" and "check_in_at" in the job stats to indicate + // the related hook event has been successfully fired. + // The ACK can be used by the reaper to justify if the hook event should be resent again. + // The failure of persisting this ACK may cause duplicated hook event resending issue, which + // can be ignored. + if err := ba.ack(evt); err != nil { + // Just log error + logger.Error(errors.Wrap(err, "trigger")) + } return nil } @@ -144,64 +140,14 @@ func (ba *basicAgent) Trigger(evt *Event) error { // Termination depends on the system context // Blocking call func (ba *basicAgent) Serve() error { - if ba.ctl == nil { - return errors.New("nil life cycle controller of hook agent") - } - ba.wg.Add(1) + go ba.loopRetry() logger.Info("Hook event retrying loop is started") - ba.wg.Add(1) - go ba.serve() - logger.Info("Basic hook agent is started") - return nil } -func (ba *basicAgent) serve() { - defer func() { - logger.Info("Basic hook agent is stopped") - ba.wg.Done() - }() - - for { - select { - case evt := <-ba.events: - // if exceed, wait here - // avoid too many request connections at the same time - <-ba.tokens - go func(evt *Event) { - defer func() { - ba.tokens <- true // return token - }() - - if err := ba.client.SendEvent(evt); err != nil { - logger.Errorf("Send hook event '%s' to '%s' failed with error: %s; push to the queue for retrying later", evt.Message, evt.URL, err) - // Push event to the retry queue - if err := ba.pushForRetry(evt); err != nil { - // Failed to push to the retry queue, let's directly push it - // to the event channel of this node with reasonable backoff time. - logger.Errorf("Failed to push hook event to the retry queue: %s", err) - - // Put to the event chan after waiting for a reasonable while, - // waiting is important, it can avoid sending large scale failure expecting - // requests in a short while. - // As 'pushForRetry' has checked the timestamp and expired event - // will be directly discarded and nil error is returned, no need to - // check it again here. - <-time.After(time.Duration(rand.Int31n(55)+5) * time.Second) - ba.events <- evt - } - } - }(evt) - - case <-ba.context.Done(): - return - } - } -} - func (ba *basicAgent) pushForRetry(evt *Event) error { if evt == nil { // do nothing @@ -248,11 +194,7 @@ func (ba *basicAgent) loopRetry() { ba.wg.Done() }() - token := make(chan bool, 1) - token <- true - for { - <-token if err := ba.reSend(); err != nil { waitInterval := shortLoopInterval if err == rds.ErrNoElements { @@ -270,44 +212,47 @@ func (ba *basicAgent) loopRetry() { return } } - - // Put token back - token <- true } } func (ba *basicAgent) reSend() error { - evt, err := ba.popMinOne() - if err != nil { - return err - } + conn := ba.redisPool.Get() + defer func() { + if err := conn.Close(); err != nil { + logger.Error(errors.Wrap(err, "resend")) + } + }() - jobID, status, err := extractJobID(evt.Data) + // Pick up one queued event for resending + evt, err := ba.popMinOne(conn) if err != nil { return err } - t, err := ba.ctl.Track(jobID) - if err != nil { - return err + // Args for executing script + args := []interface{}{ + rds.KeyJobStats(ba.namespace, evt.Data.JobID), + evt.Data.Status, + evt.Data.Metadata.Revision, + evt.Data.Metadata.CheckInAt, } - diff := status.Compare(job.Status(t.Job().Info.Status)) - if diff > 0 || - (diff == 0 && t.Job().Info.CheckIn != evt.Data.CheckIn) { - ba.events <- evt - return nil + // If failed to check the status matching, just ignore it, continue the resending + reply, err := redis.String(rds.CheckStatusMatchScript.Do(conn, args...)) + if err != nil { + // Log error + logger.Error(errors.Wrap(err, "resend")) + } else { + if reply != "ok" { + return errors.Errorf("outdated hook event: %s", evt.Message) + } } - return errors.Errorf("outdated hook event: %s, latest job status: %s", evt.Message, t.Job().Info.Status) + return ba.Trigger(evt) } -func (ba *basicAgent) popMinOne() (*Event, error) { - conn := ba.redisPool.Get() - defer func() { - _ = conn.Close() - }() - +// popMinOne picks up one event for retrying +func (ba *basicAgent) popMinOne(conn redis.Conn) (*Event, error) { key := rds.KeyHookEventRetryQueue(ba.namespace) minOne, err := rds.ZPopMin(conn, key) if err != nil { @@ -327,17 +272,33 @@ func (ba *basicAgent) popMinOne() (*Event, error) { return evt, nil } -// Extract the job ID and status from the event data field -// First return is job ID -// Second return is job status -// Last one is error -func extractJobID(data *job.StatusChange) (string, job.Status, error) { - if data != nil && len(data.JobID) > 0 { - status := job.Status(data.Status) - if status.Validate() == nil { - return data.JobID, status, nil +// ack hook event +func (ba *basicAgent) ack(evt *Event) error { + conn := ba.redisPool.Get() + defer func() { + if err := conn.Close(); err != nil { + logger.Error(errors.Wrap(err, "ack")) } + }() + + k := rds.KeyJobStats(ba.namespace, evt.Data.JobID) + k2 := rds.KeyJobTrackInProgress(ba.namespace) + reply, err := redis.String(rds.HookAckScript.Do( + conn, + k, + k2, + evt.Data.Status, + evt.Data.Metadata.Revision, + evt.Data.Metadata.CheckInAt, + evt.Data.JobID, + )) + if err != nil { + return errors.Wrap(err, "ack") } - return "", "", errors.New("malform job status change data") + if reply != "ok" { + return errors.Errorf("no ack done for event: %s", evt.Message) + } + + return nil } diff --git a/src/jobservice/hook/hook_agent_test.go b/src/jobservice/hook/hook_agent_test.go index 32bfb437459..313b71ad0ea 100644 --- a/src/jobservice/hook/hook_agent_test.go +++ b/src/jobservice/hook/hook_agent_test.go @@ -16,35 +16,34 @@ package hook import ( "context" - "fmt" - "net/http" - "net/http/httptest" - "sync/atomic" "testing" "time" + "github.com/goharbor/harbor/src/jobservice/common/list" + + "github.com/goharbor/harbor/src/jobservice/common/utils" + + "github.com/pkg/errors" + "github.com/goharbor/harbor/src/jobservice/common/rds" - "github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/job" - "github.com/goharbor/harbor/src/jobservice/lcm" "github.com/goharbor/harbor/src/jobservice/tests" "github.com/gomodule/redigo/redis" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "sync" ) // HookAgentTestSuite tests functions of hook agent type HookAgentTestSuite struct { suite.Suite - pool *redis.Pool namespace string - lcmCtl lcm.Controller + pool *redis.Pool + agent *basicAgent - envContext *env.Context - cancel context.CancelFunc + event *Event + jid string } // TestHookAgentTestSuite is entry of go test @@ -57,14 +56,11 @@ func (suite *HookAgentTestSuite) SetupSuite() { suite.pool = tests.GiveMeRedisPool() suite.namespace = tests.GiveMeTestNamespace() - ctx, cancel := context.WithCancel(context.Background()) - suite.envContext = &env.Context{ - SystemContext: ctx, - WG: new(sync.WaitGroup), + suite.agent = &basicAgent{ + context: context.TODO(), + namespace: suite.namespace, + redisPool: suite.pool, } - suite.cancel = cancel - - suite.lcmCtl = lcm.NewController(suite.envContext, suite.namespace, suite.pool, func(hookURL string, change *job.StatusChange) error { return nil }) } // TearDownSuite prepares test suites @@ -77,126 +73,121 @@ func (suite *HookAgentTestSuite) TearDownSuite() { _ = tests.ClearAll(suite.namespace, conn) } +func (suite *HookAgentTestSuite) SetupTest() { + suite.jid = utils.MakeIdentifier() + rev := time.Now().Unix() + stats := &job.Stats{ + Info: &job.StatsInfo{ + JobID: suite.jid, + Status: job.RunningStatus.String(), + Revision: rev, + JobKind: job.KindGeneric, + JobName: job.SampleJob, + }, + } + t := job.NewBasicTrackerWithStats(context.TODO(), stats, suite.namespace, suite.pool, nil, list.New()) + err := t.Save() + suite.NoError(err, "mock job stats") + + suite.event = &Event{ + URL: "http://domian.com", + Message: "HookAgentTestSuite", + Timestamp: time.Now().Unix(), + Data: &job.StatusChange{ + JobID: suite.jid, + Status: job.SuccessStatus.String(), + Metadata: &job.StatsInfo{ + JobID: suite.jid, + Status: job.SuccessStatus.String(), + Revision: rev, + JobKind: job.KindGeneric, + JobName: job.SampleJob, + }, + }, + } +} + +func (suite *HookAgentTestSuite) TearDownTest() { + conn := suite.pool.Get() + defer func() { + err := conn.Close() + suite.NoError(err, "close redis connection") + }() + + k := rds.KeyHookEventRetryQueue(suite.namespace) + _, err := conn.Do("DEL", k) + suite.NoError(err, "tear down test cases") +} + // TestEventSending ... func (suite *HookAgentTestSuite) TestEventSending() { - done := make(chan bool, 1) - - expected := uint32(1300) // >1024 max - count := uint32(0) - counter := &count - - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - defer func() { - c := atomic.AddUint32(counter, 1) - if c == expected { - done <- true - } - }() - _, _ = fmt.Fprintln(w, "ok") - })) - defer ts.Close() - - // in case test failed and avoid dead lock - go func() { - <-time.After(time.Duration(10) * time.Second) - done <- true // time out - }() + mc := &mockClient{} + mc.On("SendEvent", suite.event).Return(nil) + suite.agent.client = mc - agent := NewAgent(suite.envContext, suite.namespace, suite.pool) - agent.Attach(suite.lcmCtl) - err := agent.Serve() - require.NoError(suite.T(), err, "agent serve: nil error expected but got %s", err) - - go func() { - defer func() { - suite.cancel() - }() - - for i := uint32(0); i < expected; i++ { - changeData := &job.StatusChange{ - JobID: fmt.Sprintf("job-%d", i), - Status: "running", - } - - evt := &Event{ - URL: ts.URL, - Message: fmt.Sprintf("status of job %s change to %s", changeData.JobID, changeData.Status), - Data: changeData, - Timestamp: time.Now().Unix(), - } - - err := agent.Trigger(evt) - require.Nil(suite.T(), err, "agent trigger: nil error expected but got %s", err) - } - - // Check results - <-done - require.Equal(suite.T(), expected, count, "expected %d hook events but only got %d", expected, count) - }() + err := suite.agent.Trigger(suite.event) + require.Nil(suite.T(), err, "agent trigger: nil error expected but got %s", err) - // Wait - suite.envContext.WG.Wait() + // check + suite.checkStatus() +} + +// TestEventSending ... +func (suite *HookAgentTestSuite) TestEventSendingError() { + mc := &mockClient{} + mc.On("SendEvent", suite.event).Return(errors.New("internal server error: for testing")) + suite.agent.client = mc + + err := suite.agent.Trigger(suite.event) + + // Failed to send by client, it should be put into retry queue, check it + // The return should still be nil + suite.NoError(err, "agent trigger: nil error expected but got %s", err) + suite.checkRetryQueue(1) } // TestRetryAndPopMin ... func (suite *HookAgentTestSuite) TestRetryAndPopMin() { - ctx := context.Background() + mc := &mockClient{} + mc.On("SendEvent", suite.event).Return(nil) + suite.agent.client = mc - tks := make(chan bool, maxHandlers) - // Put tokens - for i := 0; i < maxHandlers; i++ { - tks <- true - } + err := suite.agent.pushForRetry(suite.event) + suite.NoError(err, "push event for retry") - agent := &basicAgent{ - context: ctx, - namespace: suite.namespace, - client: NewClient(ctx), - events: make(chan *Event, maxEventChanBuffer), - tokens: tks, - redisPool: suite.pool, - } - agent.Attach(suite.lcmCtl) + err = suite.agent.reSend() + require.NoError(suite.T(), err, "resend error: %v", err) - changeData := &job.StatusChange{ - JobID: "fake_job_ID", - Status: job.RunningStatus.String(), - } + // Check + suite.checkRetryQueue(0) + suite.checkStatus() +} - evt := &Event{ - URL: "https://fake.js.com", - Message: fmt.Sprintf("status of job %s change to %s", changeData.JobID, changeData.Status), - Data: changeData, - Timestamp: time.Now().Unix(), - } +func (suite *HookAgentTestSuite) checkStatus() { + t := job.NewBasicTrackerWithID(context.TODO(), suite.jid, suite.namespace, suite.pool, nil, list.New()) + err := t.Load() + suite.NoError(err, "load updated job stats") + suite.Equal(job.SuccessStatus.String(), t.Job().Info.HookAck.Status, "ack status") +} - // Mock job stats +func (suite *HookAgentTestSuite) checkRetryQueue(size int) { conn := suite.pool.Get() defer func() { - _ = conn.Close() + err := conn.Close() + suite.NoError(err, "close redis connection") }() - key := rds.KeyJobStats(suite.namespace, "fake_job_ID") - _, err := conn.Do("HSET", key, "status", job.SuccessStatus.String()) - require.Nil(suite.T(), err, "prepare job stats: nil error returned but got %s", err) - - err = agent.pushForRetry(evt) - require.Nil(suite.T(), err, "push for retry: nil error expected but got %s", err) - - err = agent.reSend() - require.Error(suite.T(), err, "resend: non nil error expected but got nil") - assert.Equal(suite.T(), 0, len(agent.events), "the hook event should be discard but actually not") - - // Change status - _, err = conn.Do("HSET", key, "status", job.PendingStatus.String()) - require.Nil(suite.T(), err, "prepare job stats: nil error returned but got %s", err) - - err = agent.pushForRetry(evt) - require.Nil(suite.T(), err, "push for retry: nil error expected but got %s", err) - err = agent.reSend() - require.Nil(suite.T(), err, "resend: nil error should be returned but got %s", err) + k := rds.KeyHookEventRetryQueue(suite.namespace) + c, err := redis.Int(conn.Do("ZCARD", k)) + suite.NoError(err, "check retry queue") + suite.Equal(size, c, "retry queue count") +} - <-time.After(time.Duration(1) * time.Second) +type mockClient struct { + mock.Mock +} - assert.Equal(suite.T(), 1, len(agent.events), "the hook event should be requeued but actually not: %d", len(agent.events)) +func (mc *mockClient) SendEvent(evt *Event) error { + args := mc.Called(evt) + return args.Error(0) } diff --git a/src/jobservice/hook/hook_client_test.go b/src/jobservice/hook/hook_client_test.go index 12f9b17d6fa..91bdc004ecf 100644 --- a/src/jobservice/hook/hook_client_test.go +++ b/src/jobservice/hook/hook_client_test.go @@ -17,14 +17,15 @@ import ( "context" "encoding/json" "fmt" - "github.com/goharbor/harbor/src/jobservice/job" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/suite" "io/ioutil" "net/http" "net/http/httptest" "testing" "time" + + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" ) // HookClientTestSuite tests functions of hook client @@ -57,7 +58,7 @@ func (suite *HookClientTestSuite) SetupSuite() { return } - fmt.Fprintln(w, "ok") + _, _ = fmt.Fprintln(w, "ok") })) } diff --git a/src/jobservice/job/impl/context_test.go b/src/jobservice/job/impl/context_test.go index 77c1ac734cd..7d608013cd3 100644 --- a/src/jobservice/job/impl/context_test.go +++ b/src/jobservice/job/impl/context_test.go @@ -19,6 +19,8 @@ import ( "os" "testing" + "github.com/goharbor/harbor/src/jobservice/common/list" + comcfg "github.com/goharbor/harbor/src/common/config" "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/config" @@ -87,6 +89,7 @@ func (suite *ContextImplTestSuite) SetupSuite() { suite.namespace, suite.pool, nil, + list.New(), ) err := suite.tracker.Save() diff --git a/src/jobservice/job/impl/sample/job.go b/src/jobservice/job/impl/sample/job.go index be860ec0e12..e4c32cb0453 100644 --- a/src/jobservice/job/impl/sample/job.go +++ b/src/jobservice/job/impl/sample/job.go @@ -80,8 +80,8 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error { ctx.Checkin("progress data: %60") // HOLD ON FOR A WHILE - logger.Warning("Holding for 10 seconds") - <-time.After(10 * time.Second) + logger.Warning("Holding for 30 seconds") + <-time.After(30 * time.Second) if cmd, ok := ctx.OPCommand(); ok { if cmd == job.StopCommand { diff --git a/src/jobservice/job/models.go b/src/jobservice/job/models.go index 1ae70679e76..35a1a122911 100644 --- a/src/jobservice/job/models.go +++ b/src/jobservice/job/models.go @@ -68,6 +68,14 @@ type StatsInfo struct { NumericPID int64 `json:"numeric_policy_id,omitempty"` // The numeric policy ID of the periodic job Parameters Parameters `json:"parameters,omitempty"` Revision int64 `json:"revision,omitempty"` // For differentiating the each retry of the same job + HookAck *ACK `json:"ack,omitempty"` +} + +// ACK is the acknowledge of hook event +type ACK struct { + Status string `json:"status"` + Revision int64 `json:"revision"` + CheckInAt int64 `json:"check_in_at"` } // ActionRequest defines for triggering job action like stop/cancel. @@ -87,6 +95,7 @@ type StatusChange struct { type SimpleStatusChange struct { JobID string `json:"job_id"` TargetStatus string `json:"target_status"` + Revision int64 `json:"revision"` } // Validate the job stats diff --git a/src/jobservice/job/tracker.go b/src/jobservice/job/tracker.go index cbd3b2011ce..1d1b1a0a89f 100644 --- a/src/jobservice/job/tracker.go +++ b/src/jobservice/job/tracker.go @@ -21,6 +21,8 @@ import ( "strconv" "time" + "github.com/goharbor/harbor/src/jobservice/common/list" + "github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/errs" @@ -32,8 +34,6 @@ import ( const ( // Try best to keep the job stats data but anyway clear it after a reasonable time statDataExpireTime = 7 * 24 * 3600 - // 1 hour to discard the job stats of success jobs - statDataExpireTimeForSuccess = 3600 ) // Tracker is designed to track the life cycle of the job described by the stats @@ -102,6 +102,9 @@ type Tracker interface { // Reset the status to `pending` Reset() error + + // Fire status hook to report the current status + FireHook() error } // basicTracker implements Tracker interface based on redis @@ -112,6 +115,7 @@ type basicTracker struct { jobID string jobStats *Stats callback HookCallback + retryList *list.SyncList } // NewBasicTrackerWithID builds a tracker with the provided job ID @@ -121,6 +125,7 @@ func NewBasicTrackerWithID( ns string, pool *redis.Pool, callback HookCallback, + retryList *list.SyncList, ) Tracker { return &basicTracker{ namespace: ns, @@ -128,6 +133,7 @@ func NewBasicTrackerWithID( pool: pool, jobID: jobID, callback: callback, + retryList: retryList, } } @@ -138,6 +144,7 @@ func NewBasicTrackerWithStats( ns string, pool *redis.Pool, callback HookCallback, + retryList *list.SyncList, ) Tracker { return &basicTracker{ namespace: ns, @@ -146,6 +153,7 @@ func NewBasicTrackerWithStats( jobStats: stats, jobID: stats.Info.JobID, callback: callback, + retryList: retryList, } } @@ -245,67 +253,44 @@ func (bt *basicTracker) Expire() error { // Run job // Either one is failed, the final return will be marked as failed. func (bt *basicTracker) Run() error { - err := bt.compareAndSet(RunningStatus) - if !errs.IsStatusMismatchError(err) { - bt.refresh(RunningStatus) - if er := bt.fireHookEvent(RunningStatus); err == nil && er != nil { - return er - } + if err := bt.setStatus(RunningStatus); err != nil { + return errors.Wrap(err, "run") } - return err + return nil } // Stop job // Stop is final status, if failed to do, retry should be enforced. // Either one is failed, the final return will be marked as failed. func (bt *basicTracker) Stop() error { - err := bt.UpdateStatusWithRetry(StoppedStatus) - if !errs.IsStatusMismatchError(err) { - bt.refresh(StoppedStatus) - if er := bt.fireHookEvent(StoppedStatus); err == nil && er != nil { - return er - } + if err := bt.setStatus(StoppedStatus); err != nil { + return errors.Wrap(err, "stop") } - return err + return nil } // Fail job // Fail is final status, if failed to do, retry should be enforced. // Either one is failed, the final return will be marked as failed. func (bt *basicTracker) Fail() error { - err := bt.UpdateStatusWithRetry(ErrorStatus) - if !errs.IsStatusMismatchError(err) { - bt.refresh(ErrorStatus) - if er := bt.fireHookEvent(ErrorStatus); err == nil && er != nil { - return er - } + if err := bt.setStatus(ErrorStatus); err != nil { + return errors.Wrap(err, "fail") } - return err + return nil } // Succeed job // Succeed is final status, if failed to do, retry should be enforced. // Either one is failed, the final return will be marked as failed. func (bt *basicTracker) Succeed() error { - err := bt.UpdateStatusWithRetry(SuccessStatus) - if !errs.IsStatusMismatchError(err) { - bt.refresh(SuccessStatus) - - // Expire the stat data of the successful job - if er := bt.expire(statDataExpireTimeForSuccess); er != nil { - // Only logged - logger.Errorf("Expire stat data for the success job `%s` failed with error: %s", bt.jobID, er) - } - - if er := bt.fireHookEvent(SuccessStatus); err == nil && er != nil { - return er - } + if err := bt.setStatus(SuccessStatus); err != nil { + return errors.Wrap(err, "succeed") } - return err + return nil } // Save the stats of job tracked by this tracker @@ -362,9 +347,13 @@ func (bt *basicTracker) Save() (err error) { // Set the first revision args = append(args, "revision", time.Now().Unix()) + // ACK data is saved/updated not via tracker, so ignore the ACK saving + // Do it in a transaction err = conn.Send("MULTI") err = conn.Send("HMSET", args...) + // Set inprogress track lock + err = conn.Send("HSET", rds.KeyJobTrackInProgress(bt.namespace), stats.Info.JobID, 2) // If job kind is periodic job, expire time should not be set // If job kind is scheduled job, expire time should be runAt+ @@ -404,15 +393,14 @@ func (bt *basicTracker) Save() (err error) { func (bt *basicTracker) UpdateStatusWithRetry(targetStatus Status) error { err := bt.compareAndSet(targetStatus) if err != nil { - // Status mismatching error will be ignored + // Status mismatching error will be directly ignored as the status has already been outdated if !errs.IsStatusMismatchError(err) { - // Push to the retrying Q - if er := bt.pushToQueueForRetry(targetStatus); er != nil { - logger.Errorf("push job status update request to retry queue error: %s", er) - // If failed to put it into the retrying Q in case, let's downgrade to retry in current process - // by recursively call in goroutines. - bt.retryUpdateStatus(targetStatus) - } + // Push to the retrying daemon + bt.retryList.Push(SimpleStatusChange{ + JobID: bt.jobID, + TargetStatus: targetStatus.String(), + Revision: bt.jobStats.Info.Revision, + }) } } @@ -428,15 +416,48 @@ func (bt *basicTracker) Reset() error { }() now := time.Now().Unix() - err := bt.Update( - "status", + if _, err := rds.StatusResetScript.Do( + conn, + rds.KeyJobStats(bt.namespace, bt.jobStats.Info.JobID), + rds.KeyJobTrackInProgress(bt.namespace), + bt.jobStats.Info.JobID, PendingStatus.String(), - "revision", now, + ); err != nil { + return errors.Wrap(err, "reset") + } + + // Sync current tracker + bt.jobStats.Info.Status = PendingStatus.String() + bt.jobStats.Info.Revision = now + bt.jobStats.Info.UpdateTime = now + bt.jobStats.Info.CheckIn = "" + bt.jobStats.Info.CheckInAt = 0 + + return nil +} + +// FireHook fires status hook event to report current status +func (bt *basicTracker) FireHook() error { + return bt.fireHookEvent( + Status(bt.jobStats.Info.Status), + bt.jobStats.Info.CheckIn, ) - if err == nil { - bt.refresh(PendingStatus) - bt.jobStats.Info.Revision = now +} + +// setStatus sets the job status to the target status and fire status change hook +func (bt *basicTracker) setStatus(status Status) error { + err := bt.UpdateStatusWithRetry(status) + if !errs.IsStatusMismatchError(err) { + bt.refresh(status) + if er := bt.fireHookEvent(status); er != nil { + // Add more error context + if err != nil { + return errors.Wrap(er, err.Error()) + } + + return er + } } return err @@ -480,72 +501,32 @@ func (bt *basicTracker) fireHookEvent(status Status, checkIn ...string) error { return nil } -func (bt *basicTracker) pushToQueueForRetry(targetStatus Status) error { - simpleStatusChange := &SimpleStatusChange{ - JobID: bt.jobID, - TargetStatus: targetStatus.String(), - } - - rawJSON, err := json.Marshal(simpleStatusChange) - if err != nil { - return err - } - - conn := bt.pool.Get() - defer func() { - _ = conn.Close() - }() - - key := rds.KeyStatusUpdateRetryQueue(bt.namespace) - args := []interface{}{key, "NX", time.Now().Unix(), rawJSON} - - _, err = conn.Do("ZADD", args...) - - return err -} - -func (bt *basicTracker) retryUpdateStatus(targetStatus Status) { - go func() { - select { - case <-time.After(time.Duration(5)*time.Minute + time.Duration(rand.Int31n(13))*time.Second): - // Check the update timestamp - if time.Now().Unix()-bt.jobStats.Info.UpdateTime < statDataExpireTime-24*3600 { - if err := bt.compareAndSet(targetStatus); err != nil { - logger.Errorf("Retry to update job status error: %s", err) - bt.retryUpdateStatus(targetStatus) - } - // Success - } - return - case <-bt.context.Done(): - return // terminated - } - }() -} - func (bt *basicTracker) compareAndSet(targetStatus Status) error { conn := bt.pool.Get() defer func() { - _ = conn.Close() + closeConn(conn) }() rootKey := rds.KeyJobStats(bt.namespace, bt.jobID) - - st, err := getStatus(conn, rootKey) + trackKey := rds.KeyJobTrackInProgress(bt.namespace) + reply, err := redis.String(rds.SetStatusScript.Do( + conn, + rootKey, + trackKey, + targetStatus.String(), + bt.jobStats.Info.Revision, + time.Now().Unix(), + bt.jobID, + )) if err != nil { - return err + return errors.Wrap(err, "compare and set status error") } - diff := st.Compare(targetStatus) - if diff > 0 { - return errs.StatusMismatchError(st.String(), targetStatus.String()) - } - if diff == 0 { - // Desired matches actual - return nil + if reply != "ok" { + return errs.StatusMismatchError(reply, targetStatus.String()) } - return setStatus(conn, rootKey, targetStatus) + return nil } // retrieve the stats of job tracked by this tracker from the backend data @@ -626,11 +607,20 @@ func (bt *basicTracker) retrieve() error { params := make(Parameters) if err := json.Unmarshal([]byte(value), ¶ms); err == nil { res.Info.Parameters = params + } else { + logger.Error(errors.Wrap(err, "retrieve: tracker")) } break case "revision": res.Info.Revision = parseInt64(value) break + case "ack": + ack := &ACK{} + if err := json.Unmarshal([]byte(value), ack); err == nil { + res.Info.HookAck = ack + } else { + logger.Error(errors.Wrap(err, "retrieve: tracker")) + } default: break } @@ -663,7 +653,7 @@ func (bt *basicTracker) expire(expireTime int64) error { func getStatus(conn redis.Conn, key string) (Status, error) { values, err := rds.HmGet(conn, key, "status") if err != nil { - return "", err + return "", errors.Wrap(err, "get status error") } if len(values) == 1 { @@ -676,10 +666,6 @@ func getStatus(conn redis.Conn, key string) (Status, error) { return "", errors.New("malformed status data returned") } -func setStatus(conn redis.Conn, key string, status Status) error { - return rds.HmSet(conn, key, "status", status.String(), "update_time", time.Now().Unix()) -} - func closeConn(conn redis.Conn) { if conn != nil { if err := conn.Close(); err != nil { diff --git a/src/jobservice/job/tracker_test.go b/src/jobservice/job/tracker_test.go index 7c08812c10c..58f7055968f 100644 --- a/src/jobservice/job/tracker_test.go +++ b/src/jobservice/job/tracker_test.go @@ -19,6 +19,8 @@ import ( "testing" "time" + "github.com/goharbor/harbor/src/jobservice/common/list" + "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/tests" "github.com/gomodule/redigo/redis" @@ -77,6 +79,7 @@ func (suite *TrackerTestSuite) TestTracker() { func(hookURL string, change *StatusChange) error { return nil }, + list.New(), ) err := tracker.Save() @@ -107,12 +110,19 @@ func (suite *TrackerTestSuite) TestTracker() { assert.Error(suite.T(), err, "run: non nil error expected but got nil") err = tracker.CheckIn("check in") assert.Nil(suite.T(), err, "check in: nil error expected but got %s", err) + // check in is allowed to be repeated + err = tracker.CheckIn("check in2") + assert.Nil(suite.T(), err, "check in2: nil error expected but got %s", err) + err = tracker.Succeed() assert.Nil(suite.T(), err, "succeed: nil error expected but got %s", err) - err = tracker.Stop() - assert.Nil(suite.T(), err, "stop: nil error expected but got %s", err) + // same status is allowed to update + err = tracker.Succeed() + assert.Nil(suite.T(), err, "succeed again: nil error expected but got %s", err) + + // final status can be set only once err = tracker.Fail() - assert.Nil(suite.T(), err, "fail: nil error expected but got %s", err) + assert.Error(suite.T(), err, "fail: error expected but got nil") t := NewBasicTrackerWithID( context.TODO(), @@ -122,10 +132,26 @@ func (suite *TrackerTestSuite) TestTracker() { func(hookURL string, change *StatusChange) error { return nil }, + list.New(), ) err = t.Load() assert.NoError(suite.T(), err) + var st Status + err = t.Reset() + assert.NoError(suite.T(), err) + + st, err = t.Status() + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), PendingStatus, st) + + err = t.Stop() + assert.NoError(suite.T(), err) + + st, err = t.Status() + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), StoppedStatus, st) + err = t.Expire() assert.NoError(suite.T(), err) } @@ -146,7 +172,7 @@ func (suite *TrackerTestSuite) TestPeriodicTracker() { }, } - t := NewBasicTrackerWithStats(context.TODO(), mockJobStats, suite.namespace, suite.pool, nil) + t := NewBasicTrackerWithStats(context.TODO(), mockJobStats, suite.namespace, suite.pool, nil, nil) err := t.Save() require.NoError(suite.T(), err) @@ -166,7 +192,7 @@ func (suite *TrackerTestSuite) TestPeriodicTracker() { }, } - t2 := NewBasicTrackerWithStats(context.TODO(), executionStats, suite.namespace, suite.pool, nil) + t2 := NewBasicTrackerWithStats(context.TODO(), executionStats, suite.namespace, suite.pool, nil, nil) err = t2.Save() require.NoError(suite.T(), err) @@ -177,32 +203,3 @@ func (suite *TrackerTestSuite) TestPeriodicTracker() { err = t2.PeriodicExecutionDone() require.NoError(suite.T(), err) } - -// TestPushForRetry tests push for retry -func (suite *TrackerTestSuite) TestPushForRetry() { - ID := utils.MakeIdentifier() - runAt := time.Now().Add(1 * time.Hour).Unix() - jobStats := &Stats{ - Info: &StatsInfo{ - JobID: ID, - Status: ScheduledStatus.String(), - JobKind: KindScheduled, - JobName: SampleJob, - IsUnique: false, - RunAt: runAt, - EnqueueTime: runAt, - }, - } - - t := &basicTracker{ - namespace: suite.namespace, - context: context.TODO(), - pool: suite.pool, - jobID: ID, - jobStats: jobStats, - callback: nil, - } - - err := t.pushToQueueForRetry(RunningStatus) - require.NoError(suite.T(), err) -} diff --git a/src/jobservice/lcm/controller.go b/src/jobservice/lcm/controller.go index 6bf990ae1e7..d3918a78a83 100644 --- a/src/jobservice/lcm/controller.go +++ b/src/jobservice/lcm/controller.go @@ -16,15 +16,21 @@ package lcm import ( "context" - "encoding/json" + "math/rand" + "sync" + "time" + + "github.com/goharbor/harbor/src/jobservice/errs" + "github.com/goharbor/harbor/src/jobservice/common/rds" + + "github.com/goharbor/harbor/src/jobservice/common/list" + "github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/logger" "github.com/gomodule/redigo/redis" "github.com/pkg/errors" - "sync" - "time" ) const ( @@ -32,6 +38,10 @@ const ( shortLoopInterval = 5 * time.Second // Waiting for long while if no retrying elements found longLoopInterval = 5 * time.Minute + // loopInterval is the interval for the loop of restoring dead status + loopInterval = 2 * time.Minute + // shortInterval is initial interval and be as based to give random buffer to loopInterval + shortInterval = 10 ) // Controller is designed to control the life cycle of the job @@ -53,6 +63,7 @@ type basicController struct { pool *redis.Pool callback job.HookCallback wg *sync.WaitGroup + retryList *list.SyncList } // NewController is the constructor of basic controller @@ -63,6 +74,7 @@ func NewController(ctx *env.Context, ns string, pool *redis.Pool, callback job.H pool: pool, callback: callback, wg: ctx.WG, + retryList: list.New(), } } @@ -86,7 +98,7 @@ func (bc *basicController) New(stats *job.Stats) (job.Tracker, error) { return nil, errors.Errorf("error occurred when creating job tracker: %s", err) } - bt := job.NewBasicTrackerWithStats(bc.context, stats, bc.namespace, bc.pool, bc.callback) + bt := job.NewBasicTrackerWithStats(bc.context, stats, bc.namespace, bc.pool, bc.callback, bc.retryList) if err := bt.Save(); err != nil { return nil, err } @@ -96,7 +108,7 @@ func (bc *basicController) New(stats *job.Stats) (job.Tracker, error) { // Track and attache with the job func (bc *basicController) Track(jobID string) (job.Tracker, error) { - bt := job.NewBasicTrackerWithID(bc.context, jobID, bc.namespace, bc.pool, bc.callback) + bt := job.NewBasicTrackerWithID(bc.context, jobID, bc.namespace, bc.pool, bc.callback, bc.retryList) if err := bt.Load(); err != nil { return nil, err } @@ -105,75 +117,90 @@ func (bc *basicController) Track(jobID string) (job.Tracker, error) { } // loopForRestoreDeadStatus is a loop to restore the dead states of jobs +// Obviously,this retry is a try best action. +// The retry items are not persisted and they will be gone if the job service is restart. func (bc *basicController) loopForRestoreDeadStatus() { + // Generate random timer duration + rd := func() time.Duration { + return longLoopInterval + time.Duration(rand.Int31n(shortInterval))*time.Second + } + defer func() { logger.Info("Status restoring loop is stopped") bc.wg.Done() }() - token := make(chan bool, 1) - token <- true + // Initialize the timer + tm := time.NewTimer(shortInterval * time.Second) + defer tm.Stop() for { - <-token - - if err := bc.restoreDeadStatus(); err != nil { - waitInterval := shortLoopInterval - if err == rds.ErrNoElements { - // No elements - waitInterval = longLoopInterval - } else { - logger.Errorf("restore dead status error: %s, put it back to the retrying Q later again", err) - } - - // wait for a while or be terminated - select { - case <-time.After(waitInterval): - case <-bc.context.Done(): - return - } + select { + case <-tm.C: + // Reset timer + tm.Reset(rd()) + + // Retry the items in the list + bc.retryLoop() + case <-bc.context.Done(): + return // terminated } - - // Return token - token <- true - } -} - -// restoreDeadStatus try to restore the dead status -func (bc *basicController) restoreDeadStatus() error { - // Get one - deadOne, err := bc.popOneDead() - if err != nil { - return err - } - // Try to update status - t, err := bc.Track(deadOne.JobID) - if err != nil { - return err } - - return t.UpdateStatusWithRetry(job.Status(deadOne.TargetStatus)) } -// popOneDead retrieves one dead status from the backend Q from lowest to highest -func (bc *basicController) popOneDead() (*job.SimpleStatusChange, error) { +// retryLoop iterates the retry queue and do retrying +func (bc *basicController) retryLoop() { + // Get connection conn := bc.pool.Get() defer func() { - _ = conn.Close() + // Return redis connection + if err := conn.Close(); err != nil { + logger.Errorf("Failed to close redis connection: %v : %s", err, "retry loop: lcm") + } }() - key := rds.KeyStatusUpdateRetryQueue(bc.namespace) - v, err := rds.ZPopMin(conn, key) + // Check the list + bc.retryList.Iterate(func(ele interface{}) bool { + if change, ok := ele.(job.SimpleStatusChange); ok { + err := retry(conn, bc.namespace, change) + if err != nil { + // Log the error + logger.Errorf("Failed to retry the status update action: %v : %s", err, "retry loop: lcm") + } + + if err == nil || errs.IsStatusMismatchError(err) { + return true + } + } + + return false + }) +} + +// retry status update action +func retry(conn redis.Conn, ns string, change job.SimpleStatusChange) error { + // Debug + logger.Debugf("Retry the status update action: %v", change) + + rootKey := rds.KeyJobStats(ns, change.JobID) + trackKey := rds.KeyJobTrackInProgress(ns) + + reply, err := redis.String(rds.SetStatusScript.Do( + conn, + rootKey, + trackKey, + change.TargetStatus, + change.Revision, + time.Now().Unix(), + change.JobID, + )) if err != nil { - return nil, err + return errors.Wrap(err, "retry") } - if bytes, ok := v.([]byte); ok { - ssc := &job.SimpleStatusChange{} - if err := json.Unmarshal(bytes, ssc); err == nil { - return ssc, nil - } + if reply != "ok" { + return errs.StatusMismatchError(reply, change.TargetStatus) } - return nil, errors.New("pop one dead error: bad result reply") + return nil } diff --git a/src/jobservice/lcm/controller_test.go b/src/jobservice/lcm/controller_test.go index 91ccd12da14..a5b64e03c3f 100644 --- a/src/jobservice/lcm/controller_test.go +++ b/src/jobservice/lcm/controller_test.go @@ -16,8 +16,10 @@ package lcm import ( "context" - "encoding/json" - "github.com/goharbor/harbor/src/jobservice/common/rds" + "sync" + "testing" + "time" + "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/job" @@ -26,9 +28,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "sync" - "testing" - "time" ) // LcmControllerTestSuite tests functions of life cycle controller @@ -65,48 +64,36 @@ func TestLcmControllerTestSuite(t *testing.T) { suite.Run(t, new(LcmControllerTestSuite)) } -// TestNewAndTrack tests controller.New() and controller.Track() -func (suite *LcmControllerTestSuite) TestNewAndTrack() { - jobID := utils.MakeIdentifier() - suite.newsStats(jobID) - - t, err := suite.ctl.Track(jobID) - require.Nil(suite.T(), err, "lcm track: nil error expected but got %s", err) - assert.Equal(suite.T(), job.SampleJob, t.Job().Info.JobName, "lcm new: expect job name %s but got %s", job.SampleJob, t.Job().Info.JobName) -} - -// TestNew tests controller.Serve() -func (suite *LcmControllerTestSuite) TestServe() { +// TestController tests lcm controller +func (suite *LcmControllerTestSuite) TestController() { // Prepare mock data jobID := utils.MakeIdentifier() - suite.newsStats(jobID) + rev := time.Now().Unix() + suite.newsStats(jobID, rev) - conn := suite.pool.Get() - defer func() { - _ = conn.Close() - }() - simpleChange := &job.SimpleStatusChange{ + simpleChange := job.SimpleStatusChange{ JobID: jobID, TargetStatus: job.RunningStatus.String(), + Revision: rev, } - rawJSON, err := json.Marshal(simpleChange) - require.Nil(suite.T(), err, "json marshal: nil error expected but got %s", err) - key := rds.KeyStatusUpdateRetryQueue(suite.namespace) - args := []interface{}{key, "NX", time.Now().Unix(), rawJSON} - _, err = conn.Do("ZADD", args...) - require.Nil(suite.T(), err, "prepare mock data: nil error expected but got %s", err) - - err = suite.ctl.Serve() + + // Just test if the server can be started + err := suite.ctl.Serve() require.NoError(suite.T(), err, "lcm: nil error expected but got %s", err) - <-time.After(1 * time.Second) - count, err := redis.Int(conn.Do("ZCARD", key)) - require.Nil(suite.T(), err, "get total dead status: nil error expected but got %s", err) - assert.Equal(suite.T(), 0, count) + // Test retry loop + bc := suite.ctl.(*basicController) + bc.retryList.Push(simpleChange) + bc.retryLoop() + + t, err := suite.ctl.Track(jobID) + require.Nil(suite.T(), err, "lcm track: nil error expected but got %s", err) + assert.Equal(suite.T(), job.SampleJob, t.Job().Info.JobName, "lcm new: expect job name %s but got %s", job.SampleJob, t.Job().Info.JobName) + assert.Equal(suite.T(), job.RunningStatus.String(), t.Job().Info.Status) } // newsStats create job stats -func (suite *LcmControllerTestSuite) newsStats(jobID string) { +func (suite *LcmControllerTestSuite) newsStats(jobID string, revision int64) { stats := &job.Stats{ Info: &job.StatsInfo{ JobID: jobID, @@ -114,6 +101,7 @@ func (suite *LcmControllerTestSuite) newsStats(jobID string) { JobName: job.SampleJob, IsUnique: true, Status: job.PendingStatus.String(), + Revision: revision, }, } diff --git a/src/jobservice/mgt/manager.go b/src/jobservice/mgt/manager.go index e31947ce124..c0889fca6f6 100644 --- a/src/jobservice/mgt/manager.go +++ b/src/jobservice/mgt/manager.go @@ -17,6 +17,9 @@ package mgt import ( "context" "fmt" + "strconv" + "strings" + "github.com/gocraft/work" "github.com/goharbor/harbor/src/jobservice/common/query" "github.com/goharbor/harbor/src/jobservice/common/rds" @@ -27,8 +30,6 @@ import ( "github.com/goharbor/harbor/src/jobservice/period" "github.com/gomodule/redigo/redis" "github.com/pkg/errors" - "strconv" - "strings" ) // Manager defies the related operations to handle the management of job stats. @@ -154,7 +155,7 @@ func (bm *basicManager) GetJobs(q *query.Parameter) ([]*job.Stats, int64, error) statsKey := string(bytes) if i := strings.LastIndex(statsKey, ":"); i != -1 { jID := statsKey[i+1:] - t := job.NewBasicTrackerWithID(bm.ctx, jID, bm.namespace, bm.pool, nil) + t := job.NewBasicTrackerWithID(bm.ctx, jID, bm.namespace, bm.pool, nil, nil) if err := t.Load(); err != nil { logger.Errorf("retrieve stats data of job %s error: %s", jID, err) continue @@ -174,7 +175,7 @@ func (bm *basicManager) GetPeriodicExecution(pID string, q *query.Parameter) (re return nil, 0, errors.New("nil periodic job ID") } - tracker := job.NewBasicTrackerWithID(bm.ctx, pID, bm.namespace, bm.pool, nil) + tracker := job.NewBasicTrackerWithID(bm.ctx, pID, bm.namespace, bm.pool, nil, nil) err = tracker.Load() if err != nil { return nil, 0, err @@ -239,7 +240,7 @@ func (bm *basicManager) GetPeriodicExecution(pID string, q *query.Parameter) (re } for _, eID := range executionIDs { - t := job.NewBasicTrackerWithID(bm.ctx, eID, bm.namespace, bm.pool, nil) + t := job.NewBasicTrackerWithID(bm.ctx, eID, bm.namespace, bm.pool, nil, nil) if er := t.Load(); er != nil { logger.Errorf("track job %s error: %s", eID, err) continue @@ -273,7 +274,7 @@ func (bm *basicManager) GetScheduledJobs(q *query.Parameter) ([]*job.Stats, int6 jID = fmt.Sprintf("%s@%d", sJob.ID, sJob.RunAt) } } - t := job.NewBasicTrackerWithID(bm.ctx, jID, bm.namespace, bm.pool, nil) + t := job.NewBasicTrackerWithID(bm.ctx, jID, bm.namespace, bm.pool, nil, nil) err = t.Load() if err != nil { // Just log it @@ -293,7 +294,7 @@ func (bm *basicManager) GetJob(jobID string) (*job.Stats, error) { return nil, errs.BadRequestError("empty job ID") } - t := job.NewBasicTrackerWithID(bm.ctx, jobID, bm.namespace, bm.pool, nil) + t := job.NewBasicTrackerWithID(bm.ctx, jobID, bm.namespace, bm.pool, nil, nil) if err := t.Load(); err != nil { return nil, err } @@ -307,7 +308,7 @@ func (bm *basicManager) SaveJob(j *job.Stats) error { return errs.BadRequestError("nil saving job stats") } - t := job.NewBasicTrackerWithStats(bm.ctx, j, bm.namespace, bm.pool, nil) + t := job.NewBasicTrackerWithStats(bm.ctx, j, bm.namespace, bm.pool, nil, nil) return t.Save() } diff --git a/src/jobservice/mgt/manager_test.go b/src/jobservice/mgt/manager_test.go index dabad18df5c..d8f40a12c05 100644 --- a/src/jobservice/mgt/manager_test.go +++ b/src/jobservice/mgt/manager_test.go @@ -16,7 +16,11 @@ package mgt import ( "context" + "testing" + "time" + "github.com/gocraft/work" + "github.com/goharbor/harbor/src/jobservice/common/list" "github.com/goharbor/harbor/src/jobservice/common/query" "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/tests" @@ -24,8 +28,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "testing" - "time" ) // BasicManagerTestSuite tests the function of basic manager @@ -56,7 +58,7 @@ func (suite *BasicManagerTestSuite) SetupSuite() { }, } - t := job.NewBasicTrackerWithStats(context.TODO(), periodicJob, suite.namespace, suite.pool, nil) + t := job.NewBasicTrackerWithStats(context.TODO(), periodicJob, suite.namespace, suite.pool, nil, list.New()) err := t.Save() require.NoError(suite.T(), err) @@ -71,7 +73,7 @@ func (suite *BasicManagerTestSuite) SetupSuite() { UpstreamJobID: "1000", }, } - t = job.NewBasicTrackerWithStats(context.TODO(), execution, suite.namespace, suite.pool, nil) + t = job.NewBasicTrackerWithStats(context.TODO(), execution, suite.namespace, suite.pool, nil, list.New()) err = t.Save() require.NoError(suite.T(), err) } @@ -117,7 +119,7 @@ func (suite *BasicManagerTestSuite) TestGetPeriodicExecutions() { assert.Equal(suite.T(), int64(1), total) assert.Equal(suite.T(), int64(1), int64(len(jobs))) - t := job.NewBasicTrackerWithID(context.TODO(), "1001", suite.namespace, suite.pool, nil) + t := job.NewBasicTrackerWithID(context.TODO(), "1001", suite.namespace, suite.pool, nil, list.New()) err = t.Load() require.NoError(suite.T(), err) err = t.PeriodicExecutionDone() @@ -147,17 +149,17 @@ func (suite *BasicManagerTestSuite) TestGetScheduledJobs() { }, } - t := job.NewBasicTrackerWithStats(context.TODO(), stats, suite.namespace, suite.pool, nil) + t := job.NewBasicTrackerWithStats(context.TODO(), stats, suite.namespace, suite.pool, nil, list.New()) err = t.Save() require.NoError(suite.T(), err) - list, total, err := suite.manager.GetScheduledJobs(&query.Parameter{ + l, total, err := suite.manager.GetScheduledJobs(&query.Parameter{ PageNumber: 1, PageSize: 10, }) require.NoError(suite.T(), err) assert.Equal(suite.T(), int64(1), total) - assert.Equal(suite.T(), int64(1), int64(len(list))) + assert.Equal(suite.T(), int64(1), int64(len(l))) } // TestGetJob tests get job diff --git a/src/jobservice/period/basic_scheduler.go b/src/jobservice/period/basic_scheduler.go index 6614d2d3ac0..34a48b47540 100644 --- a/src/jobservice/period/basic_scheduler.go +++ b/src/jobservice/period/basic_scheduler.go @@ -15,10 +15,9 @@ package period import ( - "encoding/json" + "context" "time" - "context" "github.com/gocraft/work" "github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/utils" @@ -52,27 +51,13 @@ func NewScheduler(ctx context.Context, namespace string, pool *redis.Pool, ctl l } // Start the periodic scheduling process -// Blocking call here -func (bs *basicScheduler) Start() error { - defer func() { - logger.Info("Basic scheduler is stopped") - }() - +func (bs *basicScheduler) Start() { + // Run once clean // Try best to do go bs.clearDirtyJobs() - logger.Info("Basic scheduler is started") - // start enqueuer - return bs.enqueuer.start() -} - -// Stop the periodic scheduling process -func (bs *basicScheduler) Stop() error { - // stop everything - bs.enqueuer.stopChan <- true - - return nil + bs.enqueuer.start() } // Schedule is implementation of the same method in period.Interface @@ -99,24 +84,10 @@ func (bs *basicScheduler) Schedule(p *Policy) (int64, error) { return -1, err } - // Prepare publish message - m := &message{ - Event: changeEventSchedule, - Data: p, - } - - msgJSON, err := json.Marshal(m) - if err != nil { - return -1, err - } - pid := time.Now().Unix() - // Save to redis db and publish notification via redis transaction - err = conn.Send("MULTI") - err = conn.Send("ZADD", rds.KeyPeriodicPolicy(bs.namespace), pid, rawJSON) - err = conn.Send("PUBLISH", rds.KeyPeriodicNotification(bs.namespace), msgJSON) - if _, err := conn.Do("EXEC"); err != nil { + // Save to redis db + if _, err := conn.Do("ZADD", rds.KeyPeriodicPolicy(bs.namespace), pid, rawJSON); err != nil { return -1, err } @@ -166,25 +137,9 @@ func (bs *basicScheduler) UnSchedule(policyID string) error { return errors.Errorf("no valid periodic job policy found: %s:%d", policyID, numericID) } - notification := &message{ - Event: changeEventUnSchedule, - Data: p, - } - - msgJSON, err := json.Marshal(notification) - if err != nil { - return err - } - - // REM from redis db with transaction way - err = conn.Send("MULTI") - err = conn.Send("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(bs.namespace), numericID, numericID) // Accurately remove the item with the specified score - err = conn.Send("PUBLISH", rds.KeyPeriodicNotification(bs.namespace), msgJSON) - if err != nil { - return err - } - _, err = conn.Do("EXEC") - if err != nil { + // REM from redis db + // Accurately remove the item with the specified score + if _, err := conn.Do("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(bs.namespace), numericID, numericID); err != nil { return err } diff --git a/src/jobservice/period/basic_scheduler_test.go b/src/jobservice/period/basic_scheduler_test.go index dc5081a523d..7beaecea3f1 100644 --- a/src/jobservice/period/basic_scheduler_test.go +++ b/src/jobservice/period/basic_scheduler_test.go @@ -16,6 +16,10 @@ package period import ( "context" "fmt" + "sync" + "testing" + "time" + "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/job" @@ -25,9 +29,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "sync" - "testing" - "time" ) // BasicSchedulerTestSuite tests functions of basic scheduler @@ -63,6 +64,7 @@ func (suite *BasicSchedulerTestSuite) SetupSuite() { ) suite.scheduler = NewScheduler(ctx, suite.namespace, suite.pool, suite.lcmCtl) + suite.scheduler.Start() } // TearDownSuite clears the test suite @@ -84,20 +86,6 @@ func TestSchedulerTestSuite(t *testing.T) { // TestScheduler tests scheduling and un-scheduling func (suite *BasicSchedulerTestSuite) TestScheduler() { - go func() { - <-time.After(1 * time.Second) - _ = suite.scheduler.Stop() - }() - - go func() { - var err error - defer func() { - require.NoError(suite.T(), err, "start scheduler: nil error expected but got %s", err) - }() - - err = suite.scheduler.Start() - }() - // Prepare one now := time.Now() minute := now.Minute() diff --git a/src/jobservice/period/enqueuer.go b/src/jobservice/period/enqueuer.go index 7de9e54dddd..9fc40c0c205 100644 --- a/src/jobservice/period/enqueuer.go +++ b/src/jobservice/period/enqueuer.go @@ -40,17 +40,14 @@ const ( ) type enqueuer struct { - namespace string - context context.Context - pool *redis.Pool - policyStore *policyStore - ctl lcm.Controller + namespace string + context context.Context + pool *redis.Pool + ctl lcm.Controller // Diff with other nodes nodeID string // Track the error of enqueuing lastEnqueueErr error - // For stop - stopChan chan bool } func newEnqueuer(ctx context.Context, namespace string, pool *redis.Pool, ctl lcm.Controller) *enqueuer { @@ -61,32 +58,23 @@ func newEnqueuer(ctx context.Context, namespace string, pool *redis.Pool, ctl lc } return &enqueuer{ - context: ctx, - namespace: namespace, - pool: pool, - policyStore: newPolicyStore(ctx, namespace, pool), - ctl: ctl, - stopChan: make(chan bool, 1), - nodeID: nodeID.(string), + context: ctx, + namespace: namespace, + pool: pool, + ctl: ctl, + nodeID: nodeID.(string), } } // Blocking call -func (e *enqueuer) start() error { - // Load policies first when starting - if err := e.policyStore.load(); err != nil { - return err - } - +func (e *enqueuer) start() { go e.loop() - logger.Info("Periodic enqueuer is started") - - return e.policyStore.serve() + logger.Info("Scheduler: periodic enqueuer is started") } func (e *enqueuer) loop() { defer func() { - logger.Info("Periodic enqueuer is stopped") + logger.Info("Scheduler: periodic enqueuer is stopped") }() // Do enqueue immediately when starting @@ -98,10 +86,8 @@ func (e *enqueuer) loop() { for { select { - case <-e.stopChan: - // Stop policy store now - e.policyStore.stopChan <- true - return + case <-e.context.Done(): + return // exit case <-timer.C: // Pause the timer for completing the processing this time timer.Reset(neverExecuted) @@ -157,10 +143,17 @@ func (e *enqueuer) enqueue() { // Reset error track e.lastEnqueueErr = nil - e.policyStore.Iterate(func(id string, p *Policy) bool { + // Load policies and schedule next jobs for them + pls, err := Load(e.namespace, conn) + if err != nil { + // Log error + logger.Errorf("%s:%s", err, "enqueue error: enqueuer") + return + } + + for _, p := range pls { e.scheduleNextJobs(p, conn) - return true - }) + } } // scheduleNextJobs schedules job for next time slots based on the policy diff --git a/src/jobservice/period/enqueuer_test.go b/src/jobservice/period/enqueuer_test.go index 5c3cc4affcf..8b38bde223b 100644 --- a/src/jobservice/period/enqueuer_test.go +++ b/src/jobservice/period/enqueuer_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pkg/errors" + "github.com/stretchr/testify/require" "github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/utils" @@ -30,7 +31,6 @@ import ( "github.com/goharbor/harbor/src/jobservice/tests" "github.com/gomodule/redigo/redis" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" ) @@ -69,8 +69,9 @@ func (suite *EnqueuerTestSuite) SetupSuite() { func(hookURL string, change *job.StatusChange) error { return nil }, ) suite.enqueuer = newEnqueuer(ctx, suite.namespace, suite.pool, lcmCtl) - suite.prepare() + + suite.enqueuer.start() } // TearDownSuite clears the test suite @@ -87,39 +88,32 @@ func (suite *EnqueuerTestSuite) TearDownSuite() { // TestEnqueuer tests enqueuer func (suite *EnqueuerTestSuite) TestEnqueuer() { - go func() { - defer func() { - suite.enqueuer.stopChan <- true - }() - - key := rds.RedisKeyScheduled(suite.namespace) - conn := suite.pool.Get() - defer func() { - _ = conn.Close() - }() - - tk := time.NewTicker(500 * time.Millisecond) - defer tk.Stop() - - for { - select { - case <-tk.C: - count, err := redis.Int(conn.Do("ZCARD", key)) - require.Nil(suite.T(), err, "count scheduled: nil error expected but got %s", err) - if assert.Condition(suite.T(), func() (success bool) { - return count > 0 - }, "at least one job should be scheduled for the periodic job policy") { - return - } - case <-time.After(15 * time.Second): - require.NoError(suite.T(), errors.New("timeout (15s): expect at 1 scheduled job but still get nothing")) - return - } + key := rds.RedisKeyScheduled(suite.namespace) + conn := suite.pool.Get() + defer func() { + if err := conn.Close(); err != nil { + suite.NoError(err, "close redis connection") } }() - err := suite.enqueuer.start() - require.Nil(suite.T(), err, "enqueuer start: nil error expected but got %s", err) + tk := time.NewTicker(500 * time.Millisecond) + defer tk.Stop() + + for { + select { + case <-tk.C: + count, err := redis.Int(conn.Do("ZCARD", key)) + require.Nil(suite.T(), err, "count scheduled: nil error expected but got %s", err) + if assert.Condition(suite.T(), func() (success bool) { + return count > 0 + }, "at least one job should be scheduled for the periodic job policy") { + return + } + case <-time.After(15 * time.Second): + require.NoError(suite.T(), errors.New("timeout (15s): expect at 1 scheduled job but still get nothing")) + return + } + } } func (suite *EnqueuerTestSuite) prepare() { diff --git a/src/jobservice/period/policy_store.go b/src/jobservice/period/policy_store.go index 345f8f30176..e48c35dde9f 100644 --- a/src/jobservice/period/policy_store.go +++ b/src/jobservice/period/policy_store.go @@ -15,26 +15,15 @@ package period import ( - "context" "encoding/json" "errors" "fmt" - "sync" - "time" "github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/logger" "github.com/gomodule/redigo/redis" "github.com/robfig/cron" - "strings" -) - -const ( - // changeEventSchedule : Schedule periodic job policy event - changeEventSchedule = "Schedule" - // changeEventUnSchedule : UnSchedule periodic job policy event - changeEventUnSchedule = "UnSchedule" ) // Policy ... @@ -81,184 +70,14 @@ func (p *Policy) Validate() error { return nil } -// policyStore is in-memory cache for the periodic job policies. -type policyStore struct { - // k-v pair and key is the policy ID - hash *sync.Map - namespace string - context context.Context - pool *redis.Pool - // For stop - stopChan chan bool -} - -// message is designed for sub/pub messages -type message struct { - Event string `json:"event"` - Data *Policy `json:"data"` -} - -// newPolicyStore is constructor of policyStore -func newPolicyStore(ctx context.Context, ns string, pool *redis.Pool) *policyStore { - return &policyStore{ - hash: new(sync.Map), - context: ctx, - namespace: ns, - pool: pool, - stopChan: make(chan bool, 1), - } -} - -// Blocking call -func (ps *policyStore) serve() (err error) { - defer func() { - logger.Info("Periodical job policy store is stopped") - }() - - conn := ps.pool.Get() - psc := redis.PubSubConn{ - Conn: conn, - } - defer func() { - _ = psc.Close() - }() - - // Subscribe channel - err = psc.Subscribe(redis.Args{}.AddFlat(rds.KeyPeriodicNotification(ps.namespace))...) - if err != nil { - return - } - - // Channels for sub/pub ctl - errChan := make(chan error, 1) - done := make(chan bool, 1) - - go func() { - for { - switch res := psc.Receive().(type) { - case error: - errChan <- fmt.Errorf("redis sub/pub chan error: %s", res.(error).Error()) - break - case redis.Message: - m := &message{} - if err := json.Unmarshal(res.Data, m); err != nil { - // logged - logger.Errorf("Read invalid message: %s\n", res.Data) - break - } - if err := ps.sync(m); err != nil { - logger.Error(err) - } - break - case redis.Subscription: - switch res.Kind { - case "subscribe": - logger.Infof("Subscribe redis channel %s", res.Channel) - break - case "unsubscribe": - // Unsubscribe all, means main goroutine is exiting - logger.Infof("Unsubscribe redis channel %s", res.Channel) - done <- true - return - } - } - } - }() - - logger.Info("Periodical job policy store is serving with policy auto sync enabled") - defer func() { - var unSubErr error - defer func() { - // Merge errors - finalErrs := make([]string, 0) - if unSubErr != nil { - finalErrs = append(finalErrs, unSubErr.Error()) - } - if err != nil { - finalErrs = append(finalErrs, err.Error()) - } - - if len(finalErrs) > 0 { - // Override returned err or do nothing - err = errors.New(strings.Join(finalErrs, ";")) - } - }() - // Unsubscribe all - if err := psc.Unsubscribe(); err != nil { - logger.Errorf("unsubscribe: %s", err) - } - // Confirm result - // Add timeout in case unsubscribe failed - select { - case unSubErr = <-errChan: - return - case <-done: - return - case <-time.After(30 * time.Second): - unSubErr = errors.New("unsubscribe time out") - return - } - }() - - ticker := time.NewTicker(time.Minute) - defer ticker.Stop() - - // blocking here - for { - select { - case <-ticker.C: - err = psc.Ping("ping!") - if err != nil { - return - } - case <-ps.stopChan: - return nil - case err = <-errChan: - return - } - } -} - -// sync policy with backend list -func (ps *policyStore) sync(m *message) error { - if m == nil { - return errors.New("nil message") - } - - if m.Data == nil { - return errors.New("missing data in the policy sync message") - } - - switch m.Event { - case changeEventSchedule: - if err := ps.add(m.Data); err != nil { - return fmt.Errorf("failed to sync scheduled policy %s: %s", m.Data.ID, err) - } - case changeEventUnSchedule: - removed := ps.remove(m.Data.ID) - if removed == nil { - return fmt.Errorf("failed to sync unscheduled policy %s", m.Data.ID) - } - default: - return fmt.Errorf("message %s is not supported", m.Event) - } - - return nil -} - -// Load all the policies from the backend to store -func (ps *policyStore) load() error { - conn := ps.pool.Get() - defer func() { - _ = conn.Close() - }() - - bytes, err := redis.Values(conn.Do("ZRANGE", rds.KeyPeriodicPolicy(ps.namespace), 0, -1)) +// Load all the policies from the backend storage. +func Load(namespace string, conn redis.Conn) ([]*Policy, error) { + bytes, err := redis.Values(conn.Do("ZRANGE", rds.KeyPeriodicPolicy(namespace), 0, -1)) if err != nil { - return err + return nil, err } - count := 0 + policies := make([]*Policy, 0) for i, l := 0, len(bytes); i < l; i++ { rawPolicy := bytes[i].([]byte) p := &Policy{} @@ -266,62 +85,22 @@ func (ps *policyStore) load() error { if err := p.DeSerialize(rawPolicy); err != nil { // Ignore error which means the policy data is not valid // Only logged - logger.Errorf("malform policy: %s; error: %s\n", rawPolicy, err) + logger.Errorf("Malformed policy: %s; error: %s", rawPolicy, err) continue } - // Add to cache store - if err := ps.add(p); err != nil { - // Only logged - logger.Errorf("cache periodic policies error: %s", err) + // Validate the policy object + if err := p.Validate(); err != nil { + logger.Errorf("Policy validate error: %s", err) continue } - count++ + policies = append(policies, p) logger.Debugf("Load periodic job policy: %s", string(rawPolicy)) } - logger.Infof("Load %d periodic job policies", count) - - return nil -} - -// Add one or more policy -func (ps *policyStore) add(item *Policy) error { - if item == nil { - return errors.New("nil policy to add") - } - - if utils.IsEmptyStr(item.ID) { - return errors.New("malform policy to add") - } - - v, _ := ps.hash.LoadOrStore(item.ID, item) - if v == nil { - return fmt.Errorf("failed to add policy: %s", item.ID) - } - - return nil -} + logger.Debugf("Load %d periodic job policies", len(policies)) -// Iterate all the policies in the store -func (ps *policyStore) Iterate(f func(id string, p *Policy) bool) { - ps.hash.Range(func(k, v interface{}) bool { - return f(k.(string), v.(*Policy)) - }) -} - -// Remove the specified policy from the store -func (ps *policyStore) remove(policyID string) *Policy { - if utils.IsEmptyStr(policyID) { - return nil - } - - if v, ok := ps.hash.Load(policyID); ok { - ps.hash.Delete(policyID) - return v.(*Policy) - } - - return nil + return policies, nil } diff --git a/src/jobservice/period/policy_store_test.go b/src/jobservice/period/policy_store_test.go index 6721bd379b0..c696b0b2032 100644 --- a/src/jobservice/period/policy_store_test.go +++ b/src/jobservice/period/policy_store_test.go @@ -14,25 +14,23 @@ package period import ( - "context" + "testing" + "time" + "github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/tests" "github.com/gomodule/redigo/redis" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" - "testing" - "time" ) // PolicyStoreTestSuite tests functions of policy store type PolicyStoreTestSuite struct { suite.Suite - store *policyStore namespace string pool *redis.Pool - cancel context.CancelFunc } // TestPolicyStoreTestSuite is entry of go test @@ -44,37 +42,20 @@ func TestPolicyStoreTestSuite(t *testing.T) { func (suite *PolicyStoreTestSuite) SetupSuite() { suite.namespace = tests.GiveMeTestNamespace() suite.pool = tests.GiveMeRedisPool() - ctx, cancel := context.WithCancel(context.Background()) - suite.cancel = cancel - - suite.store = newPolicyStore(ctx, suite.namespace, suite.pool) } // TearDownSuite clears the test suite func (suite *PolicyStoreTestSuite) TearDownSuite() { - suite.cancel() - conn := suite.pool.Get() defer func() { - _ = conn.Close() - }() - - _ = tests.ClearAll(suite.namespace, conn) -} - -// TestStore tests policy store serve -func (suite *PolicyStoreTestSuite) TestServe() { - var err error - - defer func() { - suite.store.stopChan <- true - assert.Nil(suite.T(), err, "serve exit: nil error expected but got %s", err) + if err := conn.Close(); err != nil { + suite.NoError(err, "close redis connection") + } }() - go func() { - err = suite.store.serve() - }() - <-time.After(1 * time.Second) + if err := tests.ClearAll(suite.namespace, conn); err != nil { + suite.NoError(err, "clear redis namespace") + } } // TestLoad tests load policy from backend @@ -91,47 +72,17 @@ func (suite *PolicyStoreTestSuite) TestLoad() { conn := suite.pool.Get() defer func() { - _ = conn.Close() + if err := conn.Close(); err != nil { + suite.NoError(err, "close redis connection") + } }() _, err = conn.Do("ZADD", key, time.Now().Unix(), rawData) assert.Nil(suite.T(), err, "add data: nil error expected but got %s", err) - err = suite.store.load() - assert.Nil(suite.T(), err, "load: nil error expected but got %s", err) - - p1 := &Policy{ - ID: "fake_policy_1", - JobName: job.SampleJob, - CronSpec: "5 * * * * *", - } - m := &message{ - Event: changeEventSchedule, - Data: p1, - } - err = suite.store.sync(m) - assert.Nil(suite.T(), err, "sync schedule: nil error expected but got %s", err) - - count := 0 - suite.store.Iterate(func(id string, p *Policy) bool { - count++ - return true - }) - assert.Equal(suite.T(), 2, count, "expected 2 policies but got %d", count) - - m1 := &message{ - Event: changeEventUnSchedule, - Data: p1, - } - err = suite.store.sync(m1) - assert.Nil(suite.T(), err, "sync unschedule: nil error expected but got %s", err) - - count = 0 - suite.store.Iterate(func(id string, p *Policy) bool { - count++ - return true - }) - assert.Equal(suite.T(), 1, count, "expected 1 policies but got %d", count) + ps, err := Load(suite.namespace, conn) + suite.NoError(err, "load: nil error expected but got %s", err) + suite.Equal(1, len(ps), "count of loaded policies") } // TestPolicy tests policy itself diff --git a/src/jobservice/period/scheduler.go b/src/jobservice/period/scheduler.go index 7704b061c5f..7b315cf311b 100644 --- a/src/jobservice/period/scheduler.go +++ b/src/jobservice/period/scheduler.go @@ -18,15 +18,7 @@ package period type Scheduler interface { // Start to serve periodic job scheduling process // - // Returns: - // error if any problems happened - Start() error - - // Stop the working periodic job scheduling process - // - // Returns; - // error if any problems happened - Stop() error + Start() // Schedule the specified cron job policy. // diff --git a/src/jobservice/runner/redis.go b/src/jobservice/runner/redis.go index 69cc947145e..6633637531c 100644 --- a/src/jobservice/runner/redis.go +++ b/src/jobservice/runner/redis.go @@ -17,7 +17,8 @@ package runner import ( "fmt" "runtime" - "time" + + "github.com/goharbor/harbor/src/jobservice/errs" "github.com/gocraft/work" "github.com/goharbor/harbor/src/jobservice/env" @@ -51,21 +52,8 @@ func (rj *RedisJob) Run(j *work.Job) (err error) { runningJob job.Interface execContext job.Context tracker job.Tracker - markStopped = bp(false) ) - // Defer to log the exit result - defer func() { - if !*markStopped { - if err == nil { - logger.Infof("|^_^| Job '%s:%s' exit with success", j.Name, j.ID) - } else { - // log error - logger.Errorf("|@_@| Job '%s:%s' exit with error: %s", j.Name, j.ID, err) - } - } - }() - // Track the running job now jID := j.ID @@ -75,60 +63,33 @@ func (rj *RedisJob) Run(j *work.Job) (err error) { } if tracker, err = rj.ctl.Track(jID); err != nil { + // log error + logger.Errorf("Job '%s:%s' exit with error: failed to get job tracker: %s", j.Name, j.ID, err) + + // Pay attentions here, if the job stats is lost (NOTFOUND error returned), + // directly return without retry again as we have no way to restore the stats again. + if errs.IsObjectNotFoundError(err) { + j.Fails = 10000000000 // never retry + } + // ELSE: // As tracker creation failed, there is no way to mark the job status change. // Also a non nil error return consumes a fail. If all retries are failed here, // it will cause the job to be zombie one (pending forever). - // Here we will avoid the job to consume a fail and let it retry again and again. - // However, to avoid a forever retry, we will check the FailedAt timestamp. - now := time.Now().Unix() - if j.FailedAt == 0 || now-j.FailedAt < 2*24*3600 { - j.Fails-- - } + // Those zombie ones will be reaped by the reaper later. return } - // Do operation based on the job status - jStatus := job.Status(tracker.Job().Info.Status) - switch jStatus { - case job.PendingStatus, job.ScheduledStatus: - // do nothing now - break - case job.StoppedStatus: - // Probably jobs has been stopped by directly mark status to stopped. - // Directly exit and no retry - markStopped = bp(true) - return nil - case job.ErrorStatus: - if j.FailedAt > 0 && j.Fails > 0 { - // Retry job - // Reset job info - if er := tracker.Reset(); er != nil { - // Log error and return the original error if existing - er = errors.Wrap(er, fmt.Sprintf("retrying job %s:%s failed", j.Name, j.ID)) - logger.Error(er) - - if len(j.LastErr) > 0 { - return errors.New(j.LastErr) - } - - return err - } - - logger.Infof("|*_*| Retrying job %s:%s, revision: %d", j.Name, j.ID, tracker.Job().Info.Revision) - } - break - default: - return errors.Errorf("mismatch status for running job: expected <%s <> got %s", job.RunningStatus.String(), jStatus.String()) - } - // Defer to switch status defer func() { // Switch job status based on the returned error. // The err happened here should not override the job run error, just log it. if err != nil { + // log error + logger.Errorf("Job '%s:%s' exit with error: %s", j.Name, j.ID, err) + if er := tracker.Fail(); er != nil { - logger.Errorf("Mark job status to failure error: %s", err) + logger.Errorf("Error occurred when marking the status of job %s:%s to failure: %s", j.Name, j.ID, er) } return @@ -136,19 +97,20 @@ func (rj *RedisJob) Run(j *work.Job) (err error) { // Nil error might be returned by the stopped job. Check the latest status here. // If refresh latest status failed, let the process to go on to void missing status updating. - if latest, er := tracker.Status(); er == nil { + if latest, er := tracker.Status(); er != nil { + logger.Errorf("Error occurred when getting the status of job %s:%s: %s", j.Name, j.ID, er) + } else { if latest == job.StoppedStatus { // Logged - logger.Infof("Job %s:%s is stopped", tracker.Job().Info.JobName, tracker.Job().Info.JobID) - // Stopped job, no exit message printing. - markStopped = bp(true) + logger.Infof("Job %s:%s is stopped", j.Name, j.ID) return } } // Mark job status to success. + logger.Infof("Job '%s:%s' exit with success", j.Name, j.ID) if er := tracker.Succeed(); er != nil { - logger.Errorf("Mark job status to success error: %s", er) + logger.Errorf("Error occurred when marking the status of job %s:%s to success: %s", j.Name, j.ID, er) } }() @@ -163,6 +125,40 @@ func (rj *RedisJob) Run(j *work.Job) (err error) { } }() + // Do operation based on the job status + jStatus := job.Status(tracker.Job().Info.Status) + switch jStatus { + case job.PendingStatus, job.ScheduledStatus: + // do nothing now + break + case job.StoppedStatus: + // Probably jobs has been stopped by directly mark status to stopped. + // Directly exit and no retry + return nil + case job.RunningStatus, job.ErrorStatus: + // The failed jobs can be put into retry queue and the in progress jobs may be + // interrupted by a sudden service crash event, all those jobs can be rescheduled. + // Reset job info. + if err = tracker.Reset(); err != nil { + // Log error and return the original error if existing + err = errors.Wrap(err, fmt.Sprintf("retrying %s job %s:%s failed", jStatus.String(), j.Name, j.ID)) + + if len(j.LastErr) > 0 { + err = errors.Wrap(err, j.LastErr) + } + + return + } + + logger.Infof("Retrying job %s:%s, revision: %d", j.Name, j.ID, tracker.Job().Info.Revision) + break + case job.SuccessStatus: + // do nothing + return nil + default: + return errors.Errorf("mismatch status for running job: expected %s/%s but got %s", job.PendingStatus, job.ScheduledStatus, jStatus.String()) + } + // Build job context if rj.context.JobContext == nil { rj.context.JobContext = impl.NewDefaultContext(rj.context.SystemContext) @@ -189,6 +185,11 @@ func (rj *RedisJob) Run(j *work.Job) (err error) { } // Run the job err = runningJob.Run(execContext, j.Args) + // Add error context + if err != nil { + err = errors.Wrap(err, "run error") + } + // Handle retry rj.retry(runningJob, j) // Handle periodic job execution diff --git a/src/jobservice/runner/redis_test.go b/src/jobservice/runner/redis_test.go index e7a74c0fc59..9566692d4e6 100644 --- a/src/jobservice/runner/redis_test.go +++ b/src/jobservice/runner/redis_test.go @@ -15,12 +15,13 @@ package runner import ( "context" - "github.com/stretchr/testify/assert" "os" "sync" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/logger/backend" @@ -154,7 +155,7 @@ func (suite *RedisRunnerTestSuite) TestJobWrapperInvalidTracker() { redisJob := NewRedisJob((*fakeParentJob)(nil), suite.envContext, suite.lcmCtl) err := redisJob.Run(j) require.Error(suite.T(), err, "redis job: non nil error expected but got nil") - assert.Equal(suite.T(), int64(2), j.Fails) + assert.Equal(suite.T(), int64(10000000000), j.Fails) } // TestJobWrapperPanic tests job runner panic diff --git a/src/jobservice/runtime/bootstrap.go b/src/jobservice/runtime/bootstrap.go index 4e737de053c..f6c4a2910e1 100644 --- a/src/jobservice/runtime/bootstrap.go +++ b/src/jobservice/runtime/bootstrap.go @@ -117,7 +117,12 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) hookCallback := func(URL string, change *job.StatusChange) error { msg := fmt.Sprintf("status change: job=%s, status=%s", change.JobID, change.Status) if !utils.IsEmptyStr(change.CheckIn) { - msg = fmt.Sprintf("%s, check_in=%s", msg, change.CheckIn) + // Ignore the real check in message to avoid too big message stream + cData := change.CheckIn + if len(cData) > 256 { + cData = fmt.Sprintf("", len(cData)) + } + msg = fmt.Sprintf("%s, check_in=%s", msg, cData) } evt := &hook.Event{ @@ -153,7 +158,6 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) // Start agent // Non blocking call - hookAgent.Attach(lcmCtl) if err = hookAgent.Serve(); err != nil { return errors.Errorf("start hook agent error: %s", err) } @@ -186,6 +190,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) terminated = true return case err = <-errChan: + logger.Errorf("Received error from error chan: %s", err) return } }(rootContext.ErrorChan) diff --git a/src/jobservice/worker/cworker/c_worker.go b/src/jobservice/worker/cworker/c_worker.go index ffa8428f1cf..bc58a0222d3 100644 --- a/src/jobservice/worker/cworker/c_worker.go +++ b/src/jobservice/worker/cworker/c_worker.go @@ -54,6 +54,7 @@ type basicWorker struct { context *env.Context scheduler period.Scheduler ctl lcm.Controller + reaper *reaper // key is name of known job // value is the type of known job @@ -92,6 +93,13 @@ func NewWorker(ctx *env.Context, namespace string, workerCount uint, redisPool * ctl: ctl, context: ctx, knownJobs: new(sync.Map), + reaper: &reaper{ + context: ctx.SystemContext, + namespace: namespace, + pool: redisPool, + lcmCtl: ctl, + jobTypes: make([]string, 0), // Append data later (at the start step) + }, } } @@ -121,16 +129,7 @@ func (w *basicWorker) Start() error { } // Start the periodic scheduler - w.context.WG.Add(1) - go func() { - defer func() { - w.context.WG.Done() - }() - // Blocking call - if err := w.scheduler.Start(); err != nil { - w.context.ErrorChan <- err - } - }() + w.scheduler.Start() // Listen to the system signal w.context.WG.Add(1) @@ -139,10 +138,8 @@ func (w *basicWorker) Start() error { w.context.WG.Done() logger.Infof("Basic worker is stopped") }() + <-w.context.SystemContext.Done() - if err := w.scheduler.Stop(); err != nil { - logger.Errorf("stop scheduler error: %s", err) - } w.pool.Stop() }() @@ -151,7 +148,15 @@ func (w *basicWorker) Start() error { w.pool.Middleware((*workerContext).logJob) // Non blocking call w.pool.Start() - logger.Infof("Redis worker is started") + logger.Infof("Basic worker is started") + + // Start the reaper + w.knownJobs.Range(func(k interface{}, v interface{}) bool { + w.reaper.jobTypes = append(w.reaper.jobTypes, k.(string)) + + return true + }) + w.reaper.start() return nil } diff --git a/src/jobservice/worker/cworker/reaper.go b/src/jobservice/worker/cworker/reaper.go new file mode 100644 index 00000000000..dd4e94b236b --- /dev/null +++ b/src/jobservice/worker/cworker/reaper.go @@ -0,0 +1,363 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cworker + +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/goharbor/harbor/src/jobservice/common/rds" + "github.com/goharbor/harbor/src/jobservice/errs" + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/jobservice/lcm" + "github.com/goharbor/harbor/src/jobservice/logger" + "github.com/gomodule/redigo/redis" + "github.com/pkg/errors" +) + +const ( + maxUpdateDuration = 24 * time.Hour + reapLoopInterval = 1 * time.Hour + initialReapLoopInterval = 5 * time.Minute +) + +// reaper is designed to reap the outdated job stats and web hook +type reaper struct { + context context.Context + namespace string + pool *redis.Pool + lcmCtl lcm.Controller + jobTypes []string +} + +// start the reap process +// Non blocking call +func (r *reaper) start() { + // Start the interval job stats sync loop + go func() { + defer logger.Info("Reaper is stopped") + + tm := time.NewTimer(initialReapLoopInterval) + defer tm.Stop() + + logger.Info("Reaper is started") + for { + select { + case <-tm.C: + tm.Reset(reapLoopInterval) + if err := r.syncOutdatedStats(); err != nil { + // Just log + logger.Error(err) + } + case <-r.context.Done(): + return // Terminated + } + } + }() + + // Start re-enqueue in-progress jobs process. + // Only run once at the start point. + go func() { + // Wait for a short while and then start + <-time.After(5 * time.Second) + if err := r.reEnqueueInProgressJobs(); err != nil { + logger.Error(err) + } + }() +} + +// reEnqueueInProgressJobs is an enhancement for reaper process of upstream project. +// Mainly fix the issue of failing to re-enqueue the jobs in the dead worker pool. +// This process only needs to be executed once when worker pool is starting. +func (r *reaper) reEnqueueInProgressJobs() error { + // Debug + logger.Debugf("Start: Reap in-progress jobs from the dead pools") + defer logger.Debugf("End: Reap in-progress jobs") + + currentPools, err := r.getCurrentWorkerPools() + if err != nil { + return errors.Wrap(err, "re-enqueue in progress jobs") + } + + h := func(k string, v int64) (err error) { + if v <= 0 { + // Do nothing + return nil + } + + // If the worker pool is in the current pool list, ignore it as the default reap + // process will cover the re-enqueuing work. + if currentPools[k] { + // Do nothing + return nil + } + + // Re-enqueue jobs + if err := r.requeueInProgressJobs(k, r.jobTypes); err != nil { + return errors.Wrap(err, "in-progress jobs reap handler") + } + + return nil + } + + for _, jt := range r.jobTypes { + lk := rds.KeyJobLockInfo(r.namespace, jt) + if err := r.scanLocks(lk, h); err != nil { + // Log error and continue + logger.Errorf("Re-enqueue in progress jobs error: %v", err) + continue + } + } + + return nil +} + +// syncOutdatedStats ensures the job status is correctly updated and +// the related status change hook events are successfully fired. +func (r *reaper) syncOutdatedStats() error { + // Debug + logger.Debugf("Start: reap outdated job stats") + defer logger.Debugf("End: reap outdated job stats") + + // Loop all the in progress jobs to check if they're really in progress or + // status is hung. + h := func(k string, v int64) (err error) { + defer func() { + if errs.IsObjectNotFoundError(err) { + // As the job stats is lost and we don't have chance to restore it, then directly discard it. + logger.Errorf("Sync outdated stats error: %s", err.Error()) + // Un-track the in-progress record + err = r.unTrackInProgress(k) + } + + if err != nil { + err = errors.Wrap(err, "sync outdated stats handler") + } + }() + + var t job.Tracker + t, err = r.lcmCtl.Track(k) + if err != nil { + return + } + + // Compare and check if the status and the ACKed status are consistent + diff := compare(t.Job().Info) + if diff == 0 { + // Status and ACKed status are consistent + if job.Status(t.Job().Info.Status).Final() { + // Final status + // The inprogress track record is not valid as everything is done and consistent. + // It should not happen. However, if it is really happened, we can fix it here. + if err = r.unTrackInProgress(t.Job().Info.JobID); err != nil { + return + } + } else { + // Ongoing, check the update timestamp to make sure it is not hung + if time.Unix(t.Job().Info.UpdateTime, 0).Add(maxUpdateDuration).Before(time.Now()) { + // Status hung + // Mark job status to error state + if err = t.Fail(); err != nil { + return + } + // Exit + } + // Exit as it is still a valid ongoing job + } + } else if diff > 0 { + // The hook event of current job status is not ACKed + // Resend hook event by set the status again + if err = t.FireHook(); err != nil { + return + } + // Success and exit + } else { + // Current status is outdated, update it with ACKed status. + if err = t.UpdateStatusWithRetry(job.Status(t.Job().Info.HookAck.Status)); err != nil { + return + } + // Success and exit + } + + return nil + } + + if err := r.scanLocks(rds.KeyJobTrackInProgress(r.namespace), h); err != nil { + return errors.Wrap(err, "reaper") + } + + return nil +} + +// scanLocks gets the lock info from the specified key by redis scan way +func (r *reaper) scanLocks(key string, handler func(k string, v int64) error) error { + conn := r.pool.Get() + defer func() { + if err := conn.Close(); err != nil { + logger.Errorf("Failed to close redis connection: %v", err) + } + }() + + var cursor int64 + + for { + reply, err := redis.Values(conn.Do("HSCAN", key, cursor, "MATCH", "*", "COUNT", 100)) + if err != nil { + return errors.Wrap(err, "scan locks") + } + + if len(reply) != 2 { + return errors.New("expect 2 elements returned") + } + + // Get next cursor + cursor, err = strconv.ParseInt(string(reply[0].([]uint8)), 10, 16) + if err != nil { + return errors.Wrap(err, "scan locks") + } + + if values, ok := reply[1].([]interface{}); ok { + for i := 0; i < len(values); i += 2 { + k := string(values[i].([]uint8)) + lc, err := strconv.ParseInt(string(values[i+1].([]uint8)), 10, 16) + if err != nil { + // Ignore and continue + logger.Errorf("Malformed lock object for %s: %v", k, err) + continue + } + // Call handler to handle the data + if err := handler(k, lc); err != nil { + // Log and ignore the error + logger.Errorf("Failed to call reap handler: %v", err) + } + } + } + + // Check if we have reached the end + if cursor == 0 { + return nil + } + } +} + +// unTrackInProgress del the key in the progress track queue +func (r *reaper) unTrackInProgress(jobID string) error { + conn := r.pool.Get() + defer func() { + if err := conn.Close(); err != nil { + logger.Errorf("Failed to close redis connection: %s: %v", "untrack in progress job", err) + } + }() + + _, err := conn.Do("HDEL", rds.KeyJobTrackInProgress(r.namespace), jobID) + if err != nil { + return errors.Wrap(err, "untrack in progress record") + } + + return nil +} + +// getCurrentWorkerPools returns the IDs of the current worker pools +func (r *reaper) getCurrentWorkerPools() (map[string]bool, error) { + conn := r.pool.Get() + defer func() { + if err := conn.Close(); err != nil { + logger.Errorf("Failed to close redis connection: %s : %v", "get current worker pools", err) + } + }() + + // Get the current worker pools + workerPoolIDs, err := redis.Strings(conn.Do("SMEMBERS", rds.KeyWorkerPools(r.namespace))) + if err != nil { + return nil, errors.Wrap(err, "get current workpools") + } + + m := make(map[string]bool) + for _, id := range workerPoolIDs { + m[id] = true + } + + return m, nil +} + +func (r *reaper) requeueInProgressJobs(poolID string, jobTypes []string) error { + numKeys := len(jobTypes) + redisRequeueScript := rds.RedisLuaReenqueueScript(numKeys) + var scriptArgs = make([]interface{}, 0, numKeys+1) + + for _, jobType := range jobTypes { + // pops from in progress, push into job queue and decrement the queue lock + scriptArgs = append( + scriptArgs, + rds.KeyInProgressQueue(r.namespace, jobType, poolID), + rds.KeyJobs(r.namespace, jobType), + rds.KeyJobLock(r.namespace, jobType), + rds.KeyJobLockInfo(r.namespace, jobType), + ) // KEYS[1-4 * N] + } + scriptArgs = append(scriptArgs, poolID) // ARGV[1] + + conn := r.pool.Get() + defer func() { + if err := conn.Close(); err != nil { + logger.Errorf("Failed to close redis connection: %s : %s", "re enqueue in-progress jobs", err) + } + }() + + // Keep moving jobs until all queues are empty + for { + values, err := redis.Values(redisRequeueScript.Do(conn, scriptArgs...)) + if err == redis.ErrNil { + return nil + } else if err != nil { + return err + } + + if len(values) != 3 { + return fmt.Errorf("need 3 elements back") + } + } +} + +// compare the status and the status in the ack +// 0: status == ack.status +// >0: status > ack.status +// <0: status < ack.status +// +// compare based on: +// revision:status_code:check_in +func compare(j *job.StatsInfo) int { + // No ack existing + if j.HookAck == nil { + return 1 + } + + // Compare revision + rev := j.Revision - j.HookAck.Revision + if rev != 0 { + return (int)(rev) + } + + // Revision is same, then compare the status + st := job.Status(j.Status).Compare(job.Status(j.HookAck.Status)) + if st != 0 { + return st + } + + // Revision and status are same, then compare the checkin + return (int)(j.CheckInAt - j.HookAck.CheckInAt) +} diff --git a/src/jobservice/worker/cworker/reaper_test.go b/src/jobservice/worker/cworker/reaper_test.go new file mode 100644 index 00000000000..82890859cf3 --- /dev/null +++ b/src/jobservice/worker/cworker/reaper_test.go @@ -0,0 +1,243 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cworker + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/goharbor/harbor/src/jobservice/common/list" + + "github.com/stretchr/testify/mock" + + "github.com/goharbor/harbor/src/jobservice/job" + + "github.com/goharbor/harbor/src/jobservice/common/rds" + "github.com/goharbor/harbor/src/jobservice/common/utils" + "github.com/goharbor/harbor/src/jobservice/tests" + "github.com/gomodule/redigo/redis" + "github.com/stretchr/testify/suite" +) + +// ReaperTestSuite is used to test reaper +type ReaperTestSuite struct { + suite.Suite + + namespace string + pool *redis.Pool + r *reaper + ctl *mockLcmCtl + jid string +} + +// TestReaper is the entry func of the ReaperTestSuite +func TestReaper(t *testing.T) { + suite.Run(t, &ReaperTestSuite{}) +} + +// SetupSuite prepares the test suite environment +func (suite *ReaperTestSuite) SetupSuite() { + suite.namespace = tests.GiveMeTestNamespace() + suite.pool = tests.GiveMeRedisPool() + + ctx := context.TODO() + suite.ctl = &mockLcmCtl{} + suite.r = &reaper{ + context: ctx, + namespace: suite.namespace, + pool: suite.pool, + jobTypes: []string{job.SampleJob}, + lcmCtl: suite.ctl, + } + + conn := suite.pool.Get() + defer func() { + err := conn.Close() + suite.NoError(err, "close redis connection error") + }() + + // Mock data in the redis DB + cwp := utils.MakeIdentifier() + wpk := fmt.Sprintf("%s%s", rds.KeyNamespacePrefix(suite.namespace), "worker_pools") + _ = conn.Send("SADD", wpk, cwp) + _ = conn.Send("HSET", fmt.Sprintf("%s:%s", wpk, cwp), "heartbeat_at", time.Now().Unix()) + err := conn.Flush() + suite.NoError(err, "mock current pool error") + + // Mock lock info of job DEMO + lk := rds.KeyJobLock(suite.namespace, job.SampleJob) + _, err = conn.Do("INCR", lk) + suite.NoError(err, "set lock data error") + + wp := utils.MakeIdentifier() + lik := rds.KeyJobLockInfo(suite.namespace, job.SampleJob) + _, err = conn.Do("HINCRBY", lik, wp, 1) + suite.NoError(err, "set lock_info data error") + + // Mock in-progress job + ipk := rds.KeyInProgressQueue(suite.namespace, job.SampleJob, wp) + j, err := mockJobData() + suite.NoError(err, "mock job") + + _, err = conn.Do("LPUSH", ipk, j) + suite.NoError(err, "push mock job to queue") + + // Mock job stats + suite.jid = utils.MakeIdentifier() + err = mockJobStats(conn, suite.namespace, suite.jid) + suite.NoError(err, "mock job stats") + + // Mock in-progress job track + tk := rds.KeyJobTrackInProgress(suite.namespace) + _, err = conn.Do("HSET", tk, suite.jid, 1) + suite.NoError(err, "mock in-progress track") +} + +// TearDownSuite clears down the test suite environment +func (suite *ReaperTestSuite) TearDownSuite() { + conn := suite.pool.Get() + defer func() { + err := conn.Close() + suite.NoError(err, "close redis connection error") + }() + + _ = tests.ClearAll(suite.namespace, conn) +} + +func (suite *ReaperTestSuite) TestRequeueInProgressJobs() { + err := suite.r.reEnqueueInProgressJobs() + suite.NoError(err, "requeue in-progress jobs") + + conn := suite.pool.Get() + defer func() { + err := conn.Close() + suite.NoError(err, "close redis connection error") + }() + + v, err := redis.Int(conn.Do("GET", rds.KeyJobLock(suite.namespace, job.SampleJob))) + suite.NoError(err, "get job lock info") + suite.Equal(0, v, "lock should be 0") +} + +func (suite *ReaperTestSuite) TestSyncOutdatedStats() { + // Use real track to test + mt := job.NewBasicTrackerWithID( + context.TODO(), + suite.jid, + suite.namespace, + suite.pool, + func(hookURL string, change *job.StatusChange) error { + return nil + }, + list.New()) + err := mt.Load() + suite.NoError(err, "track job stats") + suite.ctl.On("Track", suite.jid).Return(mt, nil) + + err = suite.r.syncOutdatedStats() + suite.NoError(err, "sync outdated stats") + + // Check result + conn := suite.pool.Get() + defer func() { + err := conn.Close() + suite.NoError(err, "close redis connection error") + }() + + status, err := redis.String(conn.Do("HGET", rds.KeyJobStats(suite.namespace, suite.jid), "status")) + suite.NoError(err, "get status") + suite.Equal(job.SuccessStatus.String(), status, "check status") +} + +func mockJobData() (string, error) { + j := make(map[string]interface{}) + j["name"] = job.SampleJob + j["id"] = utils.MakeIdentifier() + j["t"] = time.Now().Unix() + args := make(map[string]interface{}) + j["args"] = args + args["image"] = "test suite" + + b, err := json.Marshal(&j) + if err != nil { + return "", nil + } + + return string(b), nil +} + +func mockJobStats(conn redis.Conn, ns string, jid string) error { + rev := time.Now().Unix() + sk := rds.KeyJobStats(ns, jid) + + ack := &job.ACK{ + Revision: rev, + Status: job.SuccessStatus.String(), + } + + b, err := json.Marshal(ack) + if err != nil { + return err + } + + args := []interface{}{ + sk, + "id", jid, + "status", job.RunningStatus.String(), + "name", job.SampleJob, + "kind", job.KindGeneric, + "unique", 0, + "ref_link", fmt.Sprintf("/api/v1/jobs/%s", jid), + "enqueue_time", time.Now().Unix(), + "update_time", time.Now().Unix(), + "revision", rev, + "ack", string(b), + } + + _, err = conn.Do("HMSET", args...) + + return err +} + +type mockLcmCtl struct { + mock.Mock +} + +func (m *mockLcmCtl) Serve() error { + return nil +} + +// New tracker from the new provided stats +func (m *mockLcmCtl) New(stats *job.Stats) (job.Tracker, error) { + args := m.Called(stats) + if args.Get(0) != nil { + return args.Get(0).(job.Tracker), nil + } + + return nil, args.Error(1) +} + +// Track the life cycle of the specified existing job +func (m *mockLcmCtl) Track(jobID string) (job.Tracker, error) { + args := m.Called(jobID) + if args.Get(0) != nil { + return args.Get(0).(job.Tracker), nil + } + + return nil, args.Error(1) +}