/
context.go
197 lines (170 loc) · 4.99 KB
/
context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package faasflow
import (
"encoding/json"
"fmt"
"net/url"
)
// Context execution context and execution state
type Context struct {
requestId string // the request id
node string // the execution position
dataStore DataStore // underline DataStore
Query url.Values // provides request Query
State string // state of the request
Name string // name of the faas-flow
NodeInput map[string][]byte // stores inputs form each node
}
// DataStore for Storing Data
type DataStore interface {
// Configure the DaraStore with flow name and request ID
Configure(flowName string, requestId string)
// Initialize the DataStore (called only once in a request span)
Init() error
// Set store a value for key, in failure returns error
Set(key string, value string) error
// Get retrives a value by key, if failure returns error
Get(key string) (string, error)
// Del delets a value by a key
Del(key string) error
// Cleanup all the resorces in DataStore
Cleanup() error
}
// StateStore for saving execution state
type StateStore interface {
// Configure the StateStore with flow name and request ID
Configure(flowName string, requestId string)
// Initialize the StateStore (called only once in a request span)
Init() error
// Set a value (override existing, or create one)
Set(key string, value string) error
// Get a value
Get(key string) (string, error)
// Compare and Update a value
Update(key string, oldValue string, newValue string) error
// Cleanup all the resorces in StateStore (called only once in a request span)
Cleanup() error
}
const (
// StateSuccess denotes success state
StateSuccess = "success"
// StateFailure denotes failure state
StateFailure = "failure"
// StateOngoing denotes onging satte
StateOngoing = "ongoing"
)
// CreateContext create request context (used by template)
func CreateContext(id string, node string, name string,
dstore DataStore) *Context {
context := &Context{}
context.requestId = id
context.node = node
context.Name = name
context.State = StateOngoing
context.dataStore = dstore
context.NodeInput = make(map[string][]byte)
return context
}
// GetRequestId returns the request id
func (context *Context) GetRequestId() string {
return context.requestId
}
// GetPhase return the node no
func (context *Context) GetNode() string {
return context.node
}
// Set put a value in the context using DataStore
func (context *Context) Set(key string, data interface{}) error {
c := struct {
Key string `json:"key"`
Value interface{} `json:"value"`
}{Key: key, Value: data}
b, err := json.Marshal(&c)
if err != nil {
return fmt.Errorf("Failed to marshal data, error %v", err)
}
return context.dataStore.Set(key, string(b))
}
// Get retrive a value from the context using DataStore
func (context *Context) Get(key string) (interface{}, error) {
data, err := context.dataStore.Get(key)
if err != nil {
return nil, err
}
c := struct {
Key string `json:"key"`
Value interface{} `json:"value"`
}{}
err = json.Unmarshal([]byte(data), &c)
if err != nil {
return nil, fmt.Errorf("Failed to unmarshal data, error %v", err)
}
return c.Value, err
}
// GetInt retrive a integer value from the context using DataStore
func (context *Context) GetInt(key string) int {
data, err := context.dataStore.Get(key)
if err != nil {
panic(fmt.Sprintf("error %v", err))
}
c := struct {
Key string `json:"key"`
Value int `json:"value"`
}{}
err = json.Unmarshal([]byte(data), &c)
if err != nil {
panic(fmt.Sprintf("Failed to unmarshal data, error %v", err))
}
return c.Value
}
// GetString retrive a string value from the context using DataStore
func (context *Context) GetString(key string) string {
data, err := context.dataStore.Get(key)
if err != nil {
panic(fmt.Sprintf("error %v", err))
}
c := struct {
Key string `json:"key"`
Value string `json:"value"`
}{}
err = json.Unmarshal([]byte(data), &c)
if err != nil {
panic(fmt.Sprintf("Failed to unmarshal data, error %v", err))
}
return c.Value
}
// GetBytes retrive a byte array from the context using DataStore
func (context *Context) GetBytes(key string) []byte {
data, err := context.dataStore.Get(key)
if err != nil {
panic(fmt.Sprintf("error %v", err))
}
c := struct {
Key string `json:"key"`
Value []byte `json:"value"`
}{}
err = json.Unmarshal([]byte(data), &c)
if err != nil {
panic(fmt.Sprintf("Failed to unmarshal data, error %v", err))
}
return c.Value
}
// GetBool retrive a boolean value from the context using DataStore
func (context *Context) GetBool(key string) bool {
data, err := context.dataStore.Get(key)
if err != nil {
panic(fmt.Sprintf("error %v", err))
}
c := struct {
Key string `json:"key"`
Value bool `json:"value"`
}{}
err = json.Unmarshal([]byte(data), &c)
if err != nil {
panic(fmt.Sprintf("Failed to unmarshal data, error %v", err))
}
return c.Value
}
// Del deletes a value from the context using DataStore
func (context *Context) Del(key string) error {
return context.dataStore.Del(key)
}