-
Notifications
You must be signed in to change notification settings - Fork 24
/
util.go
141 lines (128 loc) · 4.46 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
package restore
import (
"bytes"
"context"
"strings"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/client-go/v2/util/codec"
berrors "github.com/tikv/migration/br/pkg/errors"
"github.com/tikv/migration/br/pkg/glue"
"github.com/tikv/migration/br/pkg/logutil"
"github.com/tikv/migration/br/pkg/rtree"
"go.uber.org/zap"
)
// GetSSTMetaFromFile compares the keys in file, region and rewrite rules, then returns a sst conn.
// The range of the returned sst meta is [regionRule.NewKeyPrefix, append(regionRule.NewKeyPrefix, 0xff)].
func GetSSTMetaFromFile(
id []byte,
file *backuppb.File,
region *metapb.Region,
regionRule *import_sstpb.RewriteRule,
) import_sstpb.SSTMeta {
// Get the column family of the file by the file name.
var cfName string
if strings.Contains(file.GetName(), defaultCFName) {
cfName = defaultCFName
} else if strings.Contains(file.GetName(), writeCFName) {
cfName = writeCFName
}
// Find the overlapped part between the file and the region.
// Here we rewrites the keys to compare with the keys of the region.
rangeStart := regionRule.GetNewKeyPrefix()
// rangeStart = max(rangeStart, region.StartKey)
if bytes.Compare(rangeStart, region.GetStartKey()) < 0 {
rangeStart = region.GetStartKey()
}
// Append 10 * 0xff to make sure rangeEnd cover all file key
// If choose to regionRule.NewKeyPrefix + 1, it may cause WrongPrefix here
// https://github.com/tikv/tikv/blob/970a9bf2a9ea782a455ae579ad237aaf6cb1daec/
// components/sst_importer/src/sst_importer.rs#L221
suffix := []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
rangeEnd := append(append([]byte{}, regionRule.GetNewKeyPrefix()...), suffix...)
// rangeEnd = min(rangeEnd, region.EndKey)
if len(region.GetEndKey()) > 0 && bytes.Compare(rangeEnd, region.GetEndKey()) > 0 {
rangeEnd = region.GetEndKey()
}
if bytes.Compare(rangeStart, rangeEnd) > 0 {
log.Panic("range start exceed range end",
logutil.File(file),
logutil.Key("startKey", rangeStart),
logutil.Key("endKey", rangeEnd))
}
log.Debug("get sstMeta",
logutil.File(file),
logutil.Key("startKey", rangeStart),
logutil.Key("endKey", rangeEnd))
return import_sstpb.SSTMeta{
Uuid: id,
CfName: cfName,
Range: &import_sstpb.Range{
Start: rangeStart,
End: rangeEnd,
},
EndKeyExclusive: true,
Length: file.GetSize_(),
RegionId: region.GetId(),
RegionEpoch: region.GetRegionEpoch(),
CipherIv: file.GetCipherIv(),
}
}
// Rewrites a raw key and returns a encoded key.
func rewriteRawKey(key []byte, rewriteRules *RewriteRules) ([]byte, *import_sstpb.RewriteRule) {
if rewriteRules == nil {
return codec.EncodeBytes([]byte{}, key), nil
}
if len(key) > 0 {
rule := matchOldPrefix(key, rewriteRules)
ret := bytes.Replace(key, rule.GetOldKeyPrefix(), rule.GetNewKeyPrefix(), 1)
return codec.EncodeBytes([]byte{}, ret), rule
}
return nil, nil
}
func matchOldPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.RewriteRule {
for _, rule := range rewriteRules.Data {
if bytes.HasPrefix(key, rule.GetOldKeyPrefix()) {
return rule
}
}
return nil
}
// SplitRanges splits region by
// 1. data range after rewrite.
// 2. rewrite rules.
func SplitRanges(
ctx context.Context,
client *Client,
ranges []rtree.Range,
rewriteRules *RewriteRules,
updateCh glue.Progress,
isRawKv bool,
needEncodeKey bool,
) error {
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig(), isRawKv))
return splitter.Split(ctx, ranges, rewriteRules, needEncodeKey, func(keys [][]byte) {
updateCh.Inc()
})
}
func rewriteFileKeys(file *backuppb.File, rewriteRules *RewriteRules) (startKey, endKey []byte, err error) {
var rule *import_sstpb.RewriteRule
startKey, rule = rewriteRawKey(file.GetStartKey(), rewriteRules)
if rewriteRules != nil && rule == nil {
log.Error("cannot find rewrite rule",
logutil.Key("startKey", file.GetStartKey()),
zap.Reflect("rewrite data", rewriteRules.Data))
err = errors.Annotate(berrors.ErrRestoreInvalidRewrite, "cannot find rewrite rule for start key")
return
}
endKey, rule = rewriteRawKey(file.GetEndKey(), rewriteRules)
if rewriteRules != nil && rule == nil {
err = errors.Annotate(berrors.ErrRestoreInvalidRewrite, "cannot find rewrite rule for end key")
return
}
return
}