-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Of watchdog #61
Of watchdog #61
Conversation
Signed-off-by: s8sg <swarvanusg@gmail.com>
Signed-off-by: s8sg <swarvanusg@gmail.com>
Signed-off-by: s8sg <swarvanusg@gmail.com>
Signed-off-by: s8sg <swarvanusg@gmail.com>
Signed-off-by: s8sg <swarvanusg@gmail.com>
@chennqqi Can you please test the PR |
OK |
@chennqqi the dag.Node("start-node").Modify(func(data []byte) ([]byte, error) {
return data, nil
}) Check |
got it |
hi s8sg, |
https://github.com/chennqqi/faas-flow-test/blob/master/stack.yml#L50 environment:
workflow_name: "sumofsqure" |
Changed, but still failed. |
Okay I'll check, are you testing with |
Signed-off-by: s8sg <swarvanusg@gmail.com>
Signed-off-by: s8sg <swarvanusg@gmail.com>
Signed-off-by: s8sg <swarvanusg@gmail.com>
@chennqqi Dynamic For each branching has issue. This is not because of |
Created an issue at: #62 |
Signed-off-by: s8sg <swarvanusg@gmail.com>
ok |
Signed-off-by: s8sg <swarvanusg@gmail.com>
Signed-off-by: s8sg <swarvanusg@gmail.com>
Add fix for dynamic branch issue
@chennqqi The latest changes on this branch should fix the issue |
@chennqqi I'm gonna merge it for now. I tested with different example dags. All of them works fine. I'll not delete the branch though, we will keep testing this branch branch and for new issues we will fix/merge if necessary. |
I test you new code. results are very strange.
test output:
after several calls, not all results are expect. Aggregator function not run. |
@chennqqi Thanks for sharing the output. What does the log say? |
logs are here. after run branch
|
@chennqqi the error here is strange, it seems the coordination failed 2019/07/22 03:07:46 stdout: [Request `bkqih0hf68f3ssuq5ocg`] Executing Node 0_1_square-each
2019/07/22 03:07:46 stdout: [Request `bkqih0hf68f3ssuq5ocg`] Executing Node 0_1_square-each
2019/07/22 03:07:46 stdout: [Request `bkqih0hf68f3ssuq5ocg`] Intermidiate result from Branch to Dynamic Node 0_1_square-each for option 1 stored as 1--0_1_square-each--0_1_square-each
[Request `bkqih0hf68f3ssuq5ocg`] Executing end of dynamic node 0_1_square-each, delayed as completed
2019/07/22 03:07:46 stdout: indegree: 1/2
2019/07/22 03:07:46 POST / - 200 OK - ContentLength: 0
2019/07/22 03:07:46 POST / - 200 OK - ContentLength: 0
2019/07/22 03:07:46 stdout: [Request `bkqih0hf68f3ssuq5ocg`] Intermidiate result from Branch to Dynamic Node 0_1_square-each for option 0 stored as 0--0_1_square-each--0_1_square-each
[Request `bkqih0hf68f3ssuq5ocg`] Executing end of dynamic node 0_1_square-each, delayed as completed
2019/07/22 03:07:46 stdout: indegree: 1/2 Clearly for 2nd execution the in-degree count should have been 2 for the same request |
value is 1 |
I thought so // IncrementCounter increment counter by given term, if doesn't exist init with incrementby
func (fhandler *flowHandler) IncrementCounter(counter string, incrementby int) (int, error) {
var serr error
count := 0
for i := 0; i < counterUpdateRetryCount; i++ {
encoded, err := fhandler.stateStore.Get(counter)
if err != nil {
// if doesn't exist try to create
err := fhandler.stateStore.Set(counter, fmt.Sprintf("%d", incrementby))
if err != nil {
return 0, fmt.Errorf("failed to update counter %s, error %v", counter, err)
}
return incrementby, nil
}
current, err := strconv.Atoi(encoded)
if err != nil {
return 0, fmt.Errorf("failed to update counter %s, error %v", counter, err)
}
count = current + incrementby
counterStr := fmt.Sprintf("%d", count)
err = fhandler.stateStore.Update(counter, encoded, counterStr)
if err == nil {
return count, nil
}
serr = err
}
return 0, fmt.Errorf("failed to update counter after max retry for %s, error %v", counter, serr)
} One possible hole here is, when both of the requests fails to get the counter and both tries to create it. |
@chennqqi // Set the no of branch completion for the current dynamic node
key := pipeline.GetNodeExecutionUniqueId(currentNode) + "-branch-completion"
_, err := fhandler.IncrementCounter(key, 0)
if err != nil {
return nil, fmt.Errorf("[Request `%s`] Failed to initiate dynamic indegree count for %s, err %v",
fhandler.id, key, err)
}
fmt.Printf("[Request `%s`] Dynamic indegree count initiated as %s\n",
fhandler.id, key)
// Set all the dynamic options for the current dynamic node
key = pipeline.GetNodeExecutionUniqueId(currentNode) + "-dynamic-branch-options"
err = fhandler.SetDynamicBranchOptions(key, options)
if err != nil {
return nil, fmt.Errorf("[Request `%s`] Dynamic Node %s, failed to store dynamic options",
fhandler.id, currentNodeUniqueId)
}
fmt.Printf("[Request `%s`] Dynamic options initiated as %s\n",
fhandler.id, key) Which logged 2019/07/22 03:07:46 stdout: [Request `bkqih0hf68f3ssuq5ocg`] Dynamic indegree count initiated as 0_1_square-each-branch-completion
2019/07/22 03:07:46 stdout: [Request `bkqih0hf68f3ssuq5ocg`] Dynamic options initiated as 0_1_square-each-dynamic-branch-options The issue is in the |
all branch run |
Is that atomic counter problem? to solve this, use consul global lock
|
1st I'm not sure whats the problem of two nodes running with same node id. In the current dag it doesn't have any issue. If you have nested foreach then it is a problem, to avoid that we are generating a unique execution id for making the counter unique pipeline.GetNodeExecutionUniqueId(currentNode) You can think of the logic here is each branch is an execution thread. But only one of them should continue forward and every other would be stopped. The logic is simple
2nd We can use distributed lock, but then the lock should be based on key. Creating a lock by flow-name will have a lot of bottleneck, and not necessary in this case. Although I'm trying to avoid lock. We are doing compare and update in statestore. // compare old value and if match then only update
err = fhandler.stateStore.Update(counter, encoded, counterStr) each compare and update call implemented as atomic in KV store. If one compare and update success, the next one should fail. Isn't it ? |
This is the code from consul statestore implementation // Update Compare and Update a valuer
func (consulStore *ConsulStateStore) Update(key string, oldValue string, newValue string) error {
key = consulStore.consulKeyPath + "/" + key
pair, _, err := consulStore.kv.Get(key, nil)
if err != nil {
return fmt.Errorf("failed to get key %s, error %v", key, err)
}
if pair == nil {
return fmt.Errorf("failed to get key %s", key)
}
if string(pair.Value) != oldValue {
return fmt.Errorf("Old value doesn't match for key %s", key)
}
modifyIndex := pair.ModifyIndex
p := &consul.KVPair{Key: key, Value: []byte(newValue), ModifyIndex: modifyIndex}
_, err = consulStore.kv.Put(p, nil)
if err != nil {
return fmt.Errorf("failed to update key %s, error %v", key, err)
}
return nil
} If once is updated the consulStore.kv.Put(p, nil) should fail |
An adaptation of
of-watchdog
.As per openfaas in future all of the templates going to use the
of-watchdog
This PR migrates from
go
template based on legacywatchdog
togo-http-template
based onof-watchdog
. Along with that comes the below changesX-Faas-Flow-Reqid
Fixes Set faas-flow unique request id on the header of the initial response #56