Skip to content

Commit

Permalink
Merge pull request #685 from jzelinskie/updater-cleanup
Browse files Browse the repository at this point in the history
updater: remove FindLock(), use errgroup to avoid races
  • Loading branch information
jzelinskie committed Feb 14, 2019
2 parents aa86829 + dd91597 commit cafe097
Show file tree
Hide file tree
Showing 76 changed files with 3,584 additions and 539 deletions.
17 changes: 5 additions & 12 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,23 +183,16 @@ type Session interface {
// FindKeyValue retrieves a value from the given key.
FindKeyValue(key string) (value string, found bool, err error)

// Lock creates or renew a Lock in the database with the given name, owner
// and duration.
// Lock acquires or renews a lock in the database with the given name, owner
// and duration without blocking. After the specified duration, the lock
// expires if it hasn't already been unlocked in order to prevent a deadlock.
//
// After the specified duration, the Lock expires by itself if it hasn't been
// unlocked, and thus, let other users create a Lock with the same name.
// However, the owner can renew its Lock by setting renew to true.
// Lock should not block, it should instead returns whether the Lock has been
// successfully acquired/renewed. If it's the case, the expiration time of
// that Lock is returned as well.
// If the acquisition of a lock is not successful, expiration should be
// the time that existing lock expires.
Lock(name string, owner string, duration time.Duration, renew bool) (success bool, expiration time.Time, err error)

// Unlock releases an existing Lock.
Unlock(name, owner string) error

// FindLock returns the owner of a Lock specified by the name, and its
// expiration time if it exists.
FindLock(name string) (owner string, expiration time.Time, found bool, err error)
}

// Datastore represents a persistent data store
Expand Down
46 changes: 46 additions & 0 deletions database/dbutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package database

import (
"time"

"github.com/deckarep/golang-set"
)

Expand Down Expand Up @@ -304,3 +306,47 @@ func MergeLayers(l *Layer, new *Layer) *Layer {

return l
}

// AcquireLock acquires a named global lock for a duration.
//
// If renewal is true, the lock is extended as long as the same owner is
// attempting to renew the lock.
func AcquireLock(datastore Datastore, name, owner string, duration time.Duration, renewal bool) (success bool, expiration time.Time) {
// any error will cause the function to catch the error and return false.
tx, err := datastore.Begin()
if err != nil {
return false, time.Time{}
}

defer tx.Rollback()

locked, t, err := tx.Lock(name, owner, duration, renewal)
if err != nil {
return false, time.Time{}
}

if locked {
if err := tx.Commit(); err != nil {
return false, time.Time{}
}
}

return locked, t
}

// ReleaseLock releases a named global lock.
func ReleaseLock(datastore Datastore, name, owner string) {
tx, err := datastore.Begin()
if err != nil {
return
}

defer tx.Rollback()

if err := tx.Unlock(name, owner); err != nil {
return
}
if err := tx.Commit(); err != nil {
return
}
}
8 changes: 0 additions & 8 deletions database/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type MockSession struct {
FctFindKeyValue func(key string) (string, bool, error)
FctLock func(name string, owner string, duration time.Duration, renew bool) (bool, time.Time, error)
FctUnlock func(name, owner string) error
FctFindLock func(name string) (string, time.Time, bool, error)
}

