/
update_org_plan_handler.go
139 lines (122 loc) · 5.53 KB
/
update_org_plan_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package event_handler
import (
"context"
"time"
"github.com/openline-ai/openline-customer-os/packages/server/customer-os-common-module/utils"
"github.com/openline-ai/openline-customer-os/packages/server/events-processing-platform/config"
commonAggregate "github.com/openline-ai/openline-customer-os/packages/server/events-processing-platform/domain/common/aggregate"
"github.com/openline-ai/openline-customer-os/packages/server/events-processing-platform/domain/organization/aggregate"
event "github.com/openline-ai/openline-customer-os/packages/server/events-processing-platform/domain/organization_plan/events"
"github.com/openline-ai/openline-customer-os/packages/server/events-processing-platform/domain/organization_plan/model"
"github.com/openline-ai/openline-customer-os/packages/server/events-processing-platform/eventstore"
"github.com/openline-ai/openline-customer-os/packages/server/events-processing-platform/logger"
"github.com/openline-ai/openline-customer-os/packages/server/events-processing-platform/tracing"
orgplanpb "github.com/openline-ai/openline-customer-os/packages/server/events-processing-proto/gen/proto/go/api/grpc/v1/org_plan"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
)
type UpdateOrganizationPlanHandler interface {
Handle(ctx context.Context, baseRequest eventstore.BaseRequest, request *orgplanpb.UpdateOrganizationPlanGrpcRequest) error
}
type updateOrganizationPlanHandler struct {
log logger.Logger
es eventstore.AggregateStore
cfg config.Utils
}
func NewUpdateOrganizationPlanHandler(log logger.Logger, es eventstore.AggregateStore, cfg config.Utils) UpdateOrganizationPlanHandler {
return &updateOrganizationPlanHandler{log: log, es: es, cfg: cfg}
}
// Handle processes the UpdateOrganizationPlanCommand to update a new master plan.
func (h *updateOrganizationPlanHandler) Handle(ctx context.Context, baseRequest eventstore.BaseRequest, request *orgplanpb.UpdateOrganizationPlanGrpcRequest) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "UpdateOrganizationPlanHandler.Handle")
defer span.Finish()
tracing.SetCommandHandlerSpanTags(ctx, span, baseRequest.Tenant, baseRequest.LoggedInUserId)
tracing.LogObjectAsJson(span, "common", baseRequest)
tracing.LogObjectAsJson(span, "request", request)
for attempt := 0; attempt == 0 || attempt < h.cfg.RetriesOnOptimisticLockException; attempt++ {
// Load or initialize the org aggregate
orgAggregate, err := aggregate.LoadOrganizationAggregate(ctx, h.es, baseRequest.Tenant, request.OrgId, eventstore.LoadAggregateOptions{})
if err != nil {
tracing.TraceErr(span, err)
return err
}
if eventstore.IsAggregateNotFound(orgAggregate) {
tracing.TraceErr(span, eventstore.ErrAggregateNotFound)
return eventstore.ErrAggregateNotFound
}
updatedAtNotNil := utils.IfNotNilTimeWithDefault(utils.TimestampProtoToTimePtr(request.UpdatedAt), utils.Now())
statusDetails := model.OrganizationPlanDetails{}
if request.StatusDetails != nil {
statusDetails = model.OrganizationPlanDetails{
Status: request.StatusDetails.Status,
UpdatedAt: updatedAtNotNil,
Comments: request.StatusDetails.Comments,
}
}
evt, err := event.NewOrganizationPlanUpdateEvent(orgAggregate, request.OrganizationPlanId, request.Name, request.Retired, updatedAtNotNil, extractOrganizationPlanFieldsMask(request.FieldsMask), statusDetails)
if err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "NewOrganizationPlanUpdateEvent")
}
commonAggregate.EnrichEventWithMetadataExtended(&evt, span, commonAggregate.EventMetadata{
Tenant: request.Tenant,
UserId: request.LoggedInUserId,
App: baseRequest.SourceFields.AppSource,
})
err = orgAggregate.Apply(evt)
if err != nil {
tracing.TraceErr(span, err)
return err
}
// Persist the changes to the event store
err = h.es.Save(ctx, orgAggregate)
if err == nil {
return nil // Save successful
}
if eventstore.IsEventStoreErrorCodeWrongExpectedVersion(err) {
// Handle concurrency error
if attempt == h.cfg.RetriesOnOptimisticLockException-1 {
// If we have reached the maximum number of retries, return an error
tracing.TraceErr(span, err)
return err
}
span.LogFields(log.Int("retryAttempt", attempt+1))
time.Sleep(utils.BackOffExponentialDelay(attempt)) // backoffDelay is a function that increases the delay with each attempt
continue // Retry
} else {
// Some other error occurred
tracing.TraceErr(span, err)
return err
}
}
return nil
}
func extractOrganizationPlanFieldsMask(fields []orgplanpb.OrganizationPlanFieldMask) []string {
fieldsMask := make([]string, 0)
if len(fields) == 0 {
return fieldsMask
}
if containsOrganizationPlanMaskFieldAll(fields) {
return fieldsMask
}
for _, field := range fields {
switch field {
case orgplanpb.OrganizationPlanFieldMask_ORGANIZATION_PLAN_PROPERTY_NAME:
fieldsMask = append(fieldsMask, event.FieldMaskName)
case orgplanpb.OrganizationPlanFieldMask_ORGANIZATION_PLAN_PROPERTY_RETIRED:
fieldsMask = append(fieldsMask, event.FieldMaskRetired)
case orgplanpb.OrganizationPlanFieldMask_ORGANIZATION_PLAN_PROPERTY_STATUS_DETAILS:
fieldsMask = append(fieldsMask, event.FieldMaskStatusDetails)
}
}
return utils.RemoveDuplicates(fieldsMask)
}
func containsOrganizationPlanMaskFieldAll(fields []orgplanpb.OrganizationPlanFieldMask) bool {
for _, field := range fields {
if field == orgplanpb.OrganizationPlanFieldMask_ORGANIZATION_PLAN_PROPERTY_ALL {
return true
}
}
return false
}