Skip to content

Commit

Permalink
Allow component providers to return outputs to the engine
Browse files Browse the repository at this point in the history
  • Loading branch information
Frassle committed Feb 8, 2024
1 parent ff6404a commit 82609d3
Show file tree
Hide file tree
Showing 12 changed files with 822 additions and 530 deletions.
138 changes: 138 additions & 0 deletions pkg/engine/lifecycletest/pulumi_test.go
Expand Up @@ -5001,3 +5001,141 @@ func TestConstructCallReturnDependencies(t *testing.T) {
test(t, deploytest.WithoutGrpc)
})
}

// Test that the engine can receive OutputValues for Construct and Call
func TestConstructCallReturnOutputs(t *testing.T) {
t.Parallel()

test := func(t *testing.T, opt deploytest.PluginOption) {
loaders := []*deploytest.ProviderLoader{
deploytest.NewProviderLoader("pkgA", semver.MustParse("1.0.0"), func() (plugin.Provider, error) {
return &deploytest.Provider{
CreateF: func(urn resource.URN, inputs resource.PropertyMap, timeout float64,
preview bool,
) (resource.ID, resource.PropertyMap, resource.Status, error) {
return "created-id", inputs, resource.StatusOK, nil
},
ReadF: func(urn resource.URN, id resource.ID,
inputs, state resource.PropertyMap,
) (plugin.ReadResult, resource.Status, error) {
return plugin.ReadResult{Inputs: inputs, Outputs: state}, resource.StatusOK, nil
},
ConstructF: func(monitor *deploytest.ResourceMonitor, typ, name string, parent resource.URN,
inputs resource.PropertyMap, info plugin.ConstructInfo, options plugin.ConstructOptions,
) (plugin.ConstructResult, error) {
urn, _, _, _, err := monitor.RegisterResource(tokens.Type(typ), name, false, deploytest.ResourceOptions{})
assert.NoError(t, err)

urnA, _, _, _, err := monitor.RegisterResource("pkgA:m:typA", name+"-a", true, deploytest.ResourceOptions{
Parent: urn,
})
assert.NoError(t, err)

// Return a secret and unknown output depending on some internal resource
deps := []resource.URN{urnA}
return plugin.ConstructResult{
URN: urn,
Outputs: resource.PropertyMap{
"foo": resource.NewOutputProperty(resource.Output{
Element: resource.NewStringProperty("foo"),
Known: true,
Secret: true,
Dependencies: deps,
}),
"bar": resource.NewOutputProperty(resource.Output{
Dependencies: deps,
}),
},
OutputDependencies: nil, // Left blank on purpose because AcceptsOutputs is true
}, nil
},
CallF: func(monitor *deploytest.ResourceMonitor,
tok tokens.ModuleMember, args resource.PropertyMap,
info plugin.CallInfo, options plugin.CallOptions,
) (plugin.CallResult, error) {
// Assume a single output arg that this call depends on
arg := args["arg"]
deps := arg.OutputValue().Dependencies

return plugin.CallResult{
Return: resource.PropertyMap{
"foo": resource.NewOutputProperty(resource.Output{
Element: resource.NewStringProperty("foo"),
Known: true,
Secret: true,
Dependencies: deps,
}),
"bar": resource.NewOutputProperty(resource.Output{
Dependencies: deps,
}),
},
ReturnDependencies: nil, // Left blank on purpose because AcceptsOutputs is true
}, nil
},
}, nil
}, opt),
}

programF := deploytest.NewLanguageRuntimeF(func(_ plugin.RunInfo, monitor *deploytest.ResourceMonitor) error {
_, _, state, deps, err := monitor.RegisterResource("pkgA:m:typA", "resA", false, deploytest.ResourceOptions{
Remote: true,
})
assert.NoError(t, err)

// The urn of the internal resource the component created
urn := resource.URN("urn:pulumi:test::test::pkgA:m:typA$pkgA:m:typA::resA-a")

// Assert that the outputs are received as just plain values because SDKs don't yet support output
// values returned from RegisterResource.
assert.Equal(t, resource.PropertyMap{
"foo": resource.MakeSecret(resource.NewStringProperty("foo")),
"bar": resource.MakeComputed(resource.NewStringProperty("")),
}, state)
assert.Equal(t, map[resource.PropertyKey][]resource.URN{
"foo": {urn},
"bar": {urn},
}, deps)

result, deps, _, err := monitor.Call("pkgA:m:typA", resource.PropertyMap{
// Send this as an output value using the dependencies returned.
"arg": resource.NewOutputProperty(resource.Output{
Element: state["foo"].SecretValue().Element,
Known: true,
Secret: true,
Dependencies: []resource.URN{urn},
}),
}, "", "")
assert.NoError(t, err)

// Assert that the outputs are received as just plain values because SDKs don't yet support output
// values returned from Call.
assert.Equal(t, resource.PropertyMap{
"foo": resource.MakeSecret(resource.NewStringProperty("foo")),
"bar": resource.MakeComputed(resource.NewStringProperty("")),
}, result)
assert.Equal(t, map[resource.PropertyKey][]resource.URN{
"foo": {urn},
"bar": {urn},
}, deps)

return nil
})
hostF := deploytest.NewPluginHostF(nil, nil, programF, loaders...)

p := &TestPlan{
Options: TestUpdateOptions{HostF: hostF},
}

project := p.GetProject()
_, err := TestOp(Update).Run(project, p.GetTarget(t, nil), p.Options, true, p.BackendClient, nil)
assert.NoError(t, err)
}
t.Run("WithGrpc", func(t *testing.T) {
t.Parallel()
test(t, deploytest.WithGrpc)
})
t.Run("WithoutGrpc", func(t *testing.T) {
t.Parallel()
test(t, deploytest.WithoutGrpc)
})
}
66 changes: 58 additions & 8 deletions pkg/resource/deploy/source_eval.go
Expand Up @@ -865,15 +865,15 @@ func (rm *resmon) Call(ctx context.Context, req *pulumirpc.ResourceCallRequest)
if err != nil {
return nil, fmt.Errorf("call of %v returned an error: %w", tok, err)
}
mret, err := plugin.MarshalProperties(ret.Return, plugin.MarshalOptions{
Label: label,
KeepUnknowns: true,
KeepSecrets: true,
KeepResources: true,
})
if err != nil {
return nil, fmt.Errorf("failed to marshal %v return: %w", tok, err)

if ret.ReturnDependencies == nil {
ret.ReturnDependencies = map[resource.PropertyKey][]resource.URN{}
}
for k, v := range ret.Return {
ret.ReturnDependencies[k] = addOutputDependencies(ret.ReturnDependencies[k], v)
}

ret.Return = downgradeOutputValues(ret.Return)

returnDependencies := map[string]*pulumirpc.CallResponse_ReturnDependencies{}
for name, deps := range ret.ReturnDependencies {
Expand All @@ -884,6 +884,16 @@ func (rm *resmon) Call(ctx context.Context, req *pulumirpc.ResourceCallRequest)
returnDependencies[string(name)] = &pulumirpc.CallResponse_ReturnDependencies{Urns: urns}
}

mret, err := plugin.MarshalProperties(ret.Return, plugin.MarshalOptions{
Label: label,
KeepUnknowns: true,
KeepSecrets: true,
KeepResources: true,
})
if err != nil {
return nil, fmt.Errorf("failed to marshal %v return: %w", tok, err)
}

chkfails := slice.Prealloc[*pulumirpc.CheckFailure](len(ret.Failures))
for _, failure := range ret.Failures {
chkfails = append(chkfails, &pulumirpc.CheckFailure{
Expand Down Expand Up @@ -1492,6 +1502,17 @@ func (rm *resmon) RegisterResource(ctx context.Context,

result = &RegisterResult{State: &resource.State{URN: constructResult.URN, Outputs: constructResult.Outputs}}

// The provider may have returned OutputValues in "Outputs", we need to downgrade them to Computed or
// Secret but also add them to the outputDeps map.
if constructResult.OutputDependencies == nil {
constructResult.OutputDependencies = map[resource.PropertyKey][]resource.URN{}
}
for k, v := range result.State.Outputs {
constructResult.OutputDependencies[k] = addOutputDependencies(constructResult.OutputDependencies[k], v)
}

result.State.Outputs = downgradeOutputValues(constructResult.Outputs)

outputDeps = map[string]*pulumirpc.RegisterResourceResponse_PropertyDependencies{}
for k, deps := range constructResult.OutputDependencies {
urns := make([]string, len(deps))
Expand All @@ -1500,6 +1521,7 @@ func (rm *resmon) RegisterResource(ctx context.Context,
}
outputDeps[string(k)] = &pulumirpc.RegisterResourceResponse_PropertyDependencies{Urns: urns}
}

} else {
additionalSecretKeys := slice.Prealloc[resource.PropertyKey](len(additionalSecretOutputs))
for _, name := range additionalSecretOutputs {
Expand Down Expand Up @@ -1884,3 +1906,31 @@ func downgradeOutputValues(v resource.PropertyMap) resource.PropertyMap {
}
return result
}

func addOutputDependencies(deps []resource.URN, v resource.PropertyValue) []resource.URN {
if v.IsOutput() {
output := v.OutputValue()
if output.Known {
deps = addOutputDependencies(deps, output.Element)
}
deps = append(deps, output.Dependencies...)
}
if v.IsResourceReference() {
ref := v.ResourceReferenceValue()
deps = addOutputDependencies(deps, ref.ID)
}
if v.IsObject() {
for _, elem := range v.ObjectValue() {
deps = addOutputDependencies(deps, elem)
}
}
if v.IsArray() {
for _, elem := range v.ArrayValue() {
deps = addOutputDependencies(deps, elem)
}
}
if v.IsSecret() {
deps = addOutputDependencies(deps, v.SecretValue().Element)
}
return deps
}
2 changes: 1 addition & 1 deletion proto/.checksum.txt
Expand Up @@ -17,7 +17,7 @@
3421371250 793 proto/pulumi/errors.proto
829031483 10232 proto/pulumi/language.proto
2893249402 1992 proto/pulumi/plugin.proto
2457704516 24303 proto/pulumi/provider.proto
545212422 24604 proto/pulumi/provider.proto
2903108406 13833 proto/pulumi/resource.proto
607478140 1008 proto/pulumi/source.proto
2565199107 2157 proto/pulumi/testing/language.proto
4 changes: 4 additions & 0 deletions proto/pulumi/provider.proto
Expand Up @@ -164,6 +164,8 @@ message CallRequest {
int32 parallel = 11; // the degree of parallelism for resource operations (<=1 for serial).
string monitorEndpoint = 12; // the address for communicating back to the resource monitor.
string organization = 14; // the organization of the stack being deployed into.

bool accepts_output_values = 17; // the engine can be passed output values back, returnDependencies can be left blank if returning output values.
}

message CallResponse {
Expand Down Expand Up @@ -378,6 +380,8 @@ message ConstructRequest {
repeated string ignoreChanges = 22; // properties that should be ignored during a diff
repeated string replaceOnChanges = 23; // properties that, when changed, trigger a replacement
bool retainOnDelete = 24; // whether to retain the resource in the cloud provider when it is deleted

bool accepts_output_values = 25; // the engine can be passed output values back, stateDependencies can be left blank if returning output values.
}

message ConstructResponse {
Expand Down
38 changes: 21 additions & 17 deletions sdk/go/common/resource/plugin/provider_plugin.go
Expand Up @@ -1435,6 +1435,7 @@ func (p *provider) Construct(info ConstructInfo, typ tokens.Type, name string, p
IgnoreChanges: options.IgnoreChanges,
ReplaceOnChanges: options.ReplaceOnChanges,
RetainOnDelete: options.RetainOnDelete,
AcceptsOutputValues: true,
}
if ct := options.CustomTimeouts; ct != nil {
req.CustomTimeouts = &pulumirpc.ConstructRequest_CustomTimeouts{
Expand All @@ -1450,10 +1451,11 @@ func (p *provider) Construct(info ConstructInfo, typ tokens.Type, name string, p
}

outputs, err := UnmarshalProperties(resp.GetState(), MarshalOptions{
Label: label + ".outputs",
KeepUnknowns: info.DryRun,
KeepSecrets: true,
KeepResources: true,
Label: label + ".outputs",
KeepUnknowns: info.DryRun,
KeepSecrets: true,
KeepResources: true,
KeepOutputValues: true,
})
if err != nil {
return ConstructResult{}, err
Expand Down Expand Up @@ -1669,15 +1671,16 @@ func (p *provider) Call(tok tokens.ModuleMember, args resource.PropertyMap, info
}

resp, err := client.Call(p.requestContext(), &pulumirpc.CallRequest{
Tok: string(tok),
Args: margs,
ArgDependencies: argDependencies,
Project: info.Project,
Stack: info.Stack,
Config: config,
DryRun: info.DryRun,
Parallel: int32(info.Parallel),
MonitorEndpoint: info.MonitorAddress,
Tok: string(tok),
Args: margs,
ArgDependencies: argDependencies,
Project: info.Project,
Stack: info.Stack,
Config: config,
DryRun: info.DryRun,
Parallel: int32(info.Parallel),
MonitorEndpoint: info.MonitorAddress,
AcceptsOutputValues: true,
})
if err != nil {
rpcError := rpcerror.Convert(err)
Expand All @@ -1687,10 +1690,11 @@ func (p *provider) Call(tok tokens.ModuleMember, args resource.PropertyMap, info

// Unmarshal any return values.
ret, err := UnmarshalProperties(resp.GetReturn(), MarshalOptions{
Label: label + ".returns",
KeepUnknowns: info.DryRun,
KeepSecrets: true,
KeepResources: true,
Label: label + ".returns",
KeepUnknowns: info.DryRun,
KeepSecrets: true,
KeepResources: true,
KeepOutputValues: true,
})
if err != nil {
return CallResult{}, err
Expand Down
1 change: 1 addition & 0 deletions sdk/go/common/resource/plugin/provider_plugin_test.go
Expand Up @@ -479,6 +479,7 @@ func TestProvider_ConstructOptions(t *testing.T) {
tt.want.Name = "name"
tt.want.Config = make(map[string]string)
tt.want.Inputs = &structpb.Struct{Fields: make(map[string]*structpb.Value)}
tt.want.AcceptsOutputValues = true

var got *pulumirpc.ConstructRequest
client := &stubClient{
Expand Down
13 changes: 6 additions & 7 deletions sdk/go/common/resource/plugin/provider_server.go
Expand Up @@ -509,7 +509,9 @@ func (p *providerServer) Construct(ctx context.Context,
return nil, err
}

outputs, err := MarshalProperties(result.Outputs, p.marshalOptions("outputs"))
opts := p.marshalOptions("outputs")
opts.KeepOutputValues = req.AcceptsOutputValues
outputs, err := MarshalProperties(result.Outputs, opts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -628,12 +630,9 @@ func (p *providerServer) Call(ctx context.Context, req *pulumirpc.CallRequest) (
return nil, err
}

rpcResult, err := MarshalProperties(result.Return, MarshalOptions{
Label: "result",
KeepUnknowns: true,
KeepSecrets: true,
KeepResources: true,
})
opts := p.marshalOptions("return")
opts.KeepOutputValues = req.AcceptsOutputValues
rpcResult, err := MarshalProperties(result.Return, opts)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 82609d3

Please sign in to comment.