/
provision.go
244 lines (211 loc) · 8.95 KB
/
provision.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
package broker
import (
"context"
"fmt"
"sync"
"time"
"github.com/kyma-project/kyma/components/remote-environment-broker/internal"
"github.com/kyma-project/kyma/components/remote-environment-broker/internal/access"
"github.com/kyma-project/kyma/components/remote-environment-broker/pkg/apis/remoteenvironment/v1alpha1"
v1client "github.com/kyma-project/kyma/components/remote-environment-broker/pkg/client/clientset/versioned/typed/remoteenvironment/v1alpha1"
"github.com/pkg/errors"
osb "github.com/pmorie/go-open-service-broker-client/v2"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/apis/meta/v1"
)
const serviceCatalogAPIVersion = "servicecatalog.k8s.io/v1beta1"
// NewProvisioner creates provisioner
func NewProvisioner(instanceInserter instanceInserter, instanceStateGetter instanceStateGetter, operationInserter operationInserter, operationUpdater operationUpdater, accessChecker access.ProvisionChecker, reSvcFinder reSvcFinder, serviceInstanceGetter serviceInstanceGetter, reClient v1client.RemoteenvironmentV1alpha1Interface, iStateUpdater instanceStateUpdater,
operationIDProvider func() (internal.OperationID, error), log logrus.FieldLogger) *ProvisionService {
return &ProvisionService{
instanceInserter: instanceInserter,
instanceStateGetter: instanceStateGetter,
instanceStateUpdater: iStateUpdater,
operationInserter: operationInserter,
operationUpdater: operationUpdater,
operationIDProvider: operationIDProvider,
accessChecker: accessChecker,
reSvcFinder: reSvcFinder,
reClient: reClient,
serviceInstanceGetter: serviceInstanceGetter,
maxWaitTime: time.Minute,
log: log.WithField("service", "provisioner"),
}
}
// ProvisionService performs provisioning action
type ProvisionService struct {
instanceInserter instanceInserter
instanceStateUpdater instanceStateUpdater
operationInserter operationInserter
operationUpdater operationUpdater
instanceStateGetter instanceStateGetter
operationIDProvider func() (internal.OperationID, error)
reSvcFinder reSvcFinder
reClient v1client.RemoteenvironmentV1alpha1Interface
accessChecker access.ProvisionChecker
serviceInstanceGetter serviceInstanceGetter
mu sync.Mutex
maxWaitTime time.Duration
log logrus.FieldLogger
asyncHook func()
}
// Provision action
func (svc *ProvisionService) Provision(ctx context.Context, osbCtx osbContext, req *osb.ProvisionRequest) (*osb.ProvisionResponse, error) {
if !req.AcceptsIncomplete {
return nil, errors.New("asynchronous operation mode required")
}
svc.mu.Lock()
defer svc.mu.Unlock()
iID := internal.InstanceID(req.InstanceID)
switch state, err := svc.instanceStateGetter.IsProvisioned(iID); true {
case err != nil:
return nil, errors.Wrap(err, "while checking if instance is already provisioned")
case state:
return &osb.ProvisionResponse{Async: false}, nil
}
switch opIDInProgress, inProgress, err := svc.instanceStateGetter.IsProvisioningInProgress(iID); true {
case err != nil:
return nil, errors.Wrap(err, "while checking if instance is being provisioned")
case inProgress:
opKeyInProgress := osb.OperationKey(opIDInProgress)
return &osb.ProvisionResponse{Async: true, OperationKey: &opKeyInProgress}, nil
}
id, err := svc.operationIDProvider()
if err != nil {
return nil, errors.Wrap(err, "while generating ID for operation")
}
opID := internal.OperationID(id)
paramHash := "TODO"
op := internal.InstanceOperation{
InstanceID: iID,
OperationID: opID,
Type: internal.OperationTypeCreate,
State: internal.OperationStateInProgress,
ParamsHash: paramHash,
}
if err := svc.operationInserter.Insert(&op); err != nil {
return nil, errors.Wrap(err, "while inserting instance operation to storage")
}
svcID := internal.ServiceID(req.ServiceID)
svcPlanID := internal.ServicePlanID(req.PlanID)
re, err := svc.reSvcFinder.FindOneByServiceID(internal.RemoteServiceID(req.ServiceID))
if err != nil {
return nil, errors.Wrapf(err, "while getting remote environment with id: %s to storage", req.ServiceID)
}
namespace, err := getNamespaceFromContext(req.Context)
if err != nil {
return nil, errors.Wrap(err, "while getting namespace from context")
}
service, err := getSvcByID(re.Services, internal.RemoteServiceID(req.ServiceID))
if err != nil {
return nil, errors.Wrapf(err, "while getting service [%s] from RemoteEnvironment [%s]", req.ServiceID, re.Name)
}
i := internal.Instance{
ID: iID,
Namespace: namespace,
ServiceID: svcID,
ServicePlanID: svcPlanID,
State: internal.InstanceStatePending,
ParamsHash: paramHash,
}
if err = svc.instanceInserter.Insert(&i); err != nil {
return nil, errors.Wrap(err, "while inserting instance to storage")
}
opKey := osb.OperationKey(op.OperationID)
resp := &osb.ProvisionResponse{
OperationKey: &opKey,
Async: true,
}
svc.doAsync(iID, opID, getRemoteServiceID(req), namespace, re.Source, service.EventProvider, service.DisplayName)
return resp, nil
}
func getRemoteServiceID(req *osb.ProvisionRequest) internal.RemoteServiceID {
return internal.RemoteServiceID(req.ServiceID)
}
func (svc *ProvisionService) doAsync(iID internal.InstanceID, opID internal.OperationID, remEnvID internal.RemoteServiceID, ns internal.Namespace, source internal.Source, eventProvider bool, displayName string) {
go svc.do(iID, opID, remEnvID, ns, source, eventProvider, displayName)
}
func (svc *ProvisionService) do(iID internal.InstanceID, opID internal.OperationID, remEnvID internal.RemoteServiceID, ns internal.Namespace, source internal.Source, eventProvider bool, displayName string) {
if svc.asyncHook != nil {
defer svc.asyncHook()
}
canProvisionOutput, err := svc.accessChecker.CanProvision(iID, remEnvID, ns, svc.maxWaitTime)
svc.log.Infof("Access checker: canProvisionInstance(remEnvID=[%s], ns=[%s]) returned: canProvisionOutput=[%+v], error=[%v]", remEnvID, ns, canProvisionOutput, err)
var instanceState internal.InstanceState
var opState internal.OperationState
var opDesc string
if err != nil {
instanceState = internal.InstanceStateFailed
opState = internal.OperationStateFailed
opDesc = fmt.Sprintf("provisioning failed on error: %s", err.Error())
} else if !canProvisionOutput.Allowed {
instanceState = internal.InstanceStateFailed
opState = internal.OperationStateFailed
opDesc = fmt.Sprintf("Forbidden provisioning instance [%s] for remote environment [id: %s] in namespace: [%s]. Reason: [%s]", iID, remEnvID, ns, canProvisionOutput.Reason)
} else {
instanceState = internal.InstanceStateSucceeded
opState = internal.OperationStateSucceeded
opDesc = "provisioning succeeded"
if eventProvider {
err := svc.createEaOnSuccessProvision(string(remEnvID), string(ns), source, displayName, iID)
if err != nil {
instanceState = internal.InstanceStateFailed
opState = internal.OperationStateFailed
opDesc = fmt.Sprintf("provisioning failed while creating EventActivation on error: %s", err.Error())
}
}
}
if err := svc.instanceStateUpdater.UpdateState(iID, instanceState); err != nil {
svc.log.Errorf("Cannot update state of the stored instance [%s]: [%v]\n", iID, err)
}
if err := svc.operationUpdater.UpdateStateDesc(iID, opID, opState, &opDesc); err != nil {
svc.log.Errorf("Cannot update state for ServiceInstance [%s]: [%v]\n", iID, err)
return
}
}
func (svc *ProvisionService) createEaOnSuccessProvision(reID, ns string, source internal.Source, displayName string, iID internal.InstanceID) error {
// instance ID is the serviceInstance.Spec.ExternalID
si, err := svc.serviceInstanceGetter.GetByNamespaceAndExternalID(ns, string(iID))
if err != nil {
return errors.Wrapf(err, "while getting service instance with external id: %q in namespace: %q", iID, ns)
}
_, err = svc.reClient.EventActivations(ns).Create(
&v1alpha1.EventActivation{
ObjectMeta: v1.ObjectMeta{
Name: reID,
Namespace: ns,
OwnerReferences: []v1.OwnerReference{
{
APIVersion: serviceCatalogAPIVersion,
Kind: "ServiceInstance",
Name: si.Name,
UID: si.UID,
},
},
},
Spec: v1alpha1.EventActivationSpec{
DisplayName: displayName,
Source: v1alpha1.Source{
Namespace: source.Namespace,
Type: source.Type,
Environment: source.Environment,
},
},
})
if err != nil {
return errors.Wrapf(err, "while creating EventActivation with name: %q in namespace: %q", reID, ns)
}
svc.log.Infof("Created EventActivation: [%s], in namespace: [%s]", reID, ns)
return nil
}
func getNamespaceFromContext(contextProfile map[string]interface{}) (internal.Namespace, error) {
return internal.Namespace(contextProfile["namespace"].(string)), nil
}
func getSvcByID(services []internal.Service, id internal.RemoteServiceID) (internal.Service, error) {
for _, svc := range services {
if svc.ID == id {
return svc, nil
}
}
return internal.Service{}, errors.Errorf("cannot find service with ID [%s]", id)
}