Skip to content

Commit

Permalink
Adopt changes to Read and Diff
Browse files Browse the repository at this point in the history
1. Return resource inputs as well as resource state from Read()
2. Return a list of properties that changed from Diff()

We implement the former by copying all properties from a resource's
state to the input bag and removing the following well-known read-only
fields:

- metadata.{uid,resourceVersion,generation,selfLink,continue,
  creationTimestamp,deletionTimestamp,deletionGracePeriodSeconds}
- status

The result is a set of inputs that is sufficient (if perhaps not
necessary) to create a resource with a state that is identical to the
resource's current state. This is used to detect drift during `refresh`
and to implement resource import.

We implement the latter by scanning the JSON diff and recording the
names of the top-level properties that changed. The engine uses this
information to filter diffs to only those that are semantically
meaningful.

These changes required a couple of bugfixes:
- Old names are only adopted if the old resource was auto-named. This
  ensures that a name must be specified when importing a resource that
  was not autonamed.
- URN to GVK conversion was fixed for resources in the "core" group.
  These resources have no group part in the GVK. Parsing was also
  simplified through the use of pulumi/pulumi's token manipulation
  functions.
- When reading a resource, the GVK for the resource to read is now
  pulled from the URN if it is absent from the inputs.
  • Loading branch information
pgavlin committed May 20, 2019
1 parent ff9e655 commit 82b2f58
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 51 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ As such, we are rev'ing the minor version of the package from 0.16 to 0.17. Rec

### Improvements

- None
- The Kubernetes provider now supports the internal features necessary for the Pulumi engine to detect diffs between
the actual and desired state of a resource after a `pulumi refresh` (https://github.com/pulumi/pulumi-kubernetes/pull/477)

### Bug fixes

Expand Down
10 changes: 4 additions & 6 deletions pkg/metadata/naming.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,13 @@ func AssignNameIfAutonamable(obj *unstructured.Unstructured, base tokens.QName)
}
}

// AdoptOldNameIfUnnamed checks if `newObj` has a name, and if not, "adopts" the name of `oldObj`
// AdoptOldAutonameIfUnnamed checks if `newObj` has a name, and if not, "adopts" the name of `oldObj`
// instead. If `oldObj` was autonamed, then we mark `newObj` as autonamed, too.
func AdoptOldNameIfUnnamed(newObj, oldObj *unstructured.Unstructured) {
func AdoptOldAutonameIfUnnamed(newObj, oldObj *unstructured.Unstructured) {
contract.Assert(oldObj.GetName() != "")
if newObj.GetName() == "" {
if newObj.GetName() == "" && IsAutonamed(oldObj) {
newObj.SetName(oldObj.GetName())
if IsAutonamed(oldObj) {
SetAnnotationTrue(newObj, AnnotationAutonamed)
}
SetAnnotationTrue(newObj, AnnotationAutonamed)
}
}

Expand Down
22 changes: 19 additions & 3 deletions pkg/metadata/naming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ func TestAdoptName(t *testing.T) {
// NOTE: annotations needs to be a `map[string]interface{}` rather than `map[string]string`
// or the k8s utility functions fail.
"annotations": map[string]interface{}{AnnotationAutonamed: "true"},
}},
},
},
}
new1 := &unstructured.Unstructured{
Object: map[string]interface{}{"metadata": map[string]interface{}{"name": "new1"}},
}
AdoptOldNameIfUnnamed(new1, old1)
AdoptOldAutonameIfUnnamed(new1, old1)
assert.Equal(t, "old1", old1.GetName())
assert.True(t, IsAutonamed(old1))
assert.Equal(t, "new1", new1.GetName())
Expand All @@ -63,7 +64,22 @@ func TestAdoptName(t *testing.T) {
new2 := &unstructured.Unstructured{
Object: map[string]interface{}{},
}
AdoptOldNameIfUnnamed(new2, old1)
AdoptOldAutonameIfUnnamed(new2, old1)
assert.Equal(t, "old1", new2.GetName())
assert.True(t, IsAutonamed(new2))

// old2 is not autonamed, so new3 DOES NOT adopt old2's name.
new3 := &unstructured.Unstructured{
Object: map[string]interface{}{},
}
old2 := &unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"name": "old1",
},
},
}
AdoptOldAutonameIfUnnamed(new3, old2)
assert.Equal(t, "", new3.GetName())
assert.False(t, IsAutonamed(new3))
}
137 changes: 96 additions & 41 deletions pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

