-
Notifications
You must be signed in to change notification settings - Fork 0
/
command.go
132 lines (114 loc) · 3.26 KB
/
command.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package rest
import (
"encoding/json"
"fmt"
"go.uber.org/fx"
"golang.org/x/net/context"
"sync"
"time"
)
const CREATE_COMMAND = "create"
const UPDATE_COMMAND = "update"
const DELETE_COMMAND = "delete"
type CommandDispatcherParams struct {
fx.In
CommandConfigs []CommandConfig
Logger Log
}
type CommandDispatcherResult struct {
fx.Out
Dispatcher CommandDispatcher
}
// NewCommandDispatcher creates a new command dispatcher and registers all the command handlers
func NewCommandDispatcher(p CommandDispatcherParams) CommandDispatcherResult {
dispatcher := &DefaultCommandDispatcher{
handlers: make(map[string][]CommandHandler),
}
for _, config := range p.CommandConfigs {
dispatcher.AddSubscriber(config)
}
return CommandDispatcherResult{
Dispatcher: dispatcher,
}
}
type DefaultCommandDispatcher struct {
handlers map[string][]CommandHandler
handlerPanicked bool
dispatch sync.Mutex
}
func (e *DefaultCommandDispatcher) Dispatch(ctx context.Context, command *Command, logger Log, options *CommandOptions) (response CommandResponse, err error) {
var wg sync.WaitGroup
var allHandlers []CommandHandler
//first preference is handlers for specific command type and entity type
if handlers, ok := e.handlers[command.Type+command.Metadata.EntityType]; ok {
allHandlers = append(allHandlers, handlers...)
}
//if there are no handler then let's fall back to checking just handlers for the command type.
if len(allHandlers) == 0 {
if handlers, ok := e.handlers[command.Type]; ok {
allHandlers = append(allHandlers, handlers...)
}
}
//lets see if there are any global handlers and add those
if globalHandlers, ok := e.handlers["*"]; ok {
allHandlers = append(allHandlers, globalHandlers...)
}
for i := 0; i < len(allHandlers); i++ {
handler := allHandlers[i]
wg.Add(1)
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Println(fmt.Sprintf("%+v", r))
err = fmt.Errorf("handler error '%v'", r)
logger.Errorf("handler error '%v'", r)
}
wg.Done()
}()
response, err = handler(ctx, command, logger, options)
}()
}
wg.Wait()
return response, err
}
func (e *DefaultCommandDispatcher) AddSubscriber(command CommandConfig) map[string][]CommandHandler {
if e.handlers == nil {
e.handlers = map[string][]CommandHandler{}
}
e.handlers[command.Type+command.Resource] = append(e.handlers[command.Type+command.Resource], command.Handler)
return e.handlers
}
func (e *DefaultCommandDispatcher) GetSubscribers() map[string][]CommandHandler {
return e.handlers
}
// CommandConfig is a struct that holds the command type and the handler for that command
type CommandConfig struct {
Type string
Resource string
Handler CommandHandler
}
type Command struct {
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
Metadata CommandMetadata `json:"metadata"`
}
type CommandMetadata struct {
EntityID string
SequenceNo int
EntityType string
Version int64
ExecutionDate *time.Time
UserID string
AccountID string
}
type CommandResponse struct {
Success bool
Message string
Code int
Body interface{}
}
type CommandOptions struct {
ResourceRepository *ResourceRepository
DefaultProjection Projection
Projections map[string]Projection
}