-
Notifications
You must be signed in to change notification settings - Fork 178
/
aggregated_signatures.go
83 lines (72 loc) · 2.84 KB
/
aggregated_signatures.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package approvals
import (
"fmt"
"sync"
"github.com/onflow/flow-go/model/flow"
)
// AggregatedSignatures is an utility struct that provides concurrency safe access
// to map of aggregated signatures indexed by chunk index
type AggregatedSignatures struct {
signatures map[uint64]flow.AggregatedSignature // aggregated signature for each chunk
lock sync.RWMutex // lock for modifying aggregatedSignatures
numberOfChunks uint64
}
// NewAggregatedSignatures instantiates a AggregatedSignatures. Requires that
// number of chunks is positive integer. Errors otherwise.
func NewAggregatedSignatures(chunks uint64) (*AggregatedSignatures, error) {
if chunks < 1 {
return nil, fmt.Errorf("number of chunks must be positive but got %d", chunks)
}
return &AggregatedSignatures{
signatures: make(map[uint64]flow.AggregatedSignature, chunks),
lock: sync.RWMutex{},
numberOfChunks: chunks,
}, nil
}
// PutSignature adds the AggregatedSignature from the collector to `aggregatedSignatures`.
// The returned int is the resulting number of approved chunks.
// Errors if chunk index exceeds valid range.
func (as *AggregatedSignatures) PutSignature(chunkIndex uint64, aggregatedSignature flow.AggregatedSignature) (uint64, error) {
if chunkIndex >= as.numberOfChunks {
return uint64(len(as.signatures)), fmt.Errorf("chunk index must be in range [0, %d] but is %d", as.numberOfChunks-1, chunkIndex)
}
as.lock.Lock()
defer as.lock.Unlock()
if _, found := as.signatures[chunkIndex]; !found {
as.signatures[chunkIndex] = aggregatedSignature
}
return uint64(len(as.signatures)), nil
}
// HasSignature returns boolean depending if we have signature for particular chunk
func (as *AggregatedSignatures) HasSignature(chunkIndex uint64) bool {
as.lock.RLock()
defer as.lock.RUnlock()
_, found := as.signatures[chunkIndex]
return found
}
// Collect returns array with aggregated signature for each chunk
func (as *AggregatedSignatures) Collect() []flow.AggregatedSignature {
aggregatedSigs := make([]flow.AggregatedSignature, as.numberOfChunks)
as.lock.RLock()
defer as.lock.RUnlock()
for chunkIndex, sig := range as.signatures {
aggregatedSigs[chunkIndex] = sig
}
return aggregatedSigs
}
// ChunksWithoutAggregatedSignature returns indexes of chunks that don't have an aggregated signature
func (as *AggregatedSignatures) ChunksWithoutAggregatedSignature() []uint64 {
// provide enough capacity to avoid allocations while we hold the lock
missingChunks := make([]uint64, 0, as.numberOfChunks)
as.lock.RLock()
defer as.lock.RUnlock()
for i := uint64(0); i < as.numberOfChunks; i++ {
chunkIndex := uint64(i)
if _, found := as.signatures[chunkIndex]; found {
// skip if we already have enough valid approvals for this chunk
continue
}
missingChunks = append(missingChunks, chunkIndex)
}
return missingChunks
}