Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/component.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,10 @@ regardless of whether they are managed or read-only. For each resource:
2. The resource is either applied to the cluster (managed) or fetched from it (read-only). Managed resources use
Server-Side Apply and get a controller owner reference pointing to the owner CRD, unless the resource is
cluster-scoped and the owner is namespace-scoped (see [Cluster-Scoped Resources](#cluster-scoped-resources)).
3. If the resource implements `DataExtractable`, its data extractors run immediately. This makes extracted data
3. For read-only resources that implement `ObservationRecorder`, the framework records the fetched object back onto the
resource so that subsequent inspection observes the live cluster state. Resources built from `generic.BaseResource`
implement this automatically.
4. If the resource implements `DataExtractable`, its data extractors run immediately. This makes extracted data
available to subsequent resources' guards and mutations within the same reconciliation cycle.

This means a read-only resource registered before a managed resource can extract data that feeds into the managed
Expand Down
21 changes: 15 additions & 6 deletions docs/custom-resource.md
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ import (
// - concepts.Suspendable (DeleteOnSuspend, Suspend, SuspensionStatus)
// - concepts.Guardable (GuardStatus)
// - concepts.DataExtractable (ExtractData)
// - concepts.ObservationRecorder (RecordObservation)
type Resource struct {
base *generic.WorkloadResource[*examplev1.GameServer, *Mutator]
}
Expand Down Expand Up @@ -582,16 +583,24 @@ func (r *Resource) GuardStatus() (concepts.GuardStatusWithReason, error) {
func (r *Resource) ExtractData() error {
return r.base.ExtractData()
}

func (r *Resource) RecordObservation(observed client.Object) error {
return r.base.RecordObservation(observed)
}
```

Forward `RecordObservation` whenever the resource may be registered as read-only with a data extractor. The framework
uses it to feed the fetched cluster object back to the resource before extraction runs; without it, the extractor would
see the inert base passed to the builder rather than live cluster state.

Which methods to include depends on your resource category:

| Category | Typical Methods |
| ----------- | ------------------------------------------------------------------------------------------------------------------------------------------------- |
| Workload | `Identity`, `Object`, `Mutate`, `ConvergingStatus`, `GraceStatus`, `DeleteOnSuspend`, `Suspend`, `SuspensionStatus`, `GuardStatus`, `ExtractData` |
| Static | `Identity`, `Object`, `Mutate`, `GuardStatus`, `ExtractData` |
| Task | `Identity`, `Object`, `Mutate`, `ConvergingStatus`, `DeleteOnSuspend`, `Suspend`, `SuspensionStatus`, `GuardStatus`, `ExtractData` |
| Integration | `Identity`, `Object`, `Mutate`, `ConvergingStatus`, `GraceStatus`, `DeleteOnSuspend`, `Suspend`, `SuspensionStatus`, `GuardStatus`, `ExtractData` |
| Category | Typical Methods |
| ----------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Workload | `Identity`, `Object`, `Mutate`, `ConvergingStatus`, `GraceStatus`, `DeleteOnSuspend`, `Suspend`, `SuspensionStatus`, `GuardStatus`, `ExtractData`, `RecordObservation` |
| Static | `Identity`, `Object`, `Mutate`, `GuardStatus`, `ExtractData`, `RecordObservation` |
| Task | `Identity`, `Object`, `Mutate`, `ConvergingStatus`, `DeleteOnSuspend`, `Suspend`, `SuspensionStatus`, `GuardStatus`, `ExtractData`, `RecordObservation` |
| Integration | `Identity`, `Object`, `Mutate`, `ConvergingStatus`, `GraceStatus`, `DeleteOnSuspend`, `Suspend`, `SuspensionStatus`, `GuardStatus`, `ExtractData`, `RecordObservation` |

### 6. Define Feature Mutations

Expand Down
21 changes: 21 additions & 0 deletions pkg/component/concepts/observable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package concepts

import "sigs.k8s.io/controller-runtime/pkg/client"

// ObservationRecorder defines the contract for resources whose internal state must be
// updated with what was just fetched from the cluster, before any subsequent inspection
// (most notably data extraction) takes place.
//
// Read-only resources are constructed with a base object that typically carries only
// enough identifying metadata for the framework to fetch the live object. The fetched
// object must then be recorded back onto the resource so that capabilities such as
// [DataExtractable] observe the cluster state rather than the inert base.
//
// Implementations should accept any object compatible with the resource's underlying
// type and return an error when the supplied object is not assignable to that type.
type ObservationRecorder interface {
// RecordObservation stores the supplied object as the resource's most recently
// observed cluster state. Subsequent calls to capabilities that inspect the
// resource's state (such as ExtractData) operate against this object.
RecordObservation(observed client.Object) error
}
8 changes: 8 additions & 0 deletions pkg/component/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ func readResource(
)
}

if recorder, ok := resource.(concepts.ObservationRecorder); ok {
if err := recorder.RecordObservation(object); err != nil {
return nil, fmt.Errorf(
"failed to record observation for resource %s: %w", resource.Identity(), err,
)
}
}

status, err := getConvergingStatus(resource, concepts.ConvergingOperationNone)
if err != nil {
return nil, fmt.Errorf(
Expand Down
67 changes: 67 additions & 0 deletions pkg/component/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (

"github.com/sourcehawk/operator-component-framework/pkg/component/concepts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

Expand Down Expand Up @@ -149,6 +151,71 @@ func TestReadResources(t *testing.T) {
resource.AssertExpectations(t)
})

t.Run("should record the fetched object when the resource implements ObservationRecorder", func(t *testing.T) {
// Given a cluster Secret with populated data and a read-only resource
// constructed with an empty base; the framework must hand the fetched
// object to RecordObservation so that subsequent data extraction sees
// the cluster state, not the inert base.
clusterSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "observed-secret",
Namespace: namespace,
},
Data: map[string][]byte{"token": []byte("from-cluster")},
}
require.NoError(t, fakeClient.Create(ctx, clusterSecret))

baseObject := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "observed-secret",
Namespace: namespace,
},
}

resource := &MockObservationRecorderResource{}
resource.On("Object").Return(baseObject, nil)
resource.On("RecordObservation", mock.MatchedBy(func(obj client.Object) bool {
secret, ok := obj.(*corev1.Secret)
if !ok {
return false
}
return string(secret.Data["token"]) == "from-cluster"
})).Return(nil)

// When
_, err := readResources(ctx, reconcileContext, []Resource{resource})

// Then
require.NoError(t, err)
resource.AssertExpectations(t)
})

t.Run("should propagate errors from RecordObservation", func(t *testing.T) {
clusterSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "record-fail-secret",
Namespace: namespace,
},
}
require.NoError(t, fakeClient.Create(ctx, clusterSecret))

resource := &MockObservationRecorderResource{}
resource.On("Object").Return(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "record-fail-secret",
Namespace: namespace,
},
}, nil)
resource.On("Identity").Return("v1/Secret/record-fail-secret")
resource.On("RecordObservation", mock.Anything).Return(errors.New("record failed"))

_, err := readResources(ctx, reconcileContext, []Resource{resource})

require.Error(t, err)
assert.Contains(t, err.Error(), "record failed")
assert.Contains(t, err.Error(), "v1/Secret/record-fail-secret")
})

t.Run("should return error and update status if resource.ConvergingStatus() fails", func(t *testing.T) {
// Given
cm := &corev1.ConfigMap{
Expand Down
9 changes: 9 additions & 0 deletions pkg/component/zz_mock_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,12 @@ func (m *MockGuardableAliveResource) GuardStatus() (concepts.GuardStatusWithReas
args := m.Called()
return args.Get(0).(concepts.GuardStatusWithReason), args.Error(1)
}

type MockObservationRecorderResource struct {
MockResource
}

func (m *MockObservationRecorderResource) RecordObservation(observed client.Object) error {
args := m.Called(observed)
return args.Error(0)
}
8 changes: 7 additions & 1 deletion pkg/generic/builder_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,13 @@ func (b *BaseBuilder[T, M]) WithGuard(handler func(T) (concepts.GuardStatusWithR
b.BaseRes.GuardHandler = handler
}

// WithDataExtractor registers a typed data extractor to run after successful reconciliation.
// WithDataExtractor registers a typed data extractor to run immediately after the
// resource has been processed during reconciliation.
//
// For managed resources, the extractor receives the object as it stands after feature
// mutations have been applied. For read-only resources, it receives the object as it
// was just fetched from the cluster. Extractors must be idempotent because they run on
// every reconcile pass.
func (b *BaseBuilder[T, M]) WithDataExtractor(extractor func(T) error) {
if extractor != nil {
b.BaseRes.DataExtractors = append(b.BaseRes.DataExtractors, extractor)
Expand Down
23 changes: 23 additions & 0 deletions pkg/generic/resource_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (r *BaseResource[T, M]) PreviewObject() (T, error) {
}

// ExtractData runs all registered data extractors against a deep copy of the reconciled object.
//
// For managed resources the reconciled object is the desired state produced by Mutate.
// For read-only resources it is the object most recently supplied via RecordObservation,
// which the read flow invokes after fetching from the cluster.
func (r *BaseResource[T, M]) ExtractData() error {
copyObj, ok := r.DesiredObject.DeepCopyObject().(T)
if !ok {
Expand All @@ -99,6 +103,25 @@ func (r *BaseResource[T, M]) ExtractData() error {
return nil
}

// RecordObservation stores the supplied object as the resource's most recently observed
// cluster state. The framework invokes this on read-only resources immediately after
// fetching them, so that subsequent capabilities such as ExtractData observe the live
// object rather than the inert base used to construct the resource.
//
// RecordObservation returns an error when the supplied object is not assignable to the
// resource's underlying type.
func (r *BaseResource[T, M]) RecordObservation(observed client.Object) error {
typed, ok := observed.(T)
if !ok {
return fmt.Errorf(
"failed to record observation: expected object of type %T, got %T",
r.DesiredObject, observed,
)
}
r.DesiredObject = typed
return nil
}

// GuardStatus evaluates the resource's guard precondition.
// If no guard handler is configured, the resource is unconditionally unblocked.
// The handler receives a deep copy of the desired object to prevent accidental mutations.
Expand Down
50 changes: 50 additions & 0 deletions pkg/generic/resource_static_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,56 @@ func TestStaticResource(t *testing.T) {
assert.EqualError(t, err, "extract error")
})

t.Run("RecordObservation makes the observed object visible to ExtractData", func(t *testing.T) {
base := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "base-cm",
Namespace: "default",
},
}
readOnly := &StaticResource[*corev1.ConfigMap, *mockMutator]{
BaseResource: BaseResource[*corev1.ConfigMap, *mockMutator]{
DesiredObject: base,
IdentityFunc: identityFunc,
NewMutator: newMutator,
},
}

observed := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "base-cm",
Namespace: "default",
},
Data: map[string]string{"foo": "from-cluster"},
}
require.NoError(t, readOnly.RecordObservation(observed))

var seen string
readOnly.DataExtractors = []func(*corev1.ConfigMap) error{
func(cm *corev1.ConfigMap) error {
seen = cm.Data["foo"]
return nil
},
}
require.NoError(t, readOnly.ExtractData())
assert.Equal(t, "from-cluster", seen,
"extractor must see the observed cluster object, not the empty desired base")
})

t.Run("RecordObservation rejects an object of the wrong type", func(t *testing.T) {
readOnly := &StaticResource[*corev1.ConfigMap, *mockMutator]{
BaseResource: BaseResource[*corev1.ConfigMap, *mockMutator]{
DesiredObject: &corev1.ConfigMap{},
IdentityFunc: identityFunc,
NewMutator: newMutator,
},
}

err := readOnly.RecordObservation(&corev1.Secret{})
require.Error(t, err)
assert.Contains(t, err.Error(), "ConfigMap")
})

t.Run("GuardStatus returns unblocked when no handler is set", func(t *testing.T) {
res.GuardHandler = nil
result, err := res.GuardStatus()
Expand Down
Loading