Skip to content

Commit

Permalink
Fixes for the case of re-creating the cluster after deletion.
Browse files Browse the repository at this point in the history
- make sure that the secrets for the system users (superuser, replication)
  are not deleted when the main cluster is. Therefore, we can re-create
  the cluster, potentially forcing Patroni to restore it from the backup
  and enable Patroni to connect, since it will use the old password, not
  the newly generated random one.

- when syncing users, always check whether they are already in the DB.
  Previously, we did this only for the sync cluster case, but the new
  cluster could be actually the one restored from the backup by Patroni,
  having all or some of the users already in place.

 - delete endponts last. Patroni uses the $clustername endpoint in order
   to store the leader related metadata. If we remove it before removing
   all pods, one of those pods running Patroni will re-create it and the
   next attempt to create the cluster with the same name will stuble on
   the existing endpoint.

 - Use db.Exec instead of db.Query for queries that expect no result.
   This also fixes the issue with the DB creation, since we didn't
   release an empty Row object it was not possible to create more than
   one database for a cluster.
  • Loading branch information
alexeyklyukin committed Dec 13, 2017
1 parent 1fb8cf7 commit 7e4d93f
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 45 deletions.
47 changes: 30 additions & 17 deletions pkg/cluster/cluster.go
Expand Up @@ -466,7 +466,7 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error {

if !c.databaseAccessDisabled() {
c.logger.Debugf("syncing roles")
if err := c.syncRoles(true); err != nil {
if err := c.syncRoles(); err != nil {
c.logger.Errorf("could not sync roles: %v", err)
updateFailed = true
}
Expand All @@ -488,14 +488,14 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error {
func() {
oldSs, err := c.generateStatefulSet(&oldSpec.Spec)
if err != nil {
c.logger.Errorf("could not generate old statefulset spec")
c.logger.Errorf("could not generate old statefulset spec: %v", err)
updateFailed = true
return
}

newSs, err := c.generateStatefulSet(&newSpec.Spec)
if err != nil {
c.logger.Errorf("could not generate new statefulset spec")
c.logger.Errorf("could not generate new statefulset spec: %v", err)
updateFailed = true
return
}
Expand Down Expand Up @@ -523,29 +523,23 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error {
}

// Delete deletes the cluster and cleans up all objects associated with it (including statefulsets).
// The deletion order here is somewhat significant, because Patroni, when running with the Kubernetes
// DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint
// before the pods, it will be re-created by the current master pod and will remain, obstructing the
// creation of the new cluster with the same name. Therefore, the endpoints should be deleted last.
func (c *Cluster) Delete() error {
c.mu.Lock()
defer c.mu.Unlock()

for _, role := range []PostgresRole{Master, Replica} {
if role == Replica && !c.Spec.ReplicaLoadBalancer {
continue
}

if err := c.deleteEndpoint(role); err != nil {
return fmt.Errorf("could not delete %s endpoint: %v", role, err)
}

if err := c.deleteService(role); err != nil {
return fmt.Errorf("could not delete %s service: %v", role, err)
}
}

if err := c.deleteStatefulSet(); err != nil {
return fmt.Errorf("could not delete statefulset: %v", err)
}

for _, obj := range c.Secrets {
if delete, user := c.shouldDeleteSecret(obj); !delete {
c.logger.Infof("not removing secret %q for the system user %q", obj.GetName(), user)
continue
}
if err := c.deleteSecret(obj); err != nil {
return fmt.Errorf("could not delete secret: %v", err)
}
Expand All @@ -555,6 +549,20 @@ func (c *Cluster) Delete() error {
return fmt.Errorf("could not delete pod disruption budget: %v", err)
}

for _, role := range []PostgresRole{Master, Replica} {
if role == Replica && !c.Spec.ReplicaLoadBalancer {
continue
}

if err := c.deleteEndpoint(role); err != nil {
return fmt.Errorf("could not delete %s endpoint: %v", role, err)
}

if err := c.deleteService(role); err != nil {
return fmt.Errorf("could not delete %s service: %v", role, err)
}
}

return nil
}

Expand Down Expand Up @@ -784,3 +792,8 @@ func (c *Cluster) Lock() {
func (c *Cluster) Unlock() {
c.mu.Unlock()
}

func (c *Cluster) shouldDeleteSecret(secret *v1.Secret) (delete bool, userName string) {
secretUser := string(secret.Data["username"])
return (secretUser != c.OpConfig.ReplicationUsername && secretUser != c.OpConfig.SuperUsername), secretUser
}
44 changes: 40 additions & 4 deletions pkg/cluster/cluster_test.go
Expand Up @@ -7,14 +7,20 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/util/config"
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
"github.com/zalando-incubator/postgres-operator/pkg/util/teams"
"k8s.io/client-go/pkg/api/v1"
"reflect"
"testing"
)

const (
superUserName = "postgres"
replicationUserName = "standby"
)

var logger = logrus.New().WithField("test", "cluster")
var cl = New(Config{OpConfig: config.Config{ProtectedRoles: []string{"admin"},
Auth: config.Auth{SuperUsername: "postgres",
ReplicationUsername: "standby"}}},
Auth: config.Auth{SuperUsername: superUserName,
ReplicationUsername: replicationUserName}}},
k8sutil.KubernetesClient{}, spec.Postgresql{}, logger)

func TestInitRobotUsers(t *testing.T) {
Expand Down Expand Up @@ -52,7 +58,7 @@ func TestInitRobotUsers(t *testing.T) {
`conflicting user flags: "NOINHERIT" and "INHERIT"`),
},
{
manifestUsers: map[string]spec.UserFlags{"admin": {"superuser"}, "postgres": {"createdb"}},
manifestUsers: map[string]spec.UserFlags{"admin": {"superuser"}, superUserName: {"createdb"}},
infraRoles: map[string]spec.PgUser{},
result: map[string]spec.PgUser{},
err: nil,
Expand Down Expand Up @@ -121,7 +127,7 @@ func TestInitHumanUsers(t *testing.T) {
},
{
existingRoles: map[string]spec.PgUser{},
teamRoles: []string{"admin", "standby"},
teamRoles: []string{"admin", replicationUserName},
result: map[string]spec.PgUser{},
},
}
Expand All @@ -138,3 +144,33 @@ func TestInitHumanUsers(t *testing.T) {
}
}
}

func TestShouldDeleteSecret(t *testing.T) {
testName := "TestShouldDeleteSecret"

tests := []struct {
secret *v1.Secret
outcome bool
}{
{
secret: &v1.Secret{Data: map[string][]byte{"username": []byte("foobar")}},
outcome: true,
},
{
secret: &v1.Secret{Data: map[string][]byte{"username": []byte(superUserName)}},

outcome: false,
},
{
secret: &v1.Secret{Data: map[string][]byte{"username": []byte(replicationUserName)}},
outcome: false,
},
}

for _, tt := range tests {
if outcome, username := cl.shouldDeleteSecret(tt.secret); outcome != tt.outcome {
t.Errorf("%s expects the check for deletion of the username %q secret to return %t, got %t",
testName, username, tt.outcome, outcome)
}
}
}
4 changes: 2 additions & 2 deletions pkg/cluster/pg.go
Expand Up @@ -191,7 +191,7 @@ func (c *Cluster) executeCreateDatabase(datname, owner string) error {
}
c.logger.Infof("creating database %q with owner %q", datname, owner)

if _, err := c.pgDb.Query(fmt.Sprintf(createDatabaseSQL, datname, owner)); err != nil {
if _, err := c.pgDb.Exec(fmt.Sprintf(createDatabaseSQL, datname, owner)); err != nil {
return fmt.Errorf("could not execute create database: %v", err)
}
return nil
Expand All @@ -204,7 +204,7 @@ func (c *Cluster) executeAlterDatabaseOwner(datname string, owner string) error
return nil
}
c.logger.Infof("changing database %q owner to %q", datname, owner)
if _, err := c.pgDb.Query(fmt.Sprintf(alterDatabaseOwnerSQL, datname, owner)); err != nil {
if _, err := c.pgDb.Exec(fmt.Sprintf(alterDatabaseOwnerSQL, datname, owner)); err != nil {
return fmt.Errorf("could not execute alter database owner: %v", err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/resources.go
Expand Up @@ -473,7 +473,7 @@ func (c *Cluster) deleteSecret(secret *v1.Secret) error {

func (c *Cluster) createRoles() (err error) {
// TODO: figure out what to do with duplicate names (humans and robots) among pgUsers
return c.syncRoles(false)
return c.syncRoles()
}

// GetServiceMaster returns cluster's kubernetes master Service
Expand Down
18 changes: 8 additions & 10 deletions pkg/cluster/sync.go
Expand Up @@ -59,7 +59,7 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) {

if !c.databaseAccessDisabled() {
c.logger.Debugf("syncing roles")
if err = c.syncRoles(true); err != nil {
if err = c.syncRoles(); err != nil {
err = fmt.Errorf("could not sync roles: %v", err)
return
}
Expand Down Expand Up @@ -346,7 +346,7 @@ func (c *Cluster) syncSecrets() error {
return nil
}

func (c *Cluster) syncRoles(readFromDatabase bool) error {
func (c *Cluster) syncRoles() error {
c.setProcessName("syncing roles")

var (
Expand All @@ -365,14 +365,12 @@ func (c *Cluster) syncRoles(readFromDatabase bool) error {
}
}()

if readFromDatabase {
for _, u := range c.pgUsers {
userNames = append(userNames, u.Name)
}
dbUsers, err = c.readPgUsersFromDatabase(userNames)
if err != nil {
return fmt.Errorf("error getting users from the database: %v", err)
}
for _, u := range c.pgUsers {
userNames = append(userNames, u.Name)
}
dbUsers, err = c.readPgUsersFromDatabase(userNames)
if err != nil {
return fmt.Errorf("error getting users from the database: %v", err)
}

pgSyncRequests := c.userSyncStrategy.ProduceSyncRequests(dbUsers, c.pgUsers)
Expand Down
14 changes: 3 additions & 11 deletions pkg/util/users/users.go
Expand Up @@ -95,7 +95,7 @@ func (s DefaultUserSyncStrategy) ExecuteSyncRequests(reqs []spec.PgSyncUserReque
func (strategy DefaultUserSyncStrategy) alterPgUserSet(user spec.PgUser, db *sql.DB) (err error) {
queries := produceAlterRoleSetStmts(user)
query := fmt.Sprintf(doBlockStmt, strings.Join(queries, ";"))
if err = runQueryDiscardResult(db, query); err != nil {
if _, err = db.Exec(query); err != nil {
err = fmt.Errorf("dB error: %v, query: %s", err, query)
return
}
Expand All @@ -120,7 +120,7 @@ func (s DefaultUserSyncStrategy) createPgUser(user spec.PgUser, db *sql.DB) (err
}
query := fmt.Sprintf(createUserSQL, user.Name, strings.Join(userFlags, " "), userPassword)

err = runQueryDiscardResult(db, query) // TODO: Try several times
_, err = db.Exec(query) // TODO: Try several times
if err != nil {
err = fmt.Errorf("dB error: %v, query: %s", err, query)
return
Expand All @@ -146,7 +146,7 @@ func (s DefaultUserSyncStrategy) alterPgUser(user spec.PgUser, db *sql.DB) (err

query := fmt.Sprintf(doBlockStmt, strings.Join(resultStmt, ";"))

err = runQueryDiscardResult(db, query) // TODO: Try several times
_, err = db.Exec(query) // TODO: Try several times
if err != nil {
err = fmt.Errorf("dB error: %v query %s", err, query)
return
Expand Down Expand Up @@ -215,11 +215,3 @@ func quoteParameterValue(name, val string) string {
}
return fmt.Sprintf(`'%s'`, strings.Trim(val, " "))
}

func runQueryDiscardResult(db *sql.DB, sql string) error {
rows, err := db.Query(sql)
if rows != nil {
rows.Close()
}
return err
}

0 comments on commit 7e4d93f

Please sign in to comment.