forked from ExploratoryEngineering/clusterfunk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
acknowledge.go
116 lines (100 loc) · 3.02 KB
/
acknowledge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package funk
//
//Copyright 2019 Telenor Digital AS
//
//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at
//
//http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
//
import (
"sync/atomic"
"time"
"github.com/lab5e/clusterfunk/pkg/toolbox"
)
// ackCollection handles acknowledgement and timeouts on acknowledgements
// The collection can be acked one time only and
type ackCollection interface {
// StartAck starts the acking
StartAck(nodes []string, shardIndex uint64, timeout time.Duration)
// Ack adds another node to the acknowledged list. Returns true if that node
// is in the list of nodes that should ack
Ack(nodeID string, shardIndex uint64) bool
// ShardIndex returns the shard index that is acked. This returns 0 when
// the ack is completed.
ShardIndex() uint64
// MissingAck returns a channel that sends a list of nodes that haven't acknowledged within the timeout.
// If something is sent on the MissingAck channel the Completed channel won't trigger.
MissingAck() <-chan []string
// Completed returns a channel that is triggered when all nodes have
// acknowledged
Completed() <-chan struct{}
// Done clears up the resources and closes all open channels
Done()
}
func newAckCollection() ackCollection {
return &ackColl{
nodes: toolbox.NewStringSet(),
completedChan: make(chan struct{}),
missingChan: make(chan []string),
shardIndex: new(uint64),
}
}
type ackColl struct {
nodes toolbox.StringSet
completedChan chan struct{}
missingChan chan []string
shardIndex *uint64
}
func (a *ackColl) StartAck(nodes []string, shardIndex uint64, timeout time.Duration) {
atomic.StoreUint64(a.shardIndex, shardIndex)
a.nodes.Sync(nodes...)
go func() {
time.Sleep(timeout)
if atomic.LoadUint64(a.shardIndex) == 0 {
// Done() has been called on this so just terminate
return
}
if a.nodes.Size() > 0 {
a.missingChan <- a.nodes.List()
}
}()
}
func (a *ackColl) Ack(nodeID string, shardIndex uint64) bool {
if atomic.LoadUint64(a.shardIndex) != shardIndex {
return false
}
if atomic.LoadUint64(a.shardIndex) == 0 {
return false
}
if a.nodes.Remove(nodeID) {
if a.nodes.Size() == 0 {
go func() { a.completedChan <- struct{}{} }()
}
return true
}
return false
}
func (a *ackColl) MissingAck() <-chan []string {
return a.missingChan
}
func (a *ackColl) Completed() <-chan struct{} {
return a.completedChan
}
func (a *ackColl) Done() {
if atomic.LoadUint64(a.shardIndex) > 0 {
atomic.StoreUint64(a.shardIndex, 0)
close(a.missingChan)
close(a.completedChan)
}
}
func (a *ackColl) ShardIndex() uint64 {
return atomic.LoadUint64(a.shardIndex)
}