-
Notifications
You must be signed in to change notification settings - Fork 22
/
linkstore.go
347 lines (269 loc) · 8.75 KB
/
linkstore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package linkstore
import (
"encoding/base64"
"encoding/json"
"fmt"
"net/url"
"time"
"github.com/hyperledger/aries-framework-go/spi/storage"
"github.com/trustbloc/logutil-go/pkg/log"
logfields "github.com/trustbloc/orb/internal/pkg/log"
orberrors "github.com/trustbloc/orb/pkg/errors"
"github.com/trustbloc/orb/pkg/hashlink"
"github.com/trustbloc/orb/pkg/store"
"github.com/trustbloc/orb/pkg/store/expiry"
)
const (
storeName = "anchor-ref"
hashTag = "anchorHash"
statusTag = "status"
expiryTimeTag = "expiryTime"
defaultPendingRecordLifespan = 24 * time.Hour
)
var logger = log.New("anchor-ref-store")
type dataExpiryService interface {
Register(store storage.Store, expiryTagName, storeName string, opts ...expiry.Option)
}
type options struct {
pendingRecordLifespan time.Duration
}
// Opt is a link store option.
type Opt func(opts *options)
// WithPendingRecordLifespan sets the lifespan of an anchor reference in PENDING state,
// after which it will be deleted.
func WithPendingRecordLifespan(value time.Duration) Opt {
return func(opts *options) {
opts.pendingRecordLifespan = value
}
}
// New creates a new anchor link store.
func New(provider storage.Provider, expiryService dataExpiryService, opts ...Opt) (*Store, error) {
options := &options{
pendingRecordLifespan: defaultPendingRecordLifespan,
}
for _, opt := range opts {
opt(options)
}
s, err := store.Open(provider, storeName,
store.NewTagGroup(hashTag, statusTag),
store.NewTagGroup(expiryTimeTag),
)
if err != nil {
return nil, fmt.Errorf("failed to open anchor ref store: %w", err)
}
ls := &Store{
options: options,
store: s,
marshal: json.Marshal,
unmarshal: json.Unmarshal,
}
expiryService.Register(s, expiryTimeTag, storeName, expiry.WithExpiryHandler(ls))
return ls, nil
}
// Store is implements an anchor link store.
type Store struct {
*options
store storage.Store
marshal func(interface{}) ([]byte, error)
unmarshal func(data []byte, v interface{}) error
}
type anchorStatus = string
const (
statusProcessed anchorStatus = ""
statusPending anchorStatus = "PENDING"
)
type anchorLinkRef struct {
AnchorHash string `json:"anchorHash"`
URL string `json:"url"`
Status string `json:"status,omitempty"`
ExpiryTime int64 `json:"expiryTime,omitempty"`
}
// PutLinks stores the given hash links.
func (s *Store) PutLinks(links []*url.URL) error {
return s.putLinks(links, statusProcessed)
}
// PutPendingLinks stores the given hash links with the status of PENDING. These links are
// yet to be processed. Once they are processed, their status will be updated to processed.
func (s *Store) PutPendingLinks(links []*url.URL) error {
return s.putLinks(links, statusPending)
}
func (s *Store) putLinks(links []*url.URL, status anchorStatus) error {
operations := make([]storage.Operation, len(links))
for i, link := range links {
anchorHash, err := hashlink.GetResourceHashFromHashLink(link.String())
if err != nil {
return fmt.Errorf("get hash from hashlink [%s]: %w", link, err)
}
tags := []storage.Tag{
{
Name: hashTag,
Value: anchorHash,
},
}
var expiryTime int64
if status == statusPending {
expiryTime = time.Now().Add(s.pendingRecordLifespan).Unix()
tags = append(tags,
storage.Tag{
Name: expiryTimeTag,
Value: fmt.Sprintf("%d", expiryTime),
},
storage.Tag{
Name: statusTag,
Value: status,
},
)
}
linkBytes, err := s.marshal(&anchorLinkRef{
AnchorHash: anchorHash,
URL: link.String(),
Status: status,
ExpiryTime: expiryTime,
})
if err != nil {
return fmt.Errorf("marshal anchor ref [%s]: %w", link, err)
}
logger.Debug("Storing anchor link reference", logfields.WithAnchorHash(anchorHash), logfields.WithAnchorURI(link),
logfields.WithStatus(status))
operations[i] = storage.Operation{
Key: getID(link),
Value: linkBytes,
Tags: tags,
}
}
err := s.store.Batch(operations)
if err != nil {
return orberrors.NewTransient(fmt.Errorf("store anchor refs: %w", err))
}
return nil
}
// DeleteLinks deletes the given hash links.
func (s *Store) DeleteLinks(links []*url.URL) error {
operations := make([]storage.Operation, len(links))
for i, link := range links {
logger.Debug("Deleting anchor link reference", logfields.WithURI(link))
operations[i] = storage.Operation{
Key: getID(link),
}
}
err := s.store.Batch(operations)
if err != nil {
return orberrors.NewTransient(fmt.Errorf("delete anchor refs: %w", err))
}
return nil
}
// DeletePendingLinks deletes the given hash links if they are in PENDING status.
func (s *Store) DeletePendingLinks(links []*url.URL) error {
operations := make([]storage.Operation, len(links))
for i, link := range links {
anchorHash, err := hashlink.GetResourceHashFromHashLink(link.String())
if err != nil {
return fmt.Errorf("get hash from hashlink [%s]: %w", link, err)
}
pendingLinks, err := s.getLinks(anchorHash, fmt.Sprintf("%s:%s&&%s:%s", hashTag, anchorHash, statusTag, statusPending))
if err != nil {
return fmt.Errorf("get pending links [%s]: %w", link, err)
}
for _, pendingLink := range pendingLinks {
if pendingLink.String() != link.String() {
continue
}
logger.Debug("Deleting pending anchor link reference", logfields.WithURI(link))
operations[i] = storage.Operation{
Key: getID(link),
}
break
}
}
err := s.store.Batch(operations)
if err != nil {
return orberrors.NewTransient(fmt.Errorf("delete pending anchor link refs: %w", err))
}
return nil
}
// GetLinks returns the links for the given anchor hash.
func (s *Store) GetLinks(anchorHash string) ([]*url.URL, error) {
logger.Debug("Retrieving processed anchor link references for anchor hash",
logfields.WithAnchorHash(anchorHash))
return s.getLinks(anchorHash, fmt.Sprintf("%s:%s&&!%s", hashTag, anchorHash, statusTag))
}
// GetProcessedAndPendingLinks returns the links for the given anchor hash, including all pending links.
func (s *Store) GetProcessedAndPendingLinks(anchorHash string) ([]*url.URL, error) {
logger.Debug("Retrieving processed and pending anchor link references for anchor hash",
logfields.WithAnchorHash(anchorHash))
return s.getLinks(anchorHash, fmt.Sprintf("%s:%s", hashTag, anchorHash))
}
func (s *Store) getLinks(anchorHash, query string) ([]*url.URL, error) {
var err error
iter, err := s.store.Query(query)
if err != nil {
return nil, orberrors.NewTransient(fmt.Errorf("failed to get refs for anchor [%s] query[%s]: %w",
anchorHash, query, err))
}
defer store.CloseIterator(iter)
ok, err := iter.Next()
if err != nil {
return nil, orberrors.NewTransient(fmt.Errorf("iterator error for anchor [%s]: %w", anchorHash, err))
}
var links []*url.URL
for ok {
value, err := iter.Value()
if err != nil {
return nil, orberrors.NewTransient(fmt.Errorf("failed to get iterator value for anchor [%s]: %w",
anchorHash, err))
}
linkRef := anchorLinkRef{}
err = s.unmarshal(value, &linkRef)
if err != nil {
return nil, fmt.Errorf("unmarshal link [%s] for anchor [%s]: %w", value, anchorHash, err)
}
u, err := url.Parse(linkRef.URL)
if err != nil {
return nil, fmt.Errorf("parse link [%s] for anchor [%s]: %w", linkRef.URL, anchorHash, err)
}
links = append(links, u)
ok, err = iter.Next()
if err != nil {
return nil, orberrors.NewTransient(fmt.Errorf("iterator error for anchor [%s]: %w", anchorHash, err))
}
}
logger.Debug("Returning anchor references for anchor hash", logfields.WithAnchorHash(anchorHash), logfields.WithURIs(links...))
return links, nil
}
// HandleExpiredKeys is invoked by the data expiration handler.
func (s *Store) HandleExpiredKeys(keys ...string) ([]string, error) {
var keysToDelete []string
for _, key := range keys {
ref, err := s.getLink(key)
if err != nil {
logger.Warn("Error getting anchor ref", logfields.WithAnchorHash(key), log.WithError(err))
return nil, err
}
if ref.Status != statusProcessed {
logger.Info("Anchor reference in PENDING state will be deleted", logfields.WithAnchorHash(key))
keysToDelete = append(keysToDelete, key)
} else {
logger.Info("Anchor reference will not be deleted since it's not in PENDING state", logfields.WithAnchorHash(key))
}
}
return keysToDelete, nil
}
func (s *Store) getLink(key string) (*anchorLinkRef, error) {
refBytes, err := s.store.Get(key)
if err != nil {
return nil, fmt.Errorf("load anchorLinkRef: %w", err)
}
linkRef := anchorLinkRef{}
err = s.unmarshal(refBytes, &linkRef)
if err != nil {
return nil, fmt.Errorf("unmarshal anchor link: %w", err)
}
return &linkRef, nil
}
func getID(link *url.URL) string {
return base64.RawStdEncoding.EncodeToString([]byte(link.String()))
}