From e70286b5cd30cd6b353bb6fbc0c4aee9fab089c3 Mon Sep 17 00:00:00 2001 From: Kyle Dixler Date: Fri, 9 Sep 2022 11:56:02 -0700 Subject: [PATCH 01/11] faste --- pkg/backend/snapshot.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/pkg/backend/snapshot.go b/pkg/backend/snapshot.go index 6bd4f2c2c9a2..fcb4a31bc9c3 100644 --- a/pkg/backend/snapshot.go +++ b/pkg/backend/snapshot.go @@ -621,8 +621,11 @@ func (sm *SnapshotManager) snap() *deploy.Snapshot { return deploy.NewSnapshot(manifest, sm.persister.SecretsManager(), resources, operations) } +var SaveSnapshotCount = 0 + // saveSnapshot persists the current snapshot and optionally verifies it afterwards. func (sm *SnapshotManager) saveSnapshot() error { + SaveSnapshotCount++ snap, err := sm.snap().NormalizeURNReferences() if err != nil { return fmt.Errorf("failed to normalize URN references: %w", err) @@ -660,7 +663,6 @@ func NewSnapshotManager(persister SnapshotPersister, baseSnap *deploy.Snapshot) go func() { // True if we have elided writes since the last actual write. - hasElidedWrites := false // Service each mutation request in turn. serviceLoop: @@ -669,23 +671,15 @@ func NewSnapshotManager(persister SnapshotPersister, baseSnap *deploy.Snapshot) case request := <-mutationRequests: var err error if request.mutator() { - err = manager.saveSnapshot() - hasElidedWrites = false - } else { - hasElidedWrites = true + //err = manager.saveSnapshot() } request.result <- err case <-cancel: break serviceLoop } } - - // If we still have elided writes once the channel has closed, flush the snapshot. - var err error - if hasElidedWrites { - logging.V(9).Infof("SnapshotManager: flushing elided writes...") - err = manager.saveSnapshot() - } + fmt.Println("saving snapshot") + err := manager.saveSnapshot() done <- err }() From 65e57c09337f62a2f4ffcabb47a3f48aca461e69 Mon Sep 17 00:00:00 2001 From: Kyle Dixler Date: Mon, 12 Sep 2022 14:53:50 -0700 Subject: [PATCH 02/11] wip --- pkg/backend/snapshot.go | 47 +- pkg/backend/unsafe_snapshot_test.go | 710 ++++++++++++++++++++++++++++ 2 files changed, 751 insertions(+), 6 deletions(-) create mode 100644 pkg/backend/unsafe_snapshot_test.go diff --git a/pkg/backend/snapshot.go b/pkg/backend/snapshot.go index fcb4a31bc9c3..eece781823b4 100644 --- a/pkg/backend/snapshot.go +++ b/pkg/backend/snapshot.go @@ -17,6 +17,7 @@ package backend import ( "errors" "fmt" + "os" "reflect" "sort" "time" @@ -621,11 +622,8 @@ func (sm *SnapshotManager) snap() *deploy.Snapshot { return deploy.NewSnapshot(manifest, sm.persister.SecretsManager(), resources, operations) } -var SaveSnapshotCount = 0 - // saveSnapshot persists the current snapshot and optionally verifies it afterwards. func (sm *SnapshotManager) saveSnapshot() error { - SaveSnapshotCount++ snap, err := sm.snap().NormalizeURNReferences() if err != nil { return fmt.Errorf("failed to normalize URN references: %w", err) @@ -641,6 +639,26 @@ func (sm *SnapshotManager) saveSnapshot() error { return nil } +func (sm *SnapshotManager) unsafeServiceLoop(mutationRequests chan mutationRequest, done chan error) { + hasIntegrity := true + for { + select { + case request := <-mutationRequests: + request.mutator() + request.result <- nil + case <-sm.cancel: + if !hasIntegrity { + fmt.Println("warning snapshot integrity compromised, run `pulumi refresh`") + return + } + done <- sm.saveSnapshot() + return + } + } +} + +const experimentalSnapshotManagerFlag = "PULUMI_EXPERIMENTAL_SNAPSHOT_MANAGER" + // NewSnapshotManager creates a new SnapshotManager for the given stack name, using the given persister // and base snapshot. // @@ -662,7 +680,16 @@ func NewSnapshotManager(persister SnapshotPersister, baseSnap *deploy.Snapshot) } go func() { + unsafeEnabled := os.Getenv(experimentalSnapshotManagerFlag) != "" + if unsafeEnabled { + // this codepath skips writing back snapshots + // on all mutations. It uses internal state + manager.unsafeServiceLoop(mutationRequests, done) + return + } + // True if we have elided writes since the last actual write. + hasElidedWrites := false // Service each mutation request in turn. serviceLoop: @@ -671,15 +698,23 @@ func NewSnapshotManager(persister SnapshotPersister, baseSnap *deploy.Snapshot) case request := <-mutationRequests: var err error if request.mutator() { - //err = manager.saveSnapshot() + err = manager.saveSnapshot() + hasElidedWrites = false + } else { + hasElidedWrites = true } request.result <- err case <-cancel: break serviceLoop } } - fmt.Println("saving snapshot") - err := manager.saveSnapshot() + + // If we still have elided writes once the channel has closed, flush the snapshot. + var err error + if hasElidedWrites { + logging.V(9).Infof("SnapshotManager: flushing elided writes...") + err = manager.saveSnapshot() + } done <- err }() diff --git a/pkg/backend/unsafe_snapshot_test.go b/pkg/backend/unsafe_snapshot_test.go new file mode 100644 index 000000000000..557d058949a3 --- /dev/null +++ b/pkg/backend/unsafe_snapshot_test.go @@ -0,0 +1,710 @@ +// Copyright 2016-2018, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// this file is copied from snapshot_test.go + +package backend + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/pulumi/pulumi/pkg/v3/resource/deploy" + "github.com/pulumi/pulumi/pkg/v3/resource/stack" + "github.com/pulumi/pulumi/sdk/v3/go/common/resource" + "github.com/pulumi/pulumi/sdk/v3/go/common/resource/config" + "github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin" +) + +func TestIdenticalSamesUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + t.Parallel() + + sameState := NewResource("a-unique-urn") + snap := NewSnapshot([]*resource.State{ + sameState, + }) + + manager, sp := MockSetup(t, snap) + + // The engine generates a SameStep on sameState. + engineGeneratedSame := NewResource(string(sameState.URN)) + same := deploy.NewSameStep(nil, nil, sameState, engineGeneratedSame) + + mutation, err := manager.BeginMutation(same) + assert.NoError(t, err) + // No mutation was made + assert.Empty(t, sp.SavedSnapshots) + + err = mutation.End(same, true) + assert.NoError(t, err) + + // Identical sames do not cause a snapshot mutation as part of `End`. + assert.Empty(t, sp.SavedSnapshots) + + // Close must write the snapshot. + err = manager.Close() + assert.NoError(t, err) + assert.Len(t, sp.SavedSnapshots, 1) +} + +func TestSamesWithEmptyDependenciesUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + t.Parallel() + + res := NewResourceWithDeps("a-unique-urn-resource-a", nil) + snap := NewSnapshot([]*resource.State{ + res, + }) + manager, sp := MockSetup(t, snap) + resUpdated := NewResourceWithDeps(string(res.URN), []resource.URN{}) + same := deploy.NewSameStep(nil, nil, res, resUpdated) + mutation, err := manager.BeginMutation(same) + assert.NoError(t, err) + err = mutation.End(same, true) + assert.NoError(t, err) + err = manager.Close() + assert.NoError(t, err) + assert.Len(t, sp.SavedSnapshots, 1) +} + +func TestSamesWithEmptyArraysInInputsUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + t.Parallel() + + // Model reading from state file + state := map[string]interface{}{"defaults": []interface{}{}} + inputs, err := stack.DeserializeProperties(state, config.NopDecrypter, config.NopEncrypter) + assert.NoError(t, err) + + res := NewResourceWithInputs("a-unique-urn-resource-a", inputs) + snap := NewSnapshot([]*resource.State{ + res, + }) + manager, sp := MockSetup(t, snap) + + // Model passing into and back out of RPC layer (e.g. via `Check`) + marshalledInputs, err := plugin.MarshalProperties(inputs, plugin.MarshalOptions{}) + assert.NoError(t, err) + inputsUpdated, err := plugin.UnmarshalProperties(marshalledInputs, plugin.MarshalOptions{}) + assert.NoError(t, err) + + resUpdated := NewResourceWithInputs(string(res.URN), inputsUpdated) + same := deploy.NewSameStep(nil, nil, res, resUpdated) + mutation, err := manager.BeginMutation(same) + assert.NoError(t, err) + err = mutation.End(same, true) + assert.NoError(t, err) + err = manager.Close() + assert.NoError(t, err) + assert.Len(t, sp.SavedSnapshots, 1) +} + +// This test exercises same steps with meaningful changes to properties _other_ than `Dependencies` in order to ensure +// that the snapshot is written. +func TestSamesWithOtherMeaningfulChangesUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + t.Parallel() + + provider := NewResource("urn:pulumi:foo::bar::pulumi:providers:pkgA::provider") + provider.Custom, provider.Type, provider.ID = true, "pulumi:providers:pkgA", "id" + + resourceP := NewResource("a-unique-urn-resource-p") + resourceA := NewResource("a-unique-urn-resource-a") + + var changes []*resource.State + + // Change the "custom" bit. + changes = append(changes, NewResource(string(resourceA.URN))) + changes[0].Custom, changes[0].Provider = true, "urn:pulumi:foo::bar::pulumi:providers:pkgA::provider::id" + + // Change the parent. + changes = append(changes, NewResource(string(resourceA.URN))) + changes[1].Parent = resourceP.URN + + // Change the "protect" bit. + changes = append(changes, NewResource(string(resourceA.URN))) + changes[2].Protect = !resourceA.Protect + + // Change the resource outputs. + changes = append(changes, NewResource(string(resourceA.URN))) + changes[3].Outputs = resource.PropertyMap{"foo": resource.NewStringProperty("bar")} + + snap := NewSnapshot([]*resource.State{ + provider, + resourceP, + resourceA, + }) + + for _, c := range changes { + manager, sp := MockSetup(t, snap) + + // Generate a same for the provider. + provUpdated := NewResource(string(provider.URN)) + provUpdated.Custom, provUpdated.Type = true, provider.Type + provSame := deploy.NewSameStep(nil, nil, provider, provUpdated) + mutation, err := manager.BeginMutation(provSame) + assert.NoError(t, err) + _, _, err = provSame.Apply(false) + assert.NoError(t, err) + err = mutation.End(provSame, true) + assert.NoError(t, err) + assert.Empty(t, sp.SavedSnapshots) + + // The engine generates a Same for p. This is not a meaningful change, so the snapshot is not written. + pUpdated := NewResource(string(resourceP.URN)) + pSame := deploy.NewSameStep(nil, nil, resourceP, pUpdated) + mutation, err = manager.BeginMutation(pSame) + assert.NoError(t, err) + err = mutation.End(pSame, true) + assert.NoError(t, err) + assert.Empty(t, sp.SavedSnapshots) + + // The engine generates a Same for a. Because this is a meaningful change, the snapshot is written: + aSame := deploy.NewSameStep(nil, nil, resourceA, c) + mutation, err = manager.BeginMutation(aSame) + assert.NoError(t, err) + err = mutation.End(aSame, true) + assert.NoError(t, err) + + err = manager.Close() + assert.NoError(t, err) + inSnapshot := sp.SavedSnapshots[0].Resources[2] + assert.Equal(t, c, inSnapshot) + + assert.Len(t, sp.SavedSnapshots, 1) + } + + // Set up a second provider and change the resource's provider reference. + provider2 := NewResource("urn:pulumi:foo::bar::pulumi:providers:pkgA::provider2") + provider2.Custom, provider2.Type, provider2.ID = true, "pulumi:providers:pkgA", "id2" + + resourceA.Custom, resourceA.ID, resourceA.Provider = + true, "id", "urn:pulumi:foo::bar::pulumi:providers:pkgA::provider::id" + + snap = NewSnapshot([]*resource.State{ + provider, + provider2, + resourceA, + }) + + changes = []*resource.State{NewResource(string(resourceA.URN))} + changes[0].Custom, changes[0].Provider = true, "urn:pulumi:foo::bar::pulumi:providers:pkgA::provider2::id2" + + for _, c := range changes { + manager, sp := MockSetup(t, snap) + + // Generate sames for the providers. + provUpdated := NewResource(string(provider.URN)) + provUpdated.Custom, provUpdated.Type = true, provider.Type + provSame := deploy.NewSameStep(nil, nil, provider, provUpdated) + mutation, err := manager.BeginMutation(provSame) + assert.NoError(t, err) + _, _, err = provSame.Apply(false) + assert.NoError(t, err) + err = mutation.End(provSame, true) + assert.NoError(t, err) + assert.Empty(t, sp.SavedSnapshots) + + // The engine generates a Same for p. This is not a meaningful change, so the snapshot is not written. + prov2Updated := NewResource(string(provider2.URN)) + prov2Updated.Custom, prov2Updated.Type = true, provider.Type + prov2Same := deploy.NewSameStep(nil, nil, provider2, prov2Updated) + mutation, err = manager.BeginMutation(prov2Same) + assert.NoError(t, err) + _, _, err = prov2Same.Apply(false) + assert.NoError(t, err) + err = mutation.End(prov2Same, true) + assert.NoError(t, err) + assert.Empty(t, sp.SavedSnapshots) + + // The engine generates a Same for a. Because this is a meaningful change, the snapshot is written: + aSame := deploy.NewSameStep(nil, nil, resourceA, c) + mutation, err = manager.BeginMutation(aSame) + assert.NoError(t, err) + _, _, err = aSame.Apply(false) + assert.NoError(t, err) + err = mutation.End(aSame, true) + assert.NoError(t, err) + + err = manager.Close() + assert.NoError(t, err) + assert.Len(t, sp.SavedSnapshots, 1) + } +} + +// This test exercises the merge operation with a particularly vexing deployment +// state that was useful in shaking out bugs. +func TestVexingDeploymentUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + t.Parallel() + + // This is the dependency graph we are going for in the base snapshot: + // + // +-+ + // +--> |A| + // | +-+ + // | ^ + // | +-+ + // | |B| + // | +-+ + // | ^ + // | +-+ + // +--+ |C| <---+ + // +-+ | + // ^ | + // +-+ | + // |D| | + // +-+ | + // | + // +-+ | + // |E| +---+ + // +-+ + a := NewResource("a") + b := NewResource("b", a.URN) + c := NewResource("c", a.URN, b.URN) + d := NewResource("d", c.URN) + e := NewResource("e", c.URN) + snap := NewSnapshot([]*resource.State{ + a, + b, + c, + d, + e, + }) + + manager, sp := MockSetup(t, snap) + + // This is the sequence of events that come out of the engine: + // B - Same, depends on nothing + // C - CreateReplacement, depends on B + // C - Replace + // D - Update, depends on new C + + // This produces the following dependency graph in the new snapshot: + // +-+ + // +---> |B| + // | +++ + // | ^ + // | +++ + // | |C| <----+ + // | +-+ | + // | | + // | +-+ | + // +---+ |C| +-------------> A (not in graph!) + // +-+ | + // | + // +-+ | + // |D| +---+ + // +-+ + // + // Conceptually, this is a plan that deletes A. However, we have not yet observed the + // deletion of A, presumably because the engine can't know for sure that it's been deleted + // until the eval source completes. Of note in this snapshot is that the replaced C is still in the graph, + // because it has not yet been deleted, and its dependency A is not in the graph because it + // has not been seen. + // + // Since axiomatically we assume that steps come in in a valid topological order of the dependency graph, + // we can logically assume that A is going to be deleted. (If A were not being deleted, it must have been + // the target of a Step that came before C, which depends on it.) + applyStep := func(step deploy.Step) { + mutation, err := manager.BeginMutation(step) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = mutation.End(step, true) + if !assert.NoError(t, err) { + t.FailNow() + } + } + + // b now depends on nothing + bPrime := NewResource(string(b.URN)) + applyStep(deploy.NewSameStep(nil, MockRegisterResourceEvent{}, b, bPrime)) + + // c now only depends on b + cPrime := NewResource(string(c.URN), bPrime.URN) + + // mocking out the behavior of a provider indicating that this resource needs to be deleted + createReplacement := deploy.NewCreateReplacementStep(nil, MockRegisterResourceEvent{}, c, cPrime, nil, nil, nil, true) + replace := deploy.NewReplaceStep(nil, c, cPrime, nil, nil, nil, true) + c.Delete = true + + applyStep(createReplacement) + applyStep(replace) + + // cPrime now exists, c is now pending deletion + // dPrime now depends on cPrime, which got replaced + dPrime := NewResource(string(d.URN), cPrime.URN) + applyStep(deploy.NewUpdateStep(nil, MockRegisterResourceEvent{}, d, dPrime, nil, nil, nil, nil)) + + err := manager.Close() + assert.NoError(t, err) + assert.Len(t, sp.SavedSnapshots, 1) +} + +func TestDeletionUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + t.Parallel() + + resourceA := NewResource("a") + snap := NewSnapshot([]*resource.State{ + resourceA, + }) + + manager, sp := MockSetup(t, snap) + step := deploy.NewDeleteStep(nil, resourceA) + mutation, err := manager.BeginMutation(step) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = mutation.End(step, true) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = manager.Close() + assert.NoError(t, err) + assert.Len(t, sp.SavedSnapshots, 1) +} + +func TestFailedDeleteUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + t.Parallel() + + resourceA := NewResource("a") + snap := NewSnapshot([]*resource.State{ + resourceA, + }) + + manager, sp := MockSetup(t, snap) + step := deploy.NewDeleteStep(nil, resourceA) + mutation, err := manager.BeginMutation(step) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = mutation.End(step, false /* successful */) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = manager.Close() + assert.NoError(t, err) + assert.Len(t, sp.SavedSnapshots, 1) +} + +func TestRecordingCreateSuccessUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + t.Parallel() + + resourceA := NewResource("a") + snap := NewSnapshot(nil) + manager, sp := MockSetup(t, snap) + step := deploy.NewCreateStep(nil, &MockRegisterResourceEvent{}, resourceA) + mutation, err := manager.BeginMutation(step) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = mutation.End(step, true /* successful */) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = manager.Close() + assert.NoError(t, err) + assert.Len(t, sp.SavedSnapshots, 1) +} + +func TestRecordingCreateFailureUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + t.Parallel() + + resourceA := NewResource("a") + snap := NewSnapshot(nil) + manager, sp := MockSetup(t, snap) + step := deploy.NewCreateStep(nil, &MockRegisterResourceEvent{}, resourceA) + mutation, err := manager.BeginMutation(step) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = mutation.End(step, false /* successful */) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = manager.Close() + assert.NoError(t, err) + assert.Len(t, sp.SavedSnapshots, 1) +} + +func TestRecordingUpdateSuccessUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + t.Parallel() + + resourceA := NewResource("a") + resourceA.Inputs["key"] = resource.NewStringProperty("old") + resourceANew := NewResource("a") + resourceANew.Inputs["key"] = resource.NewStringProperty("new") + snap := NewSnapshot([]*resource.State{ + resourceA, + }) + + manager, sp := MockSetup(t, snap) + step := deploy.NewUpdateStep(nil, &MockRegisterResourceEvent{}, resourceA, resourceANew, nil, nil, nil, nil) + mutation, err := manager.BeginMutation(step) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = mutation.End(step, true /* successful */) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = manager.Close() + assert.NoError(t, err) + assert.Len(t, sp.SavedSnapshots, 1) +} + +func TestRecordingUpdateFailureUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + t.Parallel() + + resourceA := NewResource("a") + resourceA.Inputs["key"] = resource.NewStringProperty("old") + resourceANew := NewResource("a") + resourceANew.Inputs["key"] = resource.NewStringProperty("new") + snap := NewSnapshot([]*resource.State{ + resourceA, + }) + + manager, sp := MockSetup(t, snap) + step := deploy.NewUpdateStep(nil, &MockRegisterResourceEvent{}, resourceA, resourceANew, nil, nil, nil, nil) + mutation, err := manager.BeginMutation(step) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = mutation.End(step, false /* successful */) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = manager.Close() + assert.NoError(t, err) + assert.Len(t, sp.SavedSnapshots, 1) +} + +func TestRecordingDeleteSuccessUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + t.Parallel() + + resourceA := NewResource("a") + snap := NewSnapshot([]*resource.State{ + resourceA, + }) + manager, sp := MockSetup(t, snap) + step := deploy.NewDeleteStep(nil, resourceA) + mutation, err := manager.BeginMutation(step) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = mutation.End(step, true /* successful */) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = manager.Close() + assert.NoError(t, err) + assert.Len(t, sp.SavedSnapshots, 1) +} + +func TestRecordingDeleteFailureUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + t.Parallel() + + resourceA := NewResource("a") + snap := NewSnapshot([]*resource.State{ + resourceA, + }) + manager, sp := MockSetup(t, snap) + step := deploy.NewDeleteStep(nil, resourceA) + mutation, err := manager.BeginMutation(step) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = mutation.End(step, false /* successful */) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = manager.Close() + assert.NoError(t, err) + assert.Len(t, sp.SavedSnapshots, 1) +} + +func TestRecordingReadSuccessNoPreviousResourceUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + t.Parallel() + + resourceA := NewResource("b") + resourceA.ID = "some-b" + resourceA.External = true + resourceA.Custom = true + snap := NewSnapshot(nil) + manager, sp := MockSetup(t, snap) + step := deploy.NewReadStep(nil, nil, nil, resourceA) + mutation, err := manager.BeginMutation(step) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = mutation.End(step, true /* successful */) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = manager.Close() + assert.NoError(t, err) + assert.Len(t, sp.SavedSnapshots, 1) +} + +func TestRecordingReadSuccessPreviousResourceUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + t.Parallel() + + resourceA := NewResource("c") + resourceA.ID = "some-c" + resourceA.External = true + resourceA.Custom = true + resourceA.Inputs["key"] = resource.NewStringProperty("old") + resourceANew := NewResource("c") + resourceANew.ID = "some-other-c" + resourceANew.External = true + resourceANew.Custom = true + resourceANew.Inputs["key"] = resource.NewStringProperty("new") + + snap := NewSnapshot([]*resource.State{ + resourceA, + }) + manager, sp := MockSetup(t, snap) + step := deploy.NewReadStep(nil, nil, resourceA, resourceANew) + mutation, err := manager.BeginMutation(step) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = mutation.End(step, true /* successful */) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = manager.Close() + assert.NoError(t, err) + assert.Len(t, sp.SavedSnapshots, 1) +} + +func TestRecordingReadFailureNoPreviousResourceUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + t.Parallel() + + resourceA := NewResource("d") + resourceA.ID = "some-d" + resourceA.External = true + resourceA.Custom = true + snap := NewSnapshot(nil) + manager, sp := MockSetup(t, snap) + step := deploy.NewReadStep(nil, nil, nil, resourceA) + mutation, err := manager.BeginMutation(step) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = mutation.End(step, false /* successful */) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = manager.Close() + assert.NoError(t, err) + assert.Len(t, sp.SavedSnapshots, 1) +} + +func TestRecordingReadFailurePreviousResourceUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + t.Parallel() + + resourceA := NewResource("e") + resourceA.ID = "some-e" + resourceA.External = true + resourceA.Custom = true + resourceA.Inputs["key"] = resource.NewStringProperty("old") + resourceANew := NewResource("e") + resourceANew.ID = "some-new-e" + resourceANew.External = true + resourceANew.Custom = true + resourceANew.Inputs["key"] = resource.NewStringProperty("new") + + snap := NewSnapshot([]*resource.State{ + resourceA, + }) + manager, sp := MockSetup(t, snap) + step := deploy.NewReadStep(nil, nil, resourceA, resourceANew) + mutation, err := manager.BeginMutation(step) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = mutation.End(step, false /* successful */) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = manager.Close() + assert.NoError(t, err) + assert.Len(t, sp.SavedSnapshots, 1) +} + +func TestRegisterOutputsUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + t.Parallel() + + resourceA := NewResource("a") + snap := NewSnapshot([]*resource.State{ + resourceA, + }) + manager, sp := MockSetup(t, snap) + + // There should be zero snaps performed at the start. + assert.Len(t, sp.SavedSnapshots, 0) + + // The step here is not important. + step := deploy.NewSameStep(nil, nil, resourceA, resourceA) + err := manager.RegisterResourceOutputs(step) + if !assert.NoError(t, err) { + t.FailNow() + } + + err = manager.Close() + assert.NoError(t, err) + assert.Len(t, sp.SavedSnapshots, 1) +} From 48bc13401e3a616a2f6162357a91a21f5c0be408 Mon Sep 17 00:00:00 2001 From: Kyle Dixler Date: Tue, 13 Sep 2022 07:38:49 -0700 Subject: [PATCH 03/11] lint --- pkg/backend/unsafe_snapshot_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/backend/unsafe_snapshot_test.go b/pkg/backend/unsafe_snapshot_test.go index 557d058949a3..ef82226855d4 100644 --- a/pkg/backend/unsafe_snapshot_test.go +++ b/pkg/backend/unsafe_snapshot_test.go @@ -118,8 +118,8 @@ func TestSamesWithOtherMeaningfulChangesUnsafe(t *testing.T) { t.Setenv(experimentalSnapshotManagerFlag, "1") t.Parallel() - provider := NewResource("urn:pulumi:foo::bar::pulumi:providers:pkgA::provider") - provider.Custom, provider.Type, provider.ID = true, "pulumi:providers:pkgA", "id" + provider := NewResource("urn:pulumi:foo::bar::pulumi:providers:pkgB::provider") + provider.Custom, provider.Type, provider.ID = true, "pulumi:providers:pkgB", "id" resourceP := NewResource("a-unique-urn-resource-p") resourceA := NewResource("a-unique-urn-resource-a") @@ -128,7 +128,7 @@ func TestSamesWithOtherMeaningfulChangesUnsafe(t *testing.T) { // Change the "custom" bit. changes = append(changes, NewResource(string(resourceA.URN))) - changes[0].Custom, changes[0].Provider = true, "urn:pulumi:foo::bar::pulumi:providers:pkgA::provider::id" + changes[0].Custom, changes[0].Provider = true, "urn:pulumi:foo::bar::pulumi:providers:pkgB::provider::id" // Change the parent. changes = append(changes, NewResource(string(resourceA.URN))) @@ -188,11 +188,11 @@ func TestSamesWithOtherMeaningfulChangesUnsafe(t *testing.T) { } // Set up a second provider and change the resource's provider reference. - provider2 := NewResource("urn:pulumi:foo::bar::pulumi:providers:pkgA::provider2") - provider2.Custom, provider2.Type, provider2.ID = true, "pulumi:providers:pkgA", "id2" + provider2 := NewResource("urn:pulumi:foo::bar::pulumi:providers:pkgB::provider2") + provider2.Custom, provider2.Type, provider2.ID = true, "pulumi:providers:pkgB", "id2" resourceA.Custom, resourceA.ID, resourceA.Provider = - true, "id", "urn:pulumi:foo::bar::pulumi:providers:pkgA::provider::id" + true, "id", "urn:pulumi:foo::bar::pulumi:providers:pkgB::provider::id" snap = NewSnapshot([]*resource.State{ provider, @@ -201,7 +201,7 @@ func TestSamesWithOtherMeaningfulChangesUnsafe(t *testing.T) { }) changes = []*resource.State{NewResource(string(resourceA.URN))} - changes[0].Custom, changes[0].Provider = true, "urn:pulumi:foo::bar::pulumi:providers:pkgA::provider2::id2" + changes[0].Custom, changes[0].Provider = true, "urn:pulumi:foo::bar::pulumi:providers:pkgB::provider2::id2" for _, c := range changes { manager, sp := MockSetup(t, snap) From 5cd306544f5dcee6bc9c8aac2370892391c3f6f3 Mon Sep 17 00:00:00 2001 From: Kyle Dixler Date: Tue, 13 Sep 2022 12:40:36 -0700 Subject: [PATCH 04/11] removed unnecessary tests --- pkg/backend/snapshot_test.go | 53 +++ pkg/backend/unsafe_snapshot_test.go | 710 ---------------------------- 2 files changed, 53 insertions(+), 710 deletions(-) delete mode 100644 pkg/backend/unsafe_snapshot_test.go diff --git a/pkg/backend/snapshot_test.go b/pkg/backend/snapshot_test.go index 5d36b78e6ac6..43a14181dc6c 100644 --- a/pkg/backend/snapshot_test.go +++ b/pkg/backend/snapshot_test.go @@ -250,6 +250,59 @@ func TestSamesWithDependencyChanges(t *testing.T) { assert.Equal(t, resourceB.URN, secondSnap.Resources[1].Dependencies[0]) } +// This test checks that we only write the Checkpoint once whether or not there +// are important changes +func TestWriteCheckpointOnceUnsafe(t *testing.T) { + t.Setenv(experimentalSnapshotManagerFlag, "1") + + provider := NewResource("urn:pulumi:foo::bar::pulumi:providers:pkgA::provider") + provider.Custom, provider.Type, provider.ID = true, "pulumi:providers:pkgA", "id" + + resourceP := NewResource("a-unique-urn-resource-p") + resourceA := NewResource("a-unique-urn-resource-a") + + snap := NewSnapshot([]*resource.State{ + provider, + resourceP, + resourceA, + }) + + manager, sp := MockSetup(t, snap) + + // Generate a same for the provider. + provUpdated := NewResource(string(provider.URN)) + provUpdated.Custom, provUpdated.Type = true, provider.Type + provSame := deploy.NewSameStep(nil, nil, provider, provUpdated) + mutation, err := manager.BeginMutation(provSame) + assert.NoError(t, err) + _, _, err = provSame.Apply(false) + assert.NoError(t, err) + err = mutation.End(provSame, true) + assert.NoError(t, err) + + pUpdated := NewResource(string(resourceP.URN)) + pUpdated.Protect = !resourceP.Protect + pSame := deploy.NewSameStep(nil, nil, resourceP, pUpdated) + mutation, err = manager.BeginMutation(pSame) + assert.NoError(t, err) + err = mutation.End(pSame, true) + assert.NoError(t, err) + + // The engine generates a Same for b. Because this is a meaningful change, the snapshot is written: + aUpdated := NewResource(string(resourceA.URN)) + aUpdated.Protect = !resourceA.Protect + aSame := deploy.NewSameStep(nil, nil, resourceA, aUpdated) + mutation, err = manager.BeginMutation(aSame) + assert.NoError(t, err) + err = mutation.End(aSame, true) + assert.NoError(t, err) + + err = manager.Close() + assert.NoError(t, err) + + assert.Len(t, sp.SavedSnapshots, 1) +} + // This test exercises same steps with meaningful changes to properties _other_ than `Dependencies` in order to ensure // that the snapshot is written. func TestSamesWithOtherMeaningfulChanges(t *testing.T) { diff --git a/pkg/backend/unsafe_snapshot_test.go b/pkg/backend/unsafe_snapshot_test.go deleted file mode 100644 index ef82226855d4..000000000000 --- a/pkg/backend/unsafe_snapshot_test.go +++ /dev/null @@ -1,710 +0,0 @@ -// Copyright 2016-2018, Pulumi Corporation. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// this file is copied from snapshot_test.go - -package backend - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/pulumi/pulumi/pkg/v3/resource/deploy" - "github.com/pulumi/pulumi/pkg/v3/resource/stack" - "github.com/pulumi/pulumi/sdk/v3/go/common/resource" - "github.com/pulumi/pulumi/sdk/v3/go/common/resource/config" - "github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin" -) - -func TestIdenticalSamesUnsafe(t *testing.T) { - t.Setenv(experimentalSnapshotManagerFlag, "1") - t.Parallel() - - sameState := NewResource("a-unique-urn") - snap := NewSnapshot([]*resource.State{ - sameState, - }) - - manager, sp := MockSetup(t, snap) - - // The engine generates a SameStep on sameState. - engineGeneratedSame := NewResource(string(sameState.URN)) - same := deploy.NewSameStep(nil, nil, sameState, engineGeneratedSame) - - mutation, err := manager.BeginMutation(same) - assert.NoError(t, err) - // No mutation was made - assert.Empty(t, sp.SavedSnapshots) - - err = mutation.End(same, true) - assert.NoError(t, err) - - // Identical sames do not cause a snapshot mutation as part of `End`. - assert.Empty(t, sp.SavedSnapshots) - - // Close must write the snapshot. - err = manager.Close() - assert.NoError(t, err) - assert.Len(t, sp.SavedSnapshots, 1) -} - -func TestSamesWithEmptyDependenciesUnsafe(t *testing.T) { - t.Setenv(experimentalSnapshotManagerFlag, "1") - t.Parallel() - - res := NewResourceWithDeps("a-unique-urn-resource-a", nil) - snap := NewSnapshot([]*resource.State{ - res, - }) - manager, sp := MockSetup(t, snap) - resUpdated := NewResourceWithDeps(string(res.URN), []resource.URN{}) - same := deploy.NewSameStep(nil, nil, res, resUpdated) - mutation, err := manager.BeginMutation(same) - assert.NoError(t, err) - err = mutation.End(same, true) - assert.NoError(t, err) - err = manager.Close() - assert.NoError(t, err) - assert.Len(t, sp.SavedSnapshots, 1) -} - -func TestSamesWithEmptyArraysInInputsUnsafe(t *testing.T) { - t.Setenv(experimentalSnapshotManagerFlag, "1") - t.Parallel() - - // Model reading from state file - state := map[string]interface{}{"defaults": []interface{}{}} - inputs, err := stack.DeserializeProperties(state, config.NopDecrypter, config.NopEncrypter) - assert.NoError(t, err) - - res := NewResourceWithInputs("a-unique-urn-resource-a", inputs) - snap := NewSnapshot([]*resource.State{ - res, - }) - manager, sp := MockSetup(t, snap) - - // Model passing into and back out of RPC layer (e.g. via `Check`) - marshalledInputs, err := plugin.MarshalProperties(inputs, plugin.MarshalOptions{}) - assert.NoError(t, err) - inputsUpdated, err := plugin.UnmarshalProperties(marshalledInputs, plugin.MarshalOptions{}) - assert.NoError(t, err) - - resUpdated := NewResourceWithInputs(string(res.URN), inputsUpdated) - same := deploy.NewSameStep(nil, nil, res, resUpdated) - mutation, err := manager.BeginMutation(same) - assert.NoError(t, err) - err = mutation.End(same, true) - assert.NoError(t, err) - err = manager.Close() - assert.NoError(t, err) - assert.Len(t, sp.SavedSnapshots, 1) -} - -// This test exercises same steps with meaningful changes to properties _other_ than `Dependencies` in order to ensure -// that the snapshot is written. -func TestSamesWithOtherMeaningfulChangesUnsafe(t *testing.T) { - t.Setenv(experimentalSnapshotManagerFlag, "1") - t.Parallel() - - provider := NewResource("urn:pulumi:foo::bar::pulumi:providers:pkgB::provider") - provider.Custom, provider.Type, provider.ID = true, "pulumi:providers:pkgB", "id" - - resourceP := NewResource("a-unique-urn-resource-p") - resourceA := NewResource("a-unique-urn-resource-a") - - var changes []*resource.State - - // Change the "custom" bit. - changes = append(changes, NewResource(string(resourceA.URN))) - changes[0].Custom, changes[0].Provider = true, "urn:pulumi:foo::bar::pulumi:providers:pkgB::provider::id" - - // Change the parent. - changes = append(changes, NewResource(string(resourceA.URN))) - changes[1].Parent = resourceP.URN - - // Change the "protect" bit. - changes = append(changes, NewResource(string(resourceA.URN))) - changes[2].Protect = !resourceA.Protect - - // Change the resource outputs. - changes = append(changes, NewResource(string(resourceA.URN))) - changes[3].Outputs = resource.PropertyMap{"foo": resource.NewStringProperty("bar")} - - snap := NewSnapshot([]*resource.State{ - provider, - resourceP, - resourceA, - }) - - for _, c := range changes { - manager, sp := MockSetup(t, snap) - - // Generate a same for the provider. - provUpdated := NewResource(string(provider.URN)) - provUpdated.Custom, provUpdated.Type = true, provider.Type - provSame := deploy.NewSameStep(nil, nil, provider, provUpdated) - mutation, err := manager.BeginMutation(provSame) - assert.NoError(t, err) - _, _, err = provSame.Apply(false) - assert.NoError(t, err) - err = mutation.End(provSame, true) - assert.NoError(t, err) - assert.Empty(t, sp.SavedSnapshots) - - // The engine generates a Same for p. This is not a meaningful change, so the snapshot is not written. - pUpdated := NewResource(string(resourceP.URN)) - pSame := deploy.NewSameStep(nil, nil, resourceP, pUpdated) - mutation, err = manager.BeginMutation(pSame) - assert.NoError(t, err) - err = mutation.End(pSame, true) - assert.NoError(t, err) - assert.Empty(t, sp.SavedSnapshots) - - // The engine generates a Same for a. Because this is a meaningful change, the snapshot is written: - aSame := deploy.NewSameStep(nil, nil, resourceA, c) - mutation, err = manager.BeginMutation(aSame) - assert.NoError(t, err) - err = mutation.End(aSame, true) - assert.NoError(t, err) - - err = manager.Close() - assert.NoError(t, err) - inSnapshot := sp.SavedSnapshots[0].Resources[2] - assert.Equal(t, c, inSnapshot) - - assert.Len(t, sp.SavedSnapshots, 1) - } - - // Set up a second provider and change the resource's provider reference. - provider2 := NewResource("urn:pulumi:foo::bar::pulumi:providers:pkgB::provider2") - provider2.Custom, provider2.Type, provider2.ID = true, "pulumi:providers:pkgB", "id2" - - resourceA.Custom, resourceA.ID, resourceA.Provider = - true, "id", "urn:pulumi:foo::bar::pulumi:providers:pkgB::provider::id" - - snap = NewSnapshot([]*resource.State{ - provider, - provider2, - resourceA, - }) - - changes = []*resource.State{NewResource(string(resourceA.URN))} - changes[0].Custom, changes[0].Provider = true, "urn:pulumi:foo::bar::pulumi:providers:pkgB::provider2::id2" - - for _, c := range changes { - manager, sp := MockSetup(t, snap) - - // Generate sames for the providers. - provUpdated := NewResource(string(provider.URN)) - provUpdated.Custom, provUpdated.Type = true, provider.Type - provSame := deploy.NewSameStep(nil, nil, provider, provUpdated) - mutation, err := manager.BeginMutation(provSame) - assert.NoError(t, err) - _, _, err = provSame.Apply(false) - assert.NoError(t, err) - err = mutation.End(provSame, true) - assert.NoError(t, err) - assert.Empty(t, sp.SavedSnapshots) - - // The engine generates a Same for p. This is not a meaningful change, so the snapshot is not written. - prov2Updated := NewResource(string(provider2.URN)) - prov2Updated.Custom, prov2Updated.Type = true, provider.Type - prov2Same := deploy.NewSameStep(nil, nil, provider2, prov2Updated) - mutation, err = manager.BeginMutation(prov2Same) - assert.NoError(t, err) - _, _, err = prov2Same.Apply(false) - assert.NoError(t, err) - err = mutation.End(prov2Same, true) - assert.NoError(t, err) - assert.Empty(t, sp.SavedSnapshots) - - // The engine generates a Same for a. Because this is a meaningful change, the snapshot is written: - aSame := deploy.NewSameStep(nil, nil, resourceA, c) - mutation, err = manager.BeginMutation(aSame) - assert.NoError(t, err) - _, _, err = aSame.Apply(false) - assert.NoError(t, err) - err = mutation.End(aSame, true) - assert.NoError(t, err) - - err = manager.Close() - assert.NoError(t, err) - assert.Len(t, sp.SavedSnapshots, 1) - } -} - -// This test exercises the merge operation with a particularly vexing deployment -// state that was useful in shaking out bugs. -func TestVexingDeploymentUnsafe(t *testing.T) { - t.Setenv(experimentalSnapshotManagerFlag, "1") - t.Parallel() - - // This is the dependency graph we are going for in the base snapshot: - // - // +-+ - // +--> |A| - // | +-+ - // | ^ - // | +-+ - // | |B| - // | +-+ - // | ^ - // | +-+ - // +--+ |C| <---+ - // +-+ | - // ^ | - // +-+ | - // |D| | - // +-+ | - // | - // +-+ | - // |E| +---+ - // +-+ - a := NewResource("a") - b := NewResource("b", a.URN) - c := NewResource("c", a.URN, b.URN) - d := NewResource("d", c.URN) - e := NewResource("e", c.URN) - snap := NewSnapshot([]*resource.State{ - a, - b, - c, - d, - e, - }) - - manager, sp := MockSetup(t, snap) - - // This is the sequence of events that come out of the engine: - // B - Same, depends on nothing - // C - CreateReplacement, depends on B - // C - Replace - // D - Update, depends on new C - - // This produces the following dependency graph in the new snapshot: - // +-+ - // +---> |B| - // | +++ - // | ^ - // | +++ - // | |C| <----+ - // | +-+ | - // | | - // | +-+ | - // +---+ |C| +-------------> A (not in graph!) - // +-+ | - // | - // +-+ | - // |D| +---+ - // +-+ - // - // Conceptually, this is a plan that deletes A. However, we have not yet observed the - // deletion of A, presumably because the engine can't know for sure that it's been deleted - // until the eval source completes. Of note in this snapshot is that the replaced C is still in the graph, - // because it has not yet been deleted, and its dependency A is not in the graph because it - // has not been seen. - // - // Since axiomatically we assume that steps come in in a valid topological order of the dependency graph, - // we can logically assume that A is going to be deleted. (If A were not being deleted, it must have been - // the target of a Step that came before C, which depends on it.) - applyStep := func(step deploy.Step) { - mutation, err := manager.BeginMutation(step) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = mutation.End(step, true) - if !assert.NoError(t, err) { - t.FailNow() - } - } - - // b now depends on nothing - bPrime := NewResource(string(b.URN)) - applyStep(deploy.NewSameStep(nil, MockRegisterResourceEvent{}, b, bPrime)) - - // c now only depends on b - cPrime := NewResource(string(c.URN), bPrime.URN) - - // mocking out the behavior of a provider indicating that this resource needs to be deleted - createReplacement := deploy.NewCreateReplacementStep(nil, MockRegisterResourceEvent{}, c, cPrime, nil, nil, nil, true) - replace := deploy.NewReplaceStep(nil, c, cPrime, nil, nil, nil, true) - c.Delete = true - - applyStep(createReplacement) - applyStep(replace) - - // cPrime now exists, c is now pending deletion - // dPrime now depends on cPrime, which got replaced - dPrime := NewResource(string(d.URN), cPrime.URN) - applyStep(deploy.NewUpdateStep(nil, MockRegisterResourceEvent{}, d, dPrime, nil, nil, nil, nil)) - - err := manager.Close() - assert.NoError(t, err) - assert.Len(t, sp.SavedSnapshots, 1) -} - -func TestDeletionUnsafe(t *testing.T) { - t.Setenv(experimentalSnapshotManagerFlag, "1") - t.Parallel() - - resourceA := NewResource("a") - snap := NewSnapshot([]*resource.State{ - resourceA, - }) - - manager, sp := MockSetup(t, snap) - step := deploy.NewDeleteStep(nil, resourceA) - mutation, err := manager.BeginMutation(step) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = mutation.End(step, true) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = manager.Close() - assert.NoError(t, err) - assert.Len(t, sp.SavedSnapshots, 1) -} - -func TestFailedDeleteUnsafe(t *testing.T) { - t.Setenv(experimentalSnapshotManagerFlag, "1") - t.Parallel() - - resourceA := NewResource("a") - snap := NewSnapshot([]*resource.State{ - resourceA, - }) - - manager, sp := MockSetup(t, snap) - step := deploy.NewDeleteStep(nil, resourceA) - mutation, err := manager.BeginMutation(step) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = mutation.End(step, false /* successful */) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = manager.Close() - assert.NoError(t, err) - assert.Len(t, sp.SavedSnapshots, 1) -} - -func TestRecordingCreateSuccessUnsafe(t *testing.T) { - t.Setenv(experimentalSnapshotManagerFlag, "1") - t.Parallel() - - resourceA := NewResource("a") - snap := NewSnapshot(nil) - manager, sp := MockSetup(t, snap) - step := deploy.NewCreateStep(nil, &MockRegisterResourceEvent{}, resourceA) - mutation, err := manager.BeginMutation(step) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = mutation.End(step, true /* successful */) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = manager.Close() - assert.NoError(t, err) - assert.Len(t, sp.SavedSnapshots, 1) -} - -func TestRecordingCreateFailureUnsafe(t *testing.T) { - t.Setenv(experimentalSnapshotManagerFlag, "1") - t.Parallel() - - resourceA := NewResource("a") - snap := NewSnapshot(nil) - manager, sp := MockSetup(t, snap) - step := deploy.NewCreateStep(nil, &MockRegisterResourceEvent{}, resourceA) - mutation, err := manager.BeginMutation(step) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = mutation.End(step, false /* successful */) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = manager.Close() - assert.NoError(t, err) - assert.Len(t, sp.SavedSnapshots, 1) -} - -func TestRecordingUpdateSuccessUnsafe(t *testing.T) { - t.Setenv(experimentalSnapshotManagerFlag, "1") - t.Parallel() - - resourceA := NewResource("a") - resourceA.Inputs["key"] = resource.NewStringProperty("old") - resourceANew := NewResource("a") - resourceANew.Inputs["key"] = resource.NewStringProperty("new") - snap := NewSnapshot([]*resource.State{ - resourceA, - }) - - manager, sp := MockSetup(t, snap) - step := deploy.NewUpdateStep(nil, &MockRegisterResourceEvent{}, resourceA, resourceANew, nil, nil, nil, nil) - mutation, err := manager.BeginMutation(step) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = mutation.End(step, true /* successful */) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = manager.Close() - assert.NoError(t, err) - assert.Len(t, sp.SavedSnapshots, 1) -} - -func TestRecordingUpdateFailureUnsafe(t *testing.T) { - t.Setenv(experimentalSnapshotManagerFlag, "1") - t.Parallel() - - resourceA := NewResource("a") - resourceA.Inputs["key"] = resource.NewStringProperty("old") - resourceANew := NewResource("a") - resourceANew.Inputs["key"] = resource.NewStringProperty("new") - snap := NewSnapshot([]*resource.State{ - resourceA, - }) - - manager, sp := MockSetup(t, snap) - step := deploy.NewUpdateStep(nil, &MockRegisterResourceEvent{}, resourceA, resourceANew, nil, nil, nil, nil) - mutation, err := manager.BeginMutation(step) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = mutation.End(step, false /* successful */) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = manager.Close() - assert.NoError(t, err) - assert.Len(t, sp.SavedSnapshots, 1) -} - -func TestRecordingDeleteSuccessUnsafe(t *testing.T) { - t.Setenv(experimentalSnapshotManagerFlag, "1") - t.Parallel() - - resourceA := NewResource("a") - snap := NewSnapshot([]*resource.State{ - resourceA, - }) - manager, sp := MockSetup(t, snap) - step := deploy.NewDeleteStep(nil, resourceA) - mutation, err := manager.BeginMutation(step) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = mutation.End(step, true /* successful */) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = manager.Close() - assert.NoError(t, err) - assert.Len(t, sp.SavedSnapshots, 1) -} - -func TestRecordingDeleteFailureUnsafe(t *testing.T) { - t.Setenv(experimentalSnapshotManagerFlag, "1") - t.Parallel() - - resourceA := NewResource("a") - snap := NewSnapshot([]*resource.State{ - resourceA, - }) - manager, sp := MockSetup(t, snap) - step := deploy.NewDeleteStep(nil, resourceA) - mutation, err := manager.BeginMutation(step) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = mutation.End(step, false /* successful */) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = manager.Close() - assert.NoError(t, err) - assert.Len(t, sp.SavedSnapshots, 1) -} - -func TestRecordingReadSuccessNoPreviousResourceUnsafe(t *testing.T) { - t.Setenv(experimentalSnapshotManagerFlag, "1") - t.Parallel() - - resourceA := NewResource("b") - resourceA.ID = "some-b" - resourceA.External = true - resourceA.Custom = true - snap := NewSnapshot(nil) - manager, sp := MockSetup(t, snap) - step := deploy.NewReadStep(nil, nil, nil, resourceA) - mutation, err := manager.BeginMutation(step) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = mutation.End(step, true /* successful */) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = manager.Close() - assert.NoError(t, err) - assert.Len(t, sp.SavedSnapshots, 1) -} - -func TestRecordingReadSuccessPreviousResourceUnsafe(t *testing.T) { - t.Setenv(experimentalSnapshotManagerFlag, "1") - t.Parallel() - - resourceA := NewResource("c") - resourceA.ID = "some-c" - resourceA.External = true - resourceA.Custom = true - resourceA.Inputs["key"] = resource.NewStringProperty("old") - resourceANew := NewResource("c") - resourceANew.ID = "some-other-c" - resourceANew.External = true - resourceANew.Custom = true - resourceANew.Inputs["key"] = resource.NewStringProperty("new") - - snap := NewSnapshot([]*resource.State{ - resourceA, - }) - manager, sp := MockSetup(t, snap) - step := deploy.NewReadStep(nil, nil, resourceA, resourceANew) - mutation, err := manager.BeginMutation(step) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = mutation.End(step, true /* successful */) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = manager.Close() - assert.NoError(t, err) - assert.Len(t, sp.SavedSnapshots, 1) -} - -func TestRecordingReadFailureNoPreviousResourceUnsafe(t *testing.T) { - t.Setenv(experimentalSnapshotManagerFlag, "1") - t.Parallel() - - resourceA := NewResource("d") - resourceA.ID = "some-d" - resourceA.External = true - resourceA.Custom = true - snap := NewSnapshot(nil) - manager, sp := MockSetup(t, snap) - step := deploy.NewReadStep(nil, nil, nil, resourceA) - mutation, err := manager.BeginMutation(step) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = mutation.End(step, false /* successful */) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = manager.Close() - assert.NoError(t, err) - assert.Len(t, sp.SavedSnapshots, 1) -} - -func TestRecordingReadFailurePreviousResourceUnsafe(t *testing.T) { - t.Setenv(experimentalSnapshotManagerFlag, "1") - t.Parallel() - - resourceA := NewResource("e") - resourceA.ID = "some-e" - resourceA.External = true - resourceA.Custom = true - resourceA.Inputs["key"] = resource.NewStringProperty("old") - resourceANew := NewResource("e") - resourceANew.ID = "some-new-e" - resourceANew.External = true - resourceANew.Custom = true - resourceANew.Inputs["key"] = resource.NewStringProperty("new") - - snap := NewSnapshot([]*resource.State{ - resourceA, - }) - manager, sp := MockSetup(t, snap) - step := deploy.NewReadStep(nil, nil, resourceA, resourceANew) - mutation, err := manager.BeginMutation(step) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = mutation.End(step, false /* successful */) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = manager.Close() - assert.NoError(t, err) - assert.Len(t, sp.SavedSnapshots, 1) -} - -func TestRegisterOutputsUnsafe(t *testing.T) { - t.Setenv(experimentalSnapshotManagerFlag, "1") - t.Parallel() - - resourceA := NewResource("a") - snap := NewSnapshot([]*resource.State{ - resourceA, - }) - manager, sp := MockSetup(t, snap) - - // There should be zero snaps performed at the start. - assert.Len(t, sp.SavedSnapshots, 0) - - // The step here is not important. - step := deploy.NewSameStep(nil, nil, resourceA, resourceA) - err := manager.RegisterResourceOutputs(step) - if !assert.NoError(t, err) { - t.FailNow() - } - - err = manager.Close() - assert.NoError(t, err) - assert.Len(t, sp.SavedSnapshots, 1) -} From 66d41894bd504cb895601c02038864b8f911cc08 Mon Sep 17 00:00:00 2001 From: Kyle Dixler Date: Tue, 13 Sep 2022 15:16:19 -0700 Subject: [PATCH 05/11] added tests --- pkg/backend/snapshot.go | 5 -- tests/integration/integration_nodejs_test.go | 15 +++++ .../bad_resource/Pulumi.yaml | 2 + .../bad_resource/index.ts | 17 +++++ .../bad_resource/package.json | 13 ++++ .../bad_resource/resource.ts | 67 +++++++++++++++++++ 6 files changed, 114 insertions(+), 5 deletions(-) create mode 100644 tests/integration/unsafe_snapshot_tests/bad_resource/Pulumi.yaml create mode 100644 tests/integration/unsafe_snapshot_tests/bad_resource/index.ts create mode 100644 tests/integration/unsafe_snapshot_tests/bad_resource/package.json create mode 100644 tests/integration/unsafe_snapshot_tests/bad_resource/resource.ts diff --git a/pkg/backend/snapshot.go b/pkg/backend/snapshot.go index eece781823b4..3112c00f73b9 100644 --- a/pkg/backend/snapshot.go +++ b/pkg/backend/snapshot.go @@ -640,17 +640,12 @@ func (sm *SnapshotManager) saveSnapshot() error { } func (sm *SnapshotManager) unsafeServiceLoop(mutationRequests chan mutationRequest, done chan error) { - hasIntegrity := true for { select { case request := <-mutationRequests: request.mutator() request.result <- nil case <-sm.cancel: - if !hasIntegrity { - fmt.Println("warning snapshot integrity compromised, run `pulumi refresh`") - return - } done <- sm.saveSnapshot() return } diff --git a/tests/integration/integration_nodejs_test.go b/tests/integration/integration_nodejs_test.go index 426364edfaaf..56932d9b7127 100644 --- a/tests/integration/integration_nodejs_test.go +++ b/tests/integration/integration_nodejs_test.go @@ -1313,3 +1313,18 @@ func TestTSConfigOption(t *testing.T) { e.RunCommand("pulumi", "stack", "select", "tsconfg", "--create") e.RunCommand("pulumi", "preview") } + +func TestUnsafeSnapshotManagerRetainsResourcesOnError(t *testing.T) { + integration.ProgramTest(t, &integration.ProgramTestOptions{ + Dir: filepath.Join("unsafe_snapshot_tests", "bad_resource"), + Dependencies: []string{"@pulumi/pulumi"}, + Env: []string{"PULUMI_EXPERIMENTAL_SNAPSHOT_MANAGER=1"}, + Quick: true, + ExpectFailure: true, + ExtraRuntimeValidation: func(t *testing.T, stackInfo integration.RuntimeValidationStackInfo) { + // Ensure the checkpoint contains a single resource, a provider, and the Stack + assert.NotNil(t, stackInfo.Deployment) + assert.Equal(t, 1000+3, len(stackInfo.Deployment.Resources)) + }, + }) +} diff --git a/tests/integration/unsafe_snapshot_tests/bad_resource/Pulumi.yaml b/tests/integration/unsafe_snapshot_tests/bad_resource/Pulumi.yaml new file mode 100644 index 000000000000..d643f75df4b1 --- /dev/null +++ b/tests/integration/unsafe_snapshot_tests/bad_resource/Pulumi.yaml @@ -0,0 +1,2 @@ +name: bad_resource +runtime: nodejs diff --git a/tests/integration/unsafe_snapshot_tests/bad_resource/index.ts b/tests/integration/unsafe_snapshot_tests/bad_resource/index.ts new file mode 100644 index 000000000000..8aee5c40d69f --- /dev/null +++ b/tests/integration/unsafe_snapshot_tests/bad_resource/index.ts @@ -0,0 +1,17 @@ +// Copyright 2016-2022, Pulumi Corporation. All rights reserved. +import * as process from "process"; +import { Resource } from "./resource"; +// Base depends on nothing. +const a = new Resource("base", { uniqueKey: 1, state: 99 }); + +for(let i = 0; i < 1000; i++) { + new Resource(`base-${i}`, { uniqueKey: 100+i, state: 99 }); +} + +// Dependent depends on Base with state 99. +new Resource("dependent", { uniqueKey: a.state.apply(() => { + if (process.env["PULUMI_NODEJS_DRY_RUN"] != "true") { + throw Error("`base` should be created and `dependent` should not"); + } + return 1; +}), state: a.state }); diff --git a/tests/integration/unsafe_snapshot_tests/bad_resource/package.json b/tests/integration/unsafe_snapshot_tests/bad_resource/package.json new file mode 100644 index 000000000000..40511f5a28dd --- /dev/null +++ b/tests/integration/unsafe_snapshot_tests/bad_resource/package.json @@ -0,0 +1,13 @@ +{ + "name": "stack_project_name", + "license": "Apache-2.0", + "devDependencies": { + "typescript": "^3.0.0" + }, + "peerDependencies": { + "@pulumi/pulumi": "latest" + }, + "dependencies": { + "@types/node": "^18.7.17" + } +} diff --git a/tests/integration/unsafe_snapshot_tests/bad_resource/resource.ts b/tests/integration/unsafe_snapshot_tests/bad_resource/resource.ts new file mode 100644 index 000000000000..9aeecfd697ac --- /dev/null +++ b/tests/integration/unsafe_snapshot_tests/bad_resource/resource.ts @@ -0,0 +1,67 @@ +// Copyright 2016-2018, Pulumi Corporation. All rights reserved. + +import * as pulumi from "@pulumi/pulumi"; +import * as dynamic from "@pulumi/pulumi/dynamic"; + +export class Provider implements dynamic.ResourceProvider { + public static readonly instance = new Provider(); + + private id: number = 0; + + public async check(olds: any, news: any): Promise { + // When the engine re-creates a resource after it was deleted, it should + // not pass the old (deleted) inputs to Check when re-creating. + // + // This Check implementation fails the test if this happens. + if (olds.state === 99 && news.state === 22) { + return { + inputs: news, + failures: [ + { + property: "state", + reason: "engine did invalid comparison of old and new check inputs for recreated resource", + }, + ], + }; + } + + return { + inputs: news, + }; + } + + public async diff(id: pulumi.ID, olds: any, news: any): Promise { + if (olds.state !== news.state) { + return { + changes: true, + replaces: ["state"], + deleteBeforeReplace: true, + }; + } + + return { + changes: false, + }; + } + + public async create(inputs: any): Promise { + return { + id: (this.id++).toString(), + outs: inputs, + }; + } +} + +export class Resource extends pulumi.dynamic.Resource { + public uniqueKey?: pulumi.Output; + public state: pulumi.Output; + + constructor(name: string, props: ResourceProps, opts?: pulumi.ResourceOptions) { + super(Provider.instance, name, props, opts); + } +} + +export interface ResourceProps { + readonly uniqueKey?: pulumi.Input; + readonly state: pulumi.Input; +} From d13aeb17e93769b5d1208987a4229272635b5dd0 Mon Sep 17 00:00:00 2001 From: Kyle Dixler Date: Wed, 14 Sep 2022 10:01:42 -0700 Subject: [PATCH 06/11] iterating on comments --- pkg/backend/snapshot.go | 9 +++++++-- pkg/backend/snapshot_test.go | 10 ++++++++-- tests/integration/integration_nodejs_test.go | 19 +++++++++++++------ 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/pkg/backend/snapshot.go b/pkg/backend/snapshot.go index 3112c00f73b9..b6fc072258bc 100644 --- a/pkg/backend/snapshot.go +++ b/pkg/backend/snapshot.go @@ -639,6 +639,9 @@ func (sm *SnapshotManager) saveSnapshot() error { return nil } +// unsafeServiceLoop doesn't save Snapshots when mutations occur and instead saves Snapshots when +// SnapshotManager.Close() is invoked. It trades reliability for speed as every mutation does not +// cause a Snapshot to be serialized to the user's state backend. func (sm *SnapshotManager) unsafeServiceLoop(mutationRequests chan mutationRequest, done chan error) { for { select { @@ -677,8 +680,10 @@ func NewSnapshotManager(persister SnapshotPersister, baseSnap *deploy.Snapshot) go func() { unsafeEnabled := os.Getenv(experimentalSnapshotManagerFlag) != "" if unsafeEnabled { - // this codepath skips writing back snapshots - // on all mutations. It uses internal state + // This codepath skips writing back snapshots on all mutations. + // The final snapshot is written back when `SnapshotManager.Close()` is called. + // This trades reliability for speed and can cause state to be lost if Close() + // is not called or successful. manager.unsafeServiceLoop(mutationRequests, done) return } diff --git a/pkg/backend/snapshot_test.go b/pkg/backend/snapshot_test.go index 43a14181dc6c..97c8b0a690d3 100644 --- a/pkg/backend/snapshot_test.go +++ b/pkg/backend/snapshot_test.go @@ -251,7 +251,8 @@ func TestSamesWithDependencyChanges(t *testing.T) { } // This test checks that we only write the Checkpoint once whether or not there -// are important changes +// are important changes when the `PULUMI_EXPERIMENTAL_SNAPSHOT_MANAGER` envvar +// is provided func TestWriteCheckpointOnceUnsafe(t *testing.T) { t.Setenv(experimentalSnapshotManagerFlag, "1") @@ -280,6 +281,7 @@ func TestWriteCheckpointOnceUnsafe(t *testing.T) { err = mutation.End(provSame, true) assert.NoError(t, err) + // The engine generates a meaningful change, the DEFAULT behavior is that a snapshot is written: pUpdated := NewResource(string(resourceP.URN)) pUpdated.Protect = !resourceP.Protect pSame := deploy.NewSameStep(nil, nil, resourceP, pUpdated) @@ -288,7 +290,7 @@ func TestWriteCheckpointOnceUnsafe(t *testing.T) { err = mutation.End(pSame, true) assert.NoError(t, err) - // The engine generates a Same for b. Because this is a meaningful change, the snapshot is written: + // The engine generates a meaningful change, the DEFAULT behavior is that a snapshot is written: aUpdated := NewResource(string(resourceA.URN)) aUpdated.Protect = !resourceA.Protect aSame := deploy.NewSameStep(nil, nil, resourceA, aUpdated) @@ -297,9 +299,13 @@ func TestWriteCheckpointOnceUnsafe(t *testing.T) { err = mutation.End(aSame, true) assert.NoError(t, err) + // a `Close()` call is required to write back the snapshots. + // It is called in all of the references to SnapshotManager. err = manager.Close() assert.NoError(t, err) + // DEFAULT behavior would cause more than 1 snapshot to be written, + // but the provided flag should only create 1 Snapshot assert.Len(t, sp.SavedSnapshots, 1) } diff --git a/tests/integration/integration_nodejs_test.go b/tests/integration/integration_nodejs_test.go index 56932d9b7127..0649ed444272 100644 --- a/tests/integration/integration_nodejs_test.go +++ b/tests/integration/integration_nodejs_test.go @@ -1314,17 +1314,24 @@ func TestTSConfigOption(t *testing.T) { e.RunCommand("pulumi", "preview") } +// This tests that despite an exception, that the snapshot is still written. func TestUnsafeSnapshotManagerRetainsResourcesOnError(t *testing.T) { integration.ProgramTest(t, &integration.ProgramTestOptions{ - Dir: filepath.Join("unsafe_snapshot_tests", "bad_resource"), - Dependencies: []string{"@pulumi/pulumi"}, - Env: []string{"PULUMI_EXPERIMENTAL_SNAPSHOT_MANAGER=1"}, - Quick: true, + Dir: filepath.Join("unsafe_snapshot_tests", "bad_resource"), + Dependencies: []string{"@pulumi/pulumi"}, + Env: []string{"PULUMI_EXPERIMENTAL_SNAPSHOT_MANAGER=1"}, + Quick: true, + // The program throws an exception and 1 resource fails to be created. ExpectFailure: true, ExtraRuntimeValidation: func(t *testing.T, stackInfo integration.RuntimeValidationStackInfo) { - // Ensure the checkpoint contains a single resource, a provider, and the Stack + // Ensure the checkpoint contains the 1003 other resources that were created + // - stack + // - provider + // - `base` resource + // - 1000 resources(via a for loop) + // - NOT a resource that failed to be created dependent on the `base` resource output assert.NotNil(t, stackInfo.Deployment) - assert.Equal(t, 1000+3, len(stackInfo.Deployment.Resources)) + assert.Equal(t, 3+1000, len(stackInfo.Deployment.Resources)) }, }) } From d3e82b75a4998215a615bb8fe292bc03792be1f4 Mon Sep 17 00:00:00 2001 From: Kyle Dixler Date: Wed, 14 Sep 2022 11:28:58 -0700 Subject: [PATCH 07/11] fixed lint --- pkg/backend/snapshot_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/backend/snapshot_test.go b/pkg/backend/snapshot_test.go index 97c8b0a690d3..539d95e77d1d 100644 --- a/pkg/backend/snapshot_test.go +++ b/pkg/backend/snapshot_test.go @@ -253,11 +253,13 @@ func TestSamesWithDependencyChanges(t *testing.T) { // This test checks that we only write the Checkpoint once whether or not there // are important changes when the `PULUMI_EXPERIMENTAL_SNAPSHOT_MANAGER` envvar // is provided +// +//nolint:paralleltest // mutates environment variables func TestWriteCheckpointOnceUnsafe(t *testing.T) { t.Setenv(experimentalSnapshotManagerFlag, "1") - provider := NewResource("urn:pulumi:foo::bar::pulumi:providers:pkgA::provider") - provider.Custom, provider.Type, provider.ID = true, "pulumi:providers:pkgA", "id" + provider := NewResource("urn:pulumi:foo::bar::pulumi:providers:pkgUnsafe::provider") + provider.Custom, provider.Type, provider.ID = true, "pulumi:providers:pkgUnsafe", "id" resourceP := NewResource("a-unique-urn-resource-p") resourceA := NewResource("a-unique-urn-resource-a") From 22f2989c196daf984d062e75ea18f45ba5e39192 Mon Sep 17 00:00:00 2001 From: Aaron Friel Date: Wed, 14 Sep 2022 14:49:16 -0700 Subject: [PATCH 08/11] ci: Fix package parallelism assignment --- scripts/retry | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/retry b/scripts/retry index 4c3088d14645..37cfefc4a69b 100755 --- a/scripts/retry +++ b/scripts/retry @@ -30,7 +30,7 @@ run_tests() { attempts=$((attempts + 1)) export GO_TEST_PARALLELISM=$((GO_TEST_PARALLELISM <= 2 ? 1 : GO_TEST_PARALLELISM / 2)) - export GO_TEST_PKG_PARALLELISM=$((GO_TEST_PARALLELISM <= 2 ? 1 : GO_TEST_PKG_PARALLELISM / 2)) + export GO_TEST_PKG_PARALLELISM=$((GO_TEST_PKG_PARALLELISM <= 2 ? 1 : GO_TEST_PKG_PARALLELISM / 2)) export GO_TEST_SHUFFLE="off" done From 9f5ec4a992c7b1c393cc8518db2a36039db7c6a6 Mon Sep 17 00:00:00 2001 From: Justin Van Patten Date: Thu, 15 Sep 2022 05:42:43 -0700 Subject: [PATCH 09/11] Add missing `ProgramTestOptions` overrides in `With` These options were previously added without also adding the override handling in `With`. --- pkg/testing/integration/program.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pkg/testing/integration/program.go b/pkg/testing/integration/program.go index db924b0207d0..54beae9e0d15 100644 --- a/pkg/testing/integration/program.go +++ b/pkg/testing/integration/program.go @@ -536,15 +536,33 @@ func (opts ProgramTestOptions) With(overrides ProgramTestOptions) ProgramTestOpt if overrides.PipenvBin != "" { opts.PipenvBin = overrides.PipenvBin } + if overrides.DotNetBin != "" { + opts.DotNetBin = overrides.DotNetBin + } if overrides.Env != nil { opts.Env = append(opts.Env, overrides.Env...) } + if overrides.UseAutomaticVirtualEnv { + opts.UseAutomaticVirtualEnv = overrides.UseAutomaticVirtualEnv + } if overrides.UsePipenv { opts.UsePipenv = overrides.UsePipenv } + if overrides.PreviewCompletedHook != nil { + opts.PreviewCompletedHook = overrides.PreviewCompletedHook + } + if overrides.JSONOutput { + opts.JSONOutput = overrides.JSONOutput + } + if overrides.ExportStateValidator != nil { + opts.ExportStateValidator = overrides.ExportStateValidator + } if overrides.PrepareProject != nil { opts.PrepareProject = overrides.PrepareProject } + if overrides.LocalDependencies != nil { + opts.LocalDependencies = append(opts.LocalDependencies, overrides.LocalDependencies...) + } return opts } From 56fe72c4ed600ed3345f68af458ea78fad0cdf62 Mon Sep 17 00:00:00 2001 From: Kyle Dixler Date: Thu, 15 Sep 2022 08:00:36 -0700 Subject: [PATCH 10/11] Update tests/integration/unsafe_snapshot_tests/bad_resource/resource.ts --- .../integration/unsafe_snapshot_tests/bad_resource/resource.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/unsafe_snapshot_tests/bad_resource/resource.ts b/tests/integration/unsafe_snapshot_tests/bad_resource/resource.ts index 9aeecfd697ac..1530f273760f 100644 --- a/tests/integration/unsafe_snapshot_tests/bad_resource/resource.ts +++ b/tests/integration/unsafe_snapshot_tests/bad_resource/resource.ts @@ -1,4 +1,4 @@ -// Copyright 2016-2018, Pulumi Corporation. All rights reserved. +// Copyright 2016-2022, Pulumi Corporation. All rights reserved. import * as pulumi from "@pulumi/pulumi"; import * as dynamic from "@pulumi/pulumi/dynamic"; From 9065d7caf06ecb970e98a7fba518745038069d9d Mon Sep 17 00:00:00 2001 From: Kyle Dixler Date: Thu, 15 Sep 2022 09:46:45 -0700 Subject: [PATCH 11/11] refactored defaultServiceLoop into its own method --- pkg/backend/snapshot.go | 78 ++++++++++++++++++++--------------------- 1 file changed, 38 insertions(+), 40 deletions(-) diff --git a/pkg/backend/snapshot.go b/pkg/backend/snapshot.go index b6fc072258bc..1d2d2b8f9994 100644 --- a/pkg/backend/snapshot.go +++ b/pkg/backend/snapshot.go @@ -639,6 +639,38 @@ func (sm *SnapshotManager) saveSnapshot() error { return nil } +// defaultServiceLoop saves a Snapshot whenever a mutation occurs +func (sm *SnapshotManager) defaultServiceLoop(mutationRequests chan mutationRequest, done chan error) { + // True if we have elided writes since the last actual write. + hasElidedWrites := false + + // Service each mutation request in turn. +serviceLoop: + for { + select { + case request := <-mutationRequests: + var err error + if request.mutator() { + err = sm.saveSnapshot() + hasElidedWrites = false + } else { + hasElidedWrites = true + } + request.result <- err + case <-sm.cancel: + break serviceLoop + } + } + + // If we still have elided writes once the channel has closed, flush the snapshot. + var err error + if hasElidedWrites { + logging.V(9).Infof("SnapshotManager: flushing elided writes...") + err = sm.saveSnapshot() + } + done <- err +} + // unsafeServiceLoop doesn't save Snapshots when mutations occur and instead saves Snapshots when // SnapshotManager.Close() is invoked. It trades reliability for speed as every mutation does not // cause a Snapshot to be serialized to the user's state backend. @@ -677,46 +709,12 @@ func NewSnapshotManager(persister SnapshotPersister, baseSnap *deploy.Snapshot) done: done, } - go func() { - unsafeEnabled := os.Getenv(experimentalSnapshotManagerFlag) != "" - if unsafeEnabled { - // This codepath skips writing back snapshots on all mutations. - // The final snapshot is written back when `SnapshotManager.Close()` is called. - // This trades reliability for speed and can cause state to be lost if Close() - // is not called or successful. - manager.unsafeServiceLoop(mutationRequests, done) - return - } - - // True if we have elided writes since the last actual write. - hasElidedWrites := false - - // Service each mutation request in turn. - serviceLoop: - for { - select { - case request := <-mutationRequests: - var err error - if request.mutator() { - err = manager.saveSnapshot() - hasElidedWrites = false - } else { - hasElidedWrites = true - } - request.result <- err - case <-cancel: - break serviceLoop - } - } - - // If we still have elided writes once the channel has closed, flush the snapshot. - var err error - if hasElidedWrites { - logging.V(9).Infof("SnapshotManager: flushing elided writes...") - err = manager.saveSnapshot() - } - done <- err - }() + serviceLoop := manager.defaultServiceLoop + unsafeEnabled := os.Getenv(experimentalSnapshotManagerFlag) != "" + if unsafeEnabled { + serviceLoop = manager.unsafeServiceLoop + } + go serviceLoop(mutationRequests, done) return manager }