forked from cockroachdb/cockroach
/
import.go
170 lines (146 loc) · 4.45 KB
/
import.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// Copyright 2017 The Cockroach Authors.
//
// Licensed under the Cockroach Community Licence (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/pkg/LICENSE
package storageccl
import (
"bytes"
"fmt"
"golang.org/x/net/context"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/pkg/errors"
)
func init() {
storage.SetImportCmd(evalImport)
}
// evalImport bulk loads key/value entries.
func evalImport(ctx context.Context, cArgs storage.CommandArgs) error {
args := cArgs.Args.(*roachpb.ImportRequest)
db := cArgs.Repl.DB()
kr := KeyRewriter(args.KeyRewrites)
ctx, span := tracing.ChildSpan(ctx, fmt.Sprintf("import [%s,%s)", args.DataSpan.Key, args.DataSpan.EndKey))
defer tracing.FinishSpan(span)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := beginLimitedRequest(ctx); err != nil {
return err
}
defer endLimitedRequest()
// Arrived at by tuning and watching the effect on BenchmarkRestore.
const batchSizeBytes = 1000000
var wg syncutil.WaitGroupWithError
type batchBuilder struct {
batch engine.RocksDBBatchBuilder
batchStartKey []byte
batchEndKey []byte
}
b := batchBuilder{}
sendWriteBatch := func() {
batchStartKey := roachpb.Key(b.batchStartKey)
// The end key of the WriteBatch request is exclusive, but batchEndKey
// is currently the largest key in the batch. Increment it.
batchEndKey := roachpb.Key(b.batchEndKey).Next()
if log.V(1) {
log.Infof(ctx, "writebatch [%s,%s)", batchStartKey, batchEndKey)
}
wg.Add(1)
go func(start, end roachpb.Key, repr []byte) {
if err := db.WriteBatch(ctx, start, end, repr); err != nil {
log.Errorf(ctx, "writebatch [%s,%s): %+v", start, end, err)
wg.Done(err)
cancel()
return
}
wg.Done(nil)
}(batchStartKey, batchEndKey, b.batch.Finish())
b = batchBuilder{}
}
startKeyMVCC, endKeyMVCC := engine.MVCCKey{Key: args.DataSpan.Key}, engine.MVCCKey{Key: args.DataSpan.EndKey}
for _, file := range args.Files {
if log.V(1) {
log.Infof(ctx, "import file [%s,%s) %s", args.DataSpan.Key, args.DataSpan.EndKey, file.Path)
}
dir, err := MakeExportStorage(ctx, file.Dir)
if err != nil {
return err
}
localPath, err := dir.FetchFile(ctx, file.Path)
if err != nil {
return err
}
if len(file.Sha512) > 0 {
checksum, err := sha512ChecksumFile(localPath)
if err != nil {
return err
}
if !bytes.Equal(checksum, file.Sha512) {
return errors.Errorf("checksum mismatch for %s", file.Path)
}
}
sst, err := engine.MakeRocksDBSstFileReader()
if err != nil {
return err
}
defer sst.Close()
// Add each file in its own sst reader because AddFile requires the
// affected keyrange be empty and the keys in these files might overlap.
// This becomes less heavyweight when we figure out how to use RocksDB's
// TableReader directly.
if err := sst.AddFile(localPath); err != nil {
return err
}
iter := sst.NewIterator(false)
defer iter.Close()
iter.Seek(startKeyMVCC)
for ; iter.Valid(); iter.Next() {
key := iter.Key()
if endKeyMVCC.Less(key) {
break
}
value := roachpb.Value{RawBytes: iter.Value()}
var ok bool
key.Key, ok = kr.RewriteKey(key.Key)
if !ok {
// If the key rewriter didn't match this key, it's not data for the
// table(s) we're interested in.
if log.V(3) {
log.Infof(ctx, "skipping %s %s", key.Key, value.PrettyPrint())
}
continue
}
// Rewriting the key means the checksum needs to be updated.
value.ClearChecksum()
value.InitChecksum(key.Key)
if log.V(3) {
log.Infof(ctx, "Put %s -> %s", key.Key, value.PrettyPrint())
}
b.batch.Put(key, value.RawBytes)
// Update the range currently represented in this batch, as
// necessary.
if len(b.batchStartKey) == 0 {
b.batchStartKey = append(b.batchStartKey, key.Key...)
}
b.batchEndKey = append(b.batchEndKey[:0], key.Key...)
if b.batch.Len() > batchSizeBytes {
sendWriteBatch()
}
}
if err := iter.Error(); err != nil {
return err
}
}
// Flush out the last batch.
if b.batch.Len() > 0 {
sendWriteBatch()
}
err := wg.Wait()
return err
}