Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions controllers/coherence_controller.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021, Oracle and/or its affiliates.
* Copyright (c) 2020, 2022, Oracle and/or its affiliates.
* Licensed under the Universal Permissive License v 1.0 as shown at
* http://oss.oracle.com/licenses/upl.
*/
Expand Down Expand Up @@ -54,6 +54,12 @@ type CoherenceReconciler struct {
reconcilers []reconciler.SecondaryResourceReconciler
}

// Failure is a simple holder for a named error
type Failure struct {
Name string
Error error
}

// blank assignment to verify that CoherenceReconciler implements reconcile.Reconciler
// There will be a compile-time error here if this breaks
var _ reconcile.Reconciler = &CoherenceReconciler{}
Expand Down Expand Up @@ -211,15 +217,24 @@ func (in *CoherenceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
}

// process the secondary resources in the order they should be created
var failures []Failure
for _, rec := range in.reconcilers {
log.Info("Reconciling Coherence resource secondary resources", "controller", rec.GetControllerName())
r, err := rec.ReconcileAllResourceOfKind(ctx, request, deployment, storage)
if err != nil {
return reconcile.Result{}, err
failures = append(failures, Failure{Name: rec.GetControllerName(), Error: err})
}
result.Requeue = result.Requeue || r.Requeue
}

if len(failures) > 0 {
// one or more reconcilers failed:
for _, failure := range failures {
log.Error(failure.Error, "Secondary Reconciler failed", "Reconciler", failure.Name)
}
return reconcile.Result{}, fmt.Errorf("one or more secondary resource reconcilers failed to reconcile")
}

// if replica count is zero update the status to Stopped
if deployment.GetReplicas() == 0 {
if err = in.UpdateDeploymentStatusPhase(ctx, request.NamespacedName, coh.ConditionTypeStopped); err != nil {
Expand Down
45 changes: 40 additions & 5 deletions controllers/servicemonitor/servicemonitor_controller.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021, Oracle and/or its affiliates.
* Copyright (c) 2020, 2022, Oracle and/or its affiliates.
* Licensed under the Universal Permissive License v 1.0 as shown at
* http://oss.oracle.com/licenses/upl.
*/
Expand Down Expand Up @@ -214,11 +214,46 @@ func (in *ReconcileServiceMonitor) UpdateServiceMonitor(ctx context.Context, nam

logger.Info("Patching ServiceMonitor")
_, err = in.monClient.ServiceMonitors(namespace).Patch(ctx, name, in.GetPatchType(), data, metav1.PatchOptions{})
if hashMatches {
logger.Info("Patch applied to ServiceMonitor even though hashes matched (possible external update)")
}
if err != nil {
return errors.Wrapf(err, "cannot patch ServiceMonitor %s/%s", namespace, name)
// Patch or update failed - resort to an update with retry as sometimes custom resource (like ServiceMonitor) cannot be patched
count := 1
reason := "patch"
for err != nil && count <= 5 {
logger.Info(fmt.Sprintf("Failed to %s ServiceMonitor - retrying update", reason),
"Attempt", count, "Error", err.Error())
count++
// re-fetch the current spec
current, err = in.monClient.ServiceMonitors(namespace).Get(ctx, current.Name, metav1.GetOptions{})
switch {
case err != nil && apierrors.IsNotFound(err):
// not found error so try creating the ServiceMonitor (shouldn't really get here!)
reason = "create"
_, err = in.monClient.ServiceMonitors(namespace).Create(ctx, desired.Spec.(*monitoring.ServiceMonitor), metav1.CreateOptions{})
case err != nil:
// Error reading the object - requeue the request.
// We can't call the error handler as we do not even have an owning Coherence resource.
// We log the error and do not requeue the request.
logger.Info("Failed to re-fetch ServiceMonitor")
default:
// update the current spec
reason = "update"
current.Spec = desired.Spec.(*monitoring.ServiceMonitor).Spec
_, err = in.monClient.ServiceMonitors(namespace).Update(ctx, current, metav1.UpdateOptions{})
}
}

if err != nil {
logger.Info(fmt.Sprintf("Failed to %s ServiceMonitor %s - Gave up after %d attempts.", reason, name, count),
"Error", err.Error())
}
}

if err == nil {
if hashMatches {
logger.Info("Update applied to ServiceMonitor even though hashes matched (possible external update)")
} else {
logger.Info("Update applied to ServiceMonitor")
}
}

return nil
Expand Down
84 changes: 80 additions & 4 deletions test/e2e/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021, Oracle and/or its affiliates.
* Copyright (c) 2020, 2022, Oracle and/or its affiliates.
* Licensed under the Universal Permissive License v 1.0 as shown at
* http://oss.oracle.com/licenses/upl.
*/
Expand All @@ -9,10 +9,16 @@ package prometheus
import (
"encoding/json"
"fmt"
monitoring "github.com/coreos/prometheus-operator/pkg/apis/monitoring/v1"
client "github.com/coreos/prometheus-operator/pkg/client/versioned/typed/monitoring/v1"
. "github.com/onsi/gomega"
coh "github.com/oracle/coherence-operator/api/v1"
"github.com/oracle/coherence-operator/test/e2e/helper"
"io/ioutil"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"net/http"
"strings"
Expand All @@ -29,20 +35,32 @@ func TestPrometheus(t *testing.T) {
g.Expect(err).NotTo(HaveOccurred())
g.Expect(ok).To(BeTrue(), "Cannot find any Prometheus Pods - this test requires Prometheus to have been installed")

AssertPrometheus(t, "prometheus-test.yaml", promPod)
promClient, err := client.NewForConfig(testContext.Config)
g.Expect(err).NotTo(HaveOccurred())

AssertPrometheus(t, "prometheus-test.yaml", promPod, promClient)
}

func AssertPrometheus(t *testing.T, yamlFile string, promPod corev1.Pod) {
func AssertPrometheus(t *testing.T, yamlFile string, promPod corev1.Pod, promClient *client.MonitoringV1Client) {
g := NewGomegaWithT(t)

ShouldGetPrometheusConfig(t, promPod)

// Deploy the Coherence cluster
_, cohPods := helper.AssertDeployments(testContext, t, yamlFile)
deployments, cohPods := helper.AssertDeployments(testContext, t, yamlFile)
deployment := deployments["test"]

err := ShouldEventuallyHaveServiceMonitor(t, deployment.Namespace, "test-metrics", promClient, 10*time.Second, 5*time.Minute)
g.Expect(err).NotTo(HaveOccurred())

// Wait for Coherence metrics to appear in Prometheus
ShouldEventuallySeeClusterMetrics(t, promPod, cohPods)

// Ensure that we can see the deployments size metric
ShouldGetClusterSizeMetric(t, promPod)

// Ensure we can update the Coherence deployment and cause the ServiceMonitor to be updated
ShouldPatchServiceMonitor(t, deployment, promClient)
}

func IsPrometheusInstalled() (bool, corev1.Pod, error) {
Expand Down Expand Up @@ -107,6 +125,64 @@ func ShouldGetClusterSizeMetric(t *testing.T, pod corev1.Pod) {
g.Expect(err).NotTo(HaveOccurred())
}

func ShouldPatchServiceMonitor(t *testing.T, deployment coh.Coherence, promClient *client.MonitoringV1Client) {
g := NewGomegaWithT(t)

current := &coh.Coherence{}
err := testContext.Client.Get(testContext.Context, types.NamespacedName{Namespace: deployment.Namespace, Name: deployment.Name}, current)
g.Expect(err).NotTo(HaveOccurred())

// update the ServiceMonitor interval to cause an update
current.Spec.Ports[0].ServiceMonitor.Interval = "10s"
err = testContext.Client.Update(testContext.Context, current)
g.Expect(err).NotTo(HaveOccurred())

err = ShouldEventuallyHaveServiceMonitorWithState(t, deployment.Namespace, "test-metrics", hasInterval, promClient, 10*time.Second, 5*time.Minute)
g.Expect(err).NotTo(HaveOccurred())

}

func ShouldEventuallyHaveServiceMonitor(t *testing.T, namespace, name string, promClient *client.MonitoringV1Client, retryInterval, timeout time.Duration) error {
return ShouldEventuallyHaveServiceMonitorWithState(t, namespace, name, alwaysTrue, promClient, retryInterval, timeout)
}

type ServiceMonitorPredicate func(*testing.T, *monitoring.ServiceMonitor) bool

func alwaysTrue(*testing.T, *monitoring.ServiceMonitor) bool {
return true
}

func hasInterval(t *testing.T, sm *monitoring.ServiceMonitor) bool {
if len(sm.Spec.Endpoints) > 0 && sm.Spec.Endpoints[0].Interval == "10s" {
return true
}
t.Logf("Waiting for availability of ServiceMonitor resource %s - with endpoint interval of 10s", sm.Name)
return false
}

func ShouldEventuallyHaveServiceMonitorWithState(t *testing.T, namespace, name string, predicate ServiceMonitorPredicate, promClient *client.MonitoringV1Client, retryInterval, timeout time.Duration) error {
var sm *monitoring.ServiceMonitor

err := wait.Poll(retryInterval, timeout, func() (done bool, err error) {
sm, err = promClient.ServiceMonitors(namespace).Get(testContext.Context, name, v1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
t.Logf("Waiting for availability of ServiceMonitor resource %s - NotFound", name)
return false, nil
}
t.Logf("Waiting for availability of ServiceMonitor resource %s - %s", name, err.Error())
return false, nil
}
if predicate(t, sm) {
return true, nil
}
t.Logf("Waiting for availability of ServiceMonitor resource %s - %s to match predicate", name, err.Error())
return false, nil
})

return err
}

func PrometheusQuery(t *testing.T, pod corev1.Pod, query string, result interface{}) error {
r, err := PrometheusAPIRequest(pod, "/api/v1/query?query="+query)
if err != nil {
Expand Down