diff --git a/database/errors.go b/database/errors.go new file mode 100644 index 00000000000..3b0a985f4b7 --- /dev/null +++ b/database/errors.go @@ -0,0 +1,16 @@ +package database + +import ( + "github.com/juju/errors" + "github.com/mattn/go-sqlite3" +) + +// IsErrConstraintUnique returns true if the input error was +// returned by SQLite due to violation of a unique constraint. +func IsErrConstraintUnique(err error) bool { + var sqliteErr sqlite3.Error + if errors.As(err, &sqliteErr) && sqliteErr.ExtendedCode == sqlite3.ErrConstraintUnique { + return true + } + return false +} diff --git a/database/schema/controller.go b/database/schema/controller.go index 0f285bbd17a..d8786aeb3ad 100644 --- a/database/schema/controller.go +++ b/database/schema/controller.go @@ -17,28 +17,46 @@ CREATE UNIQUE INDEX idx_lease_type_type ON lease_type (type); INSERT INTO lease_type VALUES - (0, 'controller'), - (1, 'model' ), - (2, 'application'); + (0, 'controller'), -- The controller running singular controller workers. + (1, 'model' ), -- The controller running singular workers for a model. + (2, 'application'); -- The unit that holds leadership for an application. -CREATE TABLE IF NOT EXISTS lease ( +CREATE TABLE lease ( uuid TEXT PRIMARY KEY, lease_type_id INT NOT NULL, + model_uuid TEXT, name TEXT, holder TEXT, start TIMESTAMP, expiry TIMESTAMP, - pinned BOOLEAN, CONSTRAINT fk_lease_lease_type FOREIGN KEY (lease_type_id) REFERENCES lease_type(id) ); -CREATE UNIQUE INDEX idx_lease_type_name -ON lease (lease_type_id, name); +CREATE UNIQUE INDEX idx_lease_model_type_name +ON lease (model_uuid, lease_type_id, name); CREATE INDEX idx_lease_expiry -ON lease (expiry);`[1:] +ON lease (expiry); + +CREATE TABLE lease_pin ( + -- The presence of entries in this table for a particular lease_uuid + -- implies that the lease in question is pinned and cannot expire. + uuid TEXT PRIMARY KEY, + lease_uuid TEXT, + entity_id TEXT, + CONSTRAINT fk_lease_pin_lease + FOREIGN KEY (lease_uuid) + REFERENCES lease(uuid) +); + +CREATE UNIQUE INDEX idx_lease_pin_lease_entity +ON lease_pin (lease_uuid, entity_id); + +CREATE INDEX idx_lease_pin_lease +ON lease_pin (lease_uuid); +`[1:] return strings.Split(delta, ";\n\n") } diff --git a/worker/lease/package_test.go b/worker/lease/package_test.go index 3c1fafce365..9a89e281feb 100644 --- a/worker/lease/package_test.go +++ b/worker/lease/package_test.go @@ -1,7 +1,7 @@ // Copyright 2015 Canonical Ltd. // Licensed under the AGPLv3, see LICENCE file for details. -package lease_test +package lease import ( "testing" @@ -12,3 +12,7 @@ import ( func TestPackage(t *testing.T) { gc.TestingT(t) } + +type StubLogger struct{} + +func (StubLogger) Errorf(string, ...interface{}) {} diff --git a/worker/lease/store.go b/worker/lease/store.go new file mode 100644 index 00000000000..8bba64a913e --- /dev/null +++ b/worker/lease/store.go @@ -0,0 +1,422 @@ +// Copyright 2022 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package lease + +import ( + "context" + "database/sql" + "fmt" + "math" + "sync" + + "github.com/juju/collections/set" + "github.com/juju/errors" + "github.com/juju/utils/v3" + + "github.com/juju/juju/core/lease" + "github.com/juju/juju/database" +) + +// StoreLogger describes methods for logging lease store concerns. +type StoreLogger interface { + Errorf(string, ...interface{}) +} + +// Store implements lease.Store using a database +// supporting SQLite-compatible dialects. +type Store struct { + db *sql.DB + logger StoreLogger + + cache map[string]*sql.Stmt + cacheMu sync.RWMutex +} + +// NewStore returns a reference to a new database-backed lease store. +func NewStore(db *sql.DB, logger StoreLogger) *Store { + return &Store{ + db: db, + logger: logger, + cache: make(map[string]*sql.Stmt), + } +} + +// Leases (lease.Store) returns all leases in the database, +// optionally filtering using the input keys. +func (s *Store) Leases(keys ...lease.Key) (map[lease.Key]lease.Info, error) { + // TODO (manadart 2022-11-30): We expect the variadic `keys` argument to be + // length 0 or 1. It was a work-around for design constraints at the time. + // Either filter the result here for len(keys) > 1, or fix the design. + // As it is, there are no upstream usages for more than one key, + // so we just lock in that behaviour. + if len(keys) > 1 { + return nil, errors.NotSupportedf("filtering with more than one lease key") + } + + name := "Leases" + q := ` +SELECT t.type, l.model_uuid, l.name, l.holder, l.expiry +FROM lease l JOIN lease_type t ON l.lease_type_id = t.id`[1:] + + var args []any + + if len(keys) == 1 { + q += ` +WHERE t.type = ? +AND l.model_uuid = ? +AND l.name = ?` + + name = "LeasesForKey" + key := keys[0] + args = []any{key.Namespace, key.ModelUUID, key.Lease} + } + + stmt, err := s.getPrepared(context.Background(), name, q) + if err != nil { + return nil, errors.Trace(err) + } + + rows, err := stmt.Query(args...) + if err != nil { + return nil, errors.Trace(err) + } + + result, err := leasesFromRows(rows) + return result, errors.Trace(rows.Err()) +} + +// ClaimLease (lease.Store) claims the lease indicated by the input key, +// for the holder and duration indicated by the input request. +// The lease must not already be held, otherwise an error is returned. +func (s *Store) ClaimLease(key lease.Key, req lease.Request, stop <-chan struct{}) error { + if err := req.Validate(); err != nil { + return errors.Trace(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error) + + go func() { + q := ` +INSERT INTO lease (uuid, lease_type_id, model_uuid, name, holder, start, expiry) +SELECT ?, id, ?, ?, ?, datetime('now'), datetime('now', ?) +FROM lease_type +WHERE type = ?`[1:] + + stmt, err := s.getPrepared(ctx, "ClaimLease", q) + if err != nil { + errCh <- err + return + } + + d := fmt.Sprintf("+%d seconds", int64(math.Ceil(req.Duration.Seconds()))) + + _, err = stmt.ExecContext( + ctx, utils.MustNewUUID().String(), key.ModelUUID, key.Lease, req.Holder, d, key.Namespace) + errCh <- err + }() + + select { + case <-stop: + cancel() + return errors.Trace(<-errCh) + case err := <-errCh: + cancel() + if database.IsErrConstraintUnique(err) { + return lease.ErrHeld + } + return errors.Trace(err) + } +} + +// ExtendLease (lease.Store) ensures the input lease will be held for at least +// the requested duration starting from now. +// If the input holder does not currently hold the lease, an error is returned. +func (s *Store) ExtendLease(key lease.Key, req lease.Request, stop <-chan struct{}) error { + if err := req.Validate(); err != nil { + return errors.Trace(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error) + + go func() { + q := ` +UPDATE lease +SET expiry = datetime('now', ?) +WHERE uuid = ( + SELECT l.uuid + FROM lease l JOIN lease_type t ON l.lease_type_id = t.id + WHERE t.type = ? + AND l.model_uuid = ? + AND l.name = ? + AND l.holder = ? +)`[1:] + + stmt, err := s.getPrepared(ctx, "ExtendLease", q) + if err != nil { + errCh <- err + return + } + + d := fmt.Sprintf("+%d seconds", int64(math.Ceil(req.Duration.Seconds()))) + + result, err := stmt.ExecContext(ctx, d, key.Namespace, key.ModelUUID, key.Lease, req.Holder) + + // If no rows were affected, then either this key does not exist or + // it is not held by the input holder, constituting an invalid request. + if err == nil { + var affected int64 + affected, err = result.RowsAffected() + if affected == 0 && err == nil { + err = lease.ErrInvalid + } + } + errCh <- err + }() + + select { + case <-stop: + cancel() + return errors.Trace(<-errCh) + case err := <-errCh: + cancel() + return errors.Trace(err) + } +} + +// RevokeLease (lease.Store) deletes the lease from the store, +// provided it exists and is held by the input holder. +// If either of these conditions is false, an error is returned. +func (s *Store) RevokeLease(key lease.Key, holder string, stop <-chan struct{}) error { + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error) + + go func() { + q := ` +DELETE FROM lease +WHERE uuid = ( + SELECT l.uuid + FROM lease l JOIN lease_type t ON l.lease_type_id = t.id + WHERE t.type = ? + AND l.model_uuid = ? + AND l.name = ? + AND l.holder = ? +)`[1:] + + stmt, err := s.getPrepared(ctx, "RevokeLease", q) + if err != nil { + errCh <- err + return + } + + result, err := stmt.ExecContext(ctx, key.Namespace, key.ModelUUID, key.Lease, holder) + if err == nil { + var affected int64 + affected, err = result.RowsAffected() + if affected == 0 && err == nil { + err = lease.ErrInvalid + } + } + errCh <- err + }() + + select { + case <-stop: + cancel() + return errors.Trace(<-errCh) + case err := <-errCh: + cancel() + return errors.Trace(err) + } +} + +// LeaseGroup (lease.Store) returns all leases +// for the input namespace and model. +func (s *Store) LeaseGroup(namespace, modelUUID string) (map[lease.Key]lease.Info, error) { + q := ` +SELECT t.type, l.model_uuid, l.name, l.holder, l.expiry +FROM lease l JOIN lease_type t ON l.lease_type_id = t.id +WHERE t.type = ? +AND l.model_uuid = ?`[1:] + + stmt, err := s.getPrepared(context.Background(), "LeaseGroup", q) + if err != nil { + return nil, errors.Trace(err) + } + + rows, err := stmt.Query(namespace, modelUUID) + if err != nil { + return nil, errors.Trace(err) + } + + result, err := leasesFromRows(rows) + return result, errors.Trace(err) +} + +// PinLease (lease.Store) adds the input entity into the lease_pin table +// to indicate that the lease indicated by the input key must not expire, +// and that this entity requires such behaviour. +func (s *Store) PinLease(key lease.Key, entity string, stop <-chan struct{}) error { + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error) + + go func() { + q := ` +INSERT INTO lease_pin (uuid, lease_uuid, entity_id) +SELECT ?, l.uuid, ? +FROM lease l JOIN lease_type t ON l.lease_type_id = t.id +WHERE t.type = ? +AND l.model_uuid = ? +AND l.name = ?`[1:] + + stmt, err := s.getPrepared(ctx, "PinLease", q) + if err != nil { + errCh <- err + return + } + + _, err = stmt.ExecContext(ctx, utils.MustNewUUID().String(), entity, key.Namespace, key.ModelUUID, key.Lease) + errCh <- err + }() + + select { + case <-stop: + cancel() + return errors.Trace(<-errCh) + case err := <-errCh: + cancel() + if database.IsErrConstraintUnique(err) { + return nil + } + return errors.Trace(err) + } +} + +// UnpinLease (lease.Store) removes the record indicated by the input +// key and entity from the lease pin table, indicating that the entity +// no longer requires the lease to be pinned. +// When there are no entities associated with a particular lease, +// it is determined not to be pinned, and can expire normally. +func (s *Store) UnpinLease(key lease.Key, entity string, stop <-chan struct{}) error { + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error) + + go func() { + q := ` +DELETE FROM lease_pin +WHERE uuid = ( + SELECT p.uuid + FROM lease_pin p + JOIN lease l ON l.uuid = p.lease_uuid + JOIN lease_type t ON l.lease_type_id = t.id + WHERE t.type = ? + AND l.model_uuid = ? + AND l.name = ? + AND p.entity_id = ? +)`[1:] + + stmt, err := s.getPrepared(ctx, "UnpinLease", q) + if err != nil { + errCh <- err + return + } + + _, err = stmt.ExecContext(ctx, key.Namespace, key.ModelUUID, key.Lease, entity) + errCh <- err + }() + + select { + case <-stop: + cancel() + return errors.Trace(<-errCh) + case err := <-errCh: + cancel() + return errors.Trace(err) + } +} + +// Pinned (lease.Store) returns all leases that are currently pinned, +// and the entities requiring such behaviour for them. +func (s *Store) Pinned() (map[lease.Key][]string, error) { + q := ` +SELECT l.uuid, t.type, l.model_uuid, l.name, p.entity_id +FROM lease l + JOIN lease_type t ON l.lease_type_id = t.id + JOIN lease_pin p on l.uuid = p.lease_uuid +ORDER BY l.uuid`[1:] + + stmt, err := s.getPrepared(context.Background(), "Pinned", q) + if err != nil { + return nil, errors.Trace(err) + } + + rows, err := stmt.Query() + if err != nil { + return nil, errors.Trace(err) + } + + seen := set.NewStrings() + result := make(map[lease.Key][]string) + for rows.Next() { + var leaseUUID string + var key lease.Key + var entity string + + if err := rows.Scan(&leaseUUID, &key.Namespace, &key.ModelUUID, &key.Lease, &entity); err != nil { + _ = rows.Close() + return nil, errors.Trace(err) + } + + if !seen.Contains(leaseUUID) { + result[key] = []string{entity} + seen.Add(leaseUUID) + } else { + result[key] = append(result[key], entity) + } + } + + return result, errors.Trace(rows.Err()) +} + +// getPrepared returns a prepared statement for the input name, +// ensuring that the first call for a given name caches the statement. +// thereafter the statement is returned from the cache. +func (s *Store) getPrepared(ctx context.Context, name string, stmt string) (*sql.Stmt, error) { + s.cacheMu.RLock() + if cachedStmt, ok := s.cache[name]; ok { + s.cacheMu.RUnlock() + return cachedStmt, nil + } + s.cacheMu.RUnlock() + + s.cacheMu.Lock() + defer s.cacheMu.Unlock() + + prepared, err := s.db.PrepareContext(ctx, stmt) + if err != nil { + return nil, errors.Trace(err) + } + + s.cache[name] = prepared + return prepared, nil +} + +// leasesFromRows returns lease info from rows returned from the backing DB. +func leasesFromRows(rows *sql.Rows) (map[lease.Key]lease.Info, error) { + result := map[lease.Key]lease.Info{} + + for rows.Next() { + var key lease.Key + var info lease.Info + + if err := rows.Scan(&key.Namespace, &key.ModelUUID, &key.Lease, &info.Holder, &info.Expiry); err != nil { + _ = rows.Close() + return nil, errors.Trace(err) + } + result[key] = info + } + + return result, errors.Trace(rows.Err()) +} diff --git a/worker/lease/store_test.go b/worker/lease/store_test.go new file mode 100644 index 00000000000..883c4289ad1 --- /dev/null +++ b/worker/lease/store_test.go @@ -0,0 +1,278 @@ +// Copyright 2022 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package lease_test + +import ( + "database/sql" + "time" + + "github.com/juju/errors" + "github.com/juju/testing" + jc "github.com/juju/testing/checkers" + _ "github.com/mattn/go-sqlite3" + gc "gopkg.in/check.v1" + + corelease "github.com/juju/juju/core/lease" + "github.com/juju/juju/database/schema" + "github.com/juju/juju/worker/lease" +) + +type storeSuite struct { + testing.IsolationSuite + + db *sql.DB + store *lease.Store + stopCh chan struct{} +} + +var _ = gc.Suite(&storeSuite{}) + +func (s *storeSuite) SetUpTest(c *gc.C) { + s.IsolationSuite.SetUpTest(c) + + var err error + s.db, err = sql.Open("sqlite3", ":memory:") + c.Assert(err, jc.ErrorIsNil) + + s.primeDB(c) + s.store = lease.NewStore(s.db, lease.StubLogger{}) + + // Single-buffered to allow us to queue up a stoppage. + s.stopCh = make(chan struct{}, 1) +} + +func (s *storeSuite) TestClaimLeaseSuccessAndLeaseQueries(c *gc.C) { + pgKey := corelease.Key{ + Namespace: "application", + ModelUUID: "model-uuid", + Lease: "postgresql", + } + + pgReq := corelease.Request{ + Holder: "postgresql/0", + Duration: time.Minute, + } + + // Add 2 leases. + err := s.store.ClaimLease(pgKey, pgReq, s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + mmKey := pgKey + mmKey.Lease = "mattermost" + + mmReq := pgReq + mmReq.Holder = "mattermost/0" + + err = s.store.ClaimLease(mmKey, mmReq, s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + // Check all the leases. + leases, err := s.store.Leases() + c.Assert(err, jc.ErrorIsNil) + c.Assert(leases, gc.HasLen, 2) + c.Check(leases[pgKey].Holder, gc.Equals, "postgresql/0") + c.Check(leases[pgKey].Expiry.After(time.Now().UTC()), jc.IsTrue) + c.Check(leases[mmKey].Holder, gc.Equals, "mattermost/0") + c.Check(leases[mmKey].Expiry.After(time.Now().UTC()), jc.IsTrue) + + // Check with a filter. + leases, err = s.store.Leases(pgKey) + c.Assert(err, jc.ErrorIsNil) + c.Assert(leases, gc.HasLen, 1) + c.Check(leases[pgKey].Holder, gc.Equals, "postgresql/0") + + // Add a lease from a different group, + // and check that the group returns the application leases. + err = s.store.ClaimLease( + corelease.Key{ + Namespace: "controller", + ModelUUID: "controller-model-uuid", + Lease: "singular", + }, + corelease.Request{ + Holder: "machine/0", + Duration: time.Minute, + }, + s.stopCh, + ) + c.Assert(err, jc.ErrorIsNil) + + leases, err = s.store.LeaseGroup("application", "model-uuid") + c.Assert(err, jc.ErrorIsNil) + c.Assert(leases, gc.HasLen, 2) + c.Check(leases[pgKey].Holder, gc.Equals, "postgresql/0") + c.Check(leases[mmKey].Holder, gc.Equals, "mattermost/0") +} + +func (s *storeSuite) TestClaimLeaseAlreadyHeld(c *gc.C) { + key := corelease.Key{ + Namespace: "controller", + ModelUUID: "controller-model-uuid", + Lease: "singular", + } + + req := corelease.Request{ + Holder: "machine/0", + Duration: time.Minute, + } + + err := s.store.ClaimLease(key, req, s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + err = s.store.ClaimLease(key, req, s.stopCh) + c.Assert(errors.Is(err, corelease.ErrHeld), jc.IsTrue) +} + +func (s *storeSuite) TestExtendLeaseSuccess(c *gc.C) { + key := corelease.Key{ + Namespace: "application", + ModelUUID: "model-uuid", + Lease: "postgresql", + } + + req := corelease.Request{ + Holder: "postgresql/0", + Duration: time.Minute, + } + + err := s.store.ClaimLease(key, req, s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + leases, err := s.store.Leases(key) + c.Assert(err, jc.ErrorIsNil) + c.Assert(leases, gc.HasLen, 1) + + // Save the expiry for later comparison. + originalExpiry := leases[key].Expiry + + req.Duration = 2 * time.Minute + err = s.store.ExtendLease(key, req, s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + leases, err = s.store.Leases(key) + c.Assert(err, jc.ErrorIsNil) + c.Assert(leases, gc.HasLen, 1) + + // Check that we extended. + c.Check(leases[key].Expiry.After(originalExpiry), jc.IsTrue) +} + +func (s *storeSuite) TestExtendLeaseNotHeldInvalid(c *gc.C) { + key := corelease.Key{ + Namespace: "application", + ModelUUID: "model-uuid", + Lease: "postgresql", + } + + req := corelease.Request{ + Holder: "postgresql/0", + Duration: time.Minute, + } + + err := s.store.ExtendLease(key, req, s.stopCh) + c.Assert(errors.Is(err, corelease.ErrInvalid), jc.IsTrue) +} + +func (s *storeSuite) TestRevokeLeaseSuccess(c *gc.C) { + key := corelease.Key{ + Namespace: "application", + ModelUUID: "model-uuid", + Lease: "postgresql", + } + + req := corelease.Request{ + Holder: "postgresql/0", + Duration: time.Minute, + } + + err := s.store.ClaimLease(key, req, s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + err = s.store.RevokeLease(key, req.Holder, s.stopCh) + c.Assert(err, jc.ErrorIsNil) +} + +func (s *storeSuite) TestRevokeLeaseNotHeldInvalid(c *gc.C) { + key := corelease.Key{ + Namespace: "application", + ModelUUID: "model-uuid", + Lease: "postgresql", + } + + err := s.store.RevokeLease(key, "not-the-holder", s.stopCh) + c.Assert(errors.Is(err, corelease.ErrInvalid), jc.IsTrue) +} + +func (s *storeSuite) TestPinUnpinLeaseAndPinQueries(c *gc.C) { + pgKey := corelease.Key{ + Namespace: "application", + ModelUUID: "model-uuid", + Lease: "postgresql", + } + + pgReq := corelease.Request{ + Holder: "postgresql/0", + Duration: time.Minute, + } + + err := s.store.ClaimLease(pgKey, pgReq, s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + // One entity pins the lease. + err = s.store.PinLease(pgKey, "machine/6", s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + // The same lease/entity is a no-op without error. + err = s.store.PinLease(pgKey, "machine/6", s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + // Another entity pinning the same lease. + err = s.store.PinLease(pgKey, "machine/7", s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + pins, err := s.store.Pinned() + c.Assert(err, jc.ErrorIsNil) + c.Assert(pins, gc.HasLen, 1) + c.Check(pins[pgKey], jc.SameContents, []string{"machine/6", "machine/7"}) + + // Unpin and check the leases. + err = s.store.UnpinLease(pgKey, "machine/7", s.stopCh) + c.Assert(err, jc.ErrorIsNil) + + pins, err = s.store.Pinned() + c.Assert(err, jc.ErrorIsNil) + c.Assert(pins, gc.HasLen, 1) + c.Check(pins[pgKey], jc.SameContents, []string{"machine/6"}) +} + +func (s *storeSuite) TestLeaseOperationCancellation(c *gc.C) { + s.stopCh <- struct{}{} + + key := corelease.Key{ + Namespace: "application", + ModelUUID: "model-uuid", + Lease: "postgresql", + } + + req := corelease.Request{ + Holder: "postgresql/0", + Duration: time.Minute, + } + + err := s.store.ClaimLease(key, req, s.stopCh) + c.Assert(err, gc.ErrorMatches, "context canceled") +} + +func (s *storeSuite) primeDB(c *gc.C) { + tx, err := s.db.Begin() + c.Assert(err, jc.ErrorIsNil) + + for _, stmt := range schema.ControllerDDL() { + _, err := tx.Exec(stmt) + c.Assert(err, jc.ErrorIsNil) + } + + err = tx.Commit() + c.Assert(err, jc.ErrorIsNil) +}