-
Notifications
You must be signed in to change notification settings - Fork 0
/
tp_set.go
69 lines (60 loc) · 2.19 KB
/
tp_set.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package crdt
// TwoPhaseSet is a CRDT set in which an element can be added and removed only
// once in the lifetime of this set.
// This is executed by having to GSets internally, one that holds elements
// that have been added, the other elements that have been removed.
// The types of elements (T) must be set on initialization.
type TwoPhaseSet[T comparable] struct {
name string
gset *GSet[T]
tombstone *GSet[T]
}
// NewTwoPhaseSet constructs a TwoPhaseSet with the associated name.
// It is assumed the name of this specific TwoPhaseSet uniquely identifies
// this set throughout the cluster.
func NewTwoPhaseSet[T comparable](name string) *TwoPhaseSet[T] {
twoPhaseSet := new(TwoPhaseSet[T])
twoPhaseSet.name = name
twoPhaseSet.gset = NewGSet[T](name)
twoPhaseSet.tombstone = NewGSet[T](name)
return twoPhaseSet
}
// Add adds the specified value to the set.
// Is a no-op if the value has ever previously been added to the set.
func (tpset *TwoPhaseSet[T]) Add(value T) {
tpset.gset.Add(value)
}
// Remove adds the specified value to tpset.tombstone if it exists within
// tpset.gset.
// This functionally removes it from this TwoPhaseSet forever.
func (tpset *TwoPhaseSet[T]) Remove(value T) {
if tpset.gset.Lookup(value) {
tpset.tombstone.Add(value)
}
}
// RemoveIf removes each value from this tpset if fn returns true when applied
// to each element currently in the set.
func (tpset *TwoPhaseSet[T]) RemoveIf(fn func(value T) bool) {
gset := tpset.gset.getSet()
for _, value := range gset {
if fn(value) {
tpset.Remove(value)
}
}
}
// Lookup reports whether the T value exists within the set.
func (tpset *TwoPhaseSet[T]) Lookup(value T) bool {
return tpset.gset.Lookup(value) && !tpset.tombstone.Lookup(value)
}
// Size returns the number of elements that currently exist within the set.
func (tpset *TwoPhaseSet[T]) Size() int {
return tpset.gset.Size() - tpset.tombstone.Size()
}
// Merge calls Merge on both GSets within this TwoPhaseSet.
// This is an idempotent operation and is a no-op if tpset.name != that.name.
func (tpset *TwoPhaseSet[T]) Merge(that *TwoPhaseSet[T]) {
if tpset.name == that.name {
tpset.gset.Merge(that.gset)
tpset.tombstone.Merge(that.tombstone)
}
}