/
spent_addresses_storage.go
131 lines (101 loc) · 3.03 KB
/
spent_addresses_storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package tangle
import (
"encoding/binary"
"io"
"sync"
"time"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/objectstorage"
"github.com/massyu/hornet/pkg/model/hornet"
"github.com/massyu/hornet/pkg/profile"
)
var (
spentAddressesStorage *objectstorage.ObjectStorage
spentAddressesLock sync.RWMutex
)
func ReadLockSpentAddresses() {
spentAddressesLock.RLock()
}
func ReadUnlockSpentAddresses() {
spentAddressesLock.RUnlock()
}
func WriteLockSpentAddresses() {
spentAddressesLock.Lock()
}
func WriteUnlockSpentAddresses() {
spentAddressesLock.Unlock()
}
type CachedSpentAddress struct {
objectstorage.CachedObject
}
func (c *CachedSpentAddress) GetSpentAddress() *hornet.SpentAddress {
return c.Get().(*hornet.SpentAddress)
}
func spentAddressFactory(key []byte) (objectstorage.StorableObject, int, error) {
sa := hornet.NewSpentAddress(key[:49])
return sa, 49, nil
}
func GetSpentAddressesStorageSize() int {
return spentAddressesStorage.GetSize()
}
func configureSpentAddressesStorage(store kvstore.KVStore, opts profile.CacheOpts) {
spentAddressesStorage = objectstorage.New(
store.WithRealm([]byte{StorePrefixSpentAddresses}),
spentAddressFactory,
objectstorage.CacheTime(time.Duration(opts.CacheTimeMs)*time.Millisecond),
objectstorage.PersistenceEnabled(true),
objectstorage.KeysOnly(true),
objectstorage.StoreOnCreation(true),
objectstorage.LeakDetectionEnabled(opts.LeakDetectionOptions.Enabled,
objectstorage.LeakDetectionOptions{
MaxConsumersPerObject: opts.LeakDetectionOptions.MaxConsumersPerObject,
MaxConsumerHoldTime: time.Duration(opts.LeakDetectionOptions.MaxConsumerHoldTimeSec) * time.Second,
}),
)
}
// spentAddress +-0
func WasAddressSpentFrom(address hornet.Hash) bool {
return spentAddressesStorage.Contains(address)
}
// spentAddress +-0
func MarkAddressAsSpent(address hornet.Hash) bool {
spentAddressesLock.Lock()
defer spentAddressesLock.Unlock()
return MarkAddressAsSpentWithoutLocking(address)
}
// spentAddress +-0
func MarkAddressAsSpentWithoutLocking(address hornet.Hash) bool {
spentAddress, _, _ := spentAddressFactory(address)
cachedSpentAddress, newlyAdded := spentAddressesStorage.StoreIfAbsent(spentAddress)
if cachedSpentAddress != nil {
cachedSpentAddress.Release(true)
}
return newlyAdded
}
// StreamSpentAddressesToWriter streams all spent addresses directly to an io.Writer.
func StreamSpentAddressesToWriter(buf io.Writer, abortSignal <-chan struct{}) (int32, error) {
ReadLockSpentAddresses()
defer ReadUnlockSpentAddresses()
var addressesWritten int32
wasAborted := false
spentAddressesStorage.ForEachKeyOnly(func(key []byte) bool {
select {
case <-abortSignal:
wasAborted = true
return false
default:
}
addressesWritten++
return binary.Write(buf, binary.LittleEndian, key) == nil
}, false)
if wasAborted {
return 0, ErrOperationAborted
}
return addressesWritten, nil
}
func ShutdownSpentAddressesStorage() {
spentAddressesStorage.Shutdown()
}
func FlushSpentAddressesStorage() {
spentAddressesStorage.Flush()
}