This repository has been archived by the owner on Jul 27, 2021. It is now read-only.
/
engine.go
329 lines (280 loc) · 7.08 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
package fished
import (
"errors"
"runtime"
"sync"
"time"
"github.com/hooqtv/fished/pool"
"github.com/knetic/govaluate"
"github.com/patrickmn/go-cache"
)
var (
//DefaultTarget is the default target facts
DefaultTarget = "result_end"
// DefaultWorker is the default worker for Engine
DefaultWorker = 0
// DefaultRuleLength ...
DefaultRuleLength = 100
)
type (
// Engine core of the machine
Engine struct {
InitialFacts map[string]interface{}
Rules []Rule
RuleFunctions map[string]govaluate.ExpressionFunction
RuleCache *cache.Cache
RunLock sync.RWMutex
RuntimePool *pool.ReferenceCountedPool
}
// Rule is struct for rule in fished
Rule struct {
Input []string `json:"input"`
Output string `json:"output"`
Expression string `json:"expression"`
}
// RuleFunction if type defined for rule function
RuleFunction func(...interface{}) (interface{}, error)
// Runtime is an struct for each time Engine.Run() is called
Runtime struct {
pool.ReferenceCounter
Facts map[string]interface{}
JobCh chan *Job
ResultCh chan *EvalResult
UsedRule map[int]struct{}
FactsMutex sync.RWMutex
}
// Job struct
Job struct {
Output string
ParsedExpression *govaluate.EvaluableExpression
}
// EvalResult is evaluation Result
EvalResult struct {
Key string
Value interface{}
Error error
}
)
// New will create new engine
func New() *Engine {
return NewWithCustomWorkerSize(0)
}
// NewWithCustomWorkerSize ...
func NewWithCustomWorkerSize(worker int) *Engine {
var workerSize int
c := cache.New(24*time.Hour, 1*time.Hour)
if worker == DefaultWorker {
numCPU := runtime.NumCPU()
if numCPU <= 2 {
workerSize = 1
} else {
workerSize = runtime.NumCPU() - 1
}
} else {
workerSize = worker
}
if workerSize <= 0 {
workerSize = 1
}
return &Engine{
RuleCache: c,
RuntimePool: pool.NewReferenceCountedPool(
func(counter pool.ReferenceCounter) pool.ReferenceCountable {
br := new(Runtime)
br.JobCh = make(chan *Job, DefaultRuleLength)
br.ResultCh = make(chan *EvalResult, DefaultRuleLength)
br.UsedRule = make(map[int]struct{})
br.ReferenceCounter = counter
for i := 0; i < workerSize; i++ {
go func() {
for job := range br.JobCh {
br.Evaluate(job, br.ResultCh)
}
}()
}
return br
}, func(i interface{}) error {
obj, ok := i.(*Runtime)
if !ok {
return errors.New("Illegal object passed")
}
obj.Reset()
return nil
}),
}
}
// Set all of engine attibutes in one single function
func (e *Engine) Set(facts map[string]interface{}, rules []Rule, ruleFunction map[string]RuleFunction) error {
var err error
err = e.SetFacts(facts)
if err != nil {
return err
}
err = e.SetRules(rules)
if err != nil {
return err
}
err = e.SetRuleFunctions(ruleFunction)
if err != nil {
return err
}
return nil
}
// SetFacts will set current engine with initial facts (replace the old one)
func (e *Engine) SetFacts(facts map[string]interface{}) error {
e.RunLock.Lock()
defer e.RunLock.Unlock()
e.InitialFacts = make(map[string]interface{})
for key, value := range facts {
e.InitialFacts[key] = value
}
return nil
}
// SetRules will set current engine with rules
func (e *Engine) SetRules(rules []Rule) error {
e.RunLock.Lock()
defer e.RunLock.Unlock()
e.Rules = make([]Rule, len(rules))
copy(e.Rules, rules)
e.RuleCache.Flush()
return nil
}
// SetRuleFunctions will set current engine with Expression Functions
func (e *Engine) SetRuleFunctions(ruleFunctions map[string]RuleFunction) error {
e.RunLock.Lock()
defer e.RunLock.Unlock()
e.RuleFunctions = make(map[string]govaluate.ExpressionFunction)
for key, value := range ruleFunctions {
e.RuleFunctions[key] = govaluate.ExpressionFunction(value)
}
return nil
}
// RunDefault will execute run with default parameneter
func (e *Engine) RunDefault() (interface{}, []error) {
return e.Run(DefaultTarget, DefaultWorker)
}
// RunWithCustomTarget will execute run using customizable end target
func (e *Engine) RunWithCustomTarget(target string) (interface{}, []error) {
return e.Run(target, 0)
}
// Run will execute rule and facts to get the result
// DEPRICATION NOTICE : worker param is depricated since it has been moved to engine struct
func (e *Engine) Run(target string, worker int) (interface{}, []error) {
var endTarget string
var errs []error
e.RunLock.RLock()
defer e.RunLock.RUnlock()
if target == DefaultTarget {
endTarget = DefaultTarget
} else {
endTarget = target
}
facts := make(map[string]interface{})
for key, value := range e.InitialFacts {
facts[key] = value
}
r := e.NewRuntime(facts)
defer r.DecrementReferenceCount()
for {
var jobLength int
var parseRuleError bool
for i := range e.Rules {
// Check if the rule already been executed
if _, ok := r.UsedRule[i]; ok {
continue
}
// copy rule into context
rule := e.Rules[i]
// Verify if rule has met input requirement
inputLen := len(rule.Input)
if inputLen > 0 {
var ValidInput int
for _, input := range rule.Input {
if _, ok := r.Facts[input]; ok {
ValidInput++
}
}
if inputLen != ValidInput {
continue
}
}
// Check cache for parsed rule
parsedExpression, ok := e.RuleCache.Get(rule.Expression)
// if not exist in cache then parse rule
if !ok {
var err error
parsedExpression, err = govaluate.NewEvaluableExpressionWithFunctions(rule.Expression, e.RuleFunctions)
if err != nil {
if errs == nil {
errs = make([]error, 0)
}
errs = append(errs, err)
parseRuleError = true
break
}
err = e.RuleCache.Add(rule.Expression, parsedExpression, cache.DefaultExpiration)
if err != nil {
if errs == nil {
errs = make([]error, 0)
}
errs = append(errs, err)
parseRuleError = true
break
}
}
j := &Job{
ParsedExpression: parsedExpression.(*govaluate.EvaluableExpression),
Output: rule.Output,
}
r.UsedRule[i] = struct{}{}
r.JobCh <- j
jobLength++
}
if jobLength == 0 || parseRuleError {
break
}
for jobs := 0; jobs < jobLength; jobs++ {
evalResult := <-r.ResultCh
if evalResult.Error != nil {
if errs == nil {
errs = make([]error, 0)
}
errs = append(errs, evalResult.Error)
continue
}
if evalResult.Value != nil {
r.FactsMutex.Lock()
r.Facts[evalResult.Key] = evalResult.Value
r.FactsMutex.Unlock()
}
}
}
return r.Facts[endTarget], errs
}
// NewRuntime ...
func (e *Engine) NewRuntime(facts map[string]interface{}) *Runtime {
r := e.RuntimePool.Get().(*Runtime)
r.Facts = facts
return r
}
// Evaluate will evaluate each job in runtime
func (r *Runtime) Evaluate(job *Job, result chan<- *EvalResult) {
evalResult := &EvalResult{
Key: job.Output,
}
r.FactsMutex.RLock()
res, err := job.ParsedExpression.Evaluate(r.Facts)
r.FactsMutex.RUnlock()
if err != nil {
evalResult.Error = err
}
evalResult.Value = res
result <- evalResult
}
// Reset Current Runtime
func (r *Runtime) Reset() error {
for i := range r.UsedRule {
delete(r.UsedRule, i)
}
return nil
}