-
Notifications
You must be signed in to change notification settings - Fork 14
/
fsm_models.go
201 lines (175 loc) · 7.96 KB
/
fsm_models.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package fsm
import (
"bytes"
"encoding/json"
"strings"
"encoding/gob"
"fmt"
"reflect"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/swf"
)
// constants used as marker names or signal names
const (
StateMarker = "FSM.State"
CorrelatorMarker = "FSM.Correlator"
ErrorMarker = "FSM.Error"
RepiarStateSignal = "FSM.RepairState"
ContinueTimer = "FSM.ContinueWorkflow"
ContinueSignal = "FSM.ContinueWorkflow"
CompleteState = "complete"
CanceledState = "canceled"
FailedState = "failed"
ErrorState = "error"
//the FSM was not configured with a state named in an outcome.
FSMErrorMissingState = "ErrorMissingFsmState"
//the FSM encountered an erryor while serializaing stateData
FSMErrorStateSerialization = "ErrorStateSerialization"
//the FSM encountered an erryor while deserializaing stateData
FSMErrorStateDeserialization = "ErrorStateDeserialization"
//the FSM encountered an erryor while deserializaing stateData
FSMErrorCorrelationDeserialization = "ErrorCorrelationDeserialization"
//Signal sent when a Long Lived Worker Start()
ActivityStartedSignal = "FSM.ActivityStarted"
//Signal send when long Lived worker sends an update from Work()
ActivityUpdatedSignal = "FSM.ActivityUpdated"
)
// Decider decides an Outcome based on an event and the current data for an
// FSM. You can assert the interface{} parameter that is passed to the Decider
// as the type of the DataType field in the FSM. Alternatively, you can use
// TypedFuncs to create a typed decider to avoid having to do the assertion.
type Decider func(*FSMContext, *swf.HistoryEvent, interface{}) Outcome
//Outcome is the result of a Decider processing a HistoryEvent
type Outcome struct {
//State is the desired next state in the FSM. the empty string ("") is a signal that you wish decision processing to continue
//if the FSM machinery recieves the empty string as the state of a final outcome, it will substitute the current state.
State string
Data interface{}
Decisions []*swf.Decision
}
// FSMState defines the behavior of one state of an FSM
type FSMState struct {
// Name is the name of the state. When returning an Outcome, the NextState should match the Name of an FSMState in your FSM.
Name string
// Decider decides an Outcome given the current state, data, and an event.
Decider Decider
}
//DecisionErrorHandler is the error handling contract for panics that occur in Deciders.
//If your DecisionErrorHandler does not return a non nil Outcome, any further attempt to process the decisionTask is abandoned and the task will time out.
type DecisionErrorHandler func(ctx *FSMContext, event *swf.HistoryEvent, stateBeforeEvent interface{}, stateAfterError interface{}, err error) (*Outcome, error)
// TaskErrorHandler is the error handling contract for errors that occur
// outside of the Decider machinery when handling receiving incoming tasks,
// sending outgoing decisions for tasks, or replicating state.
// This handler is called when a decision task has been abandoned and the task
// will timeout without any further intervention.
type TaskErrorHandler func(decisionTask *swf.PollForDecisionTaskOutput, err error)
//FSMErrorHandler is the error handling contract for errors in the FSM machinery itself.
//These are generally a misconfiguration of your FSM or mismatch between struct and serialized form and cant be resolved without config/code changes
//the paramaters to each method provide all availabe info at the time of the error so you can diagnose issues.
//Note that this is a diagnostic interface that basically leaks implementation details, and as such may change from release to release.
type FSMErrorReporter interface {
ErrorFindingStateData(decisionTask *swf.PollForDecisionTaskOutput, err error)
ErrorFindingCorrelator(decisionTask *swf.PollForDecisionTaskOutput, err error)
ErrorMissingFSMState(decisionTask *swf.PollForDecisionTaskOutput, outcome Outcome)
ErrorDeserializingStateData(decisionTask *swf.PollForDecisionTaskOutput, serializedStateData string, err error)
ErrorSerializingStateData(decisionTask *swf.PollForDecisionTaskOutput, outcome Outcome, eventCorrelator EventCorrelator, err error)
}
// StateSerializer defines the interface for serializing state to and deserializing state from the workflow history.
type StateSerializer interface {
Serialize(state interface{}) (string, error)
Deserialize(serialized string, state interface{}) error
}
// JSONStateSerializer is a StateSerializer that uses go json serialization.
type JSONStateSerializer struct{}
// Serialize serializes the given struct to a json string.
func (j JSONStateSerializer) Serialize(state interface{}) (string, error) {
var b bytes.Buffer
if err := json.NewEncoder(&b).Encode(state); err != nil {
return "", err
}
return b.String(), nil
}
// Deserialize unmarshalls the given (json) string into the given struct
func (j JSONStateSerializer) Deserialize(serialized string, state interface{}) error {
err := json.NewDecoder(strings.NewReader(serialized)).Decode(state)
return err
}
// Serialization is the contract for de/serializing state inside an FSM, typically implemented by the FSM itself
// but serves to break the circular dep between FSMContext and FSM.
type Serialization interface {
EventData(h *swf.HistoryEvent, data interface{})
Serialize(data interface{}) string
StateSerializer() StateSerializer
Deserialize(serialized string, data interface{})
InitialState() string
}
// FSM Data types that implement this interface will have the resulting tags used by
// FSMClient when starting workflows and by the FSMContext when calling ContinueWorkflow()
// it is []*string since thats what SWF api takes atm.
type Taggable interface {
Tags() []*string
}
func GetTagsIfTaggable(data interface{}) []*string {
var tags []*string
if t, ok := data.(Taggable); ok {
tags = t.Tags()
}
return tags
}
// SerializedState is a wrapper struct that allows serializing the current state and current data for the FSM in
// a MarkerRecorded event in the workflow history. We also maintain an epoch, which counts the number of times a workflow has
// been continued, and the StartedId of the DecisionTask that generated this state. The epoch + the id provide a total ordering
// of state over the lifetime of different runs of a workflow.
type SerializedState struct {
StateVersion uint64 `json:"stateVersion"`
StateName string `json:"stateName"`
StateData string `json:"stateData"`
WorkflowId string `json:"workflowId"`
}
//ErrorState is used as the input to a marker that signifies that the workflow is in an error state.
type SerializedErrorState struct {
Details string
EarliestUnprocessedEventId int64
LatestUnprocessedEventId int64
ErrorEvent *swf.HistoryEvent
}
//Payload of Signals ActivityStartedSignal and ActivityUpdatedSignal
type SerializedActivityState struct {
ActivityId string
Input *string
}
// StartFSMWorkflowInput should be used to construct the input for any StartWorkflowExecutionRequests.
// This panics on errors cause really this should never err.
func StartFSMWorkflowInput(serializer Serialization, data interface{}) *string {
ss := new(SerializedState)
stateData := serializer.Serialize(data)
ss.StateData = stateData
serialized := serializer.Serialize(ss)
return aws.String(serialized)
}
//Stasher is used to take snapshots of StateData between each event so that we can have shap
type Stasher struct {
dataType interface{}
}
func NewStasher(dataType interface{}) *Stasher {
gob.Register(dataType)
return &Stasher{
dataType: dataType,
}
}
func (s *Stasher) Stash(data interface{}) *bytes.Buffer {
buf := new(bytes.Buffer)
enc := gob.NewEncoder(buf)
err := enc.Encode(data)
if err != nil {
panic(fmt.Sprintf("at=stash type=%s error=%q", reflect.TypeOf(s.dataType), err))
}
return buf
}
func (s *Stasher) Unstash(stashed *bytes.Buffer, into interface{}) {
dec := gob.NewDecoder(stashed)
err := dec.Decode(into)
if err != nil {
panic(fmt.Sprintf("at=unstash type=%s error=%q", reflect.TypeOf(s.dataType), err))
}
}