Skip to content

Commit

Permalink
fix[jobservice]:job status is hung after restart
Browse files Browse the repository at this point in the history
- improve the status hook sending/resending approach
- improve the status compare and set approach
- simplify the relevant flow
- add reaper to fix the out of sync jobs
- fix goharbor#10244 , fix goharbor#9963

Signed-off-by: Steven Zou <szou@vmware.com>
  • Loading branch information
steven-zou committed Jan 8, 2020
1 parent 6d80803 commit 1de11f2
Show file tree
Hide file tree
Showing 27 changed files with 1,562 additions and 902 deletions.
64 changes: 64 additions & 0 deletions src/jobservice/common/list/list.go
@@ -0,0 +1,64 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package list

import (
"container/list"
"sync"
)

// SyncList is a sync list based on the container/list
type SyncList struct {
// For synchronization
lock *sync.RWMutex
// Use interface slice as the backend data struct
l *list.List
}

// New a sync list
func New() *SyncList {
return &SyncList{
lock: &sync.RWMutex{},
l: list.New(),
}
}

// Iterate the list
func (l *SyncList) Iterate(f func(ele interface{}) bool) {
l.lock.RLock()
defer l.lock.RUnlock()

// Get the front pointer
for e := l.l.Front(); e != nil; {
// Keep the next one
next := e.Next()

if f(e.Value) {
l.l.Remove(e)
}

e = next
}
}

// Push the element to the back of the list
func (l *SyncList) Push(ele interface{}) {
if ele != nil {
l.lock.Lock()
defer l.lock.Unlock()

l.l.PushBack(ele)
}
}
57 changes: 57 additions & 0 deletions src/jobservice/common/list/list_test.go
@@ -0,0 +1,57 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package list

import (
"strings"
"testing"

"github.com/stretchr/testify/suite"
)

type ListSuite struct {
suite.Suite

l *SyncList
}

func TestListSuite(t *testing.T) {
suite.Run(t, &ListSuite{})
}

func (suite *ListSuite) SetupSuite() {
suite.l = New()

suite.l.Push("a0")
suite.l.Push("a1")
suite.l.Push("b0")
suite.l.Push("a2")

suite.Equal(4, suite.l.l.Len())
}

func (suite *ListSuite) TestIterate() {
suite.l.Iterate(func(ele interface{}) bool {
if s, ok := ele.(string); ok {
if strings.HasPrefix(s, "b") {
return true
}
}

return false
})

suite.Equal(3, suite.l.l.Len())
}
30 changes: 30 additions & 0 deletions src/jobservice/common/rds/keys.go
Expand Up @@ -86,3 +86,33 @@ func KeyHookEventRetryQueue(namespace string) string {
func KeyStatusUpdateRetryQueue(namespace string) string {
return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "status_change_events")
}

// KeyJobTrackInProgress returns the key of in progress jobs tracking queue
func KeyJobTrackInProgress(namespace string) string {
return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "job_track:inprogress")
}

// KeyJobs returns the key of the specified job queue
func KeyJobs(namespace, jobType string) string {
return fmt.Sprintf("%sjobs:%s", KeyNamespacePrefix(namespace), jobType)
}

// KeyJobLock returns the key of lock for the specified job type.
func KeyJobLock(namespace string, jobType string) string {
return fmt.Sprintf("%s:lock", KeyJobs(namespace, jobType))
}

// KeyJobLockInfo returns the key of lock_info for the specified job type.
func KeyJobLockInfo(namespace string, jobType string) string {
return fmt.Sprintf("%s:lock_info", KeyJobs(namespace, jobType))
}

// KeyInProgressQueue returns the key of the in progress queue for the specified job type.
func KeyInProgressQueue(namespace string, jobType string, workerPoolID string) string {
return fmt.Sprintf("%s:%s:inprogress", KeyJobs(namespace, jobType), workerPoolID)
}

// KeyWorkerPools returns the key of the worker pool
func KeyWorkerPools(namespace string) string {
return KeyNamespacePrefix(namespace) + "worker_pools"
}

0 comments on commit 1de11f2

Please sign in to comment.