Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 214 additions & 5 deletions internal/controller/postgrescluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package controller
import (
"context"
"fmt"
"strings"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -52,6 +54,26 @@ import (
* PC-09 ignores no-op updates
*/

func CollectEvents(events *[]string, recorder *record.FakeRecorder) {
for {
select {
case e := <-recorder.Events:
*events = append(*events, e)
default:
return
}
}
}

func ContainsEvent(events []string, eventType string, event string) bool {
for _, e := range events {
if strings.Contains(e, eventType) && strings.Contains(e, event) {
return true
}
}
return false
}

var _ = Describe("PostgresCluster Controller", Label("postgres"), func() {

const (
Expand Down Expand Up @@ -79,6 +101,7 @@ var _ = Describe("PostgresCluster Controller", Label("postgres"), func() {
pgClusterClassKey types.NamespacedName
reconciler *PostgresClusterReconciler
req reconcile.Request
fakeRecorder *record.FakeRecorder
)

reconcileNTimes := func(times int) {
Expand Down Expand Up @@ -162,13 +185,17 @@ var _ = Describe("PostgresCluster Controller", Label("postgres"), func() {
Spec: enterprisev4.PostgresClusterSpec{
Class: className,
ClusterDeletionPolicy: ptr.To(deletePolicy),
ManagedRoles: []enterprisev4.ManagedRole{
{Name: "app_user", Exists: true},
{Name: "app_user_rw", Exists: true},
},
},
}

fakeRecorder = record.NewFakeRecorder(100)
reconciler = &PostgresClusterReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Recorder: record.NewFakeRecorder(100),
Recorder: fakeRecorder,
Metrics: &pgprometheus.NoopRecorder{},
FleetCollector: pgprometheus.NewFleetCollector(),
}
Expand Down Expand Up @@ -255,24 +282,93 @@ var _ = Describe("PostgresCluster Controller", Label("postgres"), func() {
cond := meta.FindStatusCondition(pc.Status.Conditions, "ClusterReady")
Expect(cond).NotTo(BeNil())
Expect(cond.Status).To(Equal(metav1.ConditionFalse))
Expect(cond.Reason).To(Equal("ClusterBuildSucceeded"))
Expect(cond.Reason).To(Equal("CNPGClusterProvisioning"))

secretCond := meta.FindStatusCondition(pc.Status.Conditions, "SecretsReady")
Expect(secretCond).NotTo(BeNil())
Expect(secretCond.Status).To(Equal(metav1.ConditionTrue))
Expect(secretCond.Reason).To(Equal("SuperUserSecretReady"))

configMapCond := meta.FindStatusCondition(pc.Status.Conditions, "ConfigMapsReady")
// ConfigMap converge runs in the runtime phase; at this point reconcile may
// still be returning from provisioner pending and not have written it yet.
Expect(configMapCond).To(BeNil())

// Simulate external CNPG controller status progression.
// Simulate CNPG becoming healthy first, but without managed roles status published yet.
cnpg := &cnpgv1.Cluster{}
Expect(k8sClient.Get(ctx, pgClusterKey, cnpg)).To(Succeed())
cnpg.Status.Phase = cnpgv1.PhaseHealthy
Expect(k8sClient.Status().Update(ctx, cnpg)).To(Succeed())
reconcileNTimes(1)

Expect(k8sClient.Get(ctx, pgClusterKey, pc)).To(Succeed())
managedRolesCond := meta.FindStatusCondition(pc.Status.Conditions, "ManagedRolesReady")
Expect(managedRolesCond).NotTo(BeNil())
Expect(managedRolesCond.Status).To(Equal(metav1.ConditionFalse))
Expect(managedRolesCond.Reason).To(Equal("ManagedRolesPending"))

// Simulate external CNPG controller publishing managed roles status.
Expect(k8sClient.Get(ctx, pgClusterKey, cnpg)).To(Succeed())
cnpg.Status.ManagedRolesStatus = cnpgv1.ManagedRoles{
ByStatus: map[cnpgv1.RoleStatus][]string{
cnpgv1.RoleStatusReconciled: {"app_user", "app_user_rw"},
},
}
Expect(k8sClient.Status().Update(ctx, cnpg)).To(Succeed())
reconcileNTimes(1)

// Expect cnpg status progression propagation
Expect(k8sClient.Get(ctx, pgClusterKey, pc)).To(Succeed())
cond = meta.FindStatusCondition(pc.Status.Conditions, "ClusterReady")
Expect(cond).NotTo(BeNil())
Expect(cond.Status).To(Equal(metav1.ConditionTrue))
Expect(cond.Reason).To(Equal("CNPGClusterHealthy"))

secretCond = meta.FindStatusCondition(pc.Status.Conditions, "SecretsReady")
Expect(secretCond).NotTo(BeNil())
Expect(secretCond.Status).To(Equal(metav1.ConditionTrue))
Expect(secretCond.Reason).To(Equal("SuperUserSecretReady"))

configMapCond = meta.FindStatusCondition(pc.Status.Conditions, "ConfigMapsReady")
Expect(configMapCond).NotTo(BeNil())
Expect(configMapCond.Status).To(Equal(metav1.ConditionTrue))
Expect(configMapCond.Reason).To(Equal("ConfigMapReconciled"))

managedRolesCond = meta.FindStatusCondition(pc.Status.Conditions, "ManagedRolesReady")
Expect(managedRolesCond).NotTo(BeNil())
Expect(managedRolesCond.Status).To(Equal(metav1.ConditionTrue))
Expect(managedRolesCond.Reason).To(Equal("ManagedRolesReconciled"))

// Pooler is disabled in this suite fixture, but converge publishes PoolerReady=True with disabled message.
poolerCond := meta.FindStatusCondition(pc.Status.Conditions, "PoolerReady")
Expect(poolerCond).NotTo(BeNil())
Expect(poolerCond.Status).To(Equal(metav1.ConditionTrue))
Expect(poolerCond.Reason).To(Equal("AllInstancesReady"))
Expect(poolerCond.Message).To(Equal("Connection pooler disabled"))

Expect(pc.Status.ManagedRolesStatus).NotTo(BeNil())
Expect(pc.Status.ManagedRolesStatus.Reconciled).To(ContainElements("app_user", "app_user_rw"))

Expect(pc.Status.Phase).NotTo(BeNil())
Expect(*pc.Status.Phase).To(Equal("Ready"))
Expect(pc.Status.ProvisionerRef).NotTo(BeNil())
Expect(pc.Status.ProvisionerRef.Kind).To(Equal("Cluster"))
Expect(pc.Status.ProvisionerRef.Name).To(Equal(clusterName))

Expect(pc.Status.Resources).NotTo(BeNil())
Expect(pc.Status.Resources.SuperUserSecretRef).NotTo(BeNil())
Expect(pc.Status.Resources.ConfigMapRef).NotTo(BeNil())

received := make([]string, 0, 8)
CollectEvents(&received, fakeRecorder)
Expect(ContainsEvent(
received,
v1.EventTypeNormal, core.EventConfigMapReconciled,
)).To(BeTrue(), "events seen: %v", received)
Expect(ContainsEvent(
received,
v1.EventTypeNormal, core.EventClusterReady,
)).To(BeTrue(), "events seen: %v", received)
})

// PC-07
Expand Down Expand Up @@ -308,6 +404,7 @@ var _ = Describe("PostgresCluster Controller", Label("postgres"), func() {
Context("with PostgreSQL metrics enabled in class", func() {
BeforeEach(func() {
pgCluster.Spec.Class = classNameMetrics
pgCluster.Spec.ManagedRoles = nil
})

It("adds scrape annotations to the CNPG Cluster", func() {
Expand All @@ -316,6 +413,17 @@ var _ = Describe("PostgresCluster Controller", Label("postgres"), func() {

cnpg := &cnpgv1.Cluster{}
Expect(k8sClient.Get(ctx, pgClusterKey, cnpg)).To(Succeed())

cnpg.Status.Phase = cnpgv1.PhaseHealthy
cnpg.Status.ManagedRolesStatus = cnpgv1.ManagedRoles{
ByStatus: map[cnpgv1.RoleStatus][]string{
cnpgv1.RoleStatusReconciled: {"app_user", "app_user_rw"},
},
}
Expect(k8sClient.Status().Update(ctx, cnpg)).To(Succeed())

reconcileNTimes(1)

Expect(cnpg.Spec.InheritedMetadata).NotTo(BeNil())
Expect(cnpg.Spec.InheritedMetadata.Annotations).To(HaveKeyWithValue(scrapeAnnotationKey, "true"))
Expect(cnpg.Spec.InheritedMetadata.Annotations).To(HaveKeyWithValue(pathAnnotationKey, metricsPath))
Expand Down Expand Up @@ -353,6 +461,7 @@ var _ = Describe("PostgresCluster Controller", Label("postgres"), func() {
Context("with connection pooler metrics enabled in class", func() {
BeforeEach(func() {
pgCluster.Spec.Class = classNamePooler
pgCluster.Spec.ManagedRoles = nil
})

It("adds scrape annotations to poolers only after the CNPG cluster becomes healthy", func() {
Expand All @@ -365,6 +474,13 @@ var _ = Describe("PostgresCluster Controller", Label("postgres"), func() {
Expect(apierrors.IsNotFound(k8sClient.Get(ctx, rwKey, &cnpgv1.Pooler{}))).To(BeTrue())
Expect(apierrors.IsNotFound(k8sClient.Get(ctx, roKey, &cnpgv1.Pooler{}))).To(BeTrue())

pc := &enterprisev4.PostgresCluster{}
Expect(k8sClient.Get(ctx, pgClusterKey, pc)).To(Succeed())
poolerCond := meta.FindStatusCondition(pc.Status.Conditions, "PoolerReady")
// Pooler component is gated behind provisioner readiness, so before CNPG
// becomes healthy the condition may not be written yet.
Expect(poolerCond).To(BeNil())

cnpg := &cnpgv1.Cluster{}
Expect(k8sClient.Get(ctx, pgClusterKey, cnpg)).To(Succeed())
cnpg.Status.Phase = cnpgv1.PhaseHealthy
Expand All @@ -387,6 +503,28 @@ var _ = Describe("PostgresCluster Controller", Label("postgres"), func() {
g.Expect(ro.Spec.Template.ObjectMeta.Annotations).To(HaveKeyWithValue(scrapeAnnotationKey, "true"))
g.Expect(ro.Spec.Template.ObjectMeta.Annotations).To(HaveKeyWithValue(pathAnnotationKey, metricsPath))
g.Expect(ro.Spec.Template.ObjectMeta.Annotations).To(HaveKeyWithValue(portAnnotationKey, poolerPort))

// Simulate CNPG pooler controller publishing status progression.
if rw.Status.Instances < 2 {
rw.Status.Instances = 2
g.Expect(k8sClient.Status().Update(ctx, rw)).To(Succeed())
}
if ro.Status.Instances < 2 {
ro.Status.Instances = 2
g.Expect(k8sClient.Status().Update(ctx, ro)).To(Succeed())
}
}, "20s", "250ms").Should(Succeed())

Eventually(func(g Gomega) {
_, err := reconciler.Reconcile(ctx, req)
g.Expect(err).NotTo(HaveOccurred())

updated := &enterprisev4.PostgresCluster{}
g.Expect(k8sClient.Get(ctx, pgClusterKey, updated)).To(Succeed())
poolerReadyCond := meta.FindStatusCondition(updated.Status.Conditions, "PoolerReady")
g.Expect(poolerReadyCond).NotTo(BeNil())
g.Expect(poolerReadyCond.Status).To(Equal(metav1.ConditionTrue))
g.Expect(poolerReadyCond.Reason).To(Equal("AllInstancesReady"))
}, "20s", "250ms").Should(Succeed())
})
})
Expand Down Expand Up @@ -439,7 +577,7 @@ var _ = Describe("PostgresCluster Controller", Label("postgres"), func() {
When("reconciling with invalid or drifted dependencies", func() {
// PC-05
Context("when referenced class does not exist", func() {
It("fails with class-not-found condition", func() {
It("fails with class-not-found condition and emits a warning event", func() {
badName := "bad-" + clusterName
badKey := types.NamespacedName{Name: badName, Namespace: namespace}

Expand All @@ -464,6 +602,13 @@ var _ = Describe("PostgresCluster Controller", Label("postgres"), func() {
cond := meta.FindStatusCondition(current.Status.Conditions, "ClusterReady")
return cond != nil && cond.Reason == "ClusterClassNotFound"
}, "20s", "250ms").Should(BeTrue())

received := make([]string, 0, 8)
CollectEvents(&received, fakeRecorder)
Expect(ContainsEvent(
received,
v1.EventTypeWarning, core.EventClusterClassNotFound,
)).To(BeTrue(), "events seen: %v", received)
})
})

Expand All @@ -483,5 +628,69 @@ var _ = Describe("PostgresCluster Controller", Label("postgres"), func() {
Expect(cnpg.Spec.Instances).To(Equal(int(clusterMemberCount)))
})
})

Context("when a configmap spec changes", func() {
BeforeEach(func() {
// Keep this test focused on ConfigMap behavior; otherwise reconcile can
// stop on ManagedRolesPending before ConfigMap status is written.
pgCluster.Spec.ManagedRoles = nil
})

It("emits ConfigMapReconciled event on configmap update", func() {
Expect(k8sClient.Create(ctx, pgCluster)).To(Succeed())
reconcileNTimes(2)

// Make sure runtime can proceed (if needed in your fixture)
cnpg := &cnpgv1.Cluster{}
Expect(k8sClient.Get(ctx, pgClusterKey, cnpg)).To(Succeed())
cnpg.Status.Phase = cnpgv1.PhaseHealthy
Expect(k8sClient.Status().Update(ctx, cnpg)).To(Succeed())
reconcileNTimes(1)

// Drain baseline events so we don't match the initial "created" event.
received := make([]string, 0, 16)
CollectEvents(&received, fakeRecorder)
received = received[:0]

// Drift the managed ConfigMap.
pc := &enterprisev4.PostgresCluster{}
Eventually(func() bool {
if err := k8sClient.Get(ctx, pgClusterKey, pc); err != nil {
return false
}
return pc.Status.Resources != nil && pc.Status.Resources.ConfigMapRef != nil
}, "5s", "100ms").Should(BeTrue())

cmKey := types.NamespacedName{
Name: pc.Status.Resources.ConfigMapRef.Name,
Namespace: namespace,
}
cm := &v1.ConfigMap{}
Expect(k8sClient.Get(ctx, cmKey, cm)).To(Succeed())
delete(cm.Data, "CLUSTER_RW_ENDPOINT") // force reconciliation update
Expect(k8sClient.Update(ctx, cm)).To(Succeed())

// Reconcile and assert updated event.
reconcileNTimes(1)

Eventually(func() bool {
CollectEvents(&received, fakeRecorder)

// reason match
if !ContainsEvent(received, v1.EventTypeNormal, core.EventConfigMapReconciled) {
return false
}
// message-level match for update (not create)
for _, e := range received {
if strings.Contains(e, v1.EventTypeNormal) &&
strings.Contains(e, core.EventConfigMapReconciled) &&
strings.Contains(e, "updated") {
return true
}
}
return false
}, "5s", "100ms").Should(BeTrue(), "events seen: %v", received)
})
})
})
})
Loading
Loading