-
Notifications
You must be signed in to change notification settings - Fork 93
/
trackstore.go
172 lines (155 loc) · 4.67 KB
/
trackstore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package trackstore
import (
"encoding/json"
"fmt"
"strings"
"sync"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/textileio/powergate/ffs"
)
// Store persist information about storage configs which
// should be repaired or renewed. It always contain the
// latest Storage Config value of a Cid to execute those actions.
// This store is used by the Scheduler background jobs that
// repair or renew storage configurations.
type Store struct {
lock sync.Mutex
ds datastore.Datastore
repairables map[cid.Cid]struct{}
renewables map[cid.Cid]struct{}
}
type trackedStorageConfig struct {
IID ffs.APIID
StorageConfig ffs.StorageConfig
}
// New retruns a new Store.
func New(ds datastore.Datastore) (*Store, error) {
s := &Store{
ds: ds,
repairables: map[cid.Cid]struct{}{},
renewables: map[cid.Cid]struct{}{},
}
if err := s.loadCaches(); err != nil {
return nil, fmt.Errorf("loading renewable/repairable caches: %s", err)
}
return s, nil
}
// Get returns the storage config of a repairable/renewable stored Cid.
func (s *Store) Get(c cid.Cid) (ffs.StorageConfig, ffs.APIID, error) {
v, err := s.ds.Get(datastore.NewKey(c.String()))
if err != nil {
return ffs.StorageConfig{}, "", fmt.Errorf("getting storage config: %s", err)
}
var tsc trackedStorageConfig
if err := json.Unmarshal(v, &tsc); err != nil {
return ffs.StorageConfig{}, "", fmt.Errorf("unmarshaling storage config: %s", err)
}
return tsc.StorageConfig, tsc.IID, nil
}
// Put updates the StorageConfig tracking state for a Cid.
// If the StorageConfig is repairable or renewable, it will be
// added (or updated if exist) for a Cid. If it isn't repairable
// or renewable, it will ensure it's removed from the store
// if exists. This last point happens when a StorageConfig
// which was repairable/renewable get that feature disabled.
func (s *Store) Put(iid ffs.APIID, c cid.Cid, sc ffs.StorageConfig) error {
s.lock.Lock()
defer s.lock.Unlock()
isRepairable := sc.Repairable
isRenewable := sc.Cold.Enabled && sc.Cold.Filecoin.Renew.Enabled
key := datastore.NewKey(c.String())
if isRepairable || isRenewable {
if isRepairable {
s.repairables[c] = struct{}{}
}
if isRenewable {
s.renewables[c] = struct{}{}
}
tsc := trackedStorageConfig{IID: iid, StorageConfig: sc}
buf, err := json.Marshal(tsc)
if err != nil {
return fmt.Errorf("marshaling storage config: %s", err)
}
if err := s.ds.Put(key, buf); err != nil {
return fmt.Errorf("putting renewable/repairable storageconfig: %s", err)
}
return nil
}
// The provided Storage Config isn't interesting to
// be tracked for renewal or repair.
// Check if we were tracking it before, and if that's the case
// remove it.
_, okRenewable := s.renewables[c]
_, okRepairable := s.repairables[c]
if okRenewable || okRepairable {
if err := s.ds.Delete(key); err != nil {
return fmt.Errorf("deleting disabled storage config: %s", err)
}
}
return nil
}
// Remove removes a Cid from the store, usually meaning
// the Cid storage config shouldn't be tracked for repair or
// renewal anymore.
func (s *Store) Remove(c cid.Cid) error {
if err := s.ds.Delete(datastore.NewKey(c.String())); err != nil {
return fmt.Errorf("deleting a tracked storage config: %s", err)
}
delete(s.renewables, c)
delete(s.repairables, c)
return nil
}
// GetRepairables returns the list of Cids which
// have a repairable Storage Config.
func (s *Store) GetRepairables() ([]cid.Cid, error) {
s.lock.Lock()
defer s.lock.Unlock()
res := make([]cid.Cid, len(s.repairables))
i := 0
for k := range s.repairables {
res[i] = k
i++
}
return res, nil
}
// GetRenewables returns the list of Cids which
// have a renewable Storage Config.
func (s *Store) GetRenewables() ([]cid.Cid, error) {
s.lock.Lock()
defer s.lock.Unlock()
res := make([]cid.Cid, len(s.renewables))
i := 0
for k := range s.renewables {
res[i] = k
i++
}
return res, nil
}
func (s *Store) loadCaches() error {
q := query.Query{}
r, err := s.ds.Query(q)
if err != nil {
return fmt.Errorf("querying persisted tracked storage configs: %s", err)
}
defer func() { _ = r.Close() }()
for v := range r.Next() {
var tsc trackedStorageConfig
if err := json.Unmarshal(v.Value, &tsc); err != nil {
return fmt.Errorf("unmarshaling storageconfig: %s", err)
}
cidStr := strings.TrimPrefix(v.Key, "/")
c, err := cid.Decode(cidStr)
if err != nil {
return fmt.Errorf("decoding cid: %s", err)
}
if tsc.StorageConfig.Repairable {
s.repairables[c] = struct{}{}
}
if tsc.StorageConfig.Cold.Enabled && tsc.StorageConfig.Cold.Filecoin.Renew.Enabled {
s.renewables[c] = struct{}{}
}
}
return nil
}