func (ms *MockSession) Commit() error {
Expand Down Expand Up @@ -220,13 +219,6 @@ func (ms *MockSession) Unlock(name, owner string) error {
panic("required mock function not implemented")
}

func (ms *MockSession) FindLock(name string) (string, time.Time, bool, error) {
if ms.FctFindLock != nil {
return ms.FctFindLock(name)
}
panic("required mock function not implemented")
}

// MockDatastore implements Datastore and enables overriding each available method.
// The default behavior of each method is to simply panic.
type MockDatastore struct {
Expand Down
25 changes: 4 additions & 21 deletions database/pgsql/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

const (
soiLock = `INSERT INTO lock(name, owner, until) VALUES ($1, $2, $3)`
searchLock = `SELECT owner, until FROM Lock WHERE name = $1`
searchLock = `SELECT until FROM Lock WHERE name = $1`
updateLock = `UPDATE Lock SET until = $3 WHERE name = $1 AND owner = $2`
removeLock = `DELETE FROM Lock WHERE name = $1 AND owner = $2`
removeLockExpired = `DELETE FROM LOCK WHERE until < CURRENT_TIMESTAMP`
Expand Down Expand Up @@ -67,7 +67,9 @@ func (tx *pgSession) Lock(name string, owner string, duration time.Duration, ren
_, err := tx.Exec(soiLock, name, owner, until)
if err != nil {
if isErrUniqueViolation(err) {
return false, until, nil
// Return the existing locks expiration.
err := tx.QueryRow(searchLock, name).Scan(&until)
return false, until, handleError("searchLock", err)
}
return false, until, handleError("insertLock", err)
}
Expand All @@ -86,25 +88,6 @@ func (tx *pgSession) Unlock(name, owner string) error {
return err
}

// FindLock returns the owner of a lock specified by its name and its
// expiration time.
func (tx *pgSession) FindLock(name string) (string, time.Time, bool, error) {
if name == "" {
return "", time.Time{}, false, commonerr.NewBadRequestError("could not find an invalid lock")
}

defer observeQueryTime("FindLock", "all", time.Now())

var owner string
var until time.Time
err := tx.QueryRow(searchLock, name).Scan(&owner, &until)
if err != nil {
return owner, until, false, handleError("searchLock", err)
}

return owner, until, true, nil
}

// pruneLocks removes every expired locks from the database
func (tx *pgSession) pruneLocks() error {
defer observeQueryTime("pruneLocks", "all", time.Now())
Expand Down
11 changes: 1 addition & 10 deletions database/pgsql/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ func TestLock(t *testing.T) {
defer datastore.Close()

var l bool
var et time.Time

// Create a first lock.
l, _, err := tx.Lock("test1", "owner1", time.Minute, false)
Expand Down Expand Up @@ -62,19 +61,11 @@ func TestLock(t *testing.T) {
assert.Nil(t, err)
tx = restartSession(t, datastore, tx, true)

l, et, err = tx.Lock("test1", "owner2", time.Minute, false)
l, _, err = tx.Lock("test1", "owner2", time.Minute, false)
assert.Nil(t, err)
assert.True(t, l)
tx = restartSession(t, datastore, tx, true)

// LockInfo
o, et2, ok, err := tx.FindLock("test1")
assert.True(t, ok)
assert.Nil(t, err)
assert.Equal(t, "owner2", o)
assert.Equal(t, et.Second(), et2.Second())
tx = restartSession(t, datastore, tx, true)

// Create a second lock which is actually already expired ...
l, _, err = tx.Lock("test2", "owner1", -time.Minute, false)
assert.Nil(t, err)
Expand Down
8 changes: 8 additions & 0 deletions ext/vulnmdsrc/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,11 @@ func Appenders() map[string]Appender {

return ret
}

// CleanAll is a utility function that calls Clean() on every registered
// Appender.
func CleanAll() {
for _, appender := range Appenders() {
appender.Clean()
}
}
8 changes: 8 additions & 0 deletions ext/vulnsrc/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,11 @@ func ListUpdaters() []string {
}
return r
}

// CleanAll is a utility function that calls Clean() on every registered
// Updater.
func CleanAll() {
for _, updater := range Updaters() {
updater.Clean()
}
}
16 changes: 10 additions & 6 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ import:
version: ^1.7.1
- package: gopkg.in/yaml.v2
version: ^2.2.1
- package: golang.org/x/sync/errgroup
47 changes: 3 additions & 44 deletions notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func RunNotifier(config *notification.Config, datastore database.Datastore, stop
if interrupted {
running = false
}
unlock(datastore, notification.Name, whoAmI)
database.ReleaseLock(datastore, notification.Name, whoAmI)
done <- true
}()

Expand All @@ -113,7 +113,7 @@ func RunNotifier(config *notification.Config, datastore database.Datastore, stop
case <-done:
break outer
case <-time.After(notifierLockRefreshDuration):
lock(datastore, notification.Name, whoAmI, notifierLockDuration, true)
database.AcquireLock(datastore, notification.Name, whoAmI, notifierLockDuration, true)
case <-stopper.Chan():
running = false
break
Expand Down Expand Up @@ -141,7 +141,7 @@ func findTask(datastore database.Datastore, renotifyInterval time.Duration, whoA
}

// Lock the notification.
if hasLock, _ := lock(datastore, notification.Name, whoAmI, notifierLockDuration, false); hasLock {
if hasLock, _ := database.AcquireLock(datastore, notification.Name, whoAmI, notifierLockDuration, false); hasLock {
log.WithField(logNotiName, notification.Name).Info("found and locked a notification")
return &notification
}
Expand Down Expand Up @@ -208,44 +208,3 @@ func markNotificationAsRead(datastore database.Datastore, name string) error {
}
return tx.Commit()
}

// unlock removes a lock with provided name, owner. Internally, it handles
// database transaction and catches error.
func unlock(datastore database.Datastore, name, owner string) {
tx, err := datastore.Begin()
if err != nil {
return
}

defer tx.Rollback()

if err := tx.Unlock(name, owner); err != nil {
return
}
if err := tx.Commit(); err != nil {
return
}
}

func lock(datastore database.Datastore, name string, owner string, duration time.Duration, renew bool) (bool, time.Time) {
// any error will cause the function to catch the error and return false.
tx, err := datastore.Begin()
if err != nil {
return false, time.Time{}
}

defer tx.Rollback()

locked, t, err := tx.Lock(name, owner, duration, renew)
if err != nil {
return false, time.Time{}
}

if locked {
if err := tx.Commit(); err != nil {
return false, time.Time{}
}
}

return locked, t
}
60 changes: 60 additions & 0 deletions pkg/timeutil/timeutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2018 clair authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package timeutil implements extra utilities dealing with time not found
// in the standard library.
package timeutil

import (
"math"
"math/rand"
"time"

log "github.com/sirupsen/logrus"

"github.com/coreos/clair/pkg/stopper"
)

// ApproxSleep is a stoppable time.Sleep that adds a slight random variation to
// the wakeup time in order to prevent thundering herds.
func ApproxSleep(approxWakeup time.Time, st *stopper.Stopper) (stopped bool) {
waitUntil := approxWakeup.Add(time.Duration(rand.ExpFloat64()/0.5) * time.Second)
log.WithField("wakeup", waitUntil).Debug("updater sleeping")
now := time.Now().UTC()
if !waitUntil.Before(now) {
if !st.Sleep(waitUntil.Sub(now)) {
return true
}
}
return false
}

// ExpBackoff doubles the backoff time, if the result is longer than the
// parameter max, max will be returned.
func ExpBackoff(prev, max time.Duration) time.Duration {
t := 2 * prev
if t > max {
t = max
}
if t == 0 {
return time.Second
}
return t
}

// FractionalDuration calculates the fraction of a Duration rounding half way
// from zero.
func FractionalDuration(fraction float64, d time.Duration) time.Duration {
return time.Duration(math.Round(float64(d) * fraction))
}
Loading

0 comments on commit cafe097

Please sign in to comment.