-
Notifications
You must be signed in to change notification settings - Fork 106
/
restorer.go
117 lines (92 loc) · 2.43 KB
/
restorer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package checkpoint
import (
"context"
"errors"
"io"
"sync"
db "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api"
)
// restorer is a checkpoint restorer.
type restorer struct {
sync.Mutex
ndb db.NodeDB
// currentCheckpoint contains the metadata of the checkpoint that is currently being restored.
// If it is nil then no restore is in progress.
currentCheckpoint *Metadata
// pendingChunks is a set of pending chunks.
pendingChunks map[uint64]bool
}
// Implements Restorer.
func (rs *restorer) StartRestore(ctx context.Context, checkpoint *Metadata) error {
rs.Lock()
defer rs.Unlock()
if rs.currentCheckpoint != nil {
return ErrRestoreAlreadyInProgress
}
rs.currentCheckpoint = checkpoint
rs.pendingChunks = make(map[uint64]bool)
for idx := range checkpoint.Chunks {
rs.pendingChunks[uint64(idx)] = true
}
return nil
}
func (rs *restorer) AbortRestore(ctx context.Context) error {
rs.Lock()
defer rs.Unlock()
rs.pendingChunks = nil
rs.currentCheckpoint = nil
return nil
}
func (rs *restorer) GetCurrentCheckpoint() *Metadata {
rs.Lock()
defer rs.Unlock()
if rs.currentCheckpoint == nil {
return nil
}
cp := *rs.currentCheckpoint
return &cp
}
// Implements Restorer.
func (rs *restorer) RestoreChunk(ctx context.Context, idx uint64, r io.Reader) (bool, error) {
chunk, err := func() (*ChunkMetadata, error) {
rs.Lock()
defer rs.Unlock()
if rs.currentCheckpoint == nil {
return nil, ErrNoRestoreInProgress
}
// Check if the given chunk is still pending.
if !rs.pendingChunks[idx] {
return nil, ErrChunkAlreadyRestored
}
return rs.currentCheckpoint.GetChunkMetadata(idx)
}()
if err != nil {
return false, err
}
err = restoreChunk(ctx, rs.ndb, chunk, r)
switch {
case err == nil:
case errors.Is(err, ErrChunkProofVerificationFailed):
// Chunk was as specified in the manifest but did not match the reported root. In this case
// we need to abort processing the given checkpoint.
_ = rs.AbortRestore(ctx)
return false, err
default:
return false, err
}
rs.Lock()
defer rs.Unlock()
// Mark the given chunk as restored.
delete(rs.pendingChunks, idx)
// If there are no more pending chunks, restore is done.
if len(rs.pendingChunks) == 0 {
rs.pendingChunks = nil
rs.currentCheckpoint = nil
return true, nil
}
return false, nil
}
// NewRestorer creates a new checkpoint restorer.
func NewRestorer(ndb db.NodeDB) (Restorer, error) {
return &restorer{ndb: ndb}, nil
}