-
Notifications
You must be signed in to change notification settings - Fork 9
/
resources.go
156 lines (127 loc) · 4.76 KB
/
resources.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright 2022 Namespace Labs Inc; All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
package planning
import (
"context"
"sync"
anypb "google.golang.org/protobuf/types/known/anypb"
"namespacelabs.dev/foundation/internal/compute"
"namespacelabs.dev/foundation/internal/executor"
"namespacelabs.dev/foundation/internal/fnerrors"
"namespacelabs.dev/foundation/internal/parsing"
"namespacelabs.dev/foundation/internal/planning/invocation"
"namespacelabs.dev/foundation/internal/planning/tool"
"namespacelabs.dev/foundation/internal/planning/tool/protocol"
"namespacelabs.dev/foundation/schema"
"namespacelabs.dev/foundation/std/pkggraph"
"namespacelabs.dev/foundation/std/resources"
)
type resourcePlanner struct {
eg *executor.Executor
mu sync.Mutex
state map[string]*computedResource
}
type computedResource struct {
resources []pkggraph.ResourceInstance
}
func newResourcePlanner(eg *executor.Executor) *resourcePlanner {
return &resourcePlanner{eg: eg, state: map[string]*computedResource{}}
}
func (rp *resourcePlanner) Complete() map[string][]pkggraph.ResourceInstance {
rp.mu.Lock()
defer rp.mu.Unlock()
m := map[string][]pkggraph.ResourceInstance{}
for key, state := range rp.state {
m[key] = state.resources
}
return m
}
func (rp *resourcePlanner) computeResource(sealedctx pkggraph.SealedContext, parentID string, res pkggraph.ResourceInstance, loadServer func(schema.PackageName)) error {
rp.mu.Lock()
defer rp.mu.Unlock()
key := resources.JoinID(parentID, res.ResourceID)
if _, ok := rp.state[key]; ok {
return nil
}
if parsing.IsServerResource(res.Spec.Class.Ref) {
if err := pkggraph.ValidateFoundation("runtime resources", parsing.Version_LibraryIntentsChanged, pkggraph.ModuleFromModules(sealedctx)); err != nil {
return err
}
serverIntent := &schema.PackageRef{}
if err := res.Spec.Intent.UnmarshalTo(serverIntent); err != nil {
return fnerrors.InternalError("failed to unwrap Server")
}
loadServer(serverIntent.AsPackageName())
}
state := &computedResource{}
rp.state[key] = state
if res.Spec.Provider == nil {
return nil
}
rp.eg.Go(func(ctx context.Context) error {
resources, err := state.compute(ctx, sealedctx, res.Spec.Intent, res.Spec.Provider)
if err != nil {
return err
}
for _, res := range resources {
if err := rp.computeResource(sealedctx, parentID, res, loadServer); err != nil {
return err
}
}
rp.mu.Lock()
defer rp.mu.Unlock()
state.resources = resources
return nil
})
return nil
}
func (st *computedResource) compute(ctx context.Context, sealedCtx pkggraph.SealedContext, intent *anypb.Any, provider *pkggraph.ResourceProvider) ([]pkggraph.ResourceInstance, error) {
if provider == nil || provider.Spec.ResourcesFrom == nil {
return nil, nil
}
inv, err := invocation.BuildAndPrepare(ctx, sealedCtx, sealedCtx, nil, provider.Spec.ResourcesFrom)
if err != nil {
return nil, fnerrors.InternalError("failed to compute invocation configuration: %w", err)
}
deferredResponse, err := tool.MakeInvocationNoInjections(ctx, sealedCtx, &tool.Definition{
Source: tool.Source{PackageName: schema.PackageName(provider.Spec.PackageName)},
Invocation: inv,
}, tool.InvokeProps{
Event: protocol.Lifecycle_PROVISION,
ProvisionInput: []*anypb.Any{intent},
})
if err != nil {
return nil, fnerrors.InternalError("resourcesFrom: failed to compute invocation: %w", err)
}
response, err := compute.GetValue(ctx, deferredResponse)
if err != nil {
return nil, fnerrors.InternalError("resourcesFrom: failed to invoke: %w", err)
}
if err := invocation.ValidateProviderReponse(response); err != nil {
return nil, fnerrors.InternalError("resourcesFrom: %w", err)
}
r := response.ApplyResponse
if r.OutputResourceInstance != nil {
return nil, fnerrors.InternalError("resourcesFrom: response can't include resource instance")
}
pack := &schema.ResourcePack{}
for _, x := range r.ComputedResourceInput {
pack.ResourceInstance = append(pack.ResourceInstance, &schema.ResourceInstance{
PackageName: provider.Spec.PackageName,
Name: x.Name,
Class: x.Class,
Provider: x.Provider,
SerializedIntentJson: x.SerializedIntentJson,
})
}
providerPkg, err := sealedCtx.LoadByName(ctx, schema.PackageName(provider.Spec.PackageName))
if err != nil {
return nil, fnerrors.InternalError("%s: missing provider package", provider.Spec.PackageName)
}
additionalResources, err := parsing.LoadResources(ctx, sealedCtx, providerPkg, provider.ProviderID, pack)
if err != nil {
return nil, fnerrors.InternalError("%s: failed to load computed resources: %w", provider.Spec.PackageName, err)
}
return additionalResources, nil
}