Skip to content

Commit

Permalink
Open and close DB connections on-demand.
Browse files Browse the repository at this point in the history
Previously, we used to leave the DB connection open while the
cluster was registered with the operator, potentially resutling
in dangled connections if the operator terminates abnormally.

Small refactoring around the role syncing code.
  • Loading branch information
alexeyklyukin committed Aug 9, 2017
1 parent 5dd3ec3 commit 09adc5e
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 38 deletions.
9 changes: 3 additions & 6 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) {
}
}

// initUsers populates c.systemUsers and c.pgUsers maps.
func (c *Cluster) initUsers() error {
c.initSystemUsers()

Expand Down Expand Up @@ -204,7 +205,7 @@ func (c *Cluster) Create() error {
if err = c.initUsers(); err != nil {
return err
}
c.logger.Infof("User secrets have been initialized")
c.logger.Infof("Users have been initialized")

if err = c.applySecrets(); err != nil {
return fmt.Errorf("could not create secrets: %v", err)
Expand All @@ -226,11 +227,7 @@ func (c *Cluster) Create() error {
c.logger.Infof("pods are ready")

if !(c.masterLess || c.databaseAccessDisabled()) {
err = c.initDbConn()
if err != nil {
return fmt.Errorf("could not init db connection: %v", err)
}
err = c.createUsers()
err = c.createRoles()
if err != nil {
return fmt.Errorf("could not create users: %v", err)
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/cluster/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (c *Cluster) initDbConn() (err error) {
if err != nil {
return err
}
c.logger.Debug("new database connection")
err = conn.Ping()
if err != nil {
if err2 := conn.Close(); err2 != nil {
Expand All @@ -60,6 +61,15 @@ func (c *Cluster) initDbConn() (err error) {
return nil
}

func (c *Cluster) closeDbConn() (err error) {
if c.pgDb != nil {
c.logger.Debug("closing database connection")
return c.pgDb.Close()
}
c.logger.Warning("attempted to close an empty db connection object")
return nil
}

func (c *Cluster) readPgUsersFromDatabase(userNames []string) (users spec.PgUserMap, err error) {
var rows *sql.Rows
users = make(spec.PgUserMap)
Expand Down
7 changes: 2 additions & 5 deletions pkg/cluster/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,7 @@ func (c *Cluster) deleteSecret(secret *v1.Secret) error {
return err
}

func (c *Cluster) createUsers() (err error) {
func (c *Cluster) createRoles() (err error) {
// TODO: figure out what to do with duplicate names (humans and robots) among pgUsers
reqs := c.userSyncStrategy.ProduceSyncRequests(nil, c.pgUsers)
err = c.userSyncStrategy.ExecuteSyncRequests(reqs, c.pgDb)

return err
return c.syncRoles(false)
}
57 changes: 30 additions & 27 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cluster
import (
"fmt"

"github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util"
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
"github.com/zalando-incubator/postgres-operator/pkg/util/volumes"
Expand All @@ -19,8 +20,14 @@ func (c *Cluster) Sync() error {
c.logger.Errorf("could not load resources: %v", err)
}

if err = c.initUsers(); err != nil {
return err
}

c.logger.Debugf("Syncing secrets")
if err := c.syncSecrets(); err != nil {

//TODO: mind the secrets of the deleted/new users
if err := c.applySecrets(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("could not sync secrets: %v", err)
}
Expand Down Expand Up @@ -59,11 +66,8 @@ func (c *Cluster) Sync() error {
}

if !c.databaseAccessDisabled() {
if err := c.initDbConn(); err != nil {
return fmt.Errorf("could not init db connection: %v", err)
}
c.logger.Debugf("Syncing roles")
if err := c.syncRoles(); err != nil {
if err := c.syncRoles(true); err != nil {
return fmt.Errorf("could not sync roles: %v", err)
}
}
Expand All @@ -76,17 +80,6 @@ func (c *Cluster) Sync() error {
return nil
}

func (c *Cluster) syncSecrets() error {
//TODO: mind the secrets of the deleted/new users
if err := c.initUsers(); err != nil {
return err
}

err := c.applySecrets()

return err
}

func (c *Cluster) syncService(role postgresRole) error {
cSpec := c.Spec
if c.Service[role] == nil {
Expand Down Expand Up @@ -193,21 +186,31 @@ func (c *Cluster) syncStatefulSet() error {
return nil
}

func (c *Cluster) syncRoles() error {
var userNames []string
func (c *Cluster) syncRoles(readFromDatabase bool) error {
var (
err error
dbUsers spec.PgUserMap
userNames []string
)

if err := c.initUsers(); err != nil {
return err
}
for _, u := range c.pgUsers {
userNames = append(userNames, u.Name)
}
dbUsers, err := c.readPgUsersFromDatabase(userNames)
err = c.initDbConn()
if err != nil {
return fmt.Errorf("error getting users from the database: %v", err)
return fmt.Errorf("could not init db connection: %v", err)
}
defer c.closeDbConn()

if readFromDatabase {
for _, u := range c.pgUsers {
userNames = append(userNames, u.Name)
}
dbUsers, err = c.readPgUsersFromDatabase(userNames)
if err != nil {
return fmt.Errorf("error getting users from the database: %v", err)
}
}

pgSyncRequests := c.userSyncStrategy.ProduceSyncRequests(dbUsers, c.pgUsers)
if err := c.userSyncStrategy.ExecuteSyncRequests(pgSyncRequests, c.pgDb); err != nil {
if err = c.userSyncStrategy.ExecuteSyncRequests(pgSyncRequests, c.pgDb); err != nil {
return fmt.Errorf("error executing sync statements: %v", err)
}
return nil
Expand Down

0 comments on commit 09adc5e

Please sign in to comment.