-
Notifications
You must be signed in to change notification settings - Fork 84
/
asynchronous_flow_control_operator.go
139 lines (125 loc) · 7.33 KB
/
asynchronous_flow_control_operator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package operators
import (
"context"
"encoding/json"
"runtime/debug"
"github.com/kyma-incubator/compass/components/director/internal/model"
"github.com/kyma-incubator/compass/components/director/pkg/consumer"
"github.com/kyma-incubator/compass/components/director/pkg/formationconstraint"
"github.com/kyma-incubator/compass/components/director/pkg/log"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const (
// AsynchronousFlowControlOperator represents the asynchronous flow control operator
AsynchronousFlowControlOperator = "AsynchronousFlowControl"
)
// AsynchronousFlowControlOperatorInput is input constructor for AsynchronousFlowControlOperator. It returns empty OperatorInput
func AsynchronousFlowControlOperatorInput() OperatorInput {
return &formationconstraint.AsynchronousFlowControlOperatorInput{}
}
// AsynchronousFlowControlOperator is an operator that based on different conditions behaves like the redirect operator, it redirects the formation assignment notification.
// In other cases it mutates the state, in order to control the flow of the engine, so that the assignment doesn't get deleted too early,
// and it resends the notification to the redirection endpoint, so that it can finish the cleanup.
// It introduces new deleting states.
func (e *ConstraintEngine) AsynchronousFlowControlOperator(ctx context.Context, input OperatorInput) (bool, error) {
log.C(ctx).Infof("Starting executing operator: %s", AsynchronousFlowControlOperator)
defer func() {
if err := recover(); err != nil {
log.C(ctx).WithField(logrus.ErrorKey, err).Panic("recovered panic")
debug.PrintStack()
}
}()
ri, ok := input.(*formationconstraint.AsynchronousFlowControlOperatorInput)
if !ok {
return false, errors.Errorf("Incompatible input for operator: %s", AsynchronousFlowControlOperator)
}
log.C(ctx).Infof("Enforcing constraint on resource of type: %q and subtype: %q for location with constraint type: %q and operation name: %q during %q operation", ri.ResourceType, ri.ResourceSubtype, ri.Location.ConstraintType, ri.Location.OperationName, ri.Operation)
if ri.Operation == model.AssignFormation && ri.Location.OperationName == model.SendNotificationOperation && ri.Location.ConstraintType == model.PreOperation {
return e.RedirectNotification(ctx, &ri.RedirectNotificationInput)
}
if ri.Operation == model.UnassignFormation && ri.Location.OperationName == model.SendNotificationOperation && ri.Location.ConstraintType == model.PreOperation {
formationAssignment, err := RetrieveFormationAssignmentPointer(ctx, ri.FAMemoryAddress)
if err != nil {
return false, err
}
if formationAssignment.State == string(model.InstanceCreatorDeletingAssignmentState) ||
formationAssignment.State == string(model.InstanceCreatorDeleteErrorAssignmentState) {
log.C(ctx).Infof("Tenant mapping participant processing unassign notification has alredy finished, redirecting notification for assignment %q with state %q to instance creator", formationAssignment.ID, formationAssignment.State)
ri.ShouldRedirect = true
return e.RedirectNotification(ctx, &ri.RedirectNotificationInput)
}
return true, nil
}
if ri.Location.OperationName == model.NotificationStatusReturned && ri.Location.ConstraintType == model.PreOperation {
formationAssignment, err := RetrieveFormationAssignmentPointer(ctx, ri.FAMemoryAddress)
if err != nil {
return false, err
}
statusReport, err := RetrieveNotificationStatusReportPointer(ctx, ri.NotificationStatusReportMemoryAddress)
if err != nil {
return false, err
}
if ri.Operation == model.AssignFormation {
if statusReport.State == string(model.ReadyAssignmentState) && !isNotificationStatusReportConfigEmpty(statusReport) {
var assignmentConfig Configuration
if err = json.Unmarshal(statusReport.Configuration, &assignmentConfig); err != nil {
return false, errors.Wrapf(err, "while unmarshalling tenant mapping response configuration for assignment with ID: %q", formationAssignment.ID)
}
if assignmentConfig.Credentials.InboundCommunicationDetails != nil {
statusReport.State = string(model.ConfigPendingAssignmentState)
}
}
return true, nil
}
if ri.Operation == model.UnassignFormation {
if formationAssignment.State == string(model.DeletingAssignmentState) && statusReport.State == string(model.ReadyAssignmentState) {
consumerInfo, err := consumer.LoadFromContext(ctx)
if err != nil {
return false, errors.Wrap(err, "while fetching consumer info from context")
}
// This handles the case when there is the following race condition:
// 1. First unassign sends a notification to the participant, the participant responds with READY and the assignment is in INSTANCE_CREATOR_DELETING
// 2. The second is started after the first and unassign sees the assignment as READY initially, but upon update it sees INSTANCE_CREATOR_DELETING and reverts it back to DELETING.
// Theoretically the instance creator won't respond unless it has been notified.
if consumerInfo.Type == consumer.InstanceCreator {
log.C(ctx).Infof("Instance creator reported %q, proceeding with deletion of formation assignment with ID %q", statusReport.State, formationAssignment.ID)
return true, nil
}
reverseAssignment, err := RetrieveFormationAssignmentPointer(ctx, ri.ReverseFAMemoryAddress)
if err != nil {
log.C(ctx).Warnf(errors.Wrapf(err, "Reverse assignment not found").Error())
}
log.C(ctx).Infof("Tenant mapping participant finished processing unassign notification successfully for assignment with ID %q, changing state to %q", formationAssignment.ID, model.InstanceCreatorDeletingAssignmentState)
formationAssignment.State = string(model.InstanceCreatorDeletingAssignmentState)
statusReport.State = string(model.InstanceCreatorDeletingAssignmentState)
if err = e.formationAssignmentRepo.Update(ctx, formationAssignment); err != nil {
return false, errors.Wrapf(err, "while updating formation assignment with ID %q", formationAssignment.ID)
}
log.C(ctx).Infof("Generating formation assignment notification for assignent with ID %q", formationAssignment.ID)
assignmentPair, err := e.formationAssignmentNotificationSvc.GenerateFormationAssignmentPair(ctx, formationAssignment, reverseAssignment, model.UnassignFormation)
if err != nil {
return false, errors.Wrapf(err, "while generating formation assignment notification")
}
log.C(ctx).Infof("Sending notification to instance creator")
_, err = e.formationAssignmentService.CleanupFormationAssignment(ctx, assignmentPair)
if err != nil {
return false, err
}
return true, nil
}
if formationAssignment.State == string(model.DeletingAssignmentState) && statusReport.State == string(model.DeleteErrorAssignmentState) {
return true, nil
}
if formationAssignment.State == string(model.InstanceCreatorDeletingAssignmentState) && statusReport.State == string(model.ReadyAssignmentState) {
log.C(ctx).Infof("Instance creator reported %q, proceeding with deletion of formation assignment with ID %q", statusReport.State, formationAssignment.ID)
return true, nil
}
if formationAssignment.State == string(model.InstanceCreatorDeletingAssignmentState) && statusReport.State == string(model.DeleteErrorFormationState) {
statusReport.State = string(model.InstanceCreatorDeleteErrorAssignmentState)
return true, nil
}
}
}
return true, nil
}