"github.com/golang/glog"
pbempty "github.com/golang/protobuf/ptypes/empty"
"github.com/golang/protobuf/ptypes/struct"
structpb "github.com/golang/protobuf/ptypes/struct"
"github.com/grpc/grpc-go/status"
"github.com/pulumi/pulumi-kubernetes/pkg/await"
"github.com/pulumi/pulumi-kubernetes/pkg/clients"
Expand All @@ -35,7 +35,7 @@ import (
"github.com/pulumi/pulumi/pkg/resource/provider"
"github.com/pulumi/pulumi/pkg/util/contract"
"github.com/pulumi/pulumi/pkg/util/rpcutil/rpcerror"
"github.com/pulumi/pulumi/sdk/proto/go"
pulumirpc "github.com/pulumi/pulumi/sdk/proto/go"
"github.com/yudai/gojsondiff"
"google.golang.org/grpc/codes"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -58,10 +58,6 @@ import (

// --------------------------------------------------------------------------

const (
gvkDelimiter = ":"
)

type cancellationContext struct {
context context.Context
cancel context.CancelFunc
Expand All @@ -84,7 +80,7 @@ type kubeProvider struct {
canceler *cancellationContext
name string
version string
providerPrefix string
providerPackage string
opts kubeOpts
overrideNamespace string

Expand All @@ -97,11 +93,11 @@ func makeKubeProvider(
host *provider.HostClient, name, version string,
) (pulumirpc.ResourceProviderServer, error) {
return &kubeProvider{
host: host,
canceler: makeCancellationContext(),
name: name,
version: version,
providerPrefix: name + gvkDelimiter,
host: host,
canceler: makeCancellationContext(),
name: name,
version: version,
providerPackage: name,
}, nil
}

Expand Down Expand Up @@ -265,7 +261,7 @@ func (k *kubeProvider) Check(ctx context.Context, req *pulumirpc.CheckRequest) (
// NOTE: If old inputs exist, they have a name, either provided by the user or filled in with a
// previous run of `Check`.
contract.Assert(oldInputs.GetName() != "")
metadata.AdoptOldNameIfUnnamed(newInputs, oldInputs)
metadata.AdoptOldAutonameIfUnnamed(newInputs, oldInputs)
} else {
metadata.AssignNameIfAutonamable(newInputs, urn.Name())
}
Expand Down Expand Up @@ -422,8 +418,24 @@ func (k *kubeProvider) Diff(
// Pack up PB, ship response back.
hasChanges := pulumirpc.DiffResponse_DIFF_NONE
diff := gojsondiff.New().CompareObjects(oldInputs.Object, newInputs.Object)
var changes []string
if len(diff.Deltas()) > 0 {
hasChanges = pulumirpc.DiffResponse_DIFF_SOME
for _, d := range diff.Deltas() {
var position gojsondiff.Position
switch d := d.(type) {
case gojsondiff.PostDelta:
position = d.PostPosition()
case gojsondiff.PreDelta:
position = d.PrePosition()
default:
contract.Failf("unexpected diff position type %T", d)
}

if k, ok := position.(gojsondiff.Name); ok {
changes = append(changes, string(k))
}
}
}

// Delete before replacement if we are forced to replace the old object, and the new version of
Expand All @@ -445,6 +457,7 @@ func (k *kubeProvider) Diff(
Replaces: replaces,
Stables: []string{},
DeleteBeforeReplace: deleteBeforeReplace,
Diffs: changes,
}, nil
}

Expand Down Expand Up @@ -521,7 +534,7 @@ func (k *kubeProvider) Create(
if awaitErr != nil {
// Resource was created but failed to initialize. Return live version of object so it can be
// checkpointed.
return nil, partialError(FqObjName(initialized), awaitErr, inputsAndComputed)
return nil, partialError(FqObjName(initialized), awaitErr, inputsAndComputed, nil)
}

// Invalidate the client cache if this was a CRD. This will require subsequent CR creations to
Expand Down Expand Up @@ -570,16 +583,27 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p
oldInputs, newInputs := parseCheckpointObject(oldState)

if oldInputs.GroupVersionKind().Empty() {
oldInputs.SetGroupVersionKind(newInputs.GroupVersionKind())
if newInputs.GroupVersionKind().Empty() {
gvk, err := k.gvkFromURN(urn)
if err != nil {
return nil, err
}
oldInputs.SetGroupVersionKind(gvk)
} else {
oldInputs.SetGroupVersionKind(newInputs.GroupVersionKind())
}
}

_, name := ParseFqName(req.GetId())
namespace, name := ParseFqName(req.GetId())
if name == "" {
return nil, fmt.Errorf("failed to parse resource name from request ID: %s", req.GetId())
}
if oldInputs.GetName() == "" {
oldInputs.SetName(name)
}
if oldInputs.GetNamespace() == "" {
oldInputs.SetNamespace(namespace)
}

config := await.ReadConfig{
ProviderConfig: await.ProviderConfig{
Expand Down Expand Up @@ -624,16 +648,27 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p
// initialize.
}

// Estimate the inputs for this object.
liveInputs := estimateLiveInputs(liveObj)

// TODO(lblackstone): not sure why this is needed
id := FqObjName(liveObj)
if reqID := req.GetId(); len(reqID) > 0 {
id = reqID
}

// Return a new "checkpoint object".
inputsAndComputed, err := plugin.MarshalProperties(
checkpointObject(oldInputs, liveObj), plugin.MarshalOptions{
Label: fmt.Sprintf("%s.inputsAndComputed", label), KeepUnknowns: true, SkipNulls: true,
state, err := plugin.MarshalProperties(
checkpointObject(liveInputs, liveObj), plugin.MarshalOptions{
Label: fmt.Sprintf("%s.state", label), KeepUnknowns: true, SkipNulls: true,
})
if err != nil {
return nil, err
}

inputs, err := plugin.MarshalProperties(
resource.NewPropertyMapFromMap(liveInputs.Object), plugin.MarshalOptions{
Label: label + ".inputs", KeepUnknowns: true, SkipNulls: true,
})
if err != nil {
return nil, err
Expand All @@ -642,11 +677,11 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p
if readErr != nil {
// Resource was created but failed to initialize. Return live version of object so it can be
// checkpointed.
glog.V(3).Infof("%v", partialError(id, readErr, inputsAndComputed))
return nil, partialError(id, readErr, inputsAndComputed)
glog.V(3).Infof("%v", partialError(id, readErr, state, inputs))
return nil, partialError(id, readErr, state, inputs)
}

return &pulumirpc.ReadResponse{Id: id, Properties: inputsAndComputed}, nil
return &pulumirpc.ReadResponse{Id: id, Properties: state, Inputs: inputs}, nil
}

// Update updates an existing resource with new values. Currently this client supports the
Expand Down Expand Up @@ -782,7 +817,7 @@ func (k *kubeProvider) Update(
if awaitErr != nil {
// Resource was updated/created but failed to initialize. Return live version of object so it
// can be checkpointed.
return nil, partialError(FqObjName(initialized), awaitErr, inputsAndComputed)
return nil, partialError(FqObjName(initialized), awaitErr, inputsAndComputed, nil)
}

return &pulumirpc.UpdateResponse{Properties: inputsAndComputed}, nil
Expand Down Expand Up @@ -847,7 +882,7 @@ func (k *kubeProvider) Delete(

// Resource delete was issued, but failed to complete. Return live version of object so it can be
// checkpointed.
return nil, partialError(FqObjName(lastKnownState), awaitErr, inputsAndComputed)
return nil, partialError(FqObjName(lastKnownState), awaitErr, inputsAndComputed, nil)
}

return &pbempty.Empty{}, nil
Expand Down Expand Up @@ -881,27 +916,24 @@ func (k *kubeProvider) label() string {
}

func (k *kubeProvider) gvkFromURN(urn resource.URN) (schema.GroupVersionKind, error) {
// Strip prefix.
s := string(urn.Type())
contract.Assertf(strings.HasPrefix(s, k.providerPrefix),
"Expected prefix: %q, Kubernetes GVK is: %q", k.providerPrefix, string(urn))
s = s[len(k.providerPrefix):]
contract.Assertf(string(urn.Type().Package()) == k.providerPackage, "Kubernetes GVK is: %q", string(urn))

// Emit GVK.
gvk := strings.Split(s, gvkDelimiter)
gv := strings.Split(gvk[0], "/")
if len(gvk) < 2 {
return schema.GroupVersionKind{},
fmt.Errorf("GVK must have both an apiVersion and a Kind: %q", s)
} else if len(gv) != 2 {
kind := string(urn.Type().Name())
gv := strings.Split(string(urn.Type().Module().Name()), "/")
if len(gv) != 2 {
return schema.GroupVersionKind{},
fmt.Errorf("apiVersion does not have both a group and a version: %q", s)
fmt.Errorf("apiVersion does not have both a group and a version: %q", urn.Type().Module().Name())
}
group, version := gv[0], gv[1]
if group == "core" {
group = ""
}

return schema.GroupVersionKind{
Group: gv[0],
Version: gv[1],
Kind: gvk[1],
Group: group,
Version: version,
Kind: kind,
}, nil
}

Expand Down Expand Up @@ -949,15 +981,16 @@ func parseCheckpointObject(obj resource.PropertyMap) (oldInputs, live *unstructu

// partialError creates an error for resources that did not complete an operation in progress.
// The last known state of the object is included in the error so that it can be checkpointed.
func partialError(id string, err error, inputsAndComputed *structpb.Struct) error {
func partialError(id string, err error, state *structpb.Struct, inputs *structpb.Struct) error {
reasons := []string{err.Error()}
if aggregate, isAggregate := err.(await.AggregatedError); isAggregate {
reasons = append(reasons, aggregate.SubErrors()...)
}
detail := pulumirpc.ErrorResourceInitFailed{
Id: id,
Properties: inputsAndComputed,
Properties: state,
Reasons: reasons,
Inputs: inputs,
}
return rpcerror.WithDetails(rpcerror.New(codes.Unknown, err.Error()), &detail)
}
Expand All @@ -973,3 +1006,25 @@ func canonicalNamespace(ns string) string {

// deleteResponse causes the resource to be deleted from the state.
var deleteResponse = &pulumirpc.ReadResponse{Id: "", Properties: nil}

// estimateLiveInputs computes an estimate of the provider inputs that produced the given live object. This is used by
// Read.
func estimateLiveInputs(live *unstructured.Unstructured) *unstructured.Unstructured {
// Start with a copy of the live object's state.
inputs := live.DeepCopy()

// Remove all known system-populated metadata.
inputs.SetUID("")
inputs.SetResourceVersion("")
inputs.SetGeneration(0)
inputs.SetSelfLink("")
inputs.SetContinue("")
inputs.SetCreationTimestamp(metav1.Time{})
inputs.SetDeletionTimestamp(nil)
inputs.SetDeletionGracePeriodSeconds(nil)

// Remove the status field.
delete(inputs.Object, "status")

return inputs
}

0 comments on commit 82b2f58

Please sign in to comment.