/
instance.go
408 lines (319 loc) · 10.2 KB
/
instance.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
package instance
import (
"fmt"
"strconv"
"github.com/TIBCOSoftware/flogo-contrib/action/flow/definition"
"github.com/TIBCOSoftware/flogo-contrib/action/flow/model"
"github.com/TIBCOSoftware/flogo-lib/core/action"
"github.com/TIBCOSoftware/flogo-lib/core/activity"
"github.com/TIBCOSoftware/flogo-lib/core/data"
"github.com/TIBCOSoftware/flogo-lib/logger"
"github.com/TIBCOSoftware/flogo-contrib/action/flow/event"
"time"
coreevent "github.com/TIBCOSoftware/flogo-lib/core/event"
)
type Instance struct {
subFlowId int
master *IndependentInstance //needed for change tracker
host interface{}
isHandlingError bool
status model.FlowStatus
flowDef *definition.Definition
flowURI string //needed for serialization
attrs map[string]*data.Attribute
taskInsts map[string]*TaskInst
linkInsts map[int]*LinkInst
forceCompletion bool
returnData map[string]*data.Attribute
returnError error
resultHandler action.ResultHandler
}
func (inst *Instance) FlowURI() string {
return inst.flowURI
}
func (inst *Instance) Name() string {
return inst.flowDef.Name()
}
func (inst *Instance) ID() string {
if inst.subFlowId > 0 {
return inst.master.id + "-" + strconv.Itoa(inst.subFlowId)
}
return inst.master.id
}
// InitActionContext initialize the action context, should be initialized before execution
func (inst *Instance) SetResultHandler(handler action.ResultHandler) {
inst.resultHandler = handler
}
// FindOrCreateTaskData finds an existing TaskInst or creates ones if not found for the
// specified task the task environment
func (inst *Instance) FindOrCreateTaskData(task *definition.Task) (taskInst *TaskInst, created bool) {
taskInst, ok := inst.taskInsts[task.ID()]
created = false
if !ok {
taskInst = NewTaskInst(inst, task)
inst.taskInsts[task.ID()] = taskInst
inst.master.ChangeTracker.trackTaskData(inst.subFlowId, &TaskInstChange{ChgType: CtAdd, ID: task.ID(), TaskInst: taskInst})
created = true
}
return taskInst, created
}
// FindOrCreateLinkData finds an existing LinkInst or creates ones if not found for the
// specified link the task environment
func (inst *Instance) FindOrCreateLinkData(link *definition.Link) (linkInst *LinkInst, created bool) {
linkInst, ok := inst.linkInsts[link.ID()]
created = false
if !ok {
linkInst = NewLinkInst(inst, link)
inst.linkInsts[link.ID()] = linkInst
inst.master.ChangeTracker.trackLinkData(inst.subFlowId, &LinkInstChange{ChgType: CtAdd, ID: link.ID(), LinkInst: linkInst})
created = true
}
return linkInst, created
}
func (inst *Instance) releaseTask(task *definition.Task) {
delete(inst.taskInsts, task.ID())
inst.master.ChangeTracker.trackTaskData(inst.subFlowId, &TaskInstChange{ChgType: CtDel, ID: task.ID()})
links := task.FromLinks()
for _, link := range links {
delete(inst.linkInsts, link.ID())
inst.master.ChangeTracker.trackLinkData(inst.subFlowId, &LinkInstChange{ChgType: CtDel, ID: link.ID()})
}
}
/////////////////////////////////////////
// Instance - activity.Host Implementation
// IOMetadata get the input/output metadata of the activity host
func (inst *Instance) IOMetadata() *data.IOMetadata {
return inst.flowDef.Metadata()
}
func (inst *Instance) Reply(replyData map[string]*data.Attribute, err error) {
if inst.resultHandler != nil {
inst.resultHandler.HandleResult(replyData, err)
}
}
func (inst *Instance) Return(returnData map[string]*data.Attribute, err error) {
inst.forceCompletion = true
inst.returnData = returnData
inst.returnError = err
}
func (inst *Instance) WorkingData() data.Scope {
return inst
}
func (inst *Instance) GetResolver() data.Resolver {
return definition.GetDataResolver()
}
func (inst *Instance) GetError() (error) {
return inst.returnError
}
func (inst *Instance) GetReturnData() (map[string]*data.Attribute, error) {
if inst.returnData == nil {
//construct returnData from instance attributes
md := inst.flowDef.Metadata()
if md != nil && md.Output != nil {
inst.returnData = make(map[string]*data.Attribute)
for _, mdAttr := range md.Output {
piAttr, exists := inst.attrs[mdAttr.Name()]
if exists {
inst.returnData[piAttr.Name()] = piAttr
}
}
}
}
return inst.returnData, inst.returnError
}
/////////////////////////////////////////
// Instance - FlowContext Implementation
// Status returns the current status of the Flow Instance
func (inst *Instance) Status() model.FlowStatus {
return inst.status
}
func (inst *Instance) SetStatus(status model.FlowStatus) {
inst.status = status
inst.master.ChangeTracker.SetStatus(inst.subFlowId, status)
postFlowEvent(inst)
}
// FlowDefinition returns the Flow definition associated with this context
func (inst *Instance) FlowDefinition() *definition.Definition {
return inst.flowDef
}
// TaskInstances get the task instances
func (inst *Instance) TaskInstances() []model.TaskInstance {
taskInsts := make([]model.TaskInstance, 0, len(inst.taskInsts))
for _, value := range inst.taskInsts {
taskInsts = append(taskInsts, value)
}
return taskInsts
}
/////////////////////////////////////////
// Instance - data.Scope Implementation
// GetAttr implements data.Scope.GetAttr
func (inst *Instance) GetAttr(attrName string) (value *data.Attribute, exists bool) {
if inst.attrs != nil {
attr, found := inst.attrs[attrName]
if found {
return attr, true
}
}
return inst.flowDef.GetAttr(attrName)
}
// SetAttrValue implements api.Scope.SetAttrValue
func (inst *Instance) SetAttrValue(attrName string, value interface{}) error {
if inst.attrs == nil {
inst.attrs = make(map[string]*data.Attribute)
}
logger.Debugf("SetAttr - name: %s, value:%v\n", attrName, value)
existingAttr, exists := inst.GetAttr(attrName)
//todo: optimize, use existing attr
if exists {
//todo handle error
attr, _ := data.NewAttribute(attrName, existingAttr.Type(), value)
inst.attrs[attrName] = attr
inst.master.ChangeTracker.AttrChange(inst.subFlowId, CtUpd, attr)
return nil
}
return fmt.Errorf("Attr [%s] does not exists", attrName)
}
// AddAttr add a new attribute to the instance
func (inst *Instance) AddAttr(attrName string, attrType data.Type, value interface{}) *data.Attribute {
if inst.attrs == nil {
inst.attrs = make(map[string]*data.Attribute)
}
logger.Debugf("AddAttr - name: %s, type: %s, value:%v", attrName, attrType, value)
var attr *data.Attribute
existingAttr, exists := inst.GetAttr(attrName)
if exists {
attr = existingAttr
attr.SetValue(value)
} else {
//todo handle error
attr, _ = data.NewAttribute(attrName, attrType, value)
inst.attrs[attrName] = attr
inst.master.ChangeTracker.AttrChange(inst.subFlowId, CtAdd, attr)
}
return attr
}
////////////
// UpdateAttrs updates the attributes of the Flow Instance
func (inst *Instance) UpdateAttrs(attrs map[string]*data.Attribute) {
if attrs != nil {
logger.Debugf("Updating flow attrs: %v", attrs)
if inst.attrs == nil {
inst.attrs = make(map[string]*data.Attribute, len(attrs))
}
for _, attr := range attrs {
inst.attrs[attr.Name()] = attr
}
}
}
/////////////
// FlowDetails
// ReplyHandler returns the reply handler for the instance
func (inst *Instance) ReplyHandler() activity.ReplyHandler {
return &SimpleReplyHandler{inst.resultHandler}
}
// SimpleReplyHandler is a simple ReplyHandler that is pass-thru to the action ResultHandler
type SimpleReplyHandler struct {
resultHandler action.ResultHandler
}
// Reply implements ReplyHandler.Reply
func (rh *SimpleReplyHandler) Reply(code int, replyData interface{}, err error) {
dataAttr, _ := data.NewAttribute("data", data.TypeAny, replyData)
codeAttr, _ := data.NewAttribute("code", data.TypeInteger, code)
resultData := map[string]*data.Attribute{
"data": dataAttr,
"code": codeAttr,
}
rh.resultHandler.HandleResult(resultData, err)
}
// FlowEvent provides access to flow instance execution details
type flowEvent struct {
time time.Time
err error
input, output map[string]interface{}
status event.Status
name, id, parentName, parentId string
}
func (fe *flowEvent) FlowName() string {
return fe.name
}
// Returns flow ID
func (fe *flowEvent) FlowID() string {
return fe.id
}
// In case of subflow, returns parent flow name
func (fe *flowEvent) ParentFlowName() string {
return fe.parentName
}
// In case of subflow, returns parent flow ID
func (fe *flowEvent) ParentFlowID() string {
return fe.parentId
}
// Returns event time
func (fe *flowEvent) Time() time.Time {
return fe.time
}
// Returns current flow status
func (fe *flowEvent) FlowStatus() event.Status {
return fe.status
}
// Returns output data for completed flow instance
func (fe *flowEvent) FlowOutput() map[string]interface{} {
return fe.output
}
// Returns input data for flow instance
func (fe *flowEvent) FlowInput() map[string]interface{} {
return fe.input
}
// Returns error for failed flow instance
func (fe *flowEvent) FlowError() error {
return fe.err
}
func postFlowEvent(inst *Instance) {
if coreevent.HasListener(event.FLOW_EVENT_TYPE) {
fe := &flowEvent{}
fe.time = time.Now()
fe.name = inst.Name()
fe.id = inst.ID()
if inst.master != nil {
fe.parentName = inst.master.Name()
fe.parentId = inst.master.ID()
}
fe.status = convertFlowStatus(inst.Status())
fe.input = make(map[string]interface{})
fe.output = make(map[string]interface{})
if fe.status != event.CREATED {
attrs := inst.attrs
outData, _ := inst.GetReturnData()
if attrs != nil && len(attrs) > 0 {
for name, attVal := range attrs {
if outData != nil && outData[name] != nil {
if fe.status == event.COMPLETED {
fe.output[name] = attVal.Value()
}
// Since same attribute map is used for input and output, filter output attributes
continue
}
fe.input[name] = attVal.Value()
}
}
}
if fe.status == event.FAILED {
fe.err = inst.returnError
}
coreevent.PostEvent(event.FLOW_EVENT_TYPE, fe)
}
}
func convertFlowStatus(code model.FlowStatus) event.Status {
switch code {
case model.FlowStatusNotStarted:
return event.CREATED
case model.FlowStatusActive:
return event.STARTED
case model.FlowStatusCancelled:
return event.CANCELLED
case model.FlowStatusCompleted:
return event.COMPLETED
case model.FlowStatusFailed:
return event.FAILED
}
return event.UNKNOWN
}