forked from filecoin-project/lotus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
read.go
142 lines (115 loc) · 3.36 KB
/
read.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package backupds
import (
"bytes"
"context"
"crypto/sha256"
"io"
"os"
"github.com/ipfs/go-datastore"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
)
func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte, log bool) error) (bool, error) {
scratch := make([]byte, 9)
// read array[2](
if _, err := r.Read(scratch[:1]); err != nil {
return false, xerrors.Errorf("reading array header: %w", err)
}
if scratch[0] != 0x82 {
return false, xerrors.Errorf("expected array(2) header byte 0x82, got %x", scratch[0])
}
hasher := sha256.New()
hr := io.TeeReader(r, hasher)
// read array[*](
if _, err := hr.Read(scratch[:1]); err != nil {
return false, xerrors.Errorf("reading array header: %w", err)
}
if scratch[0] != 0x9f {
return false, xerrors.Errorf("expected indefinite length array header byte 0x9f, got %x", scratch[0])
}
for {
if _, err := hr.Read(scratch[:1]); err != nil {
return false, xerrors.Errorf("reading tuple header: %w", err)
}
// close array[*]
if scratch[0] == 0xff {
break
}
// read array[2](key:[]byte, value:[]byte)
if scratch[0] != 0x82 {
return false, xerrors.Errorf("expected array(2) header 0x82, got %x", scratch[0])
}
keyb, err := cbg.ReadByteArray(hr, 1<<40)
if err != nil {
return false, xerrors.Errorf("reading key: %w", err)
}
key := datastore.NewKey(string(keyb))
value, err := cbg.ReadByteArray(hr, 1<<40)
if err != nil {
return false, xerrors.Errorf("reading value: %w", err)
}
if err := cb(key, value, false); err != nil {
return false, err
}
}
sum := hasher.Sum(nil)
// read the [32]byte checksum
expSum, err := cbg.ReadByteArray(r, 32)
if err != nil {
return false, xerrors.Errorf("reading expected checksum: %w", err)
}
if !bytes.Equal(sum, expSum) {
return false, xerrors.Errorf("checksum didn't match; expected %x, got %x", expSum, sum)
}
// read the log, set of Entry-ies
var ent Entry
bp := cbg.GetPeeker(r)
for {
_, err := bp.ReadByte()
switch err {
case io.EOF, io.ErrUnexpectedEOF:
return true, nil
case nil:
default:
return false, xerrors.Errorf("peek log: %w", err)
}
if err := bp.UnreadByte(); err != nil {
return false, xerrors.Errorf("unread log byte: %w", err)
}
if err := ent.UnmarshalCBOR(bp); err != nil {
switch err {
case io.EOF, io.ErrUnexpectedEOF:
if os.Getenv("LOTUS_ALLOW_TRUNCATED_LOG") == "1" {
log.Errorw("log entry potentially truncated")
return false, nil
}
return false, xerrors.Errorf("log entry potentially truncated, set LOTUS_ALLOW_TRUNCATED_LOG=1 to proceed: %w", err)
default:
return false, xerrors.Errorf("unmarshaling log entry: %w", err)
}
}
key := datastore.NewKey(string(ent.Key))
if err := cb(key, ent.Value, true); err != nil {
return false, err
}
}
}
func RestoreInto(r io.Reader, dest datastore.Batching) error {
batch, err := dest.Batch(context.TODO())
if err != nil {
return xerrors.Errorf("creating batch: %w", err)
}
_, err = ReadBackup(r, func(key datastore.Key, value []byte, _ bool) error {
if err := batch.Put(context.TODO(), key, value); err != nil {
return xerrors.Errorf("put key: %w", err)
}
return nil
})
if err != nil {
return xerrors.Errorf("reading backup: %w", err)
}
if err := batch.Commit(context.TODO()); err != nil {
return xerrors.Errorf("committing batch: %w", err)
}
return nil
}