Introduce and use global clock in lease manager #7957

Merged
merged 5 commits into from Oct 26, 2017
Prev

upgrades: replace old lease documents

On upgrade to 2.3, remove old lease documents,
and replace with new ones. The new documents
will have a 60 second expiry time, relative
to the global time epoch.
  • Loading branch information...
commit 54d403ce8feb2ef80b58aca4f05af2890769716b @axw axw committed Oct 25, 2017
@@ -121,3 +121,9 @@ func readClock(coll mongo.Collection) (time.Time, error) {
}
return doc.time(), nil
}
+
+// GlobalEpoch returns the global clock's epoch, an arbitrary reference time
+// at which the global clock started.
+func GlobalEpoch() time.Time {
+ return globalEpoch
+}
View
@@ -237,27 +237,49 @@ func (client *client) claimLeaseOps(name string, request lease.Request) ([]txn.O
if err != nil {
return nil, entry{}, errors.Annotate(err, "refreshing global time")
}
- nextEntry := entry{
- holder: request.Holder,
+
+ return claimLeaseOps(
+ client.config.Namespace, name, request.Holder,
+ client.config.Id, client.config.Collection,
+ globalTime, request.Duration,
+ )
+}
+
+// ClaimLeaseOps returns txn.Ops to write a new lease. The txn.Ops
+// will fail if the lease document exists, regardless of whether it
+// has expired.
+func ClaimLeaseOps(
+ namespace, name, holder, writer, collection string,
+ globalTime time.Time, duration time.Duration,
+) ([]txn.Op, error) {
+ ops, _, err := claimLeaseOps(
+ namespace, name, holder, writer, collection,
+ globalTime, duration,
+ )
+ return ops, errors.Trace(err)
+}
+
+func claimLeaseOps(
+ namespace, name, holder, writer, collection string,
+ globalTime time.Time, duration time.Duration,
+) ([]txn.Op, entry, error) {
+ newEntry := entry{
+ holder: holder,
start: globalTime,
- duration: request.Duration,
- writer: client.config.Id,
+ duration: duration,
+ writer: writer,
}
-
- // We need to write the entry to the database in a specific format.
- leaseDoc, err := newLeaseDoc(client.config.Namespace, name, nextEntry)
+ leaseDoc, err := newLeaseDoc(namespace, name, newEntry)
if err != nil {
return nil, entry{}, errors.Trace(err)
}
claimLeaseOp := txn.Op{
- C: client.config.Collection,
+ C: collection,
Id: leaseDoc.Id,
Assert: txn.DocMissing,
Insert: leaseDoc,
}
-
- ops := []txn.Op{claimLeaseOp}
- return ops, nextEntry, nil
+ return []txn.Op{claimLeaseOp}, newEntry, nil
}
// extendLeaseOps returns the []txn.Op necessary to extend the supplied lease
View
@@ -20,6 +20,8 @@ import (
"github.com/juju/juju/cloud"
"github.com/juju/juju/controller"
"github.com/juju/juju/environs/config"
+ "github.com/juju/juju/state/globalclock"
+ "github.com/juju/juju/state/lease"
"github.com/juju/juju/storage/provider"
)
@@ -1198,3 +1200,64 @@ func AddModelType(st *State) error {
}
return st.db().RunTransaction(ops)
}
+
+// MigrateLeasesToGlobalTime removes old (<2.3-beta2) lease/clock-skew
+// documents, replacing the lease documents with new ones for the
+// existing lease holders.
+func MigrateLeasesToGlobalTime(st *State) error {
+ return runForAllModelStates(st, migrateModelLeasesToGlobalTime)
+}
+
+func migrateModelLeasesToGlobalTime(st *State) error {
+ coll, closer := st.db().GetCollection(leasesC)
+ defer closer()
+
+ var doc struct {
+ DocID string `bson:"_id"`
+ Type string `bson:"type"`
+ Namespace string `bson:"namespace"`
+ Name string `bson:"name"`
+ Holder string `bson:"holder"`
+ Expiry int64 `bson:"expiry"`
+ Writer string `bson:"writer"`
+ }
+
+ // Find all old lease/clock-skew documents, remove them
+ // and create replacement lease docs in the new format.
+ //
+ // Replacement leases are created with a duration of a
+ // minute, relative to the global time epoch.
+ var ops []txn.Op
+ iter := coll.Find(bson.D{{"type", bson.D{{"$exists", true}}}}).Iter()
+ for iter.Next(&doc) {
+ ops = append(ops, txn.Op{
+ C: leasesC,
+ Id: st.localID(doc.DocID),
+ Assert: txn.DocExists,
+ Remove: true,
+ })
+ if doc.Type != "lease" {
+ continue
+ }
+ claimOps, err := lease.ClaimLeaseOps(
+ doc.Namespace,
+ doc.Name,
+ doc.Holder,
+ doc.Writer,
+ coll.Name(),
+ globalclock.GlobalEpoch(),
+ initialLeaderClaimTime,
+ )
+ if err != nil {
+ return errors.Trace(err)
+ }
+ ops = append(ops, claimOps...)
+ }
+ if err := iter.Close(); err != nil {
+ return errors.Trace(err)
+ }
+ if ops == nil {
+ return nil
+ }
+ return st.db().RunTransaction(ops)
+}
View
@@ -1854,3 +1854,54 @@ func (s *upgradesSuite) checkAddPruneSettings(c *gc.C, ageProp, sizeProp, defaul
expectUpgradedData{settingsColl, expectedSettings},
)
}
+
+func (s *upgradesSuite) TestMigrateLeasesToGlobalTime(c *gc.C) {
+ leases, closer := s.state.db().GetRawCollection(leasesC)
+ defer closer()
+
+ // Use the non-controller model to ensure we can run the function
+ // across multiple models.
+ otherState := s.makeModel(c, "crack-up", testing.Attrs{})
+ defer otherState.Close()
+
+ uuid := otherState.ModelUUID()
+
+ err := leases.Insert(bson.M{
+ "_id": uuid + ":some-garbage",
+ "model-uuid": uuid,
+ }, bson.M{
+ "_id": uuid + ":clock#some-namespace#some-name#",
+ "model-uuid": uuid,
+ "type": "clock",
+ }, bson.M{
+ "_id": uuid + ":lease#some-namespace#some-name#",
+ "model-uuid": uuid,
+ "type": "lease",
+ "namespace": "some-namespace",
+ "name": "some-name",
+ "holder": "hand",
+ "expiry": "later",
+ "writer": "ghost",
+ })
+ c.Assert(err, jc.ErrorIsNil)
+
+ // - garbage doc is left alone has it has no "type" field
+ // - clock doc is removed, but no replacement required
+ // - lease doc is removed and replaced
+ expectedLeases := []bson.M{{
+ "_id": uuid + ":some-garbage",
+ "model-uuid": uuid,
+ }, bson.M{
+ "_id": uuid + ":some-namespace#some-name#",
+ "model-uuid": uuid,
+ "namespace": "some-namespace",
+ "name": "some-name",
+ "holder": "hand",
+ "start": int64(0),
+ "duration": int64(time.Minute),
+ "writer": "ghost",
+ }}
+ s.assertUpgradedData(c, MigrateLeasesToGlobalTime,
+ expectUpgradedData{leases, expectedLeases},
+ )
+}
View
@@ -32,6 +32,7 @@ type StateBackend interface {
CorrectRelationUnitCounts() error
AddModelEnvironVersion() error
AddModelType() error
+ MigrateLeasesToGlobalTime() error
}
// Model is an interface providing access to the details of a model within the
@@ -122,6 +123,10 @@ func (s stateBackend) AddModelType() error {
return state.AddModelType(s.st)
}
+func (s stateBackend) MigrateLeasesToGlobalTime() error {
+ return state.MigrateLeasesToGlobalTime(s.st)
+}
+
type modelShim struct {
st *state.State
m *state.Model
View
@@ -13,5 +13,12 @@ func stateStepsFor23() []Step {
return context.State().AddModelType()
},
},
+ &upgradeStep{
+ description: "migrate old leases",
+ targets: []Target{DatabaseMaster},
+ run: func(context Context) error {
+ return context.State().MigrateLeasesToGlobalTime()
+ },
+ },
}
}
@@ -25,3 +25,9 @@ func (s *steps23Suite) TestAddModelType(c *gc.C) {
// Logic for step itself is tested in state package.
c.Assert(step.Targets(), jc.DeepEquals, []upgrades.Target{upgrades.DatabaseMaster})
}
+
+func (s *steps23Suite) TestMigrateLeases(c *gc.C) {
+ step := findStateStep(c, v23, "migrate old leases")
+ // Logic for step itself is tested in state package.
+ c.Assert(step.Targets(), jc.DeepEquals, []upgrades.Target{upgrades.DatabaseMaster})
+}