From 2af2ef5b654628b01bd17df0b13aab3bedf6981e Mon Sep 17 00:00:00 2001 From: Joseph Phillips Date: Thu, 1 Dec 2022 10:49:07 +0100 Subject: [PATCH 01/10] Adds model_uuid to lease table and adds lease_pin table. We need to record entities that require a lease to remain pinned, not just the fact that it is. --- database/schema/controller.go | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/database/schema/controller.go b/database/schema/controller.go index 0f285bbd17a..4123d928836 100644 --- a/database/schema/controller.go +++ b/database/schema/controller.go @@ -17,28 +17,44 @@ CREATE UNIQUE INDEX idx_lease_type_type ON lease_type (type); INSERT INTO lease_type VALUES - (0, 'controller'), - (1, 'model' ), - (2, 'application'); + (0, 'controller'), -- The controller running singular controller workers. + (1, 'model' ), -- The controller running singular workers for a model. + (2, 'application'); -- The unit that holds leadership for an application. -CREATE TABLE IF NOT EXISTS lease ( +CREATE TABLE lease ( uuid TEXT PRIMARY KEY, lease_type_id INT NOT NULL, + model_uuid TEXT, name TEXT, holder TEXT, start TIMESTAMP, expiry TIMESTAMP, - pinned BOOLEAN, CONSTRAINT fk_lease_lease_type FOREIGN KEY (lease_type_id) REFERENCES lease_type(id) ); -CREATE UNIQUE INDEX idx_lease_type_name -ON lease (lease_type_id, name); +CREATE UNIQUE INDEX idx_lease_model_type_name +ON lease (model_uuid, lease_type_id, name); CREATE INDEX idx_lease_expiry -ON lease (expiry);`[1:] +ON lease (expiry); + +CREATE TABLE lease_pin ( + -- The presence of entries in this table for a particular lease_uuid + -- implies that the lease in question is pinned and cannot expire. + uuid TEXT PRIMARY KEY, + lease_uuid TEXT, + entity_id TEXT, + CONSTRAINT fk_lease_pin_lease + FOREIGN KEY (lease_uuid) + REFERENCES lease(uuid) +); + +CREATE INDEX idx_lease_pin_lease +ON lease_pin (lease_uuid); + +`[1:] return strings.Split(delta, ";\n\n") } From b7dbdf46094c9f35d779458ac9c487d7a4de2ab2 Mon Sep 17 00:00:00 2001 From: Joseph Phillips Date: Thu, 1 Dec 2022 17:23:51 +0100 Subject: [PATCH 02/10] Adds initial DB-backed lease store with implementations for claims and lease query variants. --- core/lease/package_test.go | 18 +++++ core/lease/storedb.go | 160 +++++++++++++++++++++++++++++++++++++ core/lease/storedb_test.go | 137 +++++++++++++++++++++++++++++++ 3 files changed, 315 insertions(+) create mode 100644 core/lease/package_test.go create mode 100644 core/lease/storedb.go create mode 100644 core/lease/storedb_test.go diff --git a/core/lease/package_test.go b/core/lease/package_test.go new file mode 100644 index 00000000000..26d0b71a4cf --- /dev/null +++ b/core/lease/package_test.go @@ -0,0 +1,18 @@ +// Copyright 2022 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package lease + +import ( + "testing" + + gc "gopkg.in/check.v1" +) + +func Test(t *testing.T) { + gc.TestingT(t) +} + +type StubLogger struct{} + +func (StubLogger) Errorf(string, ...interface{}) {} diff --git a/core/lease/storedb.go b/core/lease/storedb.go new file mode 100644 index 00000000000..7e8bebe32e8 --- /dev/null +++ b/core/lease/storedb.go @@ -0,0 +1,160 @@ +// Copyright 2022 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package lease + +import ( + "context" + "database/sql" + "fmt" + "math" + + "github.com/juju/utils/v3" + + "github.com/juju/errors" +) + +// StoreLogger describes methods for logging lease store concerns. +type StoreLogger interface { + Errorf(string, ...interface{}) +} + +// DBStore implements lease.Store using a database +// supporting SQLite-compatible dialects. +type DBStore struct { + db *sql.DB + logger StoreLogger +} + +func NewDBStore(db *sql.DB, logger StoreLogger) *DBStore { + return &DBStore{ + db: db, + logger: logger, + } +} + +// Leases (lease.Store) returns all leases in the database, +// optionally filtering using the input keys. +func (s *DBStore) Leases(keys ...Key) (map[Key]Info, error) { + q := ` +SELECT t.type, l.model_uuid, l.name, l.holder, l.expiry +FROM lease l JOIN lease_type t ON l.lease_type_id = t.id`[1:] + + var args []any + + if len(keys) == 1 { + q += ` +WHERE t.type = ? +AND l.model_uuid = ? +AND l.name = ?` + + key := keys[0] + args = []any{key.Namespace, key.ModelUUID, key.Lease} + } + + rows, err := s.db.Query(q, args...) + if err != nil { + return nil, errors.Trace(err) + } + + result, err := leasesFromRows(rows) + + // TODO (manadart 2022-11-30): We expect the variadic `keys` argument to be + // length 0 or 1. It was a work-around for design constraints at the time + // that it was instituted. Either filter the result here for len(keys) > 1, + // or fix the design. + + return result, errors.Trace(rows.Err()) +} + +// ClaimLease (lease.Store) claims the lease indicated by the input key, +// for the holder indicated by the input duration. +// The lease must not already be held, otherwise an error is returned. +func (s *DBStore) ClaimLease(lease Key, request Request, stop <-chan struct{}) error { + if err := request.Validate(); err != nil { + return errors.Trace(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error) + + go func() { + q := ` +INSERT INTO lease (uuid, lease_type_id, model_uuid, name, holder, start, expiry) +SELECT ?, id, ?, ?, ?, datetime('now'), datetime('now', ?) +FROM lease_type +WHERE type = ?`[1:] + + d := fmt.Sprintf("+%d seconds", int64(math.Ceil(request.Duration.Seconds()))) + + _, err := s.db.ExecContext( + ctx, q, utils.MustNewUUID().String(), lease.ModelUUID, lease.Lease, request.Holder, d, lease.Namespace) + + errCh <- err + }() + + select { + case <-stop: + cancel() + return errors.New("claim cancelled") + case err := <-errCh: + cancel() + // TODO (manadart 2022-12-01): Interpret this such that a UK violation means ErrHeld. + return errors.Trace(err) + } +} + +func (s *DBStore) ExtendLease(lease Key, request Request, stop <-chan struct{}) error { + panic("implement me") +} + +func (s *DBStore) RevokeLease(lease Key, holder string, stop <-chan struct{}) error { + panic("implement me") +} + +// LeaseGroup returns all leases for the input namespace and model. +func (s *DBStore) LeaseGroup(namespace, modelUUID string) (map[Key]Info, error) { + q := ` +SELECT t.type, l.model_uuid, l.name, l.holder, l.expiry +FROM lease l JOIN lease_type t ON l.lease_type_id = t.id +WHERE t.type = ? +AND l.model_uuid = ?`[1:] + + rows, err := s.db.Query(q, namespace, modelUUID) + if err != nil { + return nil, errors.Trace(err) + } + + result, err := leasesFromRows(rows) + return result, errors.Trace(err) +} + +func (s *DBStore) PinLease(lease Key, entity string, stop <-chan struct{}) error { + panic("implement me") +} + +func (s *DBStore) UnpinLease(lease Key, entity string, stop <-chan struct{}) error { + panic("implement me") +} + +func (s *DBStore) Pinned() (map[Key][]string, error) { + panic("implement me") +} + +// leasesFromRows returns lease info from rows returned from the backing DB. +func leasesFromRows(rows *sql.Rows) (map[Key]Info, error) { + result := map[Key]Info{} + + for rows.Next() { + var key Key + var info Info + + if err := rows.Scan(&key.Namespace, &key.ModelUUID, &key.Lease, &info.Holder, &info.Expiry); err != nil { + _ = rows.Close() + return nil, errors.Trace(err) + } + result[key] = info + } + + return result, errors.Trace(rows.Err()) +} diff --git a/core/lease/storedb_test.go b/core/lease/storedb_test.go new file mode 100644 index 00000000000..ca05c062ec9 --- /dev/null +++ b/core/lease/storedb_test.go @@ -0,0 +1,137 @@ +// Copyright 2022 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package lease_test + +import ( + "database/sql" + "time" + + "github.com/juju/testing" + jc "github.com/juju/testing/checkers" + _ "github.com/mattn/go-sqlite3" + gc "gopkg.in/check.v1" + + "github.com/juju/juju/core/lease" + "github.com/juju/juju/database/schema" +) + +type storeDBSuite struct { + testing.IsolationSuite + + db *sql.DB + store *lease.DBStore + stopCh chan struct{} +} + +var _ = gc.Suite(&storeDBSuite{}) + +func (s *storeDBSuite) SetUpTest(c *gc.C) { + s.IsolationSuite.SetUpTest(c) + + var err error + s.db, err = sql.Open("sqlite3", ":memory:") + c.Assert(err, jc.ErrorIsNil) + + s.primeDB(c) + s.store = lease.NewDBStore(s.db, lease.StubLogger{}) + s.stopCh = make(chan struct{}) +} + +func (s *storeDBSuite) TestClaimLeaseSuccessAndLeaseListings(c *gc.C) { + pgKey := lease.Key{ + Namespace: "application", + ModelUUID: "model-uuid", + Lease: "postgresql", + } + + pgReq := lease.Request{ + Holder: "postgresql/0", + Duration: time.Minute, + } + + // Add 2 leases. + err := s.store.ClaimLease(pgKey, pgReq, s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + mmKey := pgKey + mmKey.Lease = "mattermost" + + mmReq := pgReq + mmReq.Holder = "mattermost/0" + + err = s.store.ClaimLease(mmKey, mmReq, s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + // Check all the leases. + leases, err := s.store.Leases() + c.Assert(err, jc.ErrorIsNil) + c.Assert(leases, gc.HasLen, 2) + c.Check(leases[pgKey].Holder, gc.Equals, "postgresql/0") + c.Check(leases[pgKey].Expiry.After(time.Now().UTC()), jc.IsTrue) + c.Check(leases[mmKey].Holder, gc.Equals, "mattermost/0") + c.Check(leases[mmKey].Expiry.After(time.Now().UTC()), jc.IsTrue) + + // Check with a filter. + leases, err = s.store.Leases(pgKey) + c.Assert(err, jc.ErrorIsNil) + c.Assert(leases, gc.HasLen, 1) + c.Check(leases[pgKey].Holder, gc.Equals, "postgresql/0") + + // Add a lease from a different group, + // and check that the group returns the application leases. + err = s.store.ClaimLease( + lease.Key{ + Namespace: "controller", + ModelUUID: "controller-model-uuid", + Lease: "singular", + }, + lease.Request{ + Holder: "machine/0", + Duration: time.Minute, + }, + s.stopCh, + ) + c.Assert(err, jc.ErrorIsNil) + + leases, err = s.store.LeaseGroup("application", "model-uuid") + c.Assert(err, jc.ErrorIsNil) + c.Assert(leases, gc.HasLen, 2) + c.Check(leases[pgKey].Holder, gc.Equals, "postgresql/0") + c.Check(leases[pgKey].Expiry.After(time.Now().UTC()), jc.IsTrue) + c.Check(leases[mmKey].Holder, gc.Equals, "mattermost/0") + c.Check(leases[mmKey].Expiry.After(time.Now().UTC()), jc.IsTrue) +} + +func (s *storeDBSuite) TestClaimLeaseAlreadyHeld(c *gc.C) { + key := lease.Key{ + Namespace: "controller", + ModelUUID: "controller-model-uuid", + Lease: "singular", + } + + req := lease.Request{ + Holder: "machine/0", + Duration: time.Minute, + } + + err := s.store.ClaimLease(key, req, s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + err = s.store.ClaimLease(key, req, s.stopCh) + // TODO (manadart 2022-12-01): Check for the right type; ErrHeld? + c.Assert(err, gc.NotNil) +} + +func (s *storeDBSuite) primeDB(c *gc.C) { + tx, err := s.db.Begin() + c.Assert(err, jc.ErrorIsNil) + + for _, stmt := range schema.ControllerDDL() { + _, err := tx.Exec(stmt) + c.Assert(err, jc.ErrorIsNil) + } + + err = tx.Commit() + c.Assert(err, jc.ErrorIsNil) +} From df3c53046313ab36020ae134fe2b0d1d3d7a5e24 Mon Sep 17 00:00:00 2001 From: Joseph Phillips Date: Mon, 5 Dec 2022 13:12:01 +0100 Subject: [PATCH 03/10] Adds lease pinning methods to the database-backed lease store implementation. --- core/lease/storedb.go | 126 ++++++++++++++++++++++++++++++++-- core/lease/storedb_test.go | 50 ++++++++++++-- database/schema/controller.go | 4 +- 3 files changed, 169 insertions(+), 11 deletions(-) diff --git a/core/lease/storedb.go b/core/lease/storedb.go index 7e8bebe32e8..6c33d361ff1 100644 --- a/core/lease/storedb.go +++ b/core/lease/storedb.go @@ -9,9 +9,10 @@ import ( "fmt" "math" - "github.com/juju/utils/v3" - + "github.com/juju/collections/set" "github.com/juju/errors" + "github.com/juju/utils/v3" + "github.com/mattn/go-sqlite3" ) // StoreLogger describes methods for logging lease store concerns. @@ -112,7 +113,8 @@ func (s *DBStore) RevokeLease(lease Key, holder string, stop <-chan struct{}) er panic("implement me") } -// LeaseGroup returns all leases for the input namespace and model. +// LeaseGroup (lease.Store) returns all leases +// for the input namespace and model. func (s *DBStore) LeaseGroup(namespace, modelUUID string) (map[Key]Info, error) { q := ` SELECT t.type, l.model_uuid, l.name, l.holder, l.expiry @@ -129,16 +131,116 @@ AND l.model_uuid = ?`[1:] return result, errors.Trace(err) } +// PinLease (lease.Store) adds the input entity into the lease_pin table +// to indicate that the lease indicated by the input key must not expire, +// and that this entity requires such behaviour. func (s *DBStore) PinLease(lease Key, entity string, stop <-chan struct{}) error { - panic("implement me") + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error) + + go func() { + q := ` +INSERT INTO lease_pin (uuid, lease_uuid, entity_id) +SELECT ?, l.uuid, ? +FROM lease l JOIN lease_type t ON l.lease_type_id = t.id +WHERE t.type = ? +AND l.model_uuid = ? +AND l.name = ?`[1:] + + _, err := s.db.ExecContext( + ctx, q, utils.MustNewUUID().String(), entity, lease.Namespace, lease.ModelUUID, lease.Lease) + + errCh <- err + }() + + select { + case <-stop: + cancel() + return errors.New("pin lease cancelled") + case err := <-errCh: + cancel() + if isUniquenessViolation(err) { + return nil + } + return errors.Trace(err) + } } +// UnpinLease (lease.Store) removes the record indicated by the input +// key and entity from the lease pin table, indicating that the entity +// no longer requires the lease to be pinned. +// When there are no entities associated with a particular lease, +// it is determined not to be pinned, and can expire normally. func (s *DBStore) UnpinLease(lease Key, entity string, stop <-chan struct{}) error { - panic("implement me") + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error) + + go func() { + q := ` +DELETE FROM lease_pin +WHERE uuid = ( + SELECT p.uuid + FROM lease_pin p + JOIN lease l ON l.uuid = p.lease_uuid + JOIN lease_type t ON l.lease_type_id = t.id + WHERE t.type = ? + AND l.model_uuid = ? + AND l.name = ? + AND p.entity_id = ? +)`[1:] + + _, err := s.db.ExecContext( + ctx, q, lease.Namespace, lease.ModelUUID, lease.Lease, entity) + + errCh <- err + }() + + select { + case <-stop: + cancel() + return errors.New("unpin lease cancelled") + case err := <-errCh: + cancel() + return errors.Trace(err) + } } +// Pinned (lease.Store) returns all leases that are currently pinned, +// and the entities requiring such behaviour for them. func (s *DBStore) Pinned() (map[Key][]string, error) { - panic("implement me") + q := ` +SELECT l.uuid, t.type, l.model_uuid, l.name, p.entity_id +FROM lease l + JOIN lease_type t ON l.lease_type_id = t.id + JOIN lease_pin p on l.uuid = p.lease_uuid +ORDER BY l.uuid`[1:] + + rows, err := s.db.Query(q) + if err != nil { + return nil, errors.Trace(err) + } + + seen := set.NewStrings() + result := make(map[Key][]string) + for rows.Next() { + var leaseUUID string + var key Key + var entity string + + if err := rows.Scan(&leaseUUID, &key.Namespace, &key.ModelUUID, &key.Lease, &entity); err != nil { + _ = rows.Close() + return nil, errors.Trace(err) + } + + if !seen.Contains(leaseUUID) { + result[key] = []string{entity} + seen.Add(leaseUUID) + } else { + result[key] = append(result[key], entity) + } + } + + return result, errors.Trace(rows.Err()) } // leasesFromRows returns lease info from rows returned from the backing DB. @@ -158,3 +260,15 @@ func leasesFromRows(rows *sql.Rows) (map[Key]Info, error) { return result, errors.Trace(rows.Err()) } + +// TODO (manadart 2022-12-05): Utilities like this will reside in +// the database package for general use. +func isUniquenessViolation(err error) bool { + var sqliteErr sqlite3.Error + if errors.As(err, &sqliteErr) { + if errors.Is(sqliteErr.ExtendedCode, sqlite3.ErrConstraintUnique) { + return true + } + } + return false +} diff --git a/core/lease/storedb_test.go b/core/lease/storedb_test.go index ca05c062ec9..b57134f4179 100644 --- a/core/lease/storedb_test.go +++ b/core/lease/storedb_test.go @@ -35,10 +35,12 @@ func (s *storeDBSuite) SetUpTest(c *gc.C) { s.primeDB(c) s.store = lease.NewDBStore(s.db, lease.StubLogger{}) - s.stopCh = make(chan struct{}) + + // Single-buffered to allow us to queue up a stoppage. + s.stopCh = make(chan struct{}, 1) } -func (s *storeDBSuite) TestClaimLeaseSuccessAndLeaseListings(c *gc.C) { +func (s *storeDBSuite) TestClaimLeaseSuccessAndLeaseQueries(c *gc.C) { pgKey := lease.Key{ Namespace: "application", ModelUUID: "model-uuid", @@ -98,9 +100,7 @@ func (s *storeDBSuite) TestClaimLeaseSuccessAndLeaseListings(c *gc.C) { c.Assert(err, jc.ErrorIsNil) c.Assert(leases, gc.HasLen, 2) c.Check(leases[pgKey].Holder, gc.Equals, "postgresql/0") - c.Check(leases[pgKey].Expiry.After(time.Now().UTC()), jc.IsTrue) c.Check(leases[mmKey].Holder, gc.Equals, "mattermost/0") - c.Check(leases[mmKey].Expiry.After(time.Now().UTC()), jc.IsTrue) } func (s *storeDBSuite) TestClaimLeaseAlreadyHeld(c *gc.C) { @@ -123,6 +123,48 @@ func (s *storeDBSuite) TestClaimLeaseAlreadyHeld(c *gc.C) { c.Assert(err, gc.NotNil) } +func (s *storeDBSuite) TestPinUnpinLeaseAndPinQueries(c *gc.C) { + pgKey := lease.Key{ + Namespace: "application", + ModelUUID: "model-uuid", + Lease: "postgresql", + } + + pgReq := lease.Request{ + Holder: "postgresql/0", + Duration: time.Minute, + } + + err := s.store.ClaimLease(pgKey, pgReq, s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + // One entity pins the lease. + err = s.store.PinLease(pgKey, "machine/6", s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + // The same lease/entity is a no-op without error. + err = s.store.PinLease(pgKey, "machine/6", s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + // Another entity pinning the same lease. + err = s.store.PinLease(pgKey, "machine/7", s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + pins, err := s.store.Pinned() + c.Assert(err, jc.ErrorIsNil) + c.Assert(pins, gc.HasLen, 1) + c.Check(pins[pgKey], jc.SameContents, []string{"machine/6", "machine/7"}) + + // Unpin and check the leases. + err = s.store.UnpinLease(pgKey, "machine/7", s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + pins, err = s.store.Pinned() + c.Assert(err, jc.ErrorIsNil) + c.Assert(pins, gc.HasLen, 1) + c.Check(pins[pgKey], jc.SameContents, []string{"machine/6"}) +} + func (s *storeDBSuite) primeDB(c *gc.C) { tx, err := s.db.Begin() c.Assert(err, jc.ErrorIsNil) diff --git a/database/schema/controller.go b/database/schema/controller.go index 4123d928836..d8786aeb3ad 100644 --- a/database/schema/controller.go +++ b/database/schema/controller.go @@ -51,9 +51,11 @@ CREATE TABLE lease_pin ( REFERENCES lease(uuid) ); +CREATE UNIQUE INDEX idx_lease_pin_lease_entity +ON lease_pin (lease_uuid, entity_id); + CREATE INDEX idx_lease_pin_lease ON lease_pin (lease_uuid); - `[1:] return strings.Split(delta, ";\n\n") From 2eab6166a36bf13f531207cf43fbc3e28a2a0f52 Mon Sep 17 00:00:00 2001 From: Joseph Phillips Date: Mon, 5 Dec 2022 14:32:44 +0100 Subject: [PATCH 04/10] Ensures that a unique key violation during ClaimLease returns ErrHeld. --- core/lease/storedb.go | 5 ++++- core/lease/storedb_test.go | 5 +++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/core/lease/storedb.go b/core/lease/storedb.go index 6c33d361ff1..d46fb1c28c3 100644 --- a/core/lease/storedb.go +++ b/core/lease/storedb.go @@ -27,6 +27,7 @@ type DBStore struct { logger StoreLogger } +// NewDBStore returns a reference to a new database-backed lease sore. func NewDBStore(db *sql.DB, logger StoreLogger) *DBStore { return &DBStore{ db: db, @@ -100,7 +101,9 @@ WHERE type = ?`[1:] return errors.New("claim cancelled") case err := <-errCh: cancel() - // TODO (manadart 2022-12-01): Interpret this such that a UK violation means ErrHeld. + if isUniquenessViolation(err) { + return ErrHeld + } return errors.Trace(err) } } diff --git a/core/lease/storedb_test.go b/core/lease/storedb_test.go index b57134f4179..19ee9082af2 100644 --- a/core/lease/storedb_test.go +++ b/core/lease/storedb_test.go @@ -7,6 +7,8 @@ import ( "database/sql" "time" + "github.com/juju/errors" + "github.com/juju/testing" jc "github.com/juju/testing/checkers" _ "github.com/mattn/go-sqlite3" @@ -119,8 +121,7 @@ func (s *storeDBSuite) TestClaimLeaseAlreadyHeld(c *gc.C) { c.Assert(err, jc.ErrorIsNil) err = s.store.ClaimLease(key, req, s.stopCh) - // TODO (manadart 2022-12-01): Check for the right type; ErrHeld? - c.Assert(err, gc.NotNil) + c.Assert(errors.Is(err, lease.ErrHeld), jc.IsTrue) } func (s *storeDBSuite) TestPinUnpinLeaseAndPinQueries(c *gc.C) { From e98fa376c2ea025673e8fcdf420c1e7d1383befe Mon Sep 17 00:00:00 2001 From: Joseph Phillips Date: Mon, 5 Dec 2022 14:36:49 +0100 Subject: [PATCH 05/10] Ensures that we return an error for an attempt to call Leases with more than one filter key. --- core/lease/storedb.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/core/lease/storedb.go b/core/lease/storedb.go index d46fb1c28c3..9157ec2110d 100644 --- a/core/lease/storedb.go +++ b/core/lease/storedb.go @@ -38,6 +38,15 @@ func NewDBStore(db *sql.DB, logger StoreLogger) *DBStore { // Leases (lease.Store) returns all leases in the database, // optionally filtering using the input keys. func (s *DBStore) Leases(keys ...Key) (map[Key]Info, error) { + // TODO (manadart 2022-11-30): We expect the variadic `keys` argument to be + // length 0 or 1. It was a work-around for design constraints at the time. + // Either filter the result here for len(keys) > 1, or fix the design. + // As it is, there are no upstream usages for more than on key, + // so we just lock in that behaviour. + if len(keys) > 1 { + return nil, errors.NotSupportedf("filtering with more than one lease key") + } + q := ` SELECT t.type, l.model_uuid, l.name, l.holder, l.expiry FROM lease l JOIN lease_type t ON l.lease_type_id = t.id`[1:] @@ -60,12 +69,6 @@ AND l.name = ?` } result, err := leasesFromRows(rows) - - // TODO (manadart 2022-11-30): We expect the variadic `keys` argument to be - // length 0 or 1. It was a work-around for design constraints at the time - // that it was instituted. Either filter the result here for len(keys) > 1, - // or fix the design. - return result, errors.Trace(rows.Err()) } From 28998f1699875af7338ae10b8392844d58ce6b46 Mon Sep 17 00:00:00 2001 From: Joseph Phillips Date: Tue, 6 Dec 2022 11:53:24 +0100 Subject: [PATCH 06/10] Adds lease extension, revocation and tests for cancellation to the database-backed lease store. --- core/lease/storedb.go | 89 +++++++++++++++++++++++++++++++++- core/lease/storedb_test.go | 99 +++++++++++++++++++++++++++++++++++++- 2 files changed, 185 insertions(+), 3 deletions(-) diff --git a/core/lease/storedb.go b/core/lease/storedb.go index 9157ec2110d..aca8da14991 100644 --- a/core/lease/storedb.go +++ b/core/lease/storedb.go @@ -111,12 +111,97 @@ WHERE type = ?`[1:] } } +// ExtendLease (lease.Store) ensures the input lease will be held for at least +// the requested duration starting from now. +// If the input holder does not currently hold the lease, as error is returned. func (s *DBStore) ExtendLease(lease Key, request Request, stop <-chan struct{}) error { - panic("implement me") + if err := request.Validate(); err != nil { + return errors.Trace(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error) + + go func() { + q := ` +UPDATE lease +SET expiry = datetime('now', ?) +WHERE uuid = ( + SELECT l.uuid + FROM lease l JOIN lease_type t ON l.lease_type_id = t.id + WHERE t.type = ? + AND l.model_uuid = ? + AND l.name = ? + AND l.holder = ? +)`[1:] + + d := fmt.Sprintf("+%d seconds", int64(math.Ceil(request.Duration.Seconds()))) + + result, err := s.db.ExecContext( + ctx, q, d, lease.Namespace, lease.ModelUUID, lease.Lease, request.Holder) + + // If no rows were affected, then either this lease does not exist or + // it is not held by the input holder, constituting an invalid request. + if err == nil { + var affected int64 + affected, err = result.RowsAffected() + if affected == 0 && err == nil { + err = ErrInvalid + } + } + errCh <- err + }() + + select { + case <-stop: + cancel() + return errors.New("extend cancelled") + case err := <-errCh: + cancel() + return errors.Trace(err) + } } +// RevokeLease deletes the lease from the store, +// provided it exists and is held by the input holder. +// If either of these conditions is false, an error is returned. func (s *DBStore) RevokeLease(lease Key, holder string, stop <-chan struct{}) error { - panic("implement me") + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error) + + go func() { + q := ` +DELETE FROM lease +WHERE uuid = ( + SELECT l.uuid + FROM lease l JOIN lease_type t ON l.lease_type_id = t.id + WHERE t.type = ? + AND l.model_uuid = ? + AND l.name = ? + AND l.holder = ? +)`[1:] + + result, err := s.db.ExecContext( + ctx, q, lease.Namespace, lease.ModelUUID, lease.Lease, holder) + + if err == nil { + var affected int64 + affected, err = result.RowsAffected() + if affected == 0 && err == nil { + err = ErrInvalid + } + } + errCh <- err + }() + + select { + case <-stop: + cancel() + return errors.New("revoke cancelled") + case err := <-errCh: + cancel() + return errors.Trace(err) + } } // LeaseGroup (lease.Store) returns all leases diff --git a/core/lease/storedb_test.go b/core/lease/storedb_test.go index 19ee9082af2..fff2b70e612 100644 --- a/core/lease/storedb_test.go +++ b/core/lease/storedb_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/juju/errors" - "github.com/juju/testing" jc "github.com/juju/testing/checkers" _ "github.com/mattn/go-sqlite3" @@ -124,6 +123,86 @@ func (s *storeDBSuite) TestClaimLeaseAlreadyHeld(c *gc.C) { c.Assert(errors.Is(err, lease.ErrHeld), jc.IsTrue) } +func (s *storeDBSuite) TestExtendLeaseSuccess(c *gc.C) { + key := lease.Key{ + Namespace: "application", + ModelUUID: "model-uuid", + Lease: "postgresql", + } + + req := lease.Request{ + Holder: "postgresql/0", + Duration: time.Minute, + } + + err := s.store.ClaimLease(key, req, s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + leases, err := s.store.Leases(key) + c.Assert(err, jc.ErrorIsNil) + c.Assert(leases, gc.HasLen, 1) + + // Save the expiry for later comparison. + originalExpiry := leases[key].Expiry + + req.Duration = 2 * time.Minute + err = s.store.ExtendLease(key, req, s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + leases, err = s.store.Leases(key) + c.Assert(err, jc.ErrorIsNil) + c.Assert(leases, gc.HasLen, 1) + + // Check that we extended. + c.Check(leases[key].Expiry.After(originalExpiry), jc.IsTrue) +} + +func (s *storeDBSuite) TestExtendLeaseNotHeldInvalid(c *gc.C) { + key := lease.Key{ + Namespace: "application", + ModelUUID: "model-uuid", + Lease: "postgresql", + } + + req := lease.Request{ + Holder: "postgresql/0", + Duration: time.Minute, + } + + err := s.store.ExtendLease(key, req, s.stopCh) + c.Assert(errors.Is(err, lease.ErrInvalid), jc.IsTrue) +} + +func (s *storeDBSuite) TestRevokeLeaseSuccess(c *gc.C) { + key := lease.Key{ + Namespace: "application", + ModelUUID: "model-uuid", + Lease: "postgresql", + } + + req := lease.Request{ + Holder: "postgresql/0", + Duration: time.Minute, + } + + err := s.store.ClaimLease(key, req, s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + err = s.store.RevokeLease(key, req.Holder, s.stopCh) + c.Assert(err, jc.ErrorIsNil) +} + +func (s *storeDBSuite) TestRevokeLeaseNotHeldInvalid(c *gc.C) { + key := lease.Key{ + Namespace: "application", + ModelUUID: "model-uuid", + Lease: "postgresql", + } + + err := s.store.RevokeLease(key, "not-the-holder", s.stopCh) + c.Assert(errors.Is(err, lease.ErrInvalid), jc.IsTrue) +} + func (s *storeDBSuite) TestPinUnpinLeaseAndPinQueries(c *gc.C) { pgKey := lease.Key{ Namespace: "application", @@ -166,6 +245,24 @@ func (s *storeDBSuite) TestPinUnpinLeaseAndPinQueries(c *gc.C) { c.Check(pins[pgKey], jc.SameContents, []string{"machine/6"}) } +func (s *storeDBSuite) TestLeaseOperationCancellation(c *gc.C) { + s.stopCh <- struct{}{} + + key := lease.Key{ + Namespace: "application", + ModelUUID: "model-uuid", + Lease: "postgresql", + } + + req := lease.Request{ + Holder: "postgresql/0", + Duration: time.Minute, + } + + err := s.store.ClaimLease(key, req, s.stopCh) + c.Assert(err, gc.ErrorMatches, "claim cancelled") +} + func (s *storeDBSuite) primeDB(c *gc.C) { tx, err := s.db.Begin() c.Assert(err, jc.ErrorIsNil) From d082c427a2584d6ef3fcb8a85e7fcf0e213730c7 Mon Sep 17 00:00:00 2001 From: Joseph Phillips Date: Tue, 6 Dec 2022 12:11:35 +0100 Subject: [PATCH 07/10] Fixes comment grammar in lease store comments. --- core/lease/storedb.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/lease/storedb.go b/core/lease/storedb.go index aca8da14991..7af78a46b2f 100644 --- a/core/lease/storedb.go +++ b/core/lease/storedb.go @@ -27,7 +27,7 @@ type DBStore struct { logger StoreLogger } -// NewDBStore returns a reference to a new database-backed lease sore. +// NewDBStore returns a reference to a new database-backed lease store. func NewDBStore(db *sql.DB, logger StoreLogger) *DBStore { return &DBStore{ db: db, @@ -41,7 +41,7 @@ func (s *DBStore) Leases(keys ...Key) (map[Key]Info, error) { // TODO (manadart 2022-11-30): We expect the variadic `keys` argument to be // length 0 or 1. It was a work-around for design constraints at the time. // Either filter the result here for len(keys) > 1, or fix the design. - // As it is, there are no upstream usages for more than on key, + // As it is, there are no upstream usages for more than one key, // so we just lock in that behaviour. if len(keys) > 1 { return nil, errors.NotSupportedf("filtering with more than one lease key") @@ -73,7 +73,7 @@ AND l.name = ?` } // ClaimLease (lease.Store) claims the lease indicated by the input key, -// for the holder indicated by the input duration. +// for the holder and duration indicated by the input request. // The lease must not already be held, otherwise an error is returned. func (s *DBStore) ClaimLease(lease Key, request Request, stop <-chan struct{}) error { if err := request.Validate(); err != nil { @@ -113,7 +113,7 @@ WHERE type = ?`[1:] // ExtendLease (lease.Store) ensures the input lease will be held for at least // the requested duration starting from now. -// If the input holder does not currently hold the lease, as error is returned. +// If the input holder does not currently hold the lease, an error is returned. func (s *DBStore) ExtendLease(lease Key, request Request, stop <-chan struct{}) error { if err := request.Validate(); err != nil { return errors.Trace(err) @@ -162,7 +162,7 @@ WHERE uuid = ( } } -// RevokeLease deletes the lease from the store, +// RevokeLease (lease.Store) deletes the lease from the store, // provided it exists and is held by the input holder. // If either of these conditions is false, an error is returned. func (s *DBStore) RevokeLease(lease Key, holder string, stop <-chan struct{}) error { From d54a0dd4de230c88b5626b49314b560455ee95ed Mon Sep 17 00:00:00 2001 From: Joseph Phillips Date: Wed, 7 Dec 2022 12:41:53 +0100 Subject: [PATCH 08/10] Moves the database-backed lease store implementation out of the core/lease package and into worker/lease, where it will be used by the lease manager. The constraint error detection function was moved into the database package. This rearrangement fixes build issues encountered previously. --- core/lease/package_test.go | 18 ---- database/errors.go | 16 ++++ worker/lease/package_test.go | 6 +- .../lease/storedb.go => worker/lease/store.go | 84 ++++++++----------- .../lease/store_test.go | 71 ++++++++-------- 5 files changed, 94 insertions(+), 101 deletions(-) delete mode 100644 core/lease/package_test.go create mode 100644 database/errors.go rename core/lease/storedb.go => worker/lease/store.go (77%) rename core/lease/storedb_test.go => worker/lease/store_test.go (79%) diff --git a/core/lease/package_test.go b/core/lease/package_test.go deleted file mode 100644 index 26d0b71a4cf..00000000000 --- a/core/lease/package_test.go +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright 2022 Canonical Ltd. -// Licensed under the AGPLv3, see LICENCE file for details. - -package lease - -import ( - "testing" - - gc "gopkg.in/check.v1" -) - -func Test(t *testing.T) { - gc.TestingT(t) -} - -type StubLogger struct{} - -func (StubLogger) Errorf(string, ...interface{}) {} diff --git a/database/errors.go b/database/errors.go new file mode 100644 index 00000000000..3b0a985f4b7 --- /dev/null +++ b/database/errors.go @@ -0,0 +1,16 @@ +package database + +import ( + "github.com/juju/errors" + "github.com/mattn/go-sqlite3" +) + +// IsErrConstraintUnique returns true if the input error was +// returned by SQLite due to violation of a unique constraint. +func IsErrConstraintUnique(err error) bool { + var sqliteErr sqlite3.Error + if errors.As(err, &sqliteErr) && sqliteErr.ExtendedCode == sqlite3.ErrConstraintUnique { + return true + } + return false +} diff --git a/worker/lease/package_test.go b/worker/lease/package_test.go index 3c1fafce365..9a89e281feb 100644 --- a/worker/lease/package_test.go +++ b/worker/lease/package_test.go @@ -1,7 +1,7 @@ // Copyright 2015 Canonical Ltd. // Licensed under the AGPLv3, see LICENCE file for details. -package lease_test +package lease import ( "testing" @@ -12,3 +12,7 @@ import ( func TestPackage(t *testing.T) { gc.TestingT(t) } + +type StubLogger struct{} + +func (StubLogger) Errorf(string, ...interface{}) {} diff --git a/core/lease/storedb.go b/worker/lease/store.go similarity index 77% rename from core/lease/storedb.go rename to worker/lease/store.go index 7af78a46b2f..444de34e264 100644 --- a/core/lease/storedb.go +++ b/worker/lease/store.go @@ -12,7 +12,9 @@ import ( "github.com/juju/collections/set" "github.com/juju/errors" "github.com/juju/utils/v3" - "github.com/mattn/go-sqlite3" + + "github.com/juju/juju/core/lease" + "github.com/juju/juju/database" ) // StoreLogger describes methods for logging lease store concerns. @@ -20,16 +22,16 @@ type StoreLogger interface { Errorf(string, ...interface{}) } -// DBStore implements lease.Store using a database +// Store implements lease.Store using a database // supporting SQLite-compatible dialects. -type DBStore struct { +type Store struct { db *sql.DB logger StoreLogger } -// NewDBStore returns a reference to a new database-backed lease store. -func NewDBStore(db *sql.DB, logger StoreLogger) *DBStore { - return &DBStore{ +// NewStore returns a reference to a new database-backed lease store. +func NewStore(db *sql.DB, logger StoreLogger) *Store { + return &Store{ db: db, logger: logger, } @@ -37,7 +39,7 @@ func NewDBStore(db *sql.DB, logger StoreLogger) *DBStore { // Leases (lease.Store) returns all leases in the database, // optionally filtering using the input keys. -func (s *DBStore) Leases(keys ...Key) (map[Key]Info, error) { +func (s *Store) Leases(keys ...lease.Key) (map[lease.Key]lease.Info, error) { // TODO (manadart 2022-11-30): We expect the variadic `keys` argument to be // length 0 or 1. It was a work-around for design constraints at the time. // Either filter the result here for len(keys) > 1, or fix the design. @@ -75,8 +77,8 @@ AND l.name = ?` // ClaimLease (lease.Store) claims the lease indicated by the input key, // for the holder and duration indicated by the input request. // The lease must not already be held, otherwise an error is returned. -func (s *DBStore) ClaimLease(lease Key, request Request, stop <-chan struct{}) error { - if err := request.Validate(); err != nil { +func (s *Store) ClaimLease(key lease.Key, req lease.Request, stop <-chan struct{}) error { + if err := req.Validate(); err != nil { return errors.Trace(err) } @@ -90,10 +92,10 @@ SELECT ?, id, ?, ?, ?, datetime('now'), datetime('now', ?) FROM lease_type WHERE type = ?`[1:] - d := fmt.Sprintf("+%d seconds", int64(math.Ceil(request.Duration.Seconds()))) + d := fmt.Sprintf("+%d seconds", int64(math.Ceil(req.Duration.Seconds()))) _, err := s.db.ExecContext( - ctx, q, utils.MustNewUUID().String(), lease.ModelUUID, lease.Lease, request.Holder, d, lease.Namespace) + ctx, q, utils.MustNewUUID().String(), key.ModelUUID, key.Lease, req.Holder, d, key.Namespace) errCh <- err }() @@ -104,8 +106,8 @@ WHERE type = ?`[1:] return errors.New("claim cancelled") case err := <-errCh: cancel() - if isUniquenessViolation(err) { - return ErrHeld + if database.IsErrConstraintUnique(err) { + return lease.ErrHeld } return errors.Trace(err) } @@ -114,8 +116,8 @@ WHERE type = ?`[1:] // ExtendLease (lease.Store) ensures the input lease will be held for at least // the requested duration starting from now. // If the input holder does not currently hold the lease, an error is returned. -func (s *DBStore) ExtendLease(lease Key, request Request, stop <-chan struct{}) error { - if err := request.Validate(); err != nil { +func (s *Store) ExtendLease(key lease.Key, req lease.Request, stop <-chan struct{}) error { + if err := req.Validate(); err != nil { return errors.Trace(err) } @@ -135,18 +137,18 @@ WHERE uuid = ( AND l.holder = ? )`[1:] - d := fmt.Sprintf("+%d seconds", int64(math.Ceil(request.Duration.Seconds()))) + d := fmt.Sprintf("+%d seconds", int64(math.Ceil(req.Duration.Seconds()))) result, err := s.db.ExecContext( - ctx, q, d, lease.Namespace, lease.ModelUUID, lease.Lease, request.Holder) + ctx, q, d, key.Namespace, key.ModelUUID, key.Lease, req.Holder) - // If no rows were affected, then either this lease does not exist or + // If no rows were affected, then either this key does not exist or // it is not held by the input holder, constituting an invalid request. if err == nil { var affected int64 affected, err = result.RowsAffected() if affected == 0 && err == nil { - err = ErrInvalid + err = lease.ErrInvalid } } errCh <- err @@ -165,7 +167,7 @@ WHERE uuid = ( // RevokeLease (lease.Store) deletes the lease from the store, // provided it exists and is held by the input holder. // If either of these conditions is false, an error is returned. -func (s *DBStore) RevokeLease(lease Key, holder string, stop <-chan struct{}) error { +func (s *Store) RevokeLease(key lease.Key, holder string, stop <-chan struct{}) error { ctx, cancel := context.WithCancel(context.Background()) errCh := make(chan error) @@ -182,13 +184,13 @@ WHERE uuid = ( )`[1:] result, err := s.db.ExecContext( - ctx, q, lease.Namespace, lease.ModelUUID, lease.Lease, holder) + ctx, q, key.Namespace, key.ModelUUID, key.Lease, holder) if err == nil { var affected int64 affected, err = result.RowsAffected() if affected == 0 && err == nil { - err = ErrInvalid + err = lease.ErrInvalid } } errCh <- err @@ -206,7 +208,7 @@ WHERE uuid = ( // LeaseGroup (lease.Store) returns all leases // for the input namespace and model. -func (s *DBStore) LeaseGroup(namespace, modelUUID string) (map[Key]Info, error) { +func (s *Store) LeaseGroup(namespace, modelUUID string) (map[lease.Key]lease.Info, error) { q := ` SELECT t.type, l.model_uuid, l.name, l.holder, l.expiry FROM lease l JOIN lease_type t ON l.lease_type_id = t.id @@ -225,7 +227,7 @@ AND l.model_uuid = ?`[1:] // PinLease (lease.Store) adds the input entity into the lease_pin table // to indicate that the lease indicated by the input key must not expire, // and that this entity requires such behaviour. -func (s *DBStore) PinLease(lease Key, entity string, stop <-chan struct{}) error { +func (s *Store) PinLease(key lease.Key, entity string, stop <-chan struct{}) error { ctx, cancel := context.WithCancel(context.Background()) errCh := make(chan error) @@ -239,7 +241,7 @@ AND l.model_uuid = ? AND l.name = ?`[1:] _, err := s.db.ExecContext( - ctx, q, utils.MustNewUUID().String(), entity, lease.Namespace, lease.ModelUUID, lease.Lease) + ctx, q, utils.MustNewUUID().String(), entity, key.Namespace, key.ModelUUID, key.Lease) errCh <- err }() @@ -250,7 +252,7 @@ AND l.name = ?`[1:] return errors.New("pin lease cancelled") case err := <-errCh: cancel() - if isUniquenessViolation(err) { + if database.IsErrConstraintUnique(err) { return nil } return errors.Trace(err) @@ -262,7 +264,7 @@ AND l.name = ?`[1:] // no longer requires the lease to be pinned. // When there are no entities associated with a particular lease, // it is determined not to be pinned, and can expire normally. -func (s *DBStore) UnpinLease(lease Key, entity string, stop <-chan struct{}) error { +func (s *Store) UnpinLease(key lease.Key, entity string, stop <-chan struct{}) error { ctx, cancel := context.WithCancel(context.Background()) errCh := make(chan error) @@ -281,7 +283,7 @@ WHERE uuid = ( )`[1:] _, err := s.db.ExecContext( - ctx, q, lease.Namespace, lease.ModelUUID, lease.Lease, entity) + ctx, q, key.Namespace, key.ModelUUID, key.Lease, entity) errCh <- err }() @@ -298,7 +300,7 @@ WHERE uuid = ( // Pinned (lease.Store) returns all leases that are currently pinned, // and the entities requiring such behaviour for them. -func (s *DBStore) Pinned() (map[Key][]string, error) { +func (s *Store) Pinned() (map[lease.Key][]string, error) { q := ` SELECT l.uuid, t.type, l.model_uuid, l.name, p.entity_id FROM lease l @@ -312,10 +314,10 @@ ORDER BY l.uuid`[1:] } seen := set.NewStrings() - result := make(map[Key][]string) + result := make(map[lease.Key][]string) for rows.Next() { var leaseUUID string - var key Key + var key lease.Key var entity string if err := rows.Scan(&leaseUUID, &key.Namespace, &key.ModelUUID, &key.Lease, &entity); err != nil { @@ -335,12 +337,12 @@ ORDER BY l.uuid`[1:] } // leasesFromRows returns lease info from rows returned from the backing DB. -func leasesFromRows(rows *sql.Rows) (map[Key]Info, error) { - result := map[Key]Info{} +func leasesFromRows(rows *sql.Rows) (map[lease.Key]lease.Info, error) { + result := map[lease.Key]lease.Info{} for rows.Next() { - var key Key - var info Info + var key lease.Key + var info lease.Info if err := rows.Scan(&key.Namespace, &key.ModelUUID, &key.Lease, &info.Holder, &info.Expiry); err != nil { _ = rows.Close() @@ -351,15 +353,3 @@ func leasesFromRows(rows *sql.Rows) (map[Key]Info, error) { return result, errors.Trace(rows.Err()) } - -// TODO (manadart 2022-12-05): Utilities like this will reside in -// the database package for general use. -func isUniquenessViolation(err error) bool { - var sqliteErr sqlite3.Error - if errors.As(err, &sqliteErr) { - if errors.Is(sqliteErr.ExtendedCode, sqlite3.ErrConstraintUnique) { - return true - } - } - return false -} diff --git a/core/lease/storedb_test.go b/worker/lease/store_test.go similarity index 79% rename from core/lease/storedb_test.go rename to worker/lease/store_test.go index fff2b70e612..61cae335965 100644 --- a/core/lease/storedb_test.go +++ b/worker/lease/store_test.go @@ -13,21 +13,22 @@ import ( _ "github.com/mattn/go-sqlite3" gc "gopkg.in/check.v1" - "github.com/juju/juju/core/lease" + corelease "github.com/juju/juju/core/lease" "github.com/juju/juju/database/schema" + "github.com/juju/juju/worker/lease" ) -type storeDBSuite struct { +type storeSuite struct { testing.IsolationSuite db *sql.DB - store *lease.DBStore + store *lease.Store stopCh chan struct{} } -var _ = gc.Suite(&storeDBSuite{}) +var _ = gc.Suite(&storeSuite{}) -func (s *storeDBSuite) SetUpTest(c *gc.C) { +func (s *storeSuite) SetUpTest(c *gc.C) { s.IsolationSuite.SetUpTest(c) var err error @@ -35,20 +36,20 @@ func (s *storeDBSuite) SetUpTest(c *gc.C) { c.Assert(err, jc.ErrorIsNil) s.primeDB(c) - s.store = lease.NewDBStore(s.db, lease.StubLogger{}) + s.store = lease.NewStore(s.db, lease.StubLogger{}) // Single-buffered to allow us to queue up a stoppage. s.stopCh = make(chan struct{}, 1) } -func (s *storeDBSuite) TestClaimLeaseSuccessAndLeaseQueries(c *gc.C) { - pgKey := lease.Key{ +func (s *storeSuite) TestClaimLeaseSuccessAndLeaseQueries(c *gc.C) { + pgKey := corelease.Key{ Namespace: "application", ModelUUID: "model-uuid", Lease: "postgresql", } - pgReq := lease.Request{ + pgReq := corelease.Request{ Holder: "postgresql/0", Duration: time.Minute, } @@ -84,12 +85,12 @@ func (s *storeDBSuite) TestClaimLeaseSuccessAndLeaseQueries(c *gc.C) { // Add a lease from a different group, // and check that the group returns the application leases. err = s.store.ClaimLease( - lease.Key{ + corelease.Key{ Namespace: "controller", ModelUUID: "controller-model-uuid", Lease: "singular", }, - lease.Request{ + corelease.Request{ Holder: "machine/0", Duration: time.Minute, }, @@ -104,14 +105,14 @@ func (s *storeDBSuite) TestClaimLeaseSuccessAndLeaseQueries(c *gc.C) { c.Check(leases[mmKey].Holder, gc.Equals, "mattermost/0") } -func (s *storeDBSuite) TestClaimLeaseAlreadyHeld(c *gc.C) { - key := lease.Key{ +func (s *storeSuite) TestClaimLeaseAlreadyHeld(c *gc.C) { + key := corelease.Key{ Namespace: "controller", ModelUUID: "controller-model-uuid", Lease: "singular", } - req := lease.Request{ + req := corelease.Request{ Holder: "machine/0", Duration: time.Minute, } @@ -120,17 +121,17 @@ func (s *storeDBSuite) TestClaimLeaseAlreadyHeld(c *gc.C) { c.Assert(err, jc.ErrorIsNil) err = s.store.ClaimLease(key, req, s.stopCh) - c.Assert(errors.Is(err, lease.ErrHeld), jc.IsTrue) + c.Assert(errors.Is(err, corelease.ErrHeld), jc.IsTrue) } -func (s *storeDBSuite) TestExtendLeaseSuccess(c *gc.C) { - key := lease.Key{ +func (s *storeSuite) TestExtendLeaseSuccess(c *gc.C) { + key := corelease.Key{ Namespace: "application", ModelUUID: "model-uuid", Lease: "postgresql", } - req := lease.Request{ + req := corelease.Request{ Holder: "postgresql/0", Duration: time.Minute, } @@ -157,30 +158,30 @@ func (s *storeDBSuite) TestExtendLeaseSuccess(c *gc.C) { c.Check(leases[key].Expiry.After(originalExpiry), jc.IsTrue) } -func (s *storeDBSuite) TestExtendLeaseNotHeldInvalid(c *gc.C) { - key := lease.Key{ +func (s *storeSuite) TestExtendLeaseNotHeldInvalid(c *gc.C) { + key := corelease.Key{ Namespace: "application", ModelUUID: "model-uuid", Lease: "postgresql", } - req := lease.Request{ + req := corelease.Request{ Holder: "postgresql/0", Duration: time.Minute, } err := s.store.ExtendLease(key, req, s.stopCh) - c.Assert(errors.Is(err, lease.ErrInvalid), jc.IsTrue) + c.Assert(errors.Is(err, corelease.ErrInvalid), jc.IsTrue) } -func (s *storeDBSuite) TestRevokeLeaseSuccess(c *gc.C) { - key := lease.Key{ +func (s *storeSuite) TestRevokeLeaseSuccess(c *gc.C) { + key := corelease.Key{ Namespace: "application", ModelUUID: "model-uuid", Lease: "postgresql", } - req := lease.Request{ + req := corelease.Request{ Holder: "postgresql/0", Duration: time.Minute, } @@ -192,25 +193,25 @@ func (s *storeDBSuite) TestRevokeLeaseSuccess(c *gc.C) { c.Assert(err, jc.ErrorIsNil) } -func (s *storeDBSuite) TestRevokeLeaseNotHeldInvalid(c *gc.C) { - key := lease.Key{ +func (s *storeSuite) TestRevokeLeaseNotHeldInvalid(c *gc.C) { + key := corelease.Key{ Namespace: "application", ModelUUID: "model-uuid", Lease: "postgresql", } err := s.store.RevokeLease(key, "not-the-holder", s.stopCh) - c.Assert(errors.Is(err, lease.ErrInvalid), jc.IsTrue) + c.Assert(errors.Is(err, corelease.ErrInvalid), jc.IsTrue) } -func (s *storeDBSuite) TestPinUnpinLeaseAndPinQueries(c *gc.C) { - pgKey := lease.Key{ +func (s *storeSuite) TestPinUnpinLeaseAndPinQueries(c *gc.C) { + pgKey := corelease.Key{ Namespace: "application", ModelUUID: "model-uuid", Lease: "postgresql", } - pgReq := lease.Request{ + pgReq := corelease.Request{ Holder: "postgresql/0", Duration: time.Minute, } @@ -245,16 +246,16 @@ func (s *storeDBSuite) TestPinUnpinLeaseAndPinQueries(c *gc.C) { c.Check(pins[pgKey], jc.SameContents, []string{"machine/6"}) } -func (s *storeDBSuite) TestLeaseOperationCancellation(c *gc.C) { +func (s *storeSuite) TestLeaseOperationCancellation(c *gc.C) { s.stopCh <- struct{}{} - key := lease.Key{ + key := corelease.Key{ Namespace: "application", ModelUUID: "model-uuid", Lease: "postgresql", } - req := lease.Request{ + req := corelease.Request{ Holder: "postgresql/0", Duration: time.Minute, } @@ -263,7 +264,7 @@ func (s *storeDBSuite) TestLeaseOperationCancellation(c *gc.C) { c.Assert(err, gc.ErrorMatches, "claim cancelled") } -func (s *storeDBSuite) primeDB(c *gc.C) { +func (s *storeSuite) primeDB(c *gc.C) { tx, err := s.db.Begin() c.Assert(err, jc.ErrorIsNil) From 08b351ab2659618575517788959300782ee4b38f Mon Sep 17 00:00:00 2001 From: Joseph Phillips Date: Wed, 7 Dec 2022 14:02:48 +0100 Subject: [PATCH 09/10] Ensures that we wait for lease operation Goroutines to complete upon context cancellation. --- worker/lease/store.go | 10 +++++----- worker/lease/store_test.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/worker/lease/store.go b/worker/lease/store.go index 444de34e264..cee5569862c 100644 --- a/worker/lease/store.go +++ b/worker/lease/store.go @@ -103,7 +103,7 @@ WHERE type = ?`[1:] select { case <-stop: cancel() - return errors.New("claim cancelled") + return errors.Trace(<-errCh) case err := <-errCh: cancel() if database.IsErrConstraintUnique(err) { @@ -157,7 +157,7 @@ WHERE uuid = ( select { case <-stop: cancel() - return errors.New("extend cancelled") + return errors.Trace(<-errCh) case err := <-errCh: cancel() return errors.Trace(err) @@ -199,7 +199,7 @@ WHERE uuid = ( select { case <-stop: cancel() - return errors.New("revoke cancelled") + return errors.Trace(<-errCh) case err := <-errCh: cancel() return errors.Trace(err) @@ -249,7 +249,7 @@ AND l.name = ?`[1:] select { case <-stop: cancel() - return errors.New("pin lease cancelled") + return errors.Trace(<-errCh) case err := <-errCh: cancel() if database.IsErrConstraintUnique(err) { @@ -291,7 +291,7 @@ WHERE uuid = ( select { case <-stop: cancel() - return errors.New("unpin lease cancelled") + return errors.Trace(<-errCh) case err := <-errCh: cancel() return errors.Trace(err) diff --git a/worker/lease/store_test.go b/worker/lease/store_test.go index 61cae335965..883c4289ad1 100644 --- a/worker/lease/store_test.go +++ b/worker/lease/store_test.go @@ -261,7 +261,7 @@ func (s *storeSuite) TestLeaseOperationCancellation(c *gc.C) { } err := s.store.ClaimLease(key, req, s.stopCh) - c.Assert(err, gc.ErrorMatches, "claim cancelled") + c.Assert(err, gc.ErrorMatches, "context canceled") } func (s *storeSuite) primeDB(c *gc.C) { From 98b70fc934e15e64785dd97bae83856ce77dc11a Mon Sep 17 00:00:00 2001 From: Joseph Phillips Date: Wed, 7 Dec 2022 15:04:03 +0100 Subject: [PATCH 10/10] Adds a statement cache to the lease store so that each DML/DDL statement is prepared exactly once for the store's lifetime. --- worker/lease/store.go | 95 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 81 insertions(+), 14 deletions(-) diff --git a/worker/lease/store.go b/worker/lease/store.go index cee5569862c..8bba64a913e 100644 --- a/worker/lease/store.go +++ b/worker/lease/store.go @@ -8,6 +8,7 @@ import ( "database/sql" "fmt" "math" + "sync" "github.com/juju/collections/set" "github.com/juju/errors" @@ -27,6 +28,9 @@ type StoreLogger interface { type Store struct { db *sql.DB logger StoreLogger + + cache map[string]*sql.Stmt + cacheMu sync.RWMutex } // NewStore returns a reference to a new database-backed lease store. @@ -34,6 +38,7 @@ func NewStore(db *sql.DB, logger StoreLogger) *Store { return &Store{ db: db, logger: logger, + cache: make(map[string]*sql.Stmt), } } @@ -49,6 +54,7 @@ func (s *Store) Leases(keys ...lease.Key) (map[lease.Key]lease.Info, error) { return nil, errors.NotSupportedf("filtering with more than one lease key") } + name := "Leases" q := ` SELECT t.type, l.model_uuid, l.name, l.holder, l.expiry FROM lease l JOIN lease_type t ON l.lease_type_id = t.id`[1:] @@ -61,11 +67,17 @@ WHERE t.type = ? AND l.model_uuid = ? AND l.name = ?` + name = "LeasesForKey" key := keys[0] args = []any{key.Namespace, key.ModelUUID, key.Lease} } - rows, err := s.db.Query(q, args...) + stmt, err := s.getPrepared(context.Background(), name, q) + if err != nil { + return nil, errors.Trace(err) + } + + rows, err := stmt.Query(args...) if err != nil { return nil, errors.Trace(err) } @@ -92,11 +104,16 @@ SELECT ?, id, ?, ?, ?, datetime('now'), datetime('now', ?) FROM lease_type WHERE type = ?`[1:] - d := fmt.Sprintf("+%d seconds", int64(math.Ceil(req.Duration.Seconds()))) + stmt, err := s.getPrepared(ctx, "ClaimLease", q) + if err != nil { + errCh <- err + return + } - _, err := s.db.ExecContext( - ctx, q, utils.MustNewUUID().String(), key.ModelUUID, key.Lease, req.Holder, d, key.Namespace) + d := fmt.Sprintf("+%d seconds", int64(math.Ceil(req.Duration.Seconds()))) + _, err = stmt.ExecContext( + ctx, utils.MustNewUUID().String(), key.ModelUUID, key.Lease, req.Holder, d, key.Namespace) errCh <- err }() @@ -137,10 +154,15 @@ WHERE uuid = ( AND l.holder = ? )`[1:] + stmt, err := s.getPrepared(ctx, "ExtendLease", q) + if err != nil { + errCh <- err + return + } + d := fmt.Sprintf("+%d seconds", int64(math.Ceil(req.Duration.Seconds()))) - result, err := s.db.ExecContext( - ctx, q, d, key.Namespace, key.ModelUUID, key.Lease, req.Holder) + result, err := stmt.ExecContext(ctx, d, key.Namespace, key.ModelUUID, key.Lease, req.Holder) // If no rows were affected, then either this key does not exist or // it is not held by the input holder, constituting an invalid request. @@ -183,9 +205,13 @@ WHERE uuid = ( AND l.holder = ? )`[1:] - result, err := s.db.ExecContext( - ctx, q, key.Namespace, key.ModelUUID, key.Lease, holder) + stmt, err := s.getPrepared(ctx, "RevokeLease", q) + if err != nil { + errCh <- err + return + } + result, err := stmt.ExecContext(ctx, key.Namespace, key.ModelUUID, key.Lease, holder) if err == nil { var affected int64 affected, err = result.RowsAffected() @@ -215,7 +241,12 @@ FROM lease l JOIN lease_type t ON l.lease_type_id = t.id WHERE t.type = ? AND l.model_uuid = ?`[1:] - rows, err := s.db.Query(q, namespace, modelUUID) + stmt, err := s.getPrepared(context.Background(), "LeaseGroup", q) + if err != nil { + return nil, errors.Trace(err) + } + + rows, err := stmt.Query(namespace, modelUUID) if err != nil { return nil, errors.Trace(err) } @@ -240,9 +271,13 @@ WHERE t.type = ? AND l.model_uuid = ? AND l.name = ?`[1:] - _, err := s.db.ExecContext( - ctx, q, utils.MustNewUUID().String(), entity, key.Namespace, key.ModelUUID, key.Lease) + stmt, err := s.getPrepared(ctx, "PinLease", q) + if err != nil { + errCh <- err + return + } + _, err = stmt.ExecContext(ctx, utils.MustNewUUID().String(), entity, key.Namespace, key.ModelUUID, key.Lease) errCh <- err }() @@ -282,9 +317,13 @@ WHERE uuid = ( AND p.entity_id = ? )`[1:] - _, err := s.db.ExecContext( - ctx, q, key.Namespace, key.ModelUUID, key.Lease, entity) + stmt, err := s.getPrepared(ctx, "UnpinLease", q) + if err != nil { + errCh <- err + return + } + _, err = stmt.ExecContext(ctx, key.Namespace, key.ModelUUID, key.Lease, entity) errCh <- err }() @@ -308,7 +347,12 @@ FROM lease l JOIN lease_pin p on l.uuid = p.lease_uuid ORDER BY l.uuid`[1:] - rows, err := s.db.Query(q) + stmt, err := s.getPrepared(context.Background(), "Pinned", q) + if err != nil { + return nil, errors.Trace(err) + } + + rows, err := stmt.Query() if err != nil { return nil, errors.Trace(err) } @@ -336,6 +380,29 @@ ORDER BY l.uuid`[1:] return result, errors.Trace(rows.Err()) } +// getPrepared returns a prepared statement for the input name, +// ensuring that the first call for a given name caches the statement. +// thereafter the statement is returned from the cache. +func (s *Store) getPrepared(ctx context.Context, name string, stmt string) (*sql.Stmt, error) { + s.cacheMu.RLock() + if cachedStmt, ok := s.cache[name]; ok { + s.cacheMu.RUnlock() + return cachedStmt, nil + } + s.cacheMu.RUnlock() + + s.cacheMu.Lock() + defer s.cacheMu.Unlock() + + prepared, err := s.db.PrepareContext(ctx, stmt) + if err != nil { + return nil, errors.Trace(err) + } + + s.cache[name] = prepared + return prepared, nil +} + // leasesFromRows returns lease info from rows returned from the backing DB. func leasesFromRows(rows *sql.Rows) (map[lease.Key]lease.Info, error) { result := map[lease.Key]lease.Info{}