This repository has been archived by the owner on Jul 24, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 102
/
restore_raw.go
157 lines (131 loc) · 4.47 KB
/
restore_raw.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
package task
import (
"context"
"github.com/pingcap/br/pkg/metautil"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/restore"
"github.com/pingcap/br/pkg/summary"
)
// RestoreRawConfig is the configuration specific for raw kv restore tasks.
type RestoreRawConfig struct {
RawKvConfig
RestoreCommonConfig
}
// DefineRawRestoreFlags defines common flags for the backup command.
func DefineRawRestoreFlags(command *cobra.Command) {
command.Flags().StringP(flagKeyFormat, "", "hex", "start/end key format, support raw|escaped|hex")
command.Flags().StringP(flagTiKVColumnFamily, "", "default", "restore specify cf, correspond to tikv cf")
command.Flags().StringP(flagStartKey, "", "", "restore raw kv start key, key is inclusive")
command.Flags().StringP(flagEndKey, "", "", "restore raw kv end key, key is exclusive")
DefineRestoreCommonFlags(command.PersistentFlags())
}
// ParseFromFlags parses the backup-related flags from the flag set.
func (cfg *RestoreRawConfig) ParseFromFlags(flags *pflag.FlagSet) error {
var err error
cfg.Online, err = flags.GetBool(flagOnline)
if err != nil {
return errors.Trace(err)
}
err = cfg.RestoreCommonConfig.ParseFromFlags(flags)
if err != nil {
return errors.Trace(err)
}
return cfg.RawKvConfig.ParseFromFlags(flags)
}
func (cfg *RestoreRawConfig) adjust() {
cfg.Config.adjust()
cfg.RestoreCommonConfig.adjust()
if cfg.Concurrency == 0 {
cfg.Concurrency = defaultRestoreConcurrency
}
}
// RunRestoreRaw starts a raw kv restore task inside the current goroutine.
func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreRawConfig) (err error) {
cfg.adjust()
defer summary.Summary(cmdName)
ctx, cancel := context.WithCancel(c)
defer cancel()
// Restore raw does not need domain.
needDomain := false
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain)
if err != nil {
return errors.Trace(err)
}
defer mgr.Close()
keepaliveCfg := GetKeepalive(&cfg.Config)
// sometimes we have pooled the connections.
// sending heartbeats in idle times is useful.
keepaliveCfg.PermitWithoutStream = true
client, err := restore.NewRestoreClient(g, mgr.GetPDClient(), mgr.GetStorage(), mgr.GetTLSConfig(), keepaliveCfg)
if err != nil {
return errors.Trace(err)
}
defer client.Close()
client.SetRateLimit(cfg.RateLimit)
client.SetConcurrency(uint(cfg.Concurrency))
if cfg.Online {
client.EnableOnline()
}
client.SetSwitchModeInterval(cfg.SwitchModeInterval)
u, s, backupMeta, err := ReadBackupMeta(ctx, metautil.MetaFile, &cfg.Config)
if err != nil {
return errors.Trace(err)
}
reader := metautil.NewMetaReader(backupMeta, s)
if err = client.InitBackupMeta(c, backupMeta, u, s, reader); err != nil {
return errors.Trace(err)
}
if !client.IsRawKvMode() {
return errors.Annotate(berrors.ErrRestoreModeMismatch, "cannot do raw restore from transactional data")
}
files, err := client.GetFilesInRawRange(cfg.StartKey, cfg.EndKey, cfg.CF)
if err != nil {
return errors.Trace(err)
}
archiveSize := reader.ArchiveSize(ctx, files)
g.Record(summary.RestoreDataSize, archiveSize)
if len(files) == 0 {
log.Info("all files are filtered out from the backup archive, nothing to restore")
return nil
}
summary.CollectInt("restore files", len(files))
ranges, _, err := restore.MergeFileRanges(
files, cfg.MergeSmallRegionKeyCount, cfg.MergeSmallRegionKeyCount)
if err != nil {
return errors.Trace(err)
}
// Redirect to log if there is no log file to avoid unreadable output.
// TODO: How to show progress?
updateCh := g.StartProgress(
ctx,
"Raw Restore",
// Split/Scatter + Download/Ingest
int64(len(ranges)+len(files)),
!cfg.LogProgress)
// RawKV restore does not need to rewrite keys.
rewrite := &restore.RewriteRules{}
err = restore.SplitRanges(ctx, client, ranges, rewrite, updateCh)
if err != nil {
return errors.Trace(err)
}
restoreSchedulers, err := restorePreWork(ctx, client, mgr)
if err != nil {
return errors.Trace(err)
}
defer restorePostWork(ctx, client, restoreSchedulers)
err = client.RestoreRaw(ctx, cfg.StartKey, cfg.EndKey, files, updateCh)
if err != nil {
return errors.Trace(err)
}
// Restore has finished.
updateCh.Close()
// Set task summary to success status.
summary.SetSuccessStatus(true)
return nil
}