Skip to content

Commit

Permalink
feature: updated command interface and event interface to pass key ob…
Browse files Browse the repository at this point in the history
…jects to receivers/handlers
  • Loading branch information
akeemphilbert committed Apr 9, 2024
1 parent 2bd29c4 commit c4bd9eb
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 288 deletions.
8 changes: 6 additions & 2 deletions rest/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"go.uber.org/fx"
"golang.org/x/net/context"
"gorm.io/gorm"
"net/http"
"sync"
"time"
)
Expand Down Expand Up @@ -39,7 +41,7 @@ type DefaultCommandDispatcher struct {
dispatch sync.Mutex
}

func (e *DefaultCommandDispatcher) Dispatch(ctx context.Context, command *Command, logger Log, options *CommandOptions) (response CommandResponse, err error) {
func (e *DefaultCommandDispatcher) Dispatch(ctx context.Context, logger Log, command *Command, options *CommandOptions) (response CommandResponse, err error) {
var wg sync.WaitGroup
var allHandlers []CommandHandler
//first preference is handlers for specific command type and entity type
Expand Down Expand Up @@ -69,7 +71,7 @@ func (e *DefaultCommandDispatcher) Dispatch(ctx context.Context, command *Comman
}
wg.Done()
}()
response, err = handler(ctx, command, logger, options)
response, err = handler(ctx, logger, command, options)
}()
}

Expand Down Expand Up @@ -125,4 +127,6 @@ type CommandOptions struct {
ResourceRepository *ResourceRepository
DefaultProjection Projection
Projections map[string]Projection
HttpClient *http.Client
GORMDB *gorm.DB
}
6 changes: 4 additions & 2 deletions rest/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type ControllerParams struct {
Operation map[string]*openapi3.Operation
Echo *echo.Echo
APIConfig *APIConfig
HttpClient *http.Client
}

