Skip to content

Commit

Permalink
feat: ✨ add trace and metrics pipelines for cqrs operations
Browse files Browse the repository at this point in the history
  • Loading branch information
mehdihadeli committed Oct 6, 2023
1 parent 26bf420 commit 1bbf375
Show file tree
Hide file tree
Showing 100 changed files with 1,791 additions and 976 deletions.
2 changes: 0 additions & 2 deletions internal/pkg/core/core_fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package core
import (
"github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/core/serializer"
"github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/core/serializer/json"
defaultLogger "github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/logger/default_logger"

"go.uber.org/fx"
)
Expand All @@ -17,5 +16,4 @@ var Module = fx.Module(
serializer.NewDefaultEventSerializer,
serializer.NewDefaultMetadataSerializer,
),
fx.Invoke(defaultLogger.SetupDefaultLogger),
)
121 changes: 91 additions & 30 deletions internal/pkg/core/serializer/json/json_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,69 +19,70 @@ func NewDefaultSerializer() serializer.Serializer {
// https://www.sohamkamani.com/golang/json/#decoding-json-to-maps---unstructured-data
// https://developpaper.com/mapstructure-of-go/
// https://github.com/goccy/go-json

func (s *jsonSerializer) Marshal(v interface{}) ([]byte, error) {
return json.Marshal(v)
return Marshal(v)
}

// Unmarshal is a wrapper around json.Unmarshal.
// To unmarshal JSON into an interface value, Unmarshal stores in a map[string]interface{}
func (s *jsonSerializer) Unmarshal(data []byte, v interface{}) error {
// https://pkg.go.dev/encoding/json#Unmarshal
err := json.Unmarshal(data, v)
if err != nil {
return err
}
log.Printf("deserialize structure object")

return nil
return Unmarshal(data, v)
}

// UnmarshalFromJson is a wrapper around json.Unmarshal.
func (s *jsonSerializer) UnmarshalFromJson(data string, v interface{}) error {
err := s.Unmarshal([]byte(data), v)
if err != nil {
return err
}

return nil
return UnmarshalFromJSON(data, v)
}

// DecodeWithMapStructure is a wrapper around mapstructure.Decode.
// Decode takes an input structure or map[string]interface{} and uses reflection to translate it to the output structure. output must be a pointer to a map or struct.
// https://pkg.go.dev/github.com/mitchellh/mapstructure#section-readme
func (s *jsonSerializer) DecodeWithMapStructure(input interface{}, output interface{}) error {
// https://developpaper.com/mapstructure-of-go/
return mapstructure.Decode(input, output)
func (s *jsonSerializer) DecodeWithMapStructure(
input interface{},
output interface{},
) error {
return DecodeWithMapStructure(input, output)
}

func (s *jsonSerializer) UnmarshalToMap(data []byte, v *map[string]interface{}) error {
// https://developpaper.com/mapstructure-of-go/
err := json.Unmarshal(data, v)
if err != nil {
return err
}
return nil
func (s *jsonSerializer) UnmarshalToMap(
data []byte,
v *map[string]interface{},
) error {
return UnmarshalToMap(data, v)
}

func (s *jsonSerializer) UnmarshalToMapFromJson(data string, v *map[string]interface{}) error {
return s.UnmarshalToMap([]byte(data), v)
func (s *jsonSerializer) UnmarshalToMapFromJson(
data string,
v *map[string]interface{},
) error {
return UnmarshalToMapFromJson(data, v)
}

// PrettyPrint print input object as a formatted json string
func (s *jsonSerializer) PrettyPrint(data interface{}) string {
return PrettyPrint(data)
}

// ColoredPrettyPrint print input object as a formatted json string with color
func (s *jsonSerializer) ColoredPrettyPrint(data interface{}) string {
return ColoredPrettyPrint(data)
}

func PrettyPrint(data interface{}) string {
// https://gosamples.dev/pretty-print-json/
val, err := json.MarshalIndent(data, "", " ")
if err != nil {
return ""
}

return string(val)
}

// ColoredPrettyPrint print input object as a formatted json string with color
func (s *jsonSerializer) ColoredPrettyPrint(data interface{}) string {
func ColoredPrettyPrint(data interface{}) string {
// https://github.com/TylerBrock/colorjson
var obj map[string]interface{}
err := json.Unmarshal([]byte(s.PrettyPrint(data)), &obj)
err := json.Unmarshal([]byte(PrettyPrint(data)), &obj)
if err != nil {
return ""
}
Expand All @@ -92,5 +93,65 @@ func (s *jsonSerializer) ColoredPrettyPrint(data interface{}) string {
if err != nil {
return ""
}

return string(val)
}

func Marshal(v interface{}) ([]byte, error) {
return json.Marshal(v)
}

// Unmarshal is a wrapper around json.Unmarshal.
// To unmarshal JSON into an interface value, Unmarshal stores in a map[string]interface{}
func Unmarshal(data []byte, v interface{}) error {
// https://pkg.go.dev/encoding/json#Unmarshal
err := json.Unmarshal(data, v)
if err != nil {
return err
}

log.Printf("deserialize structure object")

return nil
}

// UnmarshalFromJSON is a wrapper around json.Unmarshal.
func UnmarshalFromJSON(data string, v interface{}) error {
err := Unmarshal([]byte(data), v)
if err != nil {
return err
}

return nil
}

// DecodeWithMapStructure is a wrapper around mapstructure.Decode.
// Decode takes an input structure or map[string]interface{} and uses reflection to translate it to the output structure. output must be a pointer to a map or struct.
// https://pkg.go.dev/github.com/mitchellh/mapstructure#section-readme
func DecodeWithMapStructure(
input interface{},
output interface{},
) error {
// https://developpaper.com/mapstructure-of-go/
return mapstructure.Decode(input, output)
}

func UnmarshalToMap(
data []byte,
v *map[string]interface{},
) error {
// https://developpaper.com/mapstructure-of-go/
err := json.Unmarshal(data, v)
if err != nil {
return err
}

return nil
}

func UnmarshalToMapFromJson(
data string,
v *map[string]interface{},
) error {
return UnmarshalToMap([]byte(data), v)
}
57 changes: 40 additions & 17 deletions internal/pkg/eventstroredb/aggregate_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
expectedStreamVersion "github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/es/models/stream_version"
esErrors "github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/eventstroredb/errors"
"github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/logger"
"github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/otel/tracing"
"github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/otel/tracing/attribute"
"github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/otel/tracing/utils"
typeMapper "github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/reflection/type_mappper"

"emperror.dev/errors"
Expand Down Expand Up @@ -55,7 +55,9 @@ func (a *esdbAggregateStore[T]) StoreWithVersion(
ctx context.Context,
) (*appendResult.AppendEventsResult, error) {
ctx, span := a.tracer.Start(ctx, "esdbAggregateStore.StoreWithVersion")
span.SetAttributes(attribute2.String("AggregateID", aggregate.Id().String()))
span.SetAttributes(
attribute2.String("AggregateID", aggregate.Id().String()),
)
defer span.End()

if len(aggregate.UncommittedEvents()) == 0 {
Expand Down Expand Up @@ -97,7 +99,7 @@ func (a *esdbAggregateStore[T]) StoreWithVersion(
ctx,
)
if err != nil {
return nil, tracing.TraceErrFromSpan(
return nil, utils.TraceErrStatusFromSpan(
span,
errors.WrapIff(
err,
Expand Down Expand Up @@ -130,11 +132,18 @@ func (a *esdbAggregateStore[T]) Store(
ctx, span := a.tracer.Start(ctx, "esdbAggregateStore.Store")
defer span.End()

expectedVersion := expectedStreamVersion.FromInt64(aggregate.OriginalVersion())
expectedVersion := expectedStreamVersion.FromInt64(
aggregate.OriginalVersion(),
)

streamAppendResult, err := a.StoreWithVersion(aggregate, metadata, expectedVersion, ctx)
streamAppendResult, err := a.StoreWithVersion(
aggregate,
metadata,
expectedVersion,
ctx,
)
if err != nil {
return nil, tracing.TraceErrFromSpan(
return nil, utils.TraceErrStatusFromSpan(
span,
errors.WrapIff(
err,
Expand All @@ -147,7 +156,10 @@ func (a *esdbAggregateStore[T]) Store(
return streamAppendResult, nil
}

func (a *esdbAggregateStore[T]) Load(ctx context.Context, aggregateId uuid.UUID) (T, error) {
func (a *esdbAggregateStore[T]) Load(
ctx context.Context,
aggregateId uuid.UUID,
) (T, error) {
ctx, span := a.tracer.Start(ctx, "esdbAggregateStore.Load")
defer span.End()

Expand Down Expand Up @@ -181,7 +193,7 @@ func (a *esdbAggregateStore[T]) LoadWithReadPosition(

method := reflect.ValueOf(aggregate).MethodByName("NewEmptyAggregate")
if !method.IsValid() {
return *new(T), tracing.TraceErrFromSpan(
return *new(T), utils.TraceErrStatusFromSpan(
span,
errors.New(
"[esdbAggregateStore_LoadWithReadPosition:MethodByName] aggregate does not have a `NewEmptyAggregate` method",
Expand All @@ -196,16 +208,17 @@ func (a *esdbAggregateStore[T]) LoadWithReadPosition(

streamEvents, err := a.getStreamEvents(streamId, position, ctx)
if errors.Is(err, esdb.ErrStreamNotFound) || len(streamEvents) == 0 {
return *new(T), tracing.TraceErrFromSpan(
return *new(T), utils.TraceErrStatusFromSpan(
span,
errors.WithMessage(
esErrors.NewAggregateNotFoundError(err, aggregateId),
"[esdbAggregateStore.LoadWithReadPosition] error in loading aggregate",
),
)
}

if err != nil {
return *new(T), tracing.TraceErrFromSpan(
return *new(T), utils.TraceErrStatusFromSpan(
span,
errors.WrapIff(
err,
Expand All @@ -228,20 +241,25 @@ func (a *esdbAggregateStore[T]) LoadWithReadPosition(

err = aggregate.LoadFromHistory(domainEvents, meta)
if err != nil {
return *new(T), tracing.TraceErrFromSpan(span, err)
return *new(T), utils.TraceStatusFromSpan(span, err)
}

a.log.Infow(fmt.Sprintf("Loaded aggregate with streamId {%s} and aggregateId {%s}",
streamId.String(),
aggregateId.String()),
logger.Fields{"Aggregate": aggregate, "StreamId": streamId.String()})
a.log.Infow(
fmt.Sprintf("Loaded aggregate with streamId {%s} and aggregateId {%s}",
streamId.String(),
aggregateId.String()),
logger.Fields{"Aggregate": aggregate, "StreamId": streamId.String()},
)

span.SetAttributes(attribute.Object("Aggregate", aggregate))

return aggregate, nil
}

func (a *esdbAggregateStore[T]) Exists(ctx context.Context, aggregateId uuid.UUID) (bool, error) {
func (a *esdbAggregateStore[T]) Exists(
ctx context.Context,
aggregateId uuid.UUID,
) (bool, error) {
ctx, span := a.tracer.Start(ctx, "esdbAggregateStore.Exists")
span.SetAttributes(attribute2.String("AggregateID", aggregateId.String()))
defer span.End()
Expand All @@ -261,7 +279,12 @@ func (a *esdbAggregateStore[T]) getStreamEvents(
var streamEvents []*models.StreamEvent

for true {
events, err := a.eventStore.ReadEvents(streamId, position, uint64(pageSize), ctx)
events, err := a.eventStore.ReadEvents(
streamId,
position,
uint64(pageSize),
ctx,
)
if err != nil {
return nil, errors.WrapIff(
err,
Expand Down
Loading

0 comments on commit 1bbf375

Please sign in to comment.