Skip to content

Commit

Permalink
Adopt changes to Read and Diff
Browse files Browse the repository at this point in the history
1. Return resource inputs as well as resource state from Read()
2. Return a list of properties that changed from Diff()

We implement the former by reading the last applied configuration
from the `kubectl.kubernetes.io/last-applied-configuration`
annotation in the live object state. If this key is not present, no
inputs are populated and the old inputs are retained. These changes
also update the provider to set this field during `Create` and
`Update`.

We implement the latter by scanning the JSON diff and recording the
names of the top-level properties that changed. The engine uses this
information to filter diffs to only those that are semantically
meaningful.

These changes required a couple of bugfixes:
- Old names are only adopted if the old resource was auto-named. This
  ensures that a name must be specified when importing a resource that
  was not autonamed.
- URN to GVK conversion was fixed for resources in the "core" group.
  These resources have no group part in the GVK. Parsing was also
  simplified through the use of pulumi/pulumi's token manipulation
  functions.
- When reading a resource, the GVK for the resource to read is now
  pulled from the URN if it is absent from the inputs.
  • Loading branch information
pgavlin committed Jun 21, 2019
1 parent c52daa9 commit 158d54e
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 48 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ As such, we are rev'ing the minor version of the package from 0.16 to 0.17. Rec

### Improvements

- None
- The Kubernetes provider now supports the internal features necessary for the Pulumi engine to detect diffs between
the actual and desired state of a resource after a `pulumi refresh` (https://github.com/pulumi/pulumi-kubernetes/pull/477)

### Bug fixes

Expand Down
10 changes: 4 additions & 6 deletions pkg/metadata/naming.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,13 @@ func AssignNameIfAutonamable(obj *unstructured.Unstructured, base tokens.QName)
}
}