// DefaultWriteController handles the write operations (create, update, delete)
Expand Down Expand Up @@ -113,11 +114,12 @@ func DefaultWriteController(p *ControllerParams) echo.HandlerFunc {
if projection, ok := p.Projections[resourceType]; ok {
defaultProjection = projection
}
response, err := p.CommandDispatcher.Dispatch(ctxt.Request().Context(), &Command{
response, err := p.CommandDispatcher.Dispatch(ctxt.Request().Context(), ctxt.Logger(), &Command{
Type: commandName,
}, ctxt.Logger(), &CommandOptions{
}, &CommandOptions{
ResourceRepository: p.ResourceRepository,
DefaultProjection: defaultProjection,
HttpClient: p.HttpClient,
})

if response.Code != 0 {
Expand Down
2 changes: 1 addition & 1 deletion rest/controllers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestDefaultWriteController(t *testing.T) {
}
repository := result.Repository
commandDispatcher := &CommandDispatcherMock{
DispatchFunc: func(ctx context.Context, command *rest.Command, logger rest.Log, options *rest.CommandOptions) (rest.CommandResponse, error) {
DispatchFunc: func(ctx context.Context, logger rest.Log, command *rest.Command, options *rest.CommandOptions) (rest.CommandResponse, error) {
return rest.CommandResponse{
Code: 200,
}, nil
Expand Down
9 changes: 9 additions & 0 deletions rest/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"github.com/getkin/kin-openapi/openapi3"
"gorm.io/gorm"
"net/http"
)

type Event struct {
Expand All @@ -26,6 +27,14 @@ type EventMeta struct {
Created string `json:"created"`
}

type EventOptions struct {
ResourceRepository *ResourceRepository
DefaultProjection Projection
Projections map[string]Projection
HttpClient *http.Client
GORMDB *gorm.DB
}

func (e *Event) NewChange(event *Event) {
//TODO implement me
panic("implement me")
Expand Down
10 changes: 5 additions & 5 deletions rest/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func TestDefaultEventDisptacher_AddSubscriber(t *testing.T) {
eventDispatcher := new(rest.GORMProjection)
err := eventDispatcher.AddSubscriber(rest.EventHandlerConfig{
Type: "create",
Handler: func(ctx context.Context, logger rest.Log, event *rest.Event) error {
Handler: func(ctx context.Context, logger rest.Log, event *rest.Event, options *rest.EventOptions) error {
return nil
},
})
Expand All @@ -35,7 +35,7 @@ func TestDefaultEventDisptacher_AddSubscriber(t *testing.T) {
err := eventDispatcher.AddSubscriber(rest.EventHandlerConfig{
ResourceType: "Article",
Type: "create",
Handler: func(ctx context.Context, logger rest.Log, event *rest.Event) error {
Handler: func(ctx context.Context, logger rest.Log, event *rest.Event, options *rest.EventOptions) error {
return nil
},
})
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestResourceRepository_Dispatch(t *testing.T) {
eventDispatcher := new(rest.GORMProjection)
err := eventDispatcher.AddSubscriber(rest.EventHandlerConfig{
Type: "create",
Handler: func(ctx context.Context, logger rest.Log, event *rest.Event) error {
Handler: func(ctx context.Context, logger rest.Log, event *rest.Event, options *rest.EventOptions) error {
createHandlerHit = true
return nil
},
Expand All @@ -97,7 +97,7 @@ func TestResourceRepository_Dispatch(t *testing.T) {
err = eventDispatcher.AddSubscriber(rest.EventHandlerConfig{
Type: "create",
ResourceType: "Article",
Handler: func(ctx context.Context, logger rest.Log, event *rest.Event) error {
Handler: func(ctx context.Context, logger rest.Log, event *rest.Event, options *rest.EventOptions) error {
articleCreateHandlerHit = true
return nil
},
Expand All @@ -110,7 +110,7 @@ func TestResourceRepository_Dispatch(t *testing.T) {
Meta: rest.EventMeta{
ResourceType: "Article",
},
})
}, &rest.EventOptions{})
if len(errors) != 0 {
t.Errorf("expected no errors, got %d", len(errors))
}
Expand Down
16 changes: 9 additions & 7 deletions rest/gorm.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ type GORMProjection struct {
}

// Dispatch dispatches the event to the handlers
func (e *GORMProjection) Dispatch(ctx context.Context, logger Log, event *Event) []error {
func (e *GORMProjection) Dispatch(ctx context.Context, logger Log, event *Event, options *EventOptions) []error {
//mutex helps keep state between routines
var errors []error
var wg sync.WaitGroup
Expand Down Expand Up @@ -251,7 +251,7 @@ func (e *GORMProjection) Dispatch(ctx context.Context, logger Log, event *Event)
wg.Done()
}()

err := handler(ctx, logger, event)
err := handler(ctx, logger, event, options)
if err != nil {
errors = append(errors, err)
}
Expand Down Expand Up @@ -331,7 +331,9 @@ func (e *GORMProjection) Persist(ctxt context.Context, logger Log, resources []R
errs = append(errs, result.Error)
}
for _, event := range events {
e.Dispatch(ctxt, logger, event)
e.Dispatch(ctxt, logger, event, &EventOptions{
GORMDB: e.gormDB,
})
}
return errs
}
Expand Down Expand Up @@ -362,27 +364,27 @@ func (e *GORMProjection) GetEventHandlers() []EventHandlerConfig {
}

// ResourceUpdateHandler handles Create Update operations
func (e *GORMProjection) ResourceUpdateHandler(ctx context.Context, logger Log, event *Event) (err error) {
func (e *GORMProjection) ResourceUpdateHandler(ctx context.Context, logger Log, event *Event, options *EventOptions) (err error) {
basicResource := new(BasicResource)
err = json.Unmarshal(event.Payload, &basicResource)
if err != nil {
return err
}
result := e.gormDB.Save(basicResource)
result := options.GORMDB.Save(basicResource)
if result.Error != nil {
return result.Error
}
return err
}

// ResourceDeleteHandler handles Delete operations
func (e *GORMProjection) ResourceDeleteHandler(ctx context.Context, logger Log, event *Event) (err error) {
func (e *GORMProjection) ResourceDeleteHandler(ctx context.Context, logger Log, event *Event, options *EventOptions) (err error) {
basicResource := new(BasicResource)
err = json.Unmarshal(event.Payload, &basicResource)
if err != nil {
return err
}
result := e.gormDB.Delete(basicResource)
result := options.GORMDB.Delete(basicResource)
if result.Error != nil {
return result.Error
}
Expand Down
2 changes: 1 addition & 1 deletion rest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestIntegrations(t *testing.T) {
handlers := []rest.CommandConfig{
{
Type: "CreateBlog",
Handler: func(ctx context.Context, command *rest.Command, logger rest.Log, options *rest.CommandOptions) (response rest.CommandResponse, err error) {
Handler: func(ctx context.Context, logger rest.Log, command *rest.Command, options *rest.CommandOptions) (response rest.CommandResponse, err error) {
return rest.CommandResponse{
Code: 400, //this is set deliberately for testing
Body: map[string]interface{}{
Expand Down
8 changes: 4 additions & 4 deletions rest/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ type (
GlobalInitializer func(context.Context, *openapi3.T) (context.Context, error)
OperationInitializer func(context.Context, string, string, *openapi3.T, *openapi3.PathItem, *openapi3.Operation) (context.Context, error)
PathInitializer func(context.Context, string, *openapi3.T, *openapi3.PathItem) (context.Context, error)
CommandHandler func(ctx context.Context, command *Command, logger Log, options *CommandOptions) (response CommandResponse, err error)
EventHandler func(ctx context.Context, logger Log, event *Event) error
CommandHandler func(ctx context.Context, logger Log, command *Command, options *CommandOptions) (response CommandResponse, err error)
EventHandler func(ctx context.Context, logger Log, event *Event, options *EventOptions) error
)

type Entity interface {
Expand All @@ -39,15 +39,15 @@ type Repository interface {
}

type CommandDispatcher interface {
Dispatch(ctx context.Context, command *Command, logger Log, options *CommandOptions) (response CommandResponse, err error)
Dispatch(ctx context.Context, logger Log, command *Command, options *CommandOptions) (response CommandResponse, err error)
AddSubscriber(command CommandConfig) map[string][]CommandHandler
GetSubscribers() map[string][]CommandHandler
}

type EventDispatcher interface {
AddSubscriber(handler EventHandlerConfig) error
GetSubscribers(resourceType string) map[string][]EventHandler
Dispatch(ctx context.Context, logger Log, event *Event) []error
Dispatch(ctx context.Context, logger Log, event *Event, options *EventOptions) []error
}

type EventStore interface {
Expand Down
Loading

0 comments on commit c4bd9eb

Please sign in to comment.