forked from kyma-project/kyma
/
provision.go
275 lines (224 loc) · 8.92 KB
/
provision.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
package broker
import (
"context"
"encoding/json"
"fmt"
"sync"
"github.com/pkg/errors"
osb "github.com/pmorie/go-open-service-broker-client/v2"
"github.com/sirupsen/logrus"
rls "k8s.io/helm/pkg/proto/hapi/services"
"github.com/kyma-project/kyma/components/helm-broker/internal"
)
type provisionService struct {
bundleIDGetter bundleIDGetter
chartGetter chartGetter
instanceInserter instanceInserter
instanceStateGetter instanceStateProvisionGetter
operationInserter operationInserter
operationUpdater operationUpdater
instanceBindDataInserter instanceBindDataInserter
operationIDProvider func() (internal.OperationID, error)
helmInstaller helmInstaller
bindTemplateRenderer bindTemplateRenderer
bindTemplateResolver bindTemplateResolver
mu sync.Mutex
log *logrus.Entry
testHookAsyncCalled func(internal.OperationID)
}
func (svc *provisionService) Provision(ctx context.Context, osbCtx osbContext, req *osb.ProvisionRequest) (*osb.ProvisionResponse, error) {
if !req.AcceptsIncomplete {
return nil, errors.New("asynchronous operation mode required")
}
// Single provisioning is supported concurrently.
// TODO: switch to lock per instanceID
svc.mu.Lock()
defer svc.mu.Unlock()
iID := internal.InstanceID(req.InstanceID)
switch state, err := svc.instanceStateGetter.IsProvisioned(iID); true {
case err != nil:
return nil, errors.Wrap(err, "while checking if instance is already provisioned")
case state:
return &osb.ProvisionResponse{Async: false}, nil
}
switch opIDInProgress, inProgress, err := svc.instanceStateGetter.IsProvisioningInProgress(iID); true {
case err != nil:
return nil, errors.Wrap(err, "while checking if instance is being provisioned")
case inProgress:
opKeyInProgress := osb.OperationKey(opIDInProgress)
return &osb.ProvisionResponse{Async: true, OperationKey: &opKeyInProgress}, nil
}
id, err := svc.operationIDProvider()
if err != nil {
return nil, errors.Wrap(err, "while generating ID for operation")
}
opID := internal.OperationID(id)
namespace, err := getNamespaceFromContext(req.Context)
if err != nil {
return nil, errors.Wrap(err, "while getting namespace from context")
}
svcID := internal.ServiceID(req.ServiceID)
svcPlanID := internal.ServicePlanID(req.PlanID)
// bundleID/planID is in 1:1 match with serviceID/servicePlanID (from service catalog)
bundleID := internal.BundleID(svcID)
bundlePlanID := internal.BundlePlanID(svcPlanID)
bundle, err := svc.bundleIDGetter.GetByID(bundleID)
if err != nil {
return nil, errors.Wrap(err, "while getting bundle")
}
bundlePlan, found := bundle.Plans[bundlePlanID]
if !found {
return nil, errors.Errorf("bundle does not contain requested plan (planID: %s)", bundlePlanID)
}
releaseName := createReleaseName(bundle.Name, bundlePlan.Name, iID)
// TODO: add support for calculating ParamHash
paramHash := "TODO"
op := internal.InstanceOperation{
InstanceID: iID,
OperationID: opID,
Type: internal.OperationTypeCreate,
State: internal.OperationStateInProgress,
ParamsHash: paramHash,
}
if err := svc.operationInserter.Insert(&op); err != nil {
return nil, errors.Wrap(err, "while inserting instance operation to storage")
}
i := internal.Instance{
ID: iID,
Namespace: namespace,
ServiceID: svcID,
ServicePlanID: svcPlanID,
ReleaseName: releaseName,
ParamsHash: paramHash,
}
if err = svc.instanceInserter.Insert(&i); err != nil {
return nil, errors.Wrap(err, "while inserting instance to storage")
}
cvOver := internal.ChartValues(req.Parameters)
svc.doAsync(ctx, iID, opID, namespace, releaseName, bundlePlan, bundle.Bindable, cvOver)
opKey := osb.OperationKey(op.OperationID)
resp := &osb.ProvisionResponse{
OperationKey: &opKey,
Async: true,
}
return resp, nil
}
func (svc *provisionService) doAsync(ctx context.Context, iID internal.InstanceID, opID internal.OperationID, namespace internal.Namespace, releaseName internal.ReleaseName, bundlePlan internal.BundlePlan, isBundleBindable bool, cvOver internal.ChartValues) {
if svc.testHookAsyncCalled != nil {
svc.testHookAsyncCalled(opID)
}
go svc.do(ctx, iID, opID, namespace, releaseName, bundlePlan, isBundleBindable, cvOver)
}
// do is called asynchronously
func (svc *provisionService) do(ctx context.Context, iID internal.InstanceID, opID internal.OperationID, namespace internal.Namespace, releaseName internal.ReleaseName, bundlePlan internal.BundlePlan, isBundleBindable bool, cvOver internal.ChartValues) {
fDo := func() (*rls.InstallReleaseResponse, error) {
c, err := svc.chartGetter.Get(bundlePlan.ChartRef.Name, bundlePlan.ChartRef.Version)
if err != nil {
return nil, errors.Wrap(err, "while getting chart from storage")
}
out, err := deepCopy(map[string]interface{}(bundlePlan.ChartValues))
if err != nil {
return nil, errors.Wrap(err, "while coping plan values")
}
out = mergeValues(out, map[string]interface{}(cvOver))
svc.log.Infof("Merging values for operation [%s], releaseName [%s], namespace [%s], bundlePlan [%s]. Plan values are: [%v], overrides: [%v], merged: [%v] ",
opID, releaseName, namespace, bundlePlan.Name, bundlePlan.ChartValues, cvOver, out)
resp, err := svc.helmInstaller.Install(c, internal.ChartValues(out), releaseName, namespace)
if err != nil {
return nil, errors.Wrap(err, "while installing helm release")
}
return resp, nil
}
opState := internal.OperationStateSucceeded
opDesc := "provisioning succeeded"
resp, err := fDo()
if err != nil {
opState = internal.OperationStateFailed
opDesc = fmt.Sprintf("provisioning failed on error: %s", err.Error())
}
if err == nil && svc.isBindable(bundlePlan, isBundleBindable) {
if resolveErr := svc.resolveAndSaveBindData(iID, namespace, bundlePlan, resp); resolveErr != nil {
opState = internal.OperationStateFailed
opDesc = fmt.Sprintf("resolving bind data failed with error: %s", resolveErr.Error())
}
}
if err := svc.operationUpdater.UpdateStateDesc(iID, opID, opState, &opDesc); err != nil {
}
}
func (*provisionService) isBindable(plan internal.BundlePlan, isBundleBindable bool) bool {
return (plan.Bindable != nil && *plan.Bindable) || // if bindable field is set on plan it's override bindalbe field on bundle
(plan.Bindable == nil && isBundleBindable) // if bindable field is NOT set on plan thet bindalbe field on bundle is important
}
func (svc *provisionService) resolveAndSaveBindData(iID internal.InstanceID, namespace internal.Namespace, bundlePlan internal.BundlePlan, resp *rls.InstallReleaseResponse) error {
rendered, err := svc.bindTemplateRenderer.Render(bundlePlan.BindTemplate, resp)
if err != nil {
return errors.Wrap(err, "while rendering bind yaml template")
}
out, err := svc.bindTemplateResolver.Resolve(rendered, namespace)
if err != nil {
return errors.Wrap(err, "while resolving bind yaml values")
}
in := internal.InstanceBindData{
InstanceID: iID,
Credentials: out.Credentials,
}
if err := svc.instanceBindDataInserter.Insert(&in); err != nil {
return errors.Wrap(err, "while inserting instance bind data into storage")
}
return nil
}
func getNamespaceFromContext(contextProfile map[string]interface{}) (internal.Namespace, error) {
return internal.Namespace(contextProfile["namespace"].(string)), nil
}
func createReleaseName(name internal.BundleName, planName internal.BundlePlanName, iID internal.InstanceID) internal.ReleaseName {
maxLen := 53
relName := fmt.Sprintf("hb-%s-%s-%s", name, planName, iID)
if len(relName) <= maxLen {
return internal.ReleaseName(relName)
}
return internal.ReleaseName(relName[:maxLen])
}
// to work correctly, https://github.com/ghodss/yaml has to be used
func mergeValues(dest map[string]interface{}, src map[string]interface{}) map[string]interface{} {
for k, v := range src {
// If the key doesn't exist already, then just set the key to that value
if _, exists := dest[k]; !exists {
dest[k] = v
continue
}
nextMap, ok := v.(map[string]interface{})
// If it isn't another map, overwrite the value
if !ok {
dest[k] = v
continue
}
// If the key doesn't exist already, then just set the key to that value
if _, exists := dest[k]; !exists {
dest[k] = nextMap
continue
}
// Edge case: If the key exists in the destination, but isn't a map
destMap, isMap := dest[k].(map[string]interface{})
// If the source map has a map for this key, prefer it
if !isMap {
dest[k] = v
continue
}
// If we got to this point, it is a map in both, so merge them
dest[k] = mergeValues(destMap, nextMap)
}
return dest
}
func deepCopy(in map[string]interface{}) (map[string]interface{}, error) {
out := map[string]interface{}{}
if in != nil {
b, err := json.Marshal(in)
if err != nil {
return nil, errors.Wrap(err, "while performing deep copy (marshal)")
}
if err = json.Unmarshal(b, &out); err != nil {
return nil, errors.Wrap(err, "while performing deep copy (unmarshal)")
}
}
return out, nil
}