// AdoptOldNameIfUnnamed checks if `newObj` has a name, and if not, "adopts" the name of `oldObj`
// AdoptOldAutonameIfUnnamed checks if `newObj` has a name, and if not, "adopts" the name of `oldObj`
// instead. If `oldObj` was autonamed, then we mark `newObj` as autonamed, too.
func AdoptOldNameIfUnnamed(newObj, oldObj *unstructured.Unstructured) {
func AdoptOldAutonameIfUnnamed(newObj, oldObj *unstructured.Unstructured) {
contract.Assert(oldObj.GetName() != "")
if newObj.GetName() == "" {
if newObj.GetName() == "" && IsAutonamed(oldObj) {
newObj.SetName(oldObj.GetName())
if IsAutonamed(oldObj) {
SetAnnotationTrue(newObj, AnnotationAutonamed)
}
SetAnnotationTrue(newObj, AnnotationAutonamed)
}
}

Expand Down
22 changes: 19 additions & 3 deletions pkg/metadata/naming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ func TestAdoptName(t *testing.T) {
// NOTE: annotations needs to be a `map[string]interface{}` rather than `map[string]string`
// or the k8s utility functions fail.
"annotations": map[string]interface{}{AnnotationAutonamed: "true"},
}},
},
},
}
new1 := &unstructured.Unstructured{
Object: map[string]interface{}{"metadata": map[string]interface{}{"name": "new1"}},
}
AdoptOldNameIfUnnamed(new1, old1)
AdoptOldAutonameIfUnnamed(new1, old1)
assert.Equal(t, "old1", old1.GetName())
assert.True(t, IsAutonamed(old1))
assert.Equal(t, "new1", new1.GetName())
Expand All @@ -63,7 +64,22 @@ func TestAdoptName(t *testing.T) {
new2 := &unstructured.Unstructured{
Object: map[string]interface{}{},
}
AdoptOldNameIfUnnamed(new2, old1)
AdoptOldAutonameIfUnnamed(new2, old1)
assert.Equal(t, "old1", new2.GetName())
assert.True(t, IsAutonamed(new2))

// old2 is not autonamed, so new3 DOES NOT adopt old2's name.
new3 := &unstructured.Unstructured{
Object: map[string]interface{}{},
}
old2 := &unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"name": "old1",
},
},
}
AdoptOldAutonameIfUnnamed(new3, old2)
assert.Equal(t, "", new3.GetName())
assert.False(t, IsAutonamed(new3))
}
169 changes: 131 additions & 38 deletions pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ import (
// --------------------------------------------------------------------------

const (
gvkDelimiter = ":"
invokeKubectlReplace = "kubernetes:kubernetes:kubectlReplace"
lastAppliedConfigKey = "kubectl.kubernetes.io/last-applied-configuration"
)

type cancellationContext struct {
Expand All @@ -86,7 +86,7 @@ type kubeProvider struct {
canceler *cancellationContext
name string
version string
providerPrefix string
providerPackage string
opts kubeOpts
defaultNamespace string

Expand All @@ -99,11 +99,11 @@ func makeKubeProvider(
host *provider.HostClient, name, version string,
) (pulumirpc.ResourceProviderServer, error) {
return &kubeProvider{
host: host,
canceler: makeCancellationContext(),
name: name,
version: version,
providerPrefix: name + gvkDelimiter,
host: host,
canceler: makeCancellationContext(),
name: name,
version: version,
providerPackage: name,
}, nil
}

Expand Down Expand Up @@ -392,7 +392,7 @@ func (k *kubeProvider) Check(ctx context.Context, req *pulumirpc.CheckRequest) (
// NOTE: If old inputs exist, they have a name, either provided by the user or filled in with a
// previous run of `Check`.
contract.Assert(oldInputs.GetName() != "")
metadata.AdoptOldNameIfUnnamed(newInputs, oldInputs)
metadata.AdoptOldAutonameIfUnnamed(newInputs, oldInputs)
} else {
metadata.AssignNameIfAutonamable(newInputs, urn.Name())
}
Expand Down Expand Up @@ -623,6 +623,11 @@ func (k *kubeProvider) Create(
}
newInputs := propMapToUnstructured(newResInputs)

annotatedInputs, err := withLastAppliedConfig(newInputs)
if err != nil {
return nil, err
}

config := await.CreateConfig{
ProviderConfig: await.ProviderConfig{
Context: k.canceler.context,
Expand All @@ -631,7 +636,7 @@ func (k *kubeProvider) Create(
ClientSet: k.clientSet,
DedupLogger: logging.NewLogger(k.canceler.context, k.host, urn),
},
Inputs: newInputs,
Inputs: annotatedInputs,
}

initialized, awaitErr := await.Creation(config)
Expand Down Expand Up @@ -665,7 +670,7 @@ func (k *kubeProvider) Create(
if awaitErr != nil {
// Resource was created but failed to initialize. Return live version of object so it can be
// checkpointed.
return nil, partialError(FqObjName(initialized), awaitErr, inputsAndComputed)
return nil, partialError(FqObjName(initialized), awaitErr, inputsAndComputed, nil)
}

// Invalidate the client cache if this was a CRD. This will require subsequent CR creations to
Expand Down Expand Up @@ -714,16 +719,27 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p
oldInputs, newInputs := parseCheckpointObject(oldState)

if oldInputs.GroupVersionKind().Empty() {
oldInputs.SetGroupVersionKind(newInputs.GroupVersionKind())
if newInputs.GroupVersionKind().Empty() {
gvk, err := k.gvkFromURN(urn)
if err != nil {
return nil, err
}
oldInputs.SetGroupVersionKind(gvk)
} else {
oldInputs.SetGroupVersionKind(newInputs.GroupVersionKind())
}
}

_, name := ParseFqName(req.GetId())
namespace, name := ParseFqName(req.GetId())
if name == "" {
return nil, fmt.Errorf("failed to parse resource name from request ID: %s", req.GetId())
}
if oldInputs.GetName() == "" {
oldInputs.SetName(name)
}
if oldInputs.GetNamespace() == "" {
oldInputs.SetNamespace(namespace)
}

config := await.ReadConfig{
ProviderConfig: await.ProviderConfig{
Expand Down Expand Up @@ -768,16 +784,27 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p
// initialize.
}

// Attempt to parse the inputs for this object. If parsing was unsuccessful, retain the old inputs.
liveInputs := parseLiveInputs(liveObj, oldInputs)

// TODO(lblackstone): not sure why this is needed
id := FqObjName(liveObj)
if reqID := req.GetId(); len(reqID) > 0 {
id = reqID
}

// Return a new "checkpoint object".
inputsAndComputed, err := plugin.MarshalProperties(
checkpointObject(oldInputs, liveObj), plugin.MarshalOptions{
Label: fmt.Sprintf("%s.inputsAndComputed", label), KeepUnknowns: true, SkipNulls: true,
state, err := plugin.MarshalProperties(
checkpointObject(liveInputs, liveObj), plugin.MarshalOptions{
Label: fmt.Sprintf("%s.state", label), KeepUnknowns: true, SkipNulls: true,
})
if err != nil {
return nil, err
}

inputs, err := plugin.MarshalProperties(
resource.NewPropertyMapFromMap(liveInputs.Object), plugin.MarshalOptions{
Label: label + ".inputs", KeepUnknowns: true, SkipNulls: true,
})
if err != nil {
return nil, err
Expand All @@ -786,11 +813,11 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p
if readErr != nil {
// Resource was created but failed to initialize. Return live version of object so it can be
// checkpointed.
glog.V(3).Infof("%v", partialError(id, readErr, inputsAndComputed))
return nil, partialError(id, readErr, inputsAndComputed)
glog.V(3).Infof("%v", partialError(id, readErr, state, inputs))
return nil, partialError(id, readErr, state, inputs)
}

return &pulumirpc.ReadResponse{Id: id, Properties: inputsAndComputed}, nil
return &pulumirpc.ReadResponse{Id: id, Properties: state, Inputs: inputs}, nil
}

// Update updates an existing resource with new values. Currently this client supports the
Expand Down Expand Up @@ -881,6 +908,11 @@ func (k *kubeProvider) Update(
}
newInputs := propMapToUnstructured(newResInputs)

annotatedInputs, err := withLastAppliedConfig(newInputs)
if err != nil {
return nil, err
}

config := await.UpdateConfig{
ProviderConfig: await.ProviderConfig{
Context: k.canceler.context,
Expand All @@ -890,7 +922,7 @@ func (k *kubeProvider) Update(
DedupLogger: logging.NewLogger(k.canceler.context, k.host, urn),
},
Previous: oldInputs,
Inputs: newInputs,
Inputs: annotatedInputs,
}
// Apply update.
initialized, awaitErr := await.Update(config)
Expand Down Expand Up @@ -926,7 +958,7 @@ func (k *kubeProvider) Update(
if awaitErr != nil {
// Resource was updated/created but failed to initialize. Return live version of object so it
// can be checkpointed.
return nil, partialError(FqObjName(initialized), awaitErr, inputsAndComputed)
return nil, partialError(FqObjName(initialized), awaitErr, inputsAndComputed, nil)
}

return &pulumirpc.UpdateResponse{Properties: inputsAndComputed}, nil
Expand Down Expand Up @@ -991,7 +1023,7 @@ func (k *kubeProvider) Delete(

// Resource delete was issued, but failed to complete. Return live version of object so it can be
// checkpointed.
return nil, partialError(FqObjName(lastKnownState), awaitErr, inputsAndComputed)
return nil, partialError(FqObjName(lastKnownState), awaitErr, inputsAndComputed, nil)
}

return &pbempty.Empty{}, nil
Expand Down Expand Up @@ -1025,27 +1057,24 @@ func (k *kubeProvider) label() string {
}

func (k *kubeProvider) gvkFromURN(urn resource.URN) (schema.GroupVersionKind, error) {
// Strip prefix.
s := string(urn.Type())
contract.Assertf(strings.HasPrefix(s, k.providerPrefix),
"Expected prefix: %q, Kubernetes GVK is: %q", k.providerPrefix, string(urn))
s = s[len(k.providerPrefix):]
contract.Assertf(string(urn.Type().Package()) == k.providerPackage, "Kubernetes GVK is: %q", string(urn))

// Emit GVK.
gvk := strings.Split(s, gvkDelimiter)
gv := strings.Split(gvk[0], "/")
if len(gvk) < 2 {
return schema.GroupVersionKind{},
fmt.Errorf("GVK must have both an apiVersion and a Kind: %q", s)
} else if len(gv) != 2 {
kind := string(urn.Type().Name())
gv := strings.Split(string(urn.Type().Module().Name()), "/")
if len(gv) != 2 {
return schema.GroupVersionKind{},
fmt.Errorf("apiVersion does not have both a group and a version: %q", s)
fmt.Errorf("apiVersion does not have both a group and a version: %q", urn.Type().Module().Name())
}
group, version := gv[0], gv[1]
if group == "core" {
group = ""
}

return schema.GroupVersionKind{
Group: gv[0],
Version: gv[1],
Kind: gvk[1],
Group: group,
Version: version,
Kind: kind,
}, nil
}

Expand All @@ -1064,6 +1093,26 @@ func propMapToUnstructured(pm resource.PropertyMap) *unstructured.Unstructured {
return &unstructured.Unstructured{Object: pm.Mappable()}
}

func withLastAppliedConfig(config *unstructured.Unstructured) (*unstructured.Unstructured, error) {
// Serialize the inputs and add the last-applied-configuration annotation.
marshaled, err := config.MarshalJSON()
if err != nil {
return nil, err
}

// Deep copy the config before returning.
config = config.DeepCopy()

annotations := config.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}

annotations[lastAppliedConfigKey] = string(marshaled)
config.SetAnnotations(annotations)
return config, nil
}

func checkpointObject(inputs, live *unstructured.Unstructured) resource.PropertyMap {
object := resource.NewPropertyMapFromMap(live.Object)
object["__inputs"] = resource.NewObjectProperty(resource.NewPropertyMapFromMap(inputs.Object))
Expand Down Expand Up @@ -1101,15 +1150,16 @@ func parseCheckpointObject(obj resource.PropertyMap) (oldInputs, live *unstructu

// partialError creates an error for resources that did not complete an operation in progress.
// The last known state of the object is included in the error so that it can be checkpointed.
func partialError(id string, err error, inputsAndComputed *structpb.Struct) error {
func partialError(id string, err error, state *structpb.Struct, inputs *structpb.Struct) error {
reasons := []string{err.Error()}
if aggregate, isAggregate := err.(await.AggregatedError); isAggregate {
reasons = append(reasons, aggregate.SubErrors()...)
}
detail := pulumirpc.ErrorResourceInitFailed{
Id: id,
Properties: inputsAndComputed,
Properties: state,
Reasons: reasons,
Inputs: inputs,
}
return rpcerror.WithDetails(rpcerror.New(codes.Unknown, err.Error()), &detail)
}
Expand All @@ -1125,3 +1175,46 @@ func canonicalNamespace(ns string) string {

// deleteResponse causes the resource to be deleted from the state.
var deleteResponse = &pulumirpc.ReadResponse{Id: "", Properties: nil}

// parseLastAppliedConfig attempts to find and parse an annotation that records the last applied configuration for the
// given live object state.
func parseLastAppliedConfig(live *unstructured.Unstructured) *unstructured.Unstructured {
// If `kubectl.kubernetes.io/last-applied-configuration` metadata anotation is present, parse it into a real object
// and use it as the current set of live inputs. Otherwise, return nil.
annotations := live.GetAnnotations()
if annotations == nil {
return nil
}
lastAppliedConfig, ok := annotations[lastAppliedConfigKey]
if !ok {
return nil
}

liveInputs := &unstructured.Unstructured{}
if err := liveInputs.UnmarshalJSON([]byte(lastAppliedConfig)); err != nil {
return nil
}
return liveInputs
}

// parseLiveInputs attempts to parse the provider inputs that produced the given live object out of the object's state.
// This is used by Read.
func parseLiveInputs(live, oldInputs *unstructured.Unstructured) *unstructured.Unstructured {
// First try to find and parse a `kubectl.kubernetes.io/last-applied-configuration` metadata anotation. If that
// succeeds, we are done.
if inputs := parseLastAppliedConfig(live); inputs != nil {
return inputs
}

// If no such annotation was present--or if parsing failed--either retain the old inputs if they exist, or
// attempt to propagate the live object's GVK, any Pulumi-generated autoname and its annotation, and return
// the result.
if oldInputs != nil && len(oldInputs.Object) > 0 {
return oldInputs
}

inputs := &unstructured.Unstructured{Object: map[string]interface{}{}}
inputs.SetGroupVersionKind(live.GroupVersionKind())
metadata.AdoptOldAutonameIfUnnamed(inputs, live)
return inputs
}

0 comments on commit 158d54e

Please sign in to comment.