Skip to content

Commit

Permalink
WIP - almost working
Browse files Browse the repository at this point in the history
  • Loading branch information
lblackstone committed Jun 21, 2022
1 parent cf8922f commit 43c0379
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 19 deletions.
7 changes: 4 additions & 3 deletions provider/pkg/await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type ProviderConfig struct {
Host *pulumiprovider.HostClient
URN resource.URN
InitialAPIVersion string
FieldManager string
ClusterVersion *cluster.ServerVersion
ServerSideApply bool

Expand Down Expand Up @@ -183,7 +184,7 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) {

if c.ServerSideApply {
options := metav1.PatchOptions{
FieldManager: "pulumi-resource-kubernetes",
FieldManager: c.FieldManager,
}
objYAML, err := yaml.Marshal(c.Inputs.Object)
if err != nil {
Expand Down Expand Up @@ -407,7 +408,7 @@ func Update(c UpdateConfig) (*unstructured.Unstructured, error) {
if c.DryRun {
options.DryRun = []string{metav1.DryRunAll}
}
options.FieldManager = "pulumi-resource-kubernetes"
options.FieldManager = c.FieldManager
//force := true
//options.Force = &force

Expand Down Expand Up @@ -536,7 +537,7 @@ func Deletion(c DeleteConfig) error {
}

_, err = client.Patch(context.TODO(), c.Name, types.ApplyPatchType, yamlObj, metav1.PatchOptions{
FieldManager: "pulumi-resource-kubernetes",
FieldManager: c.FieldManager,
//FieldValidation: metav1.FieldValidationIgnore,
})
return err
Expand Down
3 changes: 3 additions & 0 deletions provider/pkg/metadata/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const (
AnnotationInitialAPIVersion = AnnotationPrefix + "initialApiVersion"
AnnotationReplaceUnready = AnnotationPrefix + "replaceUnready"

AnnotationPatchForce = AnnotationPrefix + "patchForce"
AnnotationPatchManager = AnnotationPrefix + "patchManager"

AnnotationHelmHook = "helm.sh/hook"
)

Expand Down
73 changes: 57 additions & 16 deletions provider/pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"strconv"
"strings"
"sync"
"time"

jsonpatch "github.com/evanphx/json-patch"
"github.com/golang/protobuf/ptypes/empty"
Expand Down Expand Up @@ -92,6 +93,7 @@ const (
invokeKustomize = "kubernetes:kustomize:directory"
lastAppliedConfigKey = "kubectl.kubernetes.io/last-applied-configuration"
initialAPIVersionKey = "__initialApiVersion"
fieldManagerKey = "__fieldManager"
)

type cancellationContext struct {
Expand Down Expand Up @@ -384,6 +386,7 @@ func (k *kubeProvider) DiffConfig(ctx context.Context, req *pulumirpc.DiffReques
func (k *kubeProvider) Configure(_ context.Context, req *pulumirpc.ConfigureRequest) (*pulumirpc.ConfigureResponse, error) {
const trueStr = "true"

time.Sleep(5 * time.Second)
vars := req.GetVariables()

//
Expand Down Expand Up @@ -1535,8 +1538,10 @@ func (k *kubeProvider) Diff(ctx context.Context, req *pulumirpc.DiffRequest) (*p
newInputs.GetNamespace(), newInputs.GetName())
}

fieldManager := fieldManagerName(oldState)

// Try to compute a server-side patch.
ssPatch, ssPatchBase, ssPatchOk := k.tryServerSidePatch(oldInputs, newInputs, gvk)
ssPatch, ssPatchBase, ssPatchOk := k.tryServerSidePatch(oldInputs, newInputs, gvk, fieldManager)

// If the server-side patch succeeded, then merge that patch into the client-side patch and override any conflicts
// with the server-side values.
Expand Down Expand Up @@ -1699,6 +1704,7 @@ func (k *kubeProvider) Create(
}

initialAPIVersion := newInputs.GetAPIVersion()
fieldManager := fieldManagerName(newResInputs)

if k.yamlRenderMode {
if newResInputs.ContainsSecrets() {
Expand All @@ -1711,7 +1717,7 @@ func (k *kubeProvider) Create(
return nil, err
}

obj := checkpointObject(newInputs, annotatedInputs, newResInputs, initialAPIVersion)
obj := checkpointObject(newInputs, annotatedInputs, newResInputs, initialAPIVersion, fieldManager)
inputsAndComputed, err := plugin.MarshalProperties(
obj, plugin.MarshalOptions{
Label: fmt.Sprintf("%s.inputsAndComputed", label),
Expand Down Expand Up @@ -1741,6 +1747,7 @@ func (k *kubeProvider) Create(
Host: k.host,
URN: urn,
InitialAPIVersion: initialAPIVersion,
FieldManager: fieldManager,
ClusterVersion: &k.k8sVersion,
ClientSet: k.clientSet,
DedupLogger: logging.NewLogger(k.canceler.context, k.host, urn),
Expand Down Expand Up @@ -1791,7 +1798,7 @@ func (k *kubeProvider) Create(
initialized = partialErr.Object()
}

obj := checkpointObject(newInputs, initialized, newResInputs, initialAPIVersion)
obj := checkpointObject(newInputs, initialized, newResInputs, initialAPIVersion, fieldManager)
inputsAndComputed, err := plugin.MarshalProperties(
obj, plugin.MarshalOptions{
Label: fmt.Sprintf("%s.inputsAndComputed", label),
Expand Down Expand Up @@ -1921,11 +1928,12 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p
if err != nil {
return nil, err
}
fieldManager := fieldManagerName(oldState)

if k.yamlRenderMode {
// Return a new "checkpoint object".
state, err := plugin.MarshalProperties(
checkpointObject(oldInputs, oldLive, oldState, initialAPIVersion), plugin.MarshalOptions{
checkpointObject(oldInputs, oldLive, oldState, initialAPIVersion, fieldManager), plugin.MarshalOptions{
Label: fmt.Sprintf("%s.state", label),
KeepUnknowns: true,
SkipNulls: true,
Expand Down Expand Up @@ -1955,6 +1963,7 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p
Host: k.host,
URN: urn,
InitialAPIVersion: initialAPIVersion,
FieldManager: fieldManager,
ClientSet: k.clientSet,
DedupLogger: logging.NewLogger(k.canceler.context, k.host, urn),
Resources: resources,
Expand Down Expand Up @@ -2030,7 +2039,7 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p

// Return a new "checkpoint object".
state, err := plugin.MarshalProperties(
checkpointObject(liveInputs, liveObj, oldInputsPM, initialAPIVersion), plugin.MarshalOptions{
checkpointObject(liveInputs, liveObj, oldInputsPM, initialAPIVersion, fieldManager), plugin.MarshalOptions{
Label: fmt.Sprintf("%s.state", label),
KeepUnknowns: true,
SkipNulls: true,
Expand Down Expand Up @@ -2182,6 +2191,7 @@ func (k *kubeProvider) Update(
if err != nil {
return nil, err
}
fieldManager := fieldManagerName(oldState)

if k.yamlRenderMode {
if newResInputs.ContainsSecrets() {
Expand All @@ -2194,7 +2204,7 @@ func (k *kubeProvider) Update(
return nil, err
}

obj := checkpointObject(newInputs, annotatedInputs, newResInputs, initialAPIVersion)
obj := checkpointObject(newInputs, annotatedInputs, newResInputs, initialAPIVersion, fieldManager)
inputsAndComputed, err := plugin.MarshalProperties(
obj, plugin.MarshalOptions{
Label: fmt.Sprintf("%s.inputsAndComputed", label),
Expand Down Expand Up @@ -2222,6 +2232,7 @@ func (k *kubeProvider) Update(
Host: k.host,
URN: urn,
InitialAPIVersion: initialAPIVersion,
FieldManager: fieldManager,
ClientSet: k.clientSet,
DedupLogger: logging.NewLogger(k.canceler.context, k.host, urn),
Resources: resources,
Expand Down Expand Up @@ -2262,7 +2273,7 @@ func (k *kubeProvider) Update(
// initialize.
}
// Return a new "checkpoint object".
obj := checkpointObject(newInputs, initialized, newResInputs, initialAPIVersion)
obj := checkpointObject(newInputs, initialized, newResInputs, initialAPIVersion, fieldManager)
inputsAndComputed, err := plugin.MarshalProperties(
obj, plugin.MarshalOptions{
Label: fmt.Sprintf("%s.inputsAndComputed", label),
Expand Down Expand Up @@ -2341,6 +2352,7 @@ func (k *kubeProvider) Delete(ctx context.Context, req *pulumirpc.DeleteRequest)
if err != nil {
return nil, err
}
fieldManager := fieldManagerName(oldState)
resources, err := k.getResources()
if err != nil {
return nil, pkgerrors.Wrapf(err, "Failed to fetch OpenAPI schema from the API server")
Expand All @@ -2352,6 +2364,7 @@ func (k *kubeProvider) Delete(ctx context.Context, req *pulumirpc.DeleteRequest)
Host: k.host,
URN: urn,
InitialAPIVersion: initialAPIVersion,
FieldManager: fieldManager,
ClientSet: k.clientSet,
DedupLogger: logging.NewLogger(k.canceler.context, k.host, urn),
Resources: resources,
Expand Down Expand Up @@ -2379,7 +2392,7 @@ func (k *kubeProvider) Delete(ctx context.Context, req *pulumirpc.DeleteRequest)
lastKnownState := partialErr.Object()

inputsAndComputed, err := plugin.MarshalProperties(
checkpointObject(current, lastKnownState, oldState, initialAPIVersion), plugin.MarshalOptions{
checkpointObject(current, lastKnownState, oldState, initialAPIVersion, fieldManager), plugin.MarshalOptions{
Label: fmt.Sprintf("%s.inputsAndComputed", label),
KeepUnknowns: true,
SkipNulls: true,
Expand Down Expand Up @@ -2460,7 +2473,7 @@ func (k *kubeProvider) readLiveObject(obj *unstructured.Unstructured) (*unstruct
return rc.Get(context.TODO(), obj.GetName(), metav1.GetOptions{})
}

func (k *kubeProvider) serverSidePatch(oldInputs, newInputs *unstructured.Unstructured,
func (k *kubeProvider) serverSidePatch(oldInputs, newInputs *unstructured.Unstructured, fieldManager string,
) ([]byte, map[string]interface{}, error) {

client, err := k.clientSet.ResourceClient(oldInputs.GroupVersionKind(), oldInputs.GetNamespace())
Expand Down Expand Up @@ -2499,10 +2512,18 @@ func (k *kubeProvider) serverSidePatch(oldInputs, newInputs *unstructured.Unstru
return nil, nil, err
}

newObject, err = client.Patch(context.TODO(), newInputs.GetName(), types.ApplyPatchType, objYAML, metav1.PatchOptions{
DryRun: []string{metav1.DryRunAll},
FieldManager: "pulumi-resource-kubernetes",
})
force := metadata.IsAnnotationTrue(newInputs, metadata.AnnotationPatchForce)
// TODO: this may not be working
if v := metadata.GetAnnotationValue(newInputs, metadata.AnnotationPatchManager); len(v) > 0 {
fieldManager = v
}

newObject, err = client.Patch(
context.TODO(), newInputs.GetName(), types.ApplyPatchType, objYAML, metav1.PatchOptions{
DryRun: []string{metav1.DryRunAll},
FieldManager: fieldManager,
Force: &force,
})
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -2595,7 +2616,7 @@ func (k *kubeProvider) isDryRunDisabledError(err error) bool {

// tryServerSidePatch attempts to compute a server-side patch. Returns true iff the operation succeeded.
func (k *kubeProvider) tryServerSidePatch(oldInputs, newInputs *unstructured.Unstructured, gvk schema.GroupVersionKind,
) ([]byte, map[string]interface{}, bool) {
fieldManager string) ([]byte, map[string]interface{}, bool) {
// If the resource's GVK changed, so compute patch using inputs.
if oldInputs.GroupVersionKind().String() != gvk.String() {
return nil, nil, false
Expand All @@ -2611,7 +2632,7 @@ func (k *kubeProvider) tryServerSidePatch(oldInputs, newInputs *unstructured.Uns
return nil, nil, false
}

ssPatch, ssPatchBase, err := k.serverSidePatch(oldInputs, newInputs)
ssPatch, ssPatchBase, err := k.serverSidePatch(oldInputs, newInputs, fieldManager)
if k.isDryRunDisabledError(err) {
return nil, nil, false
}
Expand All @@ -2631,6 +2652,10 @@ func (k *kubeProvider) tryServerSidePatch(oldInputs, newInputs *unstructured.Uns
}

func (k *kubeProvider) withLastAppliedConfig(config *unstructured.Unstructured) (*unstructured.Unstructured, error) {
if k.serverSideApplyMode {
return config, nil
}

if k.supportsDryRun(config.GroupVersionKind()) {
// Skip last-applied-config annotation if the resource supports server-side apply.
return config, nil
Expand Down Expand Up @@ -2720,7 +2745,22 @@ func initialAPIVersion(state resource.PropertyMap, oldConfig *unstructured.Unstr
return oldConfig.GetAPIVersion(), nil
}

func checkpointObject(inputs, live *unstructured.Unstructured, fromInputs resource.PropertyMap, initialAPIVersion string) resource.PropertyMap {
// fieldManagerName retrieves the fieldManagerName property from the checkpoint file, or assigns one if it does not
// exist.
func fieldManagerName(state resource.PropertyMap) string {
if v, ok := state[fieldManagerKey]; ok {
return v.StringValue()
}

prefix := "pulumi-kubernetes-"
fieldManager, err := resource.NewUniqueHex(prefix, 0, 0)
contract.AssertNoError(err)

return fieldManager
}

func checkpointObject(inputs, live *unstructured.Unstructured, fromInputs resource.PropertyMap,
initialAPIVersion, fieldManager string) resource.PropertyMap {
object := resource.NewPropertyMapFromMap(live.Object)
inputsPM := resource.NewPropertyMapFromMap(inputs.Object)

Expand Down Expand Up @@ -2762,6 +2802,7 @@ func checkpointObject(inputs, live *unstructured.Unstructured, fromInputs resour

object["__inputs"] = resource.NewObjectProperty(inputsPM)
object[initialAPIVersionKey] = resource.NewStringProperty(initialAPIVersion)
object[fieldManagerKey] = resource.NewStringProperty(fieldManager)

return object
}
Expand Down

0 comments on commit 43c0379

Please sign in to comment.