Skip to content

Commit

Permalink
Merge pull request #7957 from axw/global-clock-leases
Browse files Browse the repository at this point in the history
Introduce and use global clock in lease manager

## Description of change

This PR introduces a global clock document to the database. Each controller machine agent runs a worker that will periodically (1s) attempt to advance the global time by the same (period) amount. Concurrent updates will be prevented, and unsuccessful workers will back off for a short (30s) delay. We guarantee that the global time is monotonically increasing, and increases at a rate no faster than wall clock time.

We pass a global clock reader into the state/lease client, so that it can compute expiry times based on the global time. Lease documents store the start time as a global time, and the request duration; we then make the expiry time relative to the local time for use by clients. The local time is expected to contain a monotonic component (i.e. Go 1.9+), so it can be compared to time.Now().

We refresh the global clock time when a claim or extension is made, to guarantee that the lease is held for at least the specified duration. When a lease is to be expired, we first attempt to use the most recent global clock time, and then refresh if the lease expires after the cached time, only failing if the lease expires after the refreshed time.

## QA steps

1. juju bootstrap localhost
2. juju enable-ha
3. juju deploy ubuntu -n 3

Identify the lease holders for "singular-controller" and "ubuntu", and stop those agents (controller machine agent and unit agent respectively). Another agent should eventually claim the leases.

Also:

1. juju bootstrap localhost (with juju 2.2.5)
2. juju deploy postgresql -n 3
(wait for postgresql units to become idle, take note of the leader)
3. juju enable-ha
(wait for additional controllers to be voting)
4. juju upgrade-juju -m controller (with this branch)

Verify that:
 - there are no old docs in the leases collection
 - the leases are kept by the previous holders across upgrade

## Documentation changes

None.

## Bug reference

Fixes https://bugs.launchpad.net/juju/+bug/1706340
  • Loading branch information
jujubot committed Oct 26, 2017
2 parents 2e50e5c + 54d403c commit a5b1704
Show file tree
Hide file tree
Showing 44 changed files with 1,722 additions and 791 deletions.
1 change: 1 addition & 0 deletions cmd/jujud/agent/engine_test.go
Expand Up @@ -105,6 +105,7 @@ var (
"agent",
"api-caller",
"api-config-watcher",
"clock",
"migration-fortress",
"migration-inactive-flag",
"migration-minion",
Expand Down
11 changes: 11 additions & 0 deletions cmd/jujud/agent/machine.go
Expand Up @@ -1925,6 +1925,17 @@ type MongoSessioner interface {
MongoSession() *mgo.Session
}

// TODO(axw) 2017-10-24 #1726680
//
// We are still using MongoDB mastership to ensure that we
// run a single txnlogpruner worker, and a single dblogpruner
// worker. We should update worker/singular and API facade to
// support claiming for the entire controller, rather rather
// than a specific model, and use that to run controller-wide
// singular workers.
//
// When we move over to worker/singular, remove the Mongo bits
// from worker/singular that shouldn't be there anyway.
func newSingularStateRunner(runner *worker.Runner, st MongoSessioner, m *state.Machine) (jworker.Runner, error) {
singularStateConn := singularStateConn{st.MongoSession(), m}
singularRunner, err := newSingularRunner(runner, singularStateConn)
Expand Down
37 changes: 37 additions & 0 deletions cmd/jujud/agent/machine/manifolds.go
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/juju/juju/worker/fanconfigurer"
"github.com/juju/juju/worker/fortress"
"github.com/juju/juju/worker/gate"
"github.com/juju/juju/worker/globalclockupdater"
"github.com/juju/juju/worker/hostkeyreporter"
"github.com/juju/juju/worker/identityfilewriter"
"github.com/juju/juju/worker/logger"
Expand All @@ -61,6 +62,16 @@ import (
"github.com/juju/juju/worker/upgradesteps"
)

const (
// globalClockUpdaterUpdateInterval is the interval between
// global clock updates.
globalClockUpdaterUpdateInterval = 1 * time.Second

// globalClockUpdaterBackoffDelay is the amount of time to
// delay when a concurrent global clock update is detected.
globalClockUpdaterBackoffDelay = 10 * time.Second
)

// ManifoldsConfig allows specialisation of the result of Manifolds.
type ManifoldsConfig struct {

Expand Down Expand Up @@ -212,6 +223,8 @@ func Manifolds(config ManifoldsConfig) dependency.Manifolds {
// with this code.
terminationName: terminationworker.Manifold(),

clockName: clockManifold(config.Clock),

// The stateconfigwatcher manifold watches the machine agent's
// configuration and reports if state serving info is
// present. It will bounce itself if state serving info is
Expand Down Expand Up @@ -387,6 +400,19 @@ func Manifolds(config ManifoldsConfig) dependency.Manifolds {
NewWorker: migrationminion.NewWorker,
}),

// We run a global clock updater for every controller machine.
//
// The global clock updater is responsible for detecting and
// preventing concurrent updates, to ensure global time is
// monotonic and increases at a rate no faster than real time.
globalClockUpdaterName: globalclockupdater.Manifold(globalclockupdater.ManifoldConfig{
ClockName: clockName,
StateName: stateName,
NewWorker: globalclockupdater.NewWorker,
UpdateInterval: globalClockUpdaterUpdateInterval,
BackoffDelay: globalClockUpdaterBackoffDelay,
}),

// The serving-info-setter manifold sets grabs the state
// serving info from the API connection and writes it to the
// agent config.
Expand Down Expand Up @@ -546,6 +572,15 @@ func Manifolds(config ManifoldsConfig) dependency.Manifolds {
}
}

func clockManifold(clock clock.Clock) dependency.Manifold {
return dependency.Manifold{
Start: func(_ dependency.Context) (worker.Worker, error) {
return engine.NewValueWorker(clock)
},
Output: engine.ValueWorkerOutput,
}
}

var ifFullyUpgraded = engine.Housing{
Flags: []string{
upgradeStepsFlagName,
Expand All @@ -571,6 +606,7 @@ const (
apiConfigWatcherName = "api-config-watcher"
centralHubName = "central-hub"
pubSubName = "pubsub-forwarder"
clockName = "clock"

upgraderName = "upgrader"
upgradeStepsName = "upgrade-steps-runner"
Expand Down Expand Up @@ -602,4 +638,5 @@ const (
hostKeyReporterName = "host-key-reporter"
fanConfigurerName = "fan-configurer"
externalControllerUpdaterName = "external-controller-updater"
globalClockUpdaterName = "global-clock-updater"
)
4 changes: 4 additions & 0 deletions cmd/jujud/agent/machine/manifolds_test.go
Expand Up @@ -47,9 +47,11 @@ func (*ManifoldsSuite) TestManifoldNames(c *gc.C) {
"api-caller",
"api-config-watcher",
"central-hub",
"clock",
"disk-manager",
"external-controller-updater",
"fan-configurer",
"global-clock-updater",
"host-key-reporter",
"log-sender",
"logging-config-updater",
Expand Down Expand Up @@ -98,6 +100,8 @@ func (*ManifoldsSuite) TestMigrationGuardsUsed(c *gc.C) {
"api-caller",
"api-config-watcher",
"central-hub",
"clock",
"global-clock-updater",
"log-forwarder",
"pubsub-forwarder",
"state",
Expand Down
34 changes: 34 additions & 0 deletions core/globalclock/interface.go
@@ -0,0 +1,34 @@
// Copyright 2017 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.

package globalclock

import (
"time"

"github.com/juju/errors"
)

var (
// ErrConcurrentUpdate is returned by Updater.Advance when the
// clock value has been changed since the last read.
ErrConcurrentUpdate = errors.New("clock was updated concurrently, retry")
)

// Updater provides a means of updating the global clock time.
type Updater interface {
// Advance adds the given duration to the global clock, ensuring
// that the clock has not been updated concurrently.
//
// Advance will return ErrConcurrentUpdate if another updater
// updates the clock concurrently. In this case, the updater
// will refresh its view of the clock, and the caller can
// attempt Advance later.
//
// If Advance returns any error other than ErrConcurrentUpdate,
// the Updater should be considered invalid, and the caller
// should obtain a new Updater. Failing to do so could lead
// to non-monotonic time, since there is no way of knowing in
// general whether or not the database was updated.
Advance(d time.Duration) error
}
5 changes: 5 additions & 0 deletions state/allcollections.go
Expand Up @@ -49,6 +49,10 @@ func allCollections() collectionSchema {
// Infrastructure collections
// ==========================

globalClockC: {
global: true,
rawAccess: true,
},
txnsC: {
// This collection is used exclusively by mgo/txn to record transactions.
global: true,
Expand Down Expand Up @@ -487,6 +491,7 @@ const (
controllerUsersC = "controllerusers"
filesystemAttachmentsC = "filesystemAttachments"
filesystemsC = "filesystems"
globalClockC = "globalclock"
globalSettingsC = "globalSettings"
guimetadataC = "guimetadata"
guisettingsC = "guisettings"
Expand Down
9 changes: 8 additions & 1 deletion state/dump_test.go
Expand Up @@ -4,6 +4,8 @@
package state_test

import (
"time"

jc "github.com/juju/testing/checkers"
"github.com/juju/utils/set"
gc "gopkg.in/check.v1"
Expand All @@ -22,9 +24,14 @@ func (s *dumpSuite) TestDumpAll(c *gc.C) {
// collections, so make sure they've started before running
// the dump.
state.EnsureWorkersStarted(s.State)
value, err := s.State.DumpAll()

// Make a leadership claim. No leases until we do.
claimer := s.State.LeadershipClaimer()
err := claimer.ClaimLeadership("app", "app/0", time.Minute)
c.Assert(err, jc.ErrorIsNil)

value, err := s.State.DumpAll()
c.Assert(err, jc.ErrorIsNil)
models, ok := value["models"].(map[string]interface{})
c.Assert(ok, jc.IsTrue)
c.Assert(models["name"], gc.Equals, "testenv")
Expand Down
51 changes: 51 additions & 0 deletions state/globalclock/config.go
@@ -0,0 +1,51 @@
// Copyright 2017 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.

package globalclock

import (
"github.com/juju/errors"

"github.com/juju/juju/mongo"
)

// UpdaterConfig contains the resources and information required to
// create an Updater.
type UpdaterConfig struct {
Config
}

// ReaderConfig contains the resources and information required to
// create a Reader.
type ReaderConfig struct {
Config
}

// Config contains the common resources and information required to
// create an Updater or Reader.
type Config struct {
// Collection names the MongoDB collection in which the clock
// documents are stored.
Collection string

// Mongo exposes the mgo capabilities required by a Client
// for updating and reading the clock.
Mongo Mongo
}

// Mongo exposes MongoDB operations for use by the globalclock package.
type Mongo interface {
// GetCollection should probably call the mongo.CollectionFromName func
GetCollection(name string) (collection mongo.Collection, closer func())
}

// validate returns an error if the supplied config is not valid.
func (config Config) validate() error {
if config.Collection == "" {
return errors.New("missing collection")
}
if config.Mongo == nil {
return errors.New("missing mongo client")
}
return nil
}
23 changes: 23 additions & 0 deletions state/globalclock/doc.go
@@ -0,0 +1,23 @@
// Copyright 2017 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.

/*
Package globalclock provides clients for updating and reading the
global virtual time, stored in the MongoDB database.
Multiple global clock updaters may run concurrently, but concurrent
updates will fail. This simplifies failover in a multi-node controller,
while preserving the invariant that a global clock second is at least
as long as a wall-clock second.
Schema design
-------------
We maintain a single collection, with a single document containing
the current global time. Whenever time is to be advanced, we update
the document while ensuring that the global time has not advanced by
any other updater.
*/
package globalclock
14 changes: 14 additions & 0 deletions state/globalclock/package_test.go
@@ -0,0 +1,14 @@
// Copyright 2017 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.

package globalclock_test

import (
"testing"

coretesting "github.com/juju/juju/testing"
)

func TestPackage(t *testing.T) {
coretesting.MgoTestPackage(t)
}
45 changes: 45 additions & 0 deletions state/globalclock/reader.go
@@ -0,0 +1,45 @@
// Copyright 2017 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.

package globalclock

import (
"time"

"github.com/juju/errors"
"gopkg.in/mgo.v2"
)

// Reader provides a means of reading the global clock time.
//
// Reader is not goroutine-safe.
type Reader struct {
config ReaderConfig
}

// NewReader returns a new Reader using the supplied config, or an error.
//
// Readers will not function past the lifetime of their configured Mongo.
func NewReader(config ReaderConfig) (*Reader, error) {
if err := config.validate(); err != nil {
return nil, errors.Trace(err)
}
r := &Reader{config: config}
return r, nil
}

// Now returns the current global time.
func (r *Reader) Now() (time.Time, error) {
coll, closer := r.config.Mongo.GetCollection(r.config.Collection)
defer closer()

t, err := readClock(coll)
if errors.Cause(err) == mgo.ErrNotFound {
// No time written yet. When it is written
// for the first time, it'll be globalEpoch.
t = globalEpoch
} else if err != nil {
return time.Time{}, errors.Trace(err)
}
return t, nil
}

0 comments on commit a5b1704

Please sign in to comment.