Skip to content

Commit

Permalink
feat: ✨ supporting Inbox and Outbox Pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
mehdihadeli committed Dec 17, 2023
1 parent e0ec597 commit e6a5a51
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 41 deletions.
20 changes: 10 additions & 10 deletions internal/pkg/core/messaging/types/message_envelope.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package types

type MessageEnvelope struct {
Message interface{}
Message IMessage
Headers map[string]interface{}
}

func NewMessageEnvelope(
message interface{},
message IMessage,
headers map[string]interface{},
) *MessageEnvelope {
if headers == nil {
Expand All @@ -19,19 +19,19 @@ func NewMessageEnvelope(
}
}

type MessageEnvelopeTMessage struct {
type MessageEnvelopeT[T IMessage] struct {
*MessageEnvelope
MessageTMessage interface{}
Message T
}

func NewMessageEnvelopeTMessage(
messageTMessage interface{},
func NewMessageEnvelopeT[T IMessage](
message T,
headers map[string]interface{},
) *MessageEnvelopeTMessage {
messageEnvelope := NewMessageEnvelope(messageTMessage, headers)
) *MessageEnvelopeT[T] {
messageEnvelope := NewMessageEnvelope(message, headers)

return &MessageEnvelopeTMessage{
return &MessageEnvelopeT[T]{
MessageEnvelope: messageEnvelope,
MessageTMessage: messageTMessage,
Message: message,
}
}
2 changes: 2 additions & 0 deletions internal/pkg/core/serializer/contratcs/message_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

type MessageSerializer interface {
Serialize(message types.IMessage) (*EventSerializationResult, error)
SerializeObject(message interface{}) (*EventSerializationResult, error)
SerializeEnvelop(messageEnvelop types.MessageEnvelope) (*EventSerializationResult, error)
Deserialize(data []byte, messageType string, contentType string) (types.IMessage, error)
DeserializeObject(data []byte, messageType string, contentType string) (interface{}, error)
DeserializeType(data []byte, messageType reflect.Type, contentType string) (types.IMessage, error)
Expand Down
26 changes: 5 additions & 21 deletions internal/pkg/core/serializer/json/event_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ func NewDefaultEventJsonSerializer(serializer contratcs.Serializer) contratcs.Ev
return &DefaultEventJsonSerializer{serializer: serializer}
}

func (s *DefaultEventJsonSerializer) Serializer() contratcs.Serializer {
return s.serializer
}

func (s *DefaultEventJsonSerializer) Serialize(event domain.IDomainEvent) (*contratcs.EventSerializationResult, error) {
return s.SerializeObject(event)
}
Expand Down Expand Up @@ -110,25 +106,13 @@ func (s *DefaultEventJsonSerializer) DeserializeType(
// we use event short type name instead of full type name because this event in other receiver packages could have different package name
eventTypeName := typeMapper.GetTypeName(eventType)

targetEventPointer := typeMapper.EmptyInstanceByTypeNameAndImplementedInterface[domain.IDomainEvent](
eventTypeName,
)

if targetEventPointer == nil {
return nil, errors.Errorf("event type `%s` is not impelemted IDomainEvent or can't be instansiated", eventType)
}

if contentType != s.ContentType() {
return nil, errors.Errorf("contentType: %s is not supported", contentType)
}

if err := s.serializer.Unmarshal(data, targetEventPointer); err != nil {
return nil, errors.WrapIff(err, "error in Unmarshaling: `%s`", eventType)
}

return targetEventPointer.(domain.IDomainEvent), nil
return s.Deserialize(data, eventTypeName, contentType)
}

func (s *DefaultEventJsonSerializer) ContentType() string {
return "application/json"
}

func (s *DefaultEventJsonSerializer) Serializer() contratcs.Serializer {
return s.serializer
}
126 changes: 126 additions & 0 deletions internal/pkg/core/serializer/json/message_serializer.go
Original file line number Diff line number Diff line change
@@ -1 +1,127 @@
package json

import (
"reflect"

"github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/core/messaging/types"
"github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/core/serializer/contratcs"
typeMapper "github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/reflection/typemapper"

"emperror.dev/errors"
)

type DefaultMessageJsonSerializer struct {
serializer contratcs.Serializer
}

func NewDefaultMessageJsonSerializer(serializer contratcs.Serializer) contratcs.MessageSerializer {
return &DefaultMessageJsonSerializer{serializer: serializer}
}

func (m *DefaultMessageJsonSerializer) Serialize(message types.IMessage) (*contratcs.EventSerializationResult, error) {
return m.SerializeObject(message)
}

func (m *DefaultMessageJsonSerializer) SerializeObject(
message interface{},
) (*contratcs.EventSerializationResult, error) {
if message == nil {
return &contratcs.EventSerializationResult{Data: nil, ContentType: m.ContentType()}, nil
}

// we use message short type name instead of full type name because this message in other receiver packages could have different package name
eventType := typeMapper.GetTypeName(message)

data, err := m.serializer.Marshal(message)
if err != nil {
return nil, errors.WrapIff(err, "error in Marshaling: `%s`", eventType)
}

result := &contratcs.EventSerializationResult{Data: data, ContentType: m.ContentType()}

return result, nil
}

func (m *DefaultMessageJsonSerializer) SerializeEnvelop(
messageEnvelop types.MessageEnvelope,
) (*contratcs.EventSerializationResult, error) {
// TODO implement me
panic("implement me")
}

func (m *DefaultMessageJsonSerializer) Deserialize(
data []byte,
messageType string,
contentType string,
) (types.IMessage, error) {
if data == nil {
return nil, nil
}

targetMessagePointer := typeMapper.EmptyInstanceByTypeNameAndImplementedInterface[types.IMessage](
messageType,
)

if targetMessagePointer == nil {
return nil, errors.Errorf("message type `%s` is not impelemted IMessage or can't be instansiated", messageType)
}

if contentType != m.ContentType() {
return nil, errors.Errorf("contentType: %s is not supported", contentType)
}

if err := m.serializer.Unmarshal(data, targetMessagePointer); err != nil {
return nil, errors.WrapIff(err, "error in Unmarshaling: `%s`", messageType)
}

return targetMessagePointer.(types.IMessage), nil
}

func (m *DefaultMessageJsonSerializer) DeserializeObject(
data []byte,
messageType string,
contentType string,
) (interface{}, error) {
if data == nil {
return nil, nil
}

targetMessagePointer := typeMapper.InstanceByTypeName(messageType)

if targetMessagePointer == nil {
return nil, errors.Errorf("message type `%s` can't be instansiated", messageType)
}

if contentType != m.ContentType() {
return nil, errors.Errorf("contentType: %s is not supported", contentType)
}

if err := m.serializer.Unmarshal(data, targetMessagePointer); err != nil {
return nil, errors.WrapIff(err, "error in Unmarshaling: `%s`", messageType)
}

return targetMessagePointer, nil
}

func (m *DefaultMessageJsonSerializer) DeserializeType(
data []byte,
messageType reflect.Type,
contentType string,
) (types.IMessage, error) {
if data == nil {
return nil, nil
}

// we use message short type name instead of full type name because this message in other receiver packages could have different package name
messageTypeName := typeMapper.GetTypeName(messageType)

return m.Deserialize(data, messageTypeName, contentType)
}

func (m *DefaultMessageJsonSerializer) ContentType() string {
return "application/json"
}

func (m *DefaultMessageJsonSerializer) Serializer() contratcs.Serializer {
return m.serializer
}
2 changes: 1 addition & 1 deletion internal/pkg/eventstroredb/esdb_serilizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (e *EsdbSerializer) SerializeObject(
data interface{},
meta metadata.Metadata,
) (*esdb.EventData, error) {
serializedData, err := e.eventSerializer.Serialize(data)
serializedData, err := e.eventSerializer.SerializeObject(data)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (e *esdbSubscriptionCheckpointRepository) Store(
Event: events.NewEvent(typeMapper.GetTypeName(&CheckpointStored{})),
}
streamName := getCheckpointStreamName(subscriptionId)
eventData, err := e.esdbSerilizer.Serialize(checkpoint, nil)
eventData, err := e.esdbSerilizer.SerializeObject(checkpoint, nil)
if err != nil {
return errors.WrapIf(err, "esdbSerilizer.Serialize")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"reflect"

"github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/core/messaging/persistmessage"
"github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/core/messaging/types"
Expand Down Expand Up @@ -44,7 +43,7 @@ func (m *postgresMessageService) AddReceivedMessage(messageEnvelope types.Messag
panic("implement me")
}

func AddMessageCore(
func (m *postgresMessageService) AddMessageCore(
ctx context.Context,
messageEnvelope types.MessageEnvelope,
deliveryType persistmessage.MessageDeliveryType,
Expand All @@ -63,15 +62,23 @@ func AddMessageCore(
id = uuid.NewV4().String()
}

storeMessage := persistmessage.NewStoreMessage()
data, err := m.messageSerializer.SerializeEnvelop(messageEnvelope)
if err != nil {
return err
}

persistmessage.StoreMessage{
ID: id,
TypeName: reflect.TypeOf(messageEnvelope.Message).Name(),
Serialized: _messageSerializer.Serialize(messageEnvelope),
DeliveryType: deliveryType,
uuidId, err := uuid.FromString(id)
if err != nil {
return err
}

storeMessage := persistmessage.NewStoreMessage(
uuidId,
messageEnvelope.Message.GetMessageFullTypeName(),
string(data.Data),
deliveryType,
)

err := _messagePersistenceRepository.AddAsync(storeMessage, cancellationToken)
if err != nil {
return err
Expand Down

0 comments on commit e6a5a51

Please sign in to comment.