Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
10687: add envvar to omit snapshot writes on each state mutation r=dixler a=dixler

<!--- 
Thanks so much for your contribution! If this is your first time contributing, please ensure that you have read the [CONTRIBUTING](https://github.com/pulumi/pulumi/blob/master/CONTRIBUTING.md) documentation.
-->

# Description

<!--- Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. -->
Fixes #10668 

Provides an environment variable to override the internal behavior of the SnapshotManager to speed up large deployments by only writing the final state of the snapshot rather than the current behavior which saves every mutation to the snapshot to the state backend.

## Checklist

<!--- Please provide details if the checkbox below is to be left unchecked. -->
- [x] I have added tests that prove my fix is effective or that my feature works
<!--- 
User-facing changes require a CHANGELOG entry.
-->
- [x] I have updated the [CHANGELOG-PENDING](https://github.com/pulumi/pulumi/blob/master/CHANGELOG_PENDING.md) file with my change
<!--
If the change(s) in this PR is a modification of an existing call to the Pulumi Service,
then the service should honor older versions of the CLI where this change would not exist.
You must then bump the API version in /pkg/backend/httpstate/client/api.go, as well as add
it to the service.
-->
- [ ] Yes, there are changes in this PR that warrants bumping the Pulumi Service API version
  <!-- `@Pulumi` employees: If yes, you must submit corresponding changes in the service repo. -->


10729: ci: Fix package parallelism assignment r=AaronFriel a=AaronFriel



10740: Add missing `ProgramTestOptions` overrides in `With` r=justinvp a=justinvp

These options were previously added without also adding the override handling in `With`.

Co-authored-by: Kyle Dixler <kyle@pulumi.com>
Co-authored-by: Aaron Friel <mayreply@aaronfriel.com>
Co-authored-by: Justin Van Patten <jvp@justinvp.com>
  • Loading branch information
4 people committed Sep 15, 2022
4 parents bc704af + 9065d7c + 22f2989 + 9f5ec4a commit 0f3e536
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 31 deletions.
87 changes: 57 additions & 30 deletions pkg/backend/snapshot.go
Expand Up @@ -17,6 +17,7 @@ package backend
import (
"errors"
"fmt"
"os"
"reflect"
"sort"
"time"
Expand Down Expand Up @@ -638,6 +639,56 @@ func (sm *SnapshotManager) saveSnapshot() error {
return nil
}

// defaultServiceLoop saves a Snapshot whenever a mutation occurs
func (sm *SnapshotManager) defaultServiceLoop(mutationRequests chan mutationRequest, done chan error) {
// True if we have elided writes since the last actual write.
hasElidedWrites := false

// Service each mutation request in turn.
serviceLoop:
for {
select {
case request := <-mutationRequests:
var err error
if request.mutator() {
err = sm.saveSnapshot()
hasElidedWrites = false
} else {
hasElidedWrites = true
}
request.result <- err
case <-sm.cancel:
break serviceLoop
}
}

// If we still have elided writes once the channel has closed, flush the snapshot.
var err error
if hasElidedWrites {
logging.V(9).Infof("SnapshotManager: flushing elided writes...")
err = sm.saveSnapshot()
}
done <- err
}

// unsafeServiceLoop doesn't save Snapshots when mutations occur and instead saves Snapshots when
// SnapshotManager.Close() is invoked. It trades reliability for speed as every mutation does not
// cause a Snapshot to be serialized to the user's state backend.
func (sm *SnapshotManager) unsafeServiceLoop(mutationRequests chan mutationRequest, done chan error) {
for {
select {
case request := <-mutationRequests:
request.mutator()
request.result <- nil
case <-sm.cancel:
done <- sm.saveSnapshot()
return
}
}
}

const experimentalSnapshotManagerFlag = "PULUMI_EXPERIMENTAL_SNAPSHOT_MANAGER"

// NewSnapshotManager creates a new SnapshotManager for the given stack name, using the given persister
// and base snapshot.
//
Expand All @@ -658,36 +709,12 @@ func NewSnapshotManager(persister SnapshotPersister, baseSnap *deploy.Snapshot)
done: done,
}

go func() {
// True if we have elided writes since the last actual write.
hasElidedWrites := false

// Service each mutation request in turn.
serviceLoop:
for {
select {
case request := <-mutationRequests:
var err error
if request.mutator() {
err = manager.saveSnapshot()
hasElidedWrites = false
} else {
hasElidedWrites = true
}
request.result <- err
case <-cancel:
break serviceLoop
}
}

// If we still have elided writes once the channel has closed, flush the snapshot.
var err error
if hasElidedWrites {
logging.V(9).Infof("SnapshotManager: flushing elided writes...")
err = manager.saveSnapshot()
}
done <- err
}()
serviceLoop := manager.defaultServiceLoop
unsafeEnabled := os.Getenv(experimentalSnapshotManagerFlag) != ""
if unsafeEnabled {
serviceLoop = manager.unsafeServiceLoop
}
go serviceLoop(mutationRequests, done)

return manager
}
61 changes: 61 additions & 0 deletions pkg/backend/snapshot_test.go
Expand Up @@ -250,6 +250,67 @@ func TestSamesWithDependencyChanges(t *testing.T) {
assert.Equal(t, resourceB.URN, secondSnap.Resources[1].Dependencies[0])
}

// This test checks that we only write the Checkpoint once whether or not there
// are important changes when the `PULUMI_EXPERIMENTAL_SNAPSHOT_MANAGER` envvar
// is provided
//
//nolint:paralleltest // mutates environment variables
func TestWriteCheckpointOnceUnsafe(t *testing.T) {
t.Setenv(experimentalSnapshotManagerFlag, "1")

provider := NewResource("urn:pulumi:foo::bar::pulumi:providers:pkgUnsafe::provider")
provider.Custom, provider.Type, provider.ID = true, "pulumi:providers:pkgUnsafe", "id"

resourceP := NewResource("a-unique-urn-resource-p")
resourceA := NewResource("a-unique-urn-resource-a")

snap := NewSnapshot([]*resource.State{
provider,
resourceP,
resourceA,
})

manager, sp := MockSetup(t, snap)

// Generate a same for the provider.
provUpdated := NewResource(string(provider.URN))
provUpdated.Custom, provUpdated.Type = true, provider.Type
provSame := deploy.NewSameStep(nil, nil, provider, provUpdated)
mutation, err := manager.BeginMutation(provSame)
assert.NoError(t, err)
_, _, err = provSame.Apply(false)
assert.NoError(t, err)
err = mutation.End(provSame, true)
assert.NoError(t, err)

// The engine generates a meaningful change, the DEFAULT behavior is that a snapshot is written:
pUpdated := NewResource(string(resourceP.URN))
pUpdated.Protect = !resourceP.Protect
pSame := deploy.NewSameStep(nil, nil, resourceP, pUpdated)
mutation, err = manager.BeginMutation(pSame)
assert.NoError(t, err)
err = mutation.End(pSame, true)
assert.NoError(t, err)

// The engine generates a meaningful change, the DEFAULT behavior is that a snapshot is written:
aUpdated := NewResource(string(resourceA.URN))
aUpdated.Protect = !resourceA.Protect
aSame := deploy.NewSameStep(nil, nil, resourceA, aUpdated)
mutation, err = manager.BeginMutation(aSame)
assert.NoError(t, err)
err = mutation.End(aSame, true)
assert.NoError(t, err)

// a `Close()` call is required to write back the snapshots.
// It is called in all of the references to SnapshotManager.
err = manager.Close()
assert.NoError(t, err)

// DEFAULT behavior would cause more than 1 snapshot to be written,
// but the provided flag should only create 1 Snapshot
assert.Len(t, sp.SavedSnapshots, 1)
}

// This test exercises same steps with meaningful changes to properties _other_ than `Dependencies` in order to ensure
// that the snapshot is written.
func TestSamesWithOtherMeaningfulChanges(t *testing.T) {
Expand Down
18 changes: 18 additions & 0 deletions pkg/testing/integration/program.go
Expand Up @@ -536,15 +536,33 @@ func (opts ProgramTestOptions) With(overrides ProgramTestOptions) ProgramTestOpt
if overrides.PipenvBin != "" {
opts.PipenvBin = overrides.PipenvBin
}
if overrides.DotNetBin != "" {
opts.DotNetBin = overrides.DotNetBin
}
if overrides.Env != nil {
opts.Env = append(opts.Env, overrides.Env...)
}
if overrides.UseAutomaticVirtualEnv {
opts.UseAutomaticVirtualEnv = overrides.UseAutomaticVirtualEnv
}
if overrides.UsePipenv {
opts.UsePipenv = overrides.UsePipenv
}
if overrides.PreviewCompletedHook != nil {
opts.PreviewCompletedHook = overrides.PreviewCompletedHook
}
if overrides.JSONOutput {
opts.JSONOutput = overrides.JSONOutput
}
if overrides.ExportStateValidator != nil {
opts.ExportStateValidator = overrides.ExportStateValidator
}
if overrides.PrepareProject != nil {
opts.PrepareProject = overrides.PrepareProject
}
if overrides.LocalDependencies != nil {
opts.LocalDependencies = append(opts.LocalDependencies, overrides.LocalDependencies...)
}
return opts
}

Expand Down
2 changes: 1 addition & 1 deletion scripts/retry
Expand Up @@ -30,7 +30,7 @@ run_tests() {
attempts=$((attempts + 1))

export GO_TEST_PARALLELISM=$((GO_TEST_PARALLELISM <= 2 ? 1 : GO_TEST_PARALLELISM / 2))
export GO_TEST_PKG_PARALLELISM=$((GO_TEST_PARALLELISM <= 2 ? 1 : GO_TEST_PKG_PARALLELISM / 2))
export GO_TEST_PKG_PARALLELISM=$((GO_TEST_PKG_PARALLELISM <= 2 ? 1 : GO_TEST_PKG_PARALLELISM / 2))
export GO_TEST_SHUFFLE="off"
done

Expand Down
22 changes: 22 additions & 0 deletions tests/integration/integration_nodejs_test.go
Expand Up @@ -1336,3 +1336,25 @@ func TestTSConfigOption(t *testing.T) {
e.RunCommand("pulumi", "stack", "select", "tsconfg", "--create")
e.RunCommand("pulumi", "preview")
}

// This tests that despite an exception, that the snapshot is still written.
func TestUnsafeSnapshotManagerRetainsResourcesOnError(t *testing.T) {
integration.ProgramTest(t, &integration.ProgramTestOptions{
Dir: filepath.Join("unsafe_snapshot_tests", "bad_resource"),
Dependencies: []string{"@pulumi/pulumi"},
Env: []string{"PULUMI_EXPERIMENTAL_SNAPSHOT_MANAGER=1"},
Quick: true,
// The program throws an exception and 1 resource fails to be created.
ExpectFailure: true,
ExtraRuntimeValidation: func(t *testing.T, stackInfo integration.RuntimeValidationStackInfo) {
// Ensure the checkpoint contains the 1003 other resources that were created
// - stack
// - provider
// - `base` resource
// - 1000 resources(via a for loop)
// - NOT a resource that failed to be created dependent on the `base` resource output
assert.NotNil(t, stackInfo.Deployment)
assert.Equal(t, 3+1000, len(stackInfo.Deployment.Resources))
},
})
}
@@ -0,0 +1,2 @@
name: bad_resource
runtime: nodejs
17 changes: 17 additions & 0 deletions tests/integration/unsafe_snapshot_tests/bad_resource/index.ts
@@ -0,0 +1,17 @@
// Copyright 2016-2022, Pulumi Corporation. All rights reserved.
import * as process from "process";
import { Resource } from "./resource";
// Base depends on nothing.
const a = new Resource("base", { uniqueKey: 1, state: 99 });

for(let i = 0; i < 1000; i++) {
new Resource(`base-${i}`, { uniqueKey: 100+i, state: 99 });
}

// Dependent depends on Base with state 99.
new Resource("dependent", { uniqueKey: a.state.apply(() => {
if (process.env["PULUMI_NODEJS_DRY_RUN"] != "true") {
throw Error("`base` should be created and `dependent` should not");
}
return 1;
}), state: a.state });
13 changes: 13 additions & 0 deletions tests/integration/unsafe_snapshot_tests/bad_resource/package.json
@@ -0,0 +1,13 @@
{
"name": "stack_project_name",
"license": "Apache-2.0",
"devDependencies": {
"typescript": "^3.0.0"
},
"peerDependencies": {
"@pulumi/pulumi": "latest"
},
"dependencies": {
"@types/node": "^18.7.17"
}
}
67 changes: 67 additions & 0 deletions tests/integration/unsafe_snapshot_tests/bad_resource/resource.ts
@@ -0,0 +1,67 @@
// Copyright 2016-2022, Pulumi Corporation. All rights reserved.

import * as pulumi from "@pulumi/pulumi";
import * as dynamic from "@pulumi/pulumi/dynamic";

export class Provider implements dynamic.ResourceProvider {
public static readonly instance = new Provider();

private id: number = 0;

public async check(olds: any, news: any): Promise<dynamic.CheckResult> {
// When the engine re-creates a resource after it was deleted, it should
// not pass the old (deleted) inputs to Check when re-creating.
//
// This Check implementation fails the test if this happens.
if (olds.state === 99 && news.state === 22) {
return {
inputs: news,
failures: [
{
property: "state",
reason: "engine did invalid comparison of old and new check inputs for recreated resource",
},
],
};
}

return {
inputs: news,
};
}

public async diff(id: pulumi.ID, olds: any, news: any): Promise<dynamic.DiffResult> {
if (olds.state !== news.state) {
return {
changes: true,
replaces: ["state"],
deleteBeforeReplace: true,
};
}

return {
changes: false,
};
}

public async create(inputs: any): Promise<dynamic.CreateResult> {
return {
id: (this.id++).toString(),
outs: inputs,
};
}
}

export class Resource extends pulumi.dynamic.Resource {
public uniqueKey?: pulumi.Output<number>;
public state: pulumi.Output<number>;

constructor(name: string, props: ResourceProps, opts?: pulumi.ResourceOptions) {
super(Provider.instance, name, props, opts);
}
}

export interface ResourceProps {
readonly uniqueKey?: pulumi.Input<number>;
readonly state: pulumi.Input<number>;
}

0 comments on commit 0f3e536

Please sign in to comment.