/
aggregate.go
120 lines (102 loc) · 4.08 KB
/
aggregate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package offering
import (
"github.com/openline-ai/openline-customer-os/packages/server/events-processing-platform/constants"
"github.com/openline-ai/openline-customer-os/packages/server/events-processing-platform/domain/common/aggregate"
"github.com/openline-ai/openline-customer-os/packages/server/events-processing-platform/eventstore"
"github.com/openline-ai/openline-customer-os/packages/server/events-processing-platform/tracing"
offeringpb "github.com/openline-ai/openline-customer-os/packages/server/events-processing-proto/gen/proto/go/api/grpc/v1/offering"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"golang.org/x/net/context"
"strings"
)
const OfferingAggregateType = "offering"
type OfferingAggregate struct {
*aggregate.CommonTenantIdAggregate
Offering *Offering
}
func GetOfferingObjectID(aggregateID string, tenant string) string {
return aggregate.GetAggregateObjectID(aggregateID, tenant, OfferingAggregateType)
}
func LoadOfferingAggregate(ctx context.Context, eventStore eventstore.AggregateStore, tenant, objectID string, opts eventstore.LoadAggregateOptions) (*OfferingAggregate, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "LoadOfferingAggregate")
defer span.Finish()
span.SetTag(tracing.SpanTagTenant, tenant)
span.LogFields(log.String("ObjectID", objectID))
OfferingAggregate := NewOfferingAggregateWithTenantAndID(tenant, objectID)
err := aggregate.LoadAggregate(ctx, eventStore, OfferingAggregate, opts)
if err != nil {
tracing.TraceErr(span, err)
return nil, err
}
return OfferingAggregate, nil
}
func (a *OfferingAggregate) HandleRequest(ctx context.Context, request any, params map[string]any) (any, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "OfferingAggregate.HandleRequest")
defer span.Finish()
switch r := request.(type) {
case *offeringpb.CreateOfferingGrpcRequest:
return nil, a.CreateOffering(ctx, r)
case *offeringpb.UpdateOfferingGrpcRequest:
return nil, nil //TODO
default:
tracing.TraceErr(span, eventstore.ErrInvalidRequestType)
return nil, eventstore.ErrInvalidRequestType
}
}
func NewOfferingAggregateWithTenantAndID(tenant, id string) *OfferingAggregate {
OfferingAggregate := OfferingAggregate{}
OfferingAggregate.CommonTenantIdAggregate = aggregate.NewCommonAggregateWithTenantAndId(OfferingAggregateType, tenant, id)
OfferingAggregate.SetWhen(OfferingAggregate.When)
OfferingAggregate.Offering = &Offering{}
OfferingAggregate.Tenant = tenant
return &OfferingAggregate
}
func (a *OfferingAggregate) When(event eventstore.Event) error {
switch event.GetEventType() {
case OfferingCreateV1:
return a.whenOfferingCreate(event)
default:
if strings.HasPrefix(event.GetEventType(), constants.EsInternalStreamPrefix) {
return nil
}
err := eventstore.ErrInvalidEventType
err.EventType = event.GetEventType()
return err
}
}
func (a *OfferingAggregate) whenOfferingCreate(evt eventstore.Event) error {
return nil
}
func (a *OfferingAggregate) CreateOffering(ctx context.Context, request *offeringpb.CreateOfferingGrpcRequest) error {
span, _ := opentracing.StartSpanFromContext(ctx, "OfferingAggregate.CreateOffering")
defer span.Finish()
span.SetTag(tracing.SpanTagTenant, a.GetTenant())
span.SetTag(tracing.SpanTagAggregateId, a.GetID())
span.LogFields(log.Int64("AggregateVersion", a.GetVersion()))
//createdAtNotNil := utils.IfNotNilTimeWithDefault(utils.TimestampProtoToTime(request.CreatedAt), utils.Now())
//sourceFields := commonmodel.Source{}
//sourceFields.FromGrpc(request.SourceFields)
//
//createEvent, err := NewOfferingCreateEvent(
// a,
// request.Content,
// request.LoggedInUserId,
// request.OrganizationId,
// request.Dismissed,
// createdAtNotNil,
// dueDateNotNil,
// sourceFields,
//)
//if err != nil {
// tracing.TraceErr(span, err)
// return errors.Wrap(err, "NewOfferingCreateEvent")
//}
//aggregate.EnrichEventWithMetadataExtended(&createEvent, span, aggregate.EventMetadata{
// Tenant: request.Tenant,
// UserId: request.LoggedInUserId,
// App: request.SourceFields.AppSource,
//})
//return a.Apply(createEvent)
return nil
}