Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storing upgrade status into events #707

Merged
merged 10 commits into from
Feb 22, 2022
8 changes: 7 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/healthz"
Expand Down Expand Up @@ -228,7 +229,12 @@ func addDependencies(_ context.Context, mgr ctrl.Manager, cfg config.Config, v v

// adds the upgrade mechanism to be executed once the manager is ready
err = mgr.Add(manager.RunnableFunc(func(c context.Context) error {
return collectorupgrade.ManagedInstances(c, ctrl.Log.WithName("collector-upgrade"), v, mgr.GetClient())
return collectorupgrade.ManagedInstances(c, collectorupgrade.Params{
Log: ctrl.Log.WithName("collector-upgrade"),
Version: v,
Client: mgr.GetClient(),
Recorder: record.NewFakeRecorder(10),
yuriolisa marked this conversation as resolved.
Show resolved Hide resolved
})
}))
if err != nil {
return fmt.Errorf("failed to upgrade OpenTelemetryCollector instances: %w", err)
Expand Down
40 changes: 24 additions & 16 deletions pkg/collector/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,42 @@ import (

semver "github.com/Masterminds/semver/v3"
"github.com/go-logr/logr"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
"github.com/open-telemetry/opentelemetry-operator/internal/version"
)

type Params struct {
Log logr.Logger
yuriolisa marked this conversation as resolved.
Show resolved Hide resolved
Version version.Version
Client client.Client
Recorder record.EventRecorder
}

// ManagedInstances finds all the otelcol instances for the current operator and upgrades them, if necessary.
func ManagedInstances(ctx context.Context, logger logr.Logger, ver version.Version, cl client.Client) error {
logger.Info("looking for managed instances to upgrade")
func ManagedInstances(ctx context.Context, params Params) error {
yuriolisa marked this conversation as resolved.
Show resolved Hide resolved
params.Log.Info("looking for managed instances to upgrade")

opts := []client.ListOption{
client.MatchingLabels(map[string]string{
"app.kubernetes.io/managed-by": "opentelemetry-operator",
}),
}
list := &v1alpha1.OpenTelemetryCollectorList{}
if err := cl.List(ctx, list, opts...); err != nil {
if err := params.Client.List(ctx, list, opts...); err != nil {
return fmt.Errorf("failed to list: %w", err)
}

for i := range list.Items {
original := list.Items[i]
itemLogger := logger.WithValues("name", original.Name, "namespace", original.Namespace)
itemLogger := params.Log.WithValues("name", original.Name, "namespace", original.Namespace)
if original.Spec.UpgradeStrategy == v1alpha1.UpgradeStrategyNone {
itemLogger.Info("skipping instance upgrade due to UpgradeStrategy")
continue
}
upgraded, err := ManagedInstance(ctx, logger, ver, cl, original)
upgraded, err := ManagedInstance(ctx, params, original)
if err != nil {
// nothing to do at this level, just go to the next instance
continue
Expand All @@ -59,14 +67,14 @@ func ManagedInstances(ctx context.Context, logger logr.Logger, ver version.Versi
// the resource update overrides the status, so, keep it so that we can reset it later
st := upgraded.Status
patch := client.MergeFrom(&original)
if err := cl.Patch(ctx, &upgraded, patch); err != nil {
if err := params.Client.Patch(ctx, &upgraded, patch); err != nil {
itemLogger.Error(err, "failed to apply changes to instance")
continue
}

// the status object requires its own update
upgraded.Status = st
if err := cl.Status().Patch(ctx, &upgraded, patch); err != nil {
if err := params.Client.Status().Patch(ctx, &upgraded, patch); err != nil {
itemLogger.Error(err, "failed to apply changes to instance's status object")
continue
}
Expand All @@ -76,48 +84,48 @@ func ManagedInstances(ctx context.Context, logger logr.Logger, ver version.Versi
}

if len(list.Items) == 0 {
logger.Info("no instances to upgrade")
params.Log.Info("no instances to upgrade")
}

return nil
}

// ManagedInstance performs the necessary changes to bring the given otelcol instance to the current version.
func ManagedInstance(ctx context.Context, logger logr.Logger, currentV version.Version, cl client.Client, otelcol v1alpha1.OpenTelemetryCollector) (v1alpha1.OpenTelemetryCollector, error) {
func ManagedInstance(ctx context.Context, params Params, otelcol v1alpha1.OpenTelemetryCollector) (v1alpha1.OpenTelemetryCollector, error) {
yuriolisa marked this conversation as resolved.
Show resolved Hide resolved
// this is likely a new instance, assume it's already up to date
if otelcol.Status.Version == "" {
return otelcol, nil
}

instanceV, err := semver.NewVersion(otelcol.Status.Version)
if err != nil {
logger.Error(err, "failed to parse version for OpenTelemetry Collector instance", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", otelcol.Status.Version)
params.Log.Error(err, "failed to parse version for OpenTelemetry Collector instance", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", otelcol.Status.Version)
return otelcol, err
}

if instanceV.GreaterThan(&Latest.Version) {
logger.Info("skipping upgrade for OpenTelemetry Collector instance, as it's newer than our latest version", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", otelcol.Status.Version, "latest", Latest.Version.String())
params.Log.Info("skipping upgrade for OpenTelemetry Collector instance, as it's newer than our latest version", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", otelcol.Status.Version, "latest", Latest.Version.String())
return otelcol, nil
}

for _, available := range versions {
if available.GreaterThan(instanceV) {
upgraded, err := available.upgrade(cl, &otelcol)
upgraded, err := available.upgrade(params, &otelcol)

if err != nil {
logger.Error(err, "failed to upgrade managed otelcol instances", "name", otelcol.Name, "namespace", otelcol.Namespace)
params.Log.Error(err, "failed to upgrade managed otelcol instances", "name", otelcol.Name, "namespace", otelcol.Namespace)
return otelcol, err
}

logger.V(1).Info("step upgrade", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", available.String())
params.Log.V(1).Info("step upgrade", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", available.String())
upgraded.Status.Version = available.String()
otelcol = *upgraded
}
}

// at the end of the process, we are up to date with the latest known version, which is what we have from versions.txt
otelcol.Status.Version = currentV.OpenTelemetryCollector
otelcol.Status.Version = params.Version.OpenTelemetryCollector

logger.V(1).Info("final version", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", otelcol.Status.Version)
params.Log.V(1).Info("final version", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", otelcol.Status.Version)
return otelcol, nil
}
22 changes: 19 additions & 3 deletions pkg/collector/upgrade/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
Expand Down Expand Up @@ -62,7 +63,12 @@ func TestShouldUpgradeAllToLatestBasedOnUpgradeStrategy(t *testing.T) {
require.Equal(t, beginV, persisted.Status.Version)

// test
err = upgrade.ManagedInstances(context.Background(), logger, currentV, k8sClient)
err = upgrade.ManagedInstances(context.Background(), upgrade.Params{
Log: logger,
Version: currentV,
Client: k8sClient,
Recorder: record.NewFakeRecorder(10),
})
assert.NoError(t, err)

// verify
Expand All @@ -86,7 +92,12 @@ func TestUpgradeUpToLatestKnownVersion(t *testing.T) {
currentV.OpenTelemetryCollector = "0.10.0" // we don't have a 0.10.0 upgrade, but we have a 0.9.0

// test
res, err := upgrade.ManagedInstance(context.Background(), logger, currentV, k8sClient, existing)
res, err := upgrade.ManagedInstance(context.Background(), upgrade.Params{
Log: logger,
Version: currentV,
Client: k8sClient,
Recorder: record.NewFakeRecorder(10),
}, existing)

// verify
assert.NoError(t, err)
Expand Down Expand Up @@ -114,7 +125,12 @@ func TestVersionsShouldNotBeChanged(t *testing.T) {
currentV.OpenTelemetryCollector = upgrade.Latest.String()

// test
res, err := upgrade.ManagedInstance(context.Background(), logger, currentV, k8sClient, existing)
res, err := upgrade.ManagedInstance(context.Background(), upgrade.Params{
Log: logger,
Version: currentV,
Client: k8sClient,
Recorder: record.NewFakeRecorder(10),
}, existing)
if tt.failureExpected {
assert.Error(t, err)
} else {
Expand Down
10 changes: 7 additions & 3 deletions pkg/collector/upgrade/v0_15_0.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
package upgrade

import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"

corev1 "k8s.io/api/core/v1"
)

func upgrade0_15_0(cl client.Client, otelcol *v1alpha1.OpenTelemetryCollector) (*v1alpha1.OpenTelemetryCollector, error) {
func upgrade0_15_0(params Params, otelcol *v1alpha1.OpenTelemetryCollector) (*v1alpha1.OpenTelemetryCollector, error) {
delete(otelcol.Spec.Args, "--new-metrics")
delete(otelcol.Spec.Args, "--legacy-metrics")
existing := &corev1.ConfigMap{}
updated := existing.DeepCopy()
params.Recorder.Event(updated, "Normal", "Upgrade", "upgrade to v0.15.0 dropped the deprecated metrics arguments")

return otelcol, nil
}
8 changes: 7 additions & 1 deletion pkg/collector/upgrade/v0_15_0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
"github.com/open-telemetry/opentelemetry-operator/internal/version"
Expand Down Expand Up @@ -54,7 +55,12 @@ func TestRemoveMetricsTypeFlags(t *testing.T) {
require.Contains(t, existing.Spec.Args, "--legacy-metrics")

// test
res, err := upgrade.ManagedInstance(context.Background(), logger, version.Get(), nil, existing)
res, err := upgrade.ManagedInstance(context.Background(), upgrade.Params{
Log: logger,
Version: version.Get(),
Client: nil,
Recorder: record.NewFakeRecorder(10),
}, existing)
assert.NoError(t, err)

// verify
Expand Down
17 changes: 12 additions & 5 deletions pkg/collector/upgrade/v0_19_0.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ import (
"strings"

"gopkg.in/yaml.v2"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
"github.com/open-telemetry/opentelemetry-operator/pkg/collector/adapters"

corev1 "k8s.io/api/core/v1"
)

func upgrade0_19_0(cl client.Client, otelcol *v1alpha1.OpenTelemetryCollector) (*v1alpha1.OpenTelemetryCollector, error) {
func upgrade0_19_0(params Params, otelcol *v1alpha1.OpenTelemetryCollector) (*v1alpha1.OpenTelemetryCollector, error) {
if len(otelcol.Spec.Config) == 0 {
return otelcol, nil
}
Expand All @@ -47,7 +48,9 @@ func upgrade0_19_0(cl client.Client, otelcol *v1alpha1.OpenTelemetryCollector) (
// Remove deprecated queued_retry processor
if strings.HasPrefix(k.(string), "queued_retry") {
delete(processors, k)
otelcol.Status.Messages = append(otelcol.Status.Messages, fmt.Sprintf("upgrade to v0.19.0 removed the processor %q", k))
existing := &corev1.ConfigMap{}
updated := existing.DeepCopy()
params.Recorder.Event(updated, "Normal", "Upgrade", fmt.Sprintf("upgrade to v0.19.0 removed the processor %q", k))
continue
}

Expand All @@ -71,7 +74,9 @@ func upgrade0_19_0(cl client.Client, otelcol *v1alpha1.OpenTelemetryCollector) (

processor["attributes"] = attributes
delete(processor, "type")
otelcol.Status.Messages = append(otelcol.Status.Messages, fmt.Sprintf("upgrade to v0.19.0 migrated the property 'type' for processor %q", k))
existing := &corev1.ConfigMap{}
updated := existing.DeepCopy()
params.Recorder.Event(updated, "Normal", "Upgrade", fmt.Sprintf("upgrade to v0.19.0 migrated the property 'type' for processor %q", k))
}

// handle labels
Expand All @@ -95,7 +100,9 @@ func upgrade0_19_0(cl client.Client, otelcol *v1alpha1.OpenTelemetryCollector) (

processor["attributes"] = attributes
delete(processor, "labels")
otelcol.Status.Messages = append(otelcol.Status.Messages, fmt.Sprintf("upgrade to v0.19.0 migrated the property 'labels' for processor %q", k))
existing := &corev1.ConfigMap{}
updated := existing.DeepCopy()
params.Recorder.Event(updated, "Normal", "Upgrade", fmt.Sprintf("upgrade to v0.19.0 migrated the property 'labels' for processor %q", k))
}

processors[k] = processor
Expand Down
28 changes: 22 additions & 6 deletions pkg/collector/upgrade/v0_19_0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
"github.com/open-telemetry/opentelemetry-operator/internal/version"
Expand Down Expand Up @@ -58,15 +59,20 @@ func TestRemoveQueuedRetryProcessor(t *testing.T) {
require.Contains(t, existing.Spec.Config, "num_workers: 123") // checking one property is sufficient

// test
res, err := upgrade.ManagedInstance(context.Background(), logger, version.Get(), nil, existing)
res, err := upgrade.ManagedInstance(context.Background(), upgrade.Params{
Log: logger,
Version: version.Get(),
Client: nil,
Recorder: record.NewFakeRecorder(10),
}, existing)
assert.NoError(t, err)

// verify
assert.NotContains(t, res.Spec.Config, "queued_retry:")
assert.Contains(t, res.Spec.Config, "otherprocessor:")
assert.NotContains(t, res.Spec.Config, "queued_retry/second:")
assert.NotContains(t, res.Spec.Config, "num_workers: 123") // checking one property is sufficient
assert.Contains(t, res.Status.Messages[0], "upgrade to v0.19.0 removed the processor")
//assert.Contains(t, res.Status.Messages[0], "upgrade to v0.19.0 removed the processor")
yuriolisa marked this conversation as resolved.
Show resolved Hide resolved
}

func TestMigrateResourceType(t *testing.T) {
Expand All @@ -90,7 +96,12 @@ func TestMigrateResourceType(t *testing.T) {
existing.Status.Version = "0.18.0"

// test
res, err := upgrade.ManagedInstance(context.Background(), logger, version.Get(), nil, existing)
res, err := upgrade.ManagedInstance(context.Background(), upgrade.Params{
Log: logger,
Version: version.Get(),
Client: nil,
Recorder: record.NewFakeRecorder(10),
}, existing)
assert.NoError(t, err)

// verify
Expand All @@ -101,7 +112,7 @@ func TestMigrateResourceType(t *testing.T) {
key: opencensus.type
value: some-type
`, res.Spec.Config)
assert.Contains(t, res.Status.Messages[0], "upgrade to v0.19.0 migrated the property 'type' for processor")
//assert.Contains(t, res.Status.Messages[0], "upgrade to v0.19.0 migrated the property 'type' for processor")
yuriolisa marked this conversation as resolved.
Show resolved Hide resolved
}

func TestMigrateLabels(t *testing.T) {
Expand All @@ -127,7 +138,12 @@ func TestMigrateLabels(t *testing.T) {
existing.Status.Version = "0.18.0"

// test
res, err := upgrade.ManagedInstance(context.Background(), logger, version.Get(), nil, existing)
res, err := upgrade.ManagedInstance(context.Background(), upgrade.Params{
Log: logger,
Version: version.Version{},
Client: nil,
Recorder: record.NewFakeRecorder(10),
}, existing)
assert.NoError(t, err)

actual, err := adapters.ConfigFromString(res.Spec.Config)
Expand All @@ -139,5 +155,5 @@ func TestMigrateLabels(t *testing.T) {
// verify
assert.Len(t, actualAttrs, 2)
assert.Nil(t, actualProcessor["labels"])
assert.Contains(t, res.Status.Messages[0], "upgrade to v0.19.0 migrated the property 'labels' for processor")
//assert.Contains(t, res.Status.Messages[0], "upgrade to v0.19.0 migrated the property 'labels' for processor")
yuriolisa marked this conversation as resolved.
Show resolved Hide resolved
}
9 changes: 6 additions & 3 deletions pkg/collector/upgrade/v0_24_0.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ import (
"strings"

"gopkg.in/yaml.v2"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
"github.com/open-telemetry/opentelemetry-operator/pkg/collector/adapters"

corev1 "k8s.io/api/core/v1"
)

func upgrade0_24_0(cl client.Client, otelcol *v1alpha1.OpenTelemetryCollector) (*v1alpha1.OpenTelemetryCollector, error) {
func upgrade0_24_0(params Params, otelcol *v1alpha1.OpenTelemetryCollector) (*v1alpha1.OpenTelemetryCollector, error) {
if len(otelcol.Spec.Config) == 0 {
return otelcol, nil
}
Expand All @@ -48,7 +49,9 @@ func upgrade0_24_0(cl client.Client, otelcol *v1alpha1.OpenTelemetryCollector) (
if port, ok := extension["port"]; ok {
delete(extension, "port")
extension["endpoint"] = fmt.Sprintf("0.0.0.0:%d", port)
otelcol.Status.Messages = append(otelcol.Status.Messages, fmt.Sprintf("upgrade to v0.24.0 migrated the property 'port' to 'endpoint' for extension %q", k))
existing := &corev1.ConfigMap{}
updated := existing.DeepCopy()
params.Recorder.Event(updated, "Normal", "Upgrade", fmt.Sprintf("upgrade to v0.24.0 migrated the property 'port' to 'endpoint' for extension %q", k))
}
case string:
if len(extension) == 0 {
Expand Down
Loading