-
Notifications
You must be signed in to change notification settings - Fork 1
/
actors.go
172 lines (143 loc) · 3.98 KB
/
actors.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package hashring
import (
"bytes"
"fmt"
"sync"
"text/tabwriter"
"github.com/SimonRichardson/coherence/pkg/cluster/bloom"
"github.com/SimonRichardson/coherence/pkg/cluster/nodes"
"github.com/SimonRichardson/resilience/clock"
)
type NodeStrategy func() nodes.Node
// Actor represents a way to communicate to a node in the cluster.
// The actor also has some knowledge of a potential key living inside
// the underlying store.
type Actor struct {
node nodes.Node
bloom *bloom.Bloom
clock clock.Clock
}
// NewActor creates a Actor with the correct Transport for communicating
// to the various end point.
func NewActor(strategy NodeStrategy) *Actor {
return &Actor{
node: strategy(),
bloom: bloom.New(defaultBloomCapacity, 4),
clock: clock.NewLamportClock(),
}
}
// Contains checks to see if there is any potential data in the
// underlying store.
func (n *Actor) Contains(data string) bool {
ok, err := n.bloom.Contains(data)
if err != nil {
return false
}
return ok
}
// Hash returns the unique hash of the actor node
func (n *Actor) Hash() uint32 {
return n.node.Hash()
}
// Host returns the host of the actor node
func (n *Actor) Host() string {
return n.node.Host()
}
// Time returns the time of a last modification of the actor
func (n *Actor) Time() clock.Time {
return n.clock.Now()
}
// Add adds a known piece of data to the actor to improve the potential of
// finding the data with in the store. Consider this as a Hint to improve
// various consensus algorithms.
func (n *Actor) Add(data string) error {
err := n.bloom.Add(data)
if err != nil {
return err
}
n.clock.Increment()
return nil
}
// Update performs a union of the shared knowledge Hint. The update payload
// will more or less come from other nodes in the cluster and a union is an
// attempt to gather a much information as possible over time about what each
// store holds.
func (n *Actor) Update(payload []byte) error {
// Go throw and merge the blooms
bits := new(bloom.Bloom)
if _, err := bits.Read(bytes.NewReader(payload)); err != nil {
return err
}
if err := n.bloom.Union(bits); err != nil {
return err
}
// Update the internal clock of an actor
n.clock.Increment()
return nil
}
// Actors is a collection of the Actor Node allowing accessing the
// actor via the host or via a hash.
type Actors struct {
mutex sync.RWMutex
hashes map[uint32]*Actor
}
// NewActors creates a Actors with the correct dependencies
func NewActors() *Actors {
return &Actors{
hashes: make(map[uint32]*Actor),
}
}
// Get returns a the Actor according to the hash of the node
// Returns the ok if the node is found.
func (n *Actors) Get(hash uint32) (*Actor, bool) {
n.mutex.RLock()
v, ok := n.hashes[hash]
n.mutex.RUnlock()
return v, ok
}
// Set adds a Actor to the nodes according to the address hash
// If a hash already exists, then it will over write the existing hash value
func (n *Actors) Set(v *Actor) {
n.mutex.Lock()
n.hashes[v.Hash()] = v
n.mutex.Unlock()
}
// Remove a Actor via it's addr
func (n *Actors) Remove(hash uint32) {
n.mutex.Lock()
defer n.mutex.Unlock()
delete(n.hashes, hash)
}
// Hashes returns a slice of hashes in the nodeset
func (n *Actors) Hashes() []uint32 {
var (
c int
res = make([]uint32, len(n.hashes))
)
for k := range n.hashes {
res[c] = k
c++
}
return res
}
// Update the payload of a hash node
// Return error if the writing to the bloom fails
func (n *Actors) Update(hash uint32, payload []byte) error {
if actor, ok := n.hashes[hash]; ok {
if err := actor.Update(payload); err != nil {
return err
}
}
return nil
}
// String returns a table view of the internal actor nodes
func (n *Actors) String() string {
buf := new(bytes.Buffer)
writer := tabwriter.NewWriter(buf, 0, 0, 1, ' ', tabwriter.Debug)
fmt.Fprintln(writer, "host\t hash\t bits\t clock\t")
for _, v := range n.hashes {
fmt.Fprintf(writer, "%s\t %d\t %s\t %d\t\n", v.Host(), v.Hash(), v.bloom.String(), v.clock.Now().Value())
}
writer.Flush()
return fmt.Sprintf("\n%s", buf.String())
}