Skip to content

Commit

Permalink
updater: refactor to use errgroup
Browse files Browse the repository at this point in the history
This addresses a race condition and makes this code much more
understandable.
  • Loading branch information
jzelinskie committed Jan 10, 2019
1 parent 399deab commit 6c5be7e
Showing 1 changed file with 131 additions and 23 deletions.
154 changes: 131 additions & 23 deletions updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,22 @@
package clair

import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"time"

"github.com/pborman/uuid"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"

"github.com/coreos/clair/database"
"github.com/coreos/clair/ext/vulnmdsrc"
"github.com/coreos/clair/ext/vulnsrc"
"github.com/coreos/clair/pkg/stopper"
"github.com/coreos/clair/pkg/timeutil"
"github.com/pborman/uuid"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -76,9 +78,8 @@ type vulnerabilityChange struct {
new *database.VulnerabilityWithAffected
}

// RunUpdater begins a process that updates the vulnerability database at
// regular intervals.
func RunUpdater(config *UpdaterConfig, datastore database.Datastore, st *stopper.Stopper) {
/*
func runUpdaterOld(config *UpdaterConfig, datastore database.Datastore, st *stopper.Stopper) {
defer st.End()
// Do not run the updater if there is no config or if the interval is 0.
Expand Down Expand Up @@ -168,23 +169,129 @@ func RunUpdater(config *UpdaterConfig, datastore database.Datastore, st *stopper
log.Info("updater service stopped")
}
*/

// sleepUpdater sleeps the updater for an approximate duration, but remains
// able to be cancelled by a stopper.
func sleepUpdater(approxWakeup time.Time, st *stopper.Stopper) (stopped bool) {
waitUntil := approxWakeup.Add(time.Duration(rand.ExpFloat64()/0.5) * time.Second)
log.WithField("scheduled time", waitUntil).Debug("updater sleeping")
if !waitUntil.Before(time.Now().UTC()) {
if !st.Sleep(waitUntil.Sub(time.Now())) {
return true
// RunUpdater begins a process that updates the vulnerability database at
// regular intervals.
func RunUpdater(config *UpdaterConfig, datastore database.Datastore, st *stopper.Stopper) {
defer st.End()

// Do not run the updater if there is no config or if the interval is 0.
if config == nil || config.Interval == 0 || len(config.EnabledUpdaters) == 0 {
log.Info("updater service is disabled.")
return
}

// Clean up any resources the updater left behind.
defer func() {
for _, appenders := range vulnmdsrc.Appenders() {
appenders.Clean()
}
for _, updaters := range vulnsrc.Updaters() {
updaters.Clean()
}

log.Info("updater service stopped")
}()

// Create a new unique identity for tracking who owns global locks.
whoAmI := uuid.New()
log.WithField("owner", whoAmI).Info("updater service started")

sleepDuration := updaterSleepBetweenLoopsDuration
for {
// Determine if this is the first update and define the next update time.
// The next update time is (last update time + interval) or now if this is the first update.
nextUpdate := time.Now().UTC()
lastUpdate, isFirstUpdate, err := GetLastUpdateTime(datastore)
if err != nil {
log.WithError(err).Error("an error occurred while getting the last update time")
nextUpdate = nextUpdate.Add(config.Interval)
}

log.WithFields(log.Fields{
"firstUpdate": isFirstUpdate,
"nextUpdate": nextUpdate,
}).Debug("fetched last update time")
if !isFirstUpdate {
nextUpdate = lastUpdate.Add(config.Interval)
}

// If the next update timer is in the past, then try to update.
if nextUpdate.Before(time.Now().UTC()) {
// Attempt to get a lock on the the update.
log.Debug("attempting to obtain update lock")
acquiredLock, lockExpiration := database.AcquireLock(datastore, updaterLockName, whoAmI, updaterLockDuration, false)
if lockExpiration.IsZero() {
// Any failures to acquire the lock should instantly expire.
var instantExpiration time.Duration
sleepDuration = instantExpiration
}

if acquiredLock {
sleepDuration, err = updateWhileRenewingLock(datastore, whoAmI, isFirstUpdate, st)
if err != nil {
if err == errReceivedStopSignal {
return
}
log.WithError(err).Debug("failed to acquired lock")
sleepDuration = timeutil.ExpBackoff(sleepDuration, config.Interval)
}
} else {
sleepDuration = updaterSleepBetweenLoopsDuration
}
} else {
sleepDuration = time.Until(nextUpdate)
}

if stopped := timeutil.ApproxSleep(time.Now().Add(sleepDuration), st); stopped {
return
}
}
return false
}

var errReceivedStopSignal = errors.New("stopped")

func updateWhileRenewingLock(datastore database.Datastore, whoAmI string, isFirstUpdate bool, st *stopper.Stopper) (sleepDuration time.Duration, err error) {
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error {
// todo handle ctx
return update(ctx, datastore, isFirstUpdate)
})

g.Go(func() error {
var refreshDuration = updaterLockRefreshDuration
for {
select {
case <-time.After(timeutil.FractionalDuration(0.9, refreshDuration)):
success, lockExpiration := database.AcquireLock(datastore, updaterLockName, whoAmI, updaterLockRefreshDuration, true)
if !success {
return errors.New("failed to extend lock")
}
refreshDuration = time.Until(lockExpiration)
case <-ctx.Done():
database.ReleaseLock(datastore, updaterLockName, whoAmI)
return nil
}
}
})

g.Go(func() error {
select {
case <-st.Chan():
return errReceivedStopSignal
case <-ctx.Done():
return nil
}
})

err = g.Wait()
return
}

// update fetches all the vulnerabilities from the registered fetchers, updates
// vulnerabilities, and updater flags, and logs notes from updaters.
func update(datastore database.Datastore, firstUpdate bool) {
func update(ctx context.Context, datastore database.Datastore, firstUpdate bool) error {
defer setUpdaterDuration(time.Now())

log.Info("updating vulnerabilities")
Expand All @@ -209,7 +316,7 @@ func update(datastore database.Datastore, firstUpdate bool) {

if err := database.PersistNamespacesAndCommit(datastore, namespaces); err != nil {
log.WithError(err).Error("Unable to insert namespaces")
return
return err
}

changes, err := updateVulnerabilities(datastore, vulnerabilities)
Expand All @@ -222,21 +329,21 @@ func update(datastore database.Datastore, firstUpdate bool) {

if err != nil {
log.WithError(err).Error("Unable to update vulnerabilities")
return
return err
}

if !firstUpdate {
err = createVulnerabilityNotifications(datastore, changes)
if err != nil {
log.WithError(err).Error("Unable to create notifications")
return
return err
}
}

err = updateUpdaterFlags(datastore, flags)
if err != nil {
log.WithError(err).Error("Unable to update updater flags")
return
return err
}

for _, note := range notes {
Expand All @@ -248,11 +355,12 @@ func update(datastore database.Datastore, firstUpdate bool) {
err = setLastUpdateTime(datastore)
if err != nil {
log.WithError(err).Error("Unable to set last update time")
return
return err
}
}

log.Info("update finished")
return nil
}

func setUpdaterDuration(start time.Time) {
Expand Down

0 comments on commit 6c5be7e

Please sign in to comment.