/
temp_aggregate.go
60 lines (51 loc) · 2.43 KB
/
temp_aggregate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package aggregate
import (
"github.com/openline-ai/openline-customer-os/packages/server/events-processing-platform/domain/common/aggregate"
"github.com/openline-ai/openline-customer-os/packages/server/events-processing-platform/domain/contract/event"
"github.com/openline-ai/openline-customer-os/packages/server/events-processing-platform/eventstore"
"github.com/openline-ai/openline-customer-os/packages/server/events-processing-platform/tracing"
contractpb "github.com/openline-ai/openline-customer-os/packages/server/events-processing-proto/gen/proto/go/api/grpc/v1/contract"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
type ContractTempAggregate struct {
*aggregate.CommonTenantIdTempAggregate
}
func NewContractTempAggregateWithTenantAndID(tenant, id string) *ContractTempAggregate {
contractTempAggregate := ContractTempAggregate{}
contractTempAggregate.CommonTenantIdTempAggregate = aggregate.NewCommonTempAggregateWithTenantAndId(ContractAggregateType, tenant, id)
contractTempAggregate.Tenant = tenant
return &contractTempAggregate
}
func (a *ContractTempAggregate) HandleGRPCRequest(ctx context.Context, request any, params map[string]any) (any, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "ContractTempAggregate.HandleRequest")
defer span.Finish()
switch r := request.(type) {
case *contractpb.RefreshContractStatusGrpcRequest:
return nil, a.refreshContractStatus(ctx, r)
default:
tracing.TraceErr(span, eventstore.ErrInvalidRequestType)
return nil, eventstore.ErrInvalidRequestType
}
}
func (a *ContractTempAggregate) refreshContractStatus(ctx context.Context, request *contractpb.RefreshContractStatusGrpcRequest) error {
span, _ := opentracing.StartSpanFromContext(ctx, "ContractAggregate.refreshContractStatus")
defer span.Finish()
span.SetTag(tracing.SpanTagTenant, a.Tenant)
span.SetTag(tracing.SpanTagAggregateId, a.GetID())
span.LogFields(log.Int64("aggregateVersion", a.GetVersion()))
tracing.LogObjectAsJson(span, "request", request)
updateEvent, err := event.NewContractRefreshStatusEvent(a)
if err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "NewContractRefreshStatusEvent")
}
aggregate.EnrichEventWithMetadataExtended(&updateEvent, span, aggregate.EventMetadata{
Tenant: a.Tenant,
UserId: request.LoggedInUserId,
App: request.GetAppSource(),
})
return a.Apply(updateEvent)
}