Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (s *Server) clusters(w http.ResponseWriter, req *http.Request) {
)

if matches := util.FindNamedStringSubmatch(clusterStatusURL, req.URL.Path); matches != nil {
namespace, _ := matches["namespace"]
namespace := matches["namespace"]
resp, err = s.controller.ClusterStatus(matches["team"], namespace, matches["cluster"])
} else if matches := util.FindNamedStringSubmatch(teamURL, req.URL.Path); matches != nil {
teamClusters := s.controller.TeamClusterList()
Expand All @@ -174,10 +174,10 @@ func (s *Server) clusters(w http.ResponseWriter, req *http.Request) {

resp, err = clusterNames, nil
} else if matches := util.FindNamedStringSubmatch(clusterLogsURL, req.URL.Path); matches != nil {
namespace, _ := matches["namespace"]
namespace := matches["namespace"]
resp, err = s.controller.ClusterLogs(matches["team"], namespace, matches["cluster"])
} else if matches := util.FindNamedStringSubmatch(clusterHistoryURL, req.URL.Path); matches != nil {
namespace, _ := matches["namespace"]
namespace := matches["namespace"]
resp, err = s.controller.ClusterHistory(matches["team"], namespace, matches["cluster"])
} else if req.URL.Path == clustersURL {
clusterNamesPerTeam := make(map[string][]string)
Expand All @@ -194,8 +194,8 @@ func (s *Server) clusters(w http.ResponseWriter, req *http.Request) {
s.respond(resp, err, w)
}

func mustConvertToUint32(s string) uint32{
result, err := strconv.Atoi(s);
func mustConvertToUint32(s string) uint32 {
result, err := strconv.Atoi(s)
if err != nil {
panic(fmt.Errorf("mustConvertToUint32 called for %s: %v", s, err))
}
Expand Down Expand Up @@ -244,8 +244,6 @@ func (s *Server) databases(w http.ResponseWriter, req *http.Request) {

databaseNamesPerCluster := s.controller.ClusterDatabasesMap()
s.respond(databaseNamesPerCluster, nil, w)
return

}

func (s *Server) allQueues(w http.ResponseWriter, r *http.Request) {
Expand Down
20 changes: 11 additions & 9 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,16 +450,16 @@ func (c *Cluster) compareContainers(setA, setB *v1beta1.StatefulSet) (bool, []st
return needsRollUpdate, reasons
}

func compareResources(a *v1.ResourceRequirements, b *v1.ResourceRequirements) (equal bool) {
equal = true
func compareResources(a *v1.ResourceRequirements, b *v1.ResourceRequirements) bool {
equal := true
if a != nil {
equal = compareResoucesAssumeFirstNotNil(a, b)
}
if equal && (b != nil) {
equal = compareResoucesAssumeFirstNotNil(b, a)
}

return
return equal
}

func compareResoucesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.ResourceRequirements) bool {
Expand Down Expand Up @@ -786,15 +786,16 @@ func (c *Cluster) initInfrastructureRoles() error {
}

// resolves naming conflicts between existing and new roles by chosing either of them.
func (c *Cluster) resolveNameConflict(currentRole, newRole *spec.PgUser) (result spec.PgUser) {
func (c *Cluster) resolveNameConflict(currentRole, newRole *spec.PgUser) spec.PgUser {
var result spec.PgUser
if newRole.Origin >= currentRole.Origin {
result = *newRole
} else {
result = *currentRole
}
c.logger.Debugf("resolved a conflict of role %q between %s and %s to %s",
newRole.Name, newRole.Origin, currentRole.Origin, result.Origin)
return
return result
}

func (c *Cluster) shouldAvoidProtectedOrSystemRole(username, purpose string) bool {
Expand Down Expand Up @@ -838,8 +839,9 @@ func (c *Cluster) GetStatus() *spec.ClusterStatus {
}

// Switchover does a switchover (via Patroni) to a candidate pod
func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) (err error) {
func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) error {

var err error
c.logger.Debugf("failing over from %q to %q", curMaster.Name, candidate)

var wg sync.WaitGroup
Expand All @@ -858,8 +860,8 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) (

select {
case <-stopCh:
case podLabelErr <- func() (err error) {
_, err = c.waitForPodLabel(ch, stopCh, &role)
case podLabelErr <- func() (err2 error) {
_, err2 = c.waitForPodLabel(ch, stopCh, &role)
return
}():
}
Expand All @@ -882,7 +884,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) (
// close the label waiting channel no sooner than the waiting goroutine terminates.
close(podLabelErr)

return
return err

}

Expand Down
28 changes: 14 additions & 14 deletions pkg/cluster/k8sres.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (

"github.com/Sirupsen/logrus"

"k8s.io/api/apps/v1beta1"
"k8s.io/api/core/v1"
policybeta1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/api/core/v1"
"k8s.io/api/apps/v1beta1"
policybeta1 "k8s.io/api/policy/v1beta1"

"github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util"
Expand Down Expand Up @@ -90,7 +90,7 @@ func (c *Cluster) makeDefaultResources() spec.Resources {
defaultRequests := spec.ResourceDescription{CPU: config.DefaultCPURequest, Memory: config.DefaultMemoryRequest}
defaultLimits := spec.ResourceDescription{CPU: config.DefaultCPULimit, Memory: config.DefaultMemoryLimit}

return spec.Resources{ResourceRequest:defaultRequests, ResourceLimits:defaultLimits}
return spec.Resources{ResourceRequest: defaultRequests, ResourceLimits: defaultLimits}
}

func generateResourceRequirements(resources spec.Resources, defaultResources spec.Resources) (*v1.ResourceRequirements, error) {
Expand Down Expand Up @@ -366,7 +366,7 @@ func generateSidecarContainers(sidecars []spec.Sidecar,
volumeMounts []v1.VolumeMount, defaultResources spec.Resources,
superUserName string, credentialsSecretName string, logger *logrus.Entry) ([]v1.Container, error) {

if sidecars != nil && len(sidecars) > 0 {
if len(sidecars) > 0 {
result := make([]v1.Container, 0)
for index, sidecar := range sidecars {

Expand Down Expand Up @@ -699,7 +699,7 @@ func (c *Cluster) generateStatefulSet(spec *spec.PostgresSpec) (*v1beta1.Statefu
// generate sidecar containers
if sidecarContainers, err = generateSidecarContainers(sideCars, volumeMounts, defaultResources,
c.OpConfig.SuperUsername, c.credentialSecretName(c.OpConfig.SuperUsername), c.logger); err != nil {
return nil, fmt.Errorf("could not generate sidecar containers: %v", err)
return nil, fmt.Errorf("could not generate sidecar containers: %v", err)
}

tolerationSpec := tolerations(&spec.Tolerations, c.OpConfig.PodToleration)
Expand All @@ -716,7 +716,7 @@ func (c *Cluster) generateStatefulSet(spec *spec.PostgresSpec) (*v1beta1.Statefu
int64(c.OpConfig.PodTerminateGracePeriod.Seconds()),
c.OpConfig.PodServiceAccountName,
c.OpConfig.KubeIAMRole,
effectivePodPriorityClassName); err != nil{
effectivePodPriorityClassName); err != nil {
return nil, fmt.Errorf("could not generate pod template: %v", err)
}

Expand All @@ -726,7 +726,7 @@ func (c *Cluster) generateStatefulSet(spec *spec.PostgresSpec) (*v1beta1.Statefu

if volumeClaimTemplate, err = generatePersistentVolumeClaimTemplate(spec.Volume.Size,
spec.Volume.StorageClass); err != nil {
return nil, fmt.Errorf("could not generate volume claim template: %v", err)
return nil, fmt.Errorf("could not generate volume claim template: %v", err)
}

numberOfInstances := c.getNumberOfInstances(spec)
Expand Down Expand Up @@ -804,11 +804,11 @@ func (c *Cluster) mergeSidecars(sidecars []spec.Sidecar) []spec.Sidecar {
return result
}

func (c *Cluster) getNumberOfInstances(spec *spec.PostgresSpec) (newcur int32) {
func (c *Cluster) getNumberOfInstances(spec *spec.PostgresSpec) int32 {
min := c.OpConfig.MinInstances
max := c.OpConfig.MaxInstances
cur := spec.NumberOfInstances
newcur = cur
newcur := cur

if max >= 0 && newcur > max {
newcur = max
Expand All @@ -820,7 +820,7 @@ func (c *Cluster) getNumberOfInstances(spec *spec.PostgresSpec) (newcur int32) {
c.logger.Infof("adjusted number of instances from %d to %d (min: %d, max: %d)", cur, newcur, min, max)
}

return
return newcur
}

func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) {
Expand Down Expand Up @@ -860,8 +860,8 @@ func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string
return volumeClaim, nil
}

func (c *Cluster) generateUserSecrets() (secrets map[string]*v1.Secret) {
secrets = make(map[string]*v1.Secret, len(c.pgUsers))
func (c *Cluster) generateUserSecrets() map[string]*v1.Secret {
secrets := make(map[string]*v1.Secret, len(c.pgUsers))
namespace := c.Namespace
for username, pgUser := range c.pgUsers {
//Skip users with no password i.e. human users (they'll be authenticated using pam)
Expand All @@ -878,7 +878,7 @@ func (c *Cluster) generateUserSecrets() (secrets map[string]*v1.Secret) {
}
}

return
return secrets
}

func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser) *v1.Secret {
Expand Down
24 changes: 11 additions & 13 deletions pkg/cluster/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,32 +183,30 @@ func (c *Cluster) getDatabases() (dbs map[string]string, err error) {
dbs[datname] = owner
}

return
return dbs, err
}

// executeCreateDatabase creates new database with the given owner.
// The caller is responsible for openinging and closing the database connection.
func (c *Cluster) executeCreateDatabase(datname, owner string) error {
if !c.databaseNameOwnerValid(datname, owner) {
return nil
}
c.logger.Infof("creating database %q with owner %q", datname, owner)

if _, err := c.pgDb.Exec(fmt.Sprintf(createDatabaseSQL, datname, owner)); err != nil {
return fmt.Errorf("could not execute create database: %v", err)
}
return nil
return c.execCreateOrAlterDatabase(datname, owner, createDatabaseSQL,
"creating database", "create database")
}

// executeCreateDatabase changes the owner of the given database.
// The caller is responsible for openinging and closing the database connection.
func (c *Cluster) executeAlterDatabaseOwner(datname string, owner string) error {
return c.execCreateOrAlterDatabase(datname, owner, alterDatabaseOwnerSQL,
"changing owner for database", "alter database owner")
}

func (c *Cluster) execCreateOrAlterDatabase(datname, owner, statement, doing, operation string) error {
if !c.databaseNameOwnerValid(datname, owner) {
return nil
}
c.logger.Infof("changing database %q owner to %q", datname, owner)
if _, err := c.pgDb.Exec(fmt.Sprintf(alterDatabaseOwnerSQL, datname, owner)); err != nil {
return fmt.Errorf("could not execute alter database owner: %v", err)
c.logger.Infof("%s %q owner %q", doing, datname, owner)
if _, err := c.pgDb.Exec(fmt.Sprintf(statement, datname, owner)); err != nil {
return fmt.Errorf("could not execute %s: %v", operation, err)
}
return nil
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/cluster/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ func (c *Cluster) masterCandidate(oldNodeName string) (*v1.Pod, error) {
func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error {
var (
masterCandidatePod *v1.Pod
err error
eol bool
err error
eol bool
)

oldMaster, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{})
Expand Down Expand Up @@ -212,7 +212,7 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error {
var sset *v1beta1.StatefulSet
if sset, err = c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(),
metav1.GetOptions{}); err != nil {
return fmt.Errorf("could not retrieve cluster statefulset: %v", err)
return fmt.Errorf("could not retrieve cluster statefulset: %v", err)
}
c.Statefulset = sset
}
Expand All @@ -225,7 +225,6 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error {
c.logger.Warningf("single master pod for cluster %q, migration will cause longer downtime of the master instance", c.clusterName())
}


// there are two cases for each postgres cluster that has its master pod on the node to migrate from:
// - the cluster has some replicas - migrate one of those if necessary and failover to it
// - there are no replicas - just terminate the master and wait until it respawns
Expand Down
24 changes: 0 additions & 24 deletions pkg/cluster/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,27 +633,3 @@ func (c *Cluster) GetStatefulSet() *v1beta1.StatefulSet {
func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget {
return c.PodDisruptionBudget
}

func (c *Cluster) createDatabases() error {
c.setProcessName("creating databases")

if len(c.Spec.Databases) == 0 {
return nil
}

if err := c.initDbConn(); err != nil {
return fmt.Errorf("could not init database connection")
}
defer func() {
if err := c.closeDbConn(); err != nil {
c.logger.Errorf("could not close database connection: %v", err)
}
}()

for datname, owner := range c.Spec.Databases {
if err := c.executeCreateDatabase(datname, owner); err != nil {
return err
}
}
return nil
}
Loading