Skip to content

Commit

Permalink
1.source address supports cluster 2.target address supports several p…
Browse files Browse the repository at this point in the history
…roxies to write data in a round-robin way.
  • Loading branch information
vinllen committed May 8, 2019
1 parent 1e32724 commit aa04ed0
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 104 deletions.
5 changes: 5 additions & 0 deletions ChangeLog
@@ -1,3 +1,8 @@
2019-04-21 Alibaba Cloud.
* VERSION: 1.6.0
* FEATURE: source address supports cluster.
* FEATURE: target address supports several proxies to write data in
a round-robin way.
2019-04-24 Alibaba Cloud.
* VERSION: 1.4.2
* IMPROVE: improve rump to support fetching data from given keys in
Expand Down
51 changes: 26 additions & 25 deletions conf/redis-shake.conf
Expand Up @@ -23,19 +23,6 @@ ncpu = 0
# parallel routines number used in RDB file syncing. default is 64.
parallel = 32

# input RDB file. read from stdin, default is stdin ('/dev/stdin').
# used in `decode` and `restore`.
# if the input is list split by semicolon(;), redis-shake will restore the list one by one.
# 如果是decode或者restore,这个参数表示读取的rdb文件。支持输入列表,例如:rdb.0;rdb.1;rdb.2
# redis-shake将会挨个进行恢复。
input_rdb = local

# output RDB file prefix.
# used in `decode` and `dump`.
# 如果是decode或者dump,这个参数表示输出的rdb前缀,比如输入有3个db,那么dump分别是:
# ${output_rdb}.0, ${output_rdb}.1, ${output_rdb}.2
output_rdb = local_dump

# source redis configuration.
# used in `dump`, `sync` and `rump`.
# ip:port
Expand All @@ -48,11 +35,6 @@ source.address = 127.0.0.1:20441
source.password_raw = 123456
# auth type, don't modify it
source.auth_type = auth
# the concurrence of fetching data, default is len(source.address)
# used in `dump` and `sync`.
# 拉取的并发度,默认是source.address中db的个数。假如db节点有5个,但source.parallel=3,那么一次只会
# 并发拉取3个db的全量数据,直到某个db的rdb拉取完毕,才会拉取第4个db节点的rdb,以此类推。
source.parallel =

# target redis configuration. used in `restore` and `sync`.
# used in `restore`, `sync` and `rump`.
Expand All @@ -65,15 +47,34 @@ target.address = 127.0.0.1:20551
target.password_raw = 123456
# auth type, don't modify it
target.auth_type = auth
# the type of target redis can be `standalone`, `proxy` or `cluster`.
# `standalone`: standalone db mode.
# `proxy`: proxy layer ahead redis.
# `cluster`: open source cluster (not supported currently)
# If the target is proxy, data will be inserted in a round-robin way.
# standalone表示单个db节点进行写入(包括主从模式),proxy表示写入的是代理层,将会以轮回的方式进行写入,1个proxy对应一条
# 长连接通道(1个源端db的数据只会写入1个proxy),cluster表示开源的集群架构
target.type = standalone
# all the data will be written into this db. < 0 means disable.
target.db = -1
# the type of target redis: `cluster`, `opensource`. If the target is open source redis cluster,
# redis-shake uses redis-go-cluster driver to write data, otherwise, redigo driver is used to
# insert data in round robin way.
# 目的redis的类型:`opensource`和`proxy`. `opensource`表示是开源的cluster或者redis db节点;
# `proxy`表示是proxy类型,将会以round robin循环方式写入。对于开源的cluster用redis-go-cluster驱动写入,其余
# 的则用redigo写入
# target.type = opensource

# input RDB file. read from stdin, default is stdin ('/dev/stdin').
# used in `decode` and `restore`.
# if the input is list split by semicolon(;), redis-shake will restore the list one by one.
# 如果是decode或者restore,这个参数表示读取的rdb文件。支持输入列表,例如:rdb.0;rdb.1;rdb.2
# redis-shake将会挨个进行恢复。
rdb.input = local
# output RDB file prefix.
# used in `decode` and `dump`.
# 如果是decode或者dump,这个参数表示输出的rdb前缀,比如输入有3个db,那么dump分别是:
# ${output_rdb}.0, ${output_rdb}.1, ${output_rdb}.2
rdb.output = local_dump
# the concurrence of fetching data, default is len(source.address) or len(rdb.input).
# used in `dump`, `sync` and `restore`.
# 拉取的并发度,如果是`dump`或者`sync`,默认是source.address中db的个数,`restore`模式默认len(rdb.input)。
# 假如db节点/输入的rdb有5个,但rdb.parallel=3,那么一次只会
# 并发拉取3个db的全量数据,直到某个db的rdb拉取完毕,才会拉取第4个db节点的rdb,以此类推。
rdb.parallel =

# use for expire key, set the time gap when source and target timestamp are not the same.
# 用于处理过期的键值,当迁移两端不一致的时候,目的端需要加上这个值
Expand Down
22 changes: 17 additions & 5 deletions src/redis-shake/common/common.go
Expand Up @@ -8,7 +8,7 @@ import (

"pkg/libs/bytesize"
"redis-shake/configure"

logRotate "gopkg.in/natefinch/lumberjack.v2"
)

Expand All @@ -26,9 +26,10 @@ const (
)

var (
Version = "$"
LogRotater *logRotate.Logger
StartTime string
Version = "$"
LogRotater *logRotate.Logger
StartTime string
TargetRoundRobin int
)

// read until hit the end of RESP: "\r\n"
Expand Down Expand Up @@ -70,5 +71,16 @@ func ParseInfo(content []byte) map[string]string {
}

func GetTotalLink() int {
return len(conf.Options.SourceAddress)
if len(conf.Options.SourceAddress) != 0 {
return len(conf.Options.SourceAddress)
} else {
return len(conf.Options.RdbInput)
}
}

func PickTargetRoundRobin(n int) int {
defer func() {
TargetRoundRobin = (TargetRoundRobin + 1) % n
}()
return TargetRoundRobin
}
8 changes: 5 additions & 3 deletions src/redis-shake/configure/configure.go
Expand Up @@ -11,20 +11,22 @@ type Configuration struct {
HttpProfile int `config:"http_profile"`
NCpu int `config:"ncpu"`
Parallel int `config:"parallel"`
InputRdb []string `config:"input_rdb"`
OutputRdb string `config:"output_rdb"`
SourceAddress []string `config:"source.address"`
SourcePasswordRaw string `config:"source.password_raw"`
SourcePasswordEncoding string `config:"source.password_encoding"`
SourceVersion uint `config:"source.version"`
SourceAuthType string `config:"source.auth_type"`
SourceParallel uint `config:"source.parallel"`
TargetAddress string `config:"target.address"`
TargetAddress []string `config:"target.address"`
TargetPasswordRaw string `config:"target.password_raw"`
TargetPasswordEncoding string `config:"target.password_encoding"`
TargetVersion uint `config:"target.version"`
TargetDB int `config:"target.db"`
TargetAuthType string `config:"target.auth_type"`
TargetType string `config:"target.type"`
RdbInput []string `config:"rdb.input"`
RdbOutput string `config:"rdb.output"`
RdbParallel int `config:"rdb.parallel"`
FakeTime string `config:"fake_time"`
Rewrite bool `config:"rewrite"`
FilterDB string `config:"filter.db"`
Expand Down
6 changes: 3 additions & 3 deletions src/redis-shake/decode.go
Expand Up @@ -40,11 +40,11 @@ func (cmd *CmdDecode) GetDetailedInfo() interface{} {
}

func (cmd *CmdDecode) Main() {
log.Infof("decode from '%s' to '%s'\n", conf.Options.InputRdb, conf.Options.OutputRdb)
log.Infof("decode from '%s' to '%s'\n", conf.Options.RdbInput, conf.Options.RdbOutput)

for i, input := range conf.Options.InputRdb {
for i, input := range conf.Options.RdbInput {
// decode one by one
output := fmt.Sprintf("%s.%d", conf.Options.OutputRdb, i)
output := fmt.Sprintf("%s.%d", conf.Options.RdbOutput, i)
cmd.decode(input, output)
}

Expand Down
2 changes: 1 addition & 1 deletion src/redis-shake/dump.go
Expand Up @@ -36,7 +36,7 @@ func (cmd *CmdDump) Main() {
for i, source := range conf.Options.SourceAddress {
nd := node{
source: source,
output: fmt.Sprintf("%s.%d", conf.Options.OutputRdb, i),
output: fmt.Sprintf("%s.%d", conf.Options.RdbOutput, i),
}
cmd.dumpChan <- nd
}
Expand Down
80 changes: 57 additions & 23 deletions src/redis-shake/main/main.go
Expand Up @@ -193,20 +193,48 @@ func sanitizeOptions(tp string) error {
return fmt.Errorf("BigKeyThreshold[%v] should <= 524288000", conf.Options.BigKeyThreshold)
}

if (tp == TypeRestore || tp == TypeSync) && conf.Options.TargetAddress == "" {
return fmt.Errorf("target address shouldn't be empty when type in {restore, sync}")
}
if (tp == TypeDump || tp == TypeSync) && len(conf.Options.SourceAddress) == 0 {
return fmt.Errorf("source address shouldn't be empty when type in {dump, sync}")
if tp == TypeRestore || tp == TypeSync || tp == TypeRump {
if len(conf.Options.TargetAddress) == 0 {
return fmt.Errorf("target address shouldn't be empty when type in {restore, sync}")
}

switch conf.Options.TargetType {
case "standalone":
if len(conf.Options.TargetAddress) != 1 {
return fmt.Errorf("the source address[%v] != 1 when target.type is standalone",
len(conf.Options.TargetAddress))
}
case "proxy":
case "cluster":
if tp == TypeRump || tp == TypeRestore {
// TODO
return fmt.Errorf("{rump, restore} mode doesn't support cluster currently")
}
return fmt.Errorf("coming soon")
default:
return fmt.Errorf("illegal target.type[%v]", conf.Options.TargetType)
}
}
if tp == TypeRump && (len(conf.Options.SourceAddress) == 0 || conf.Options.TargetAddress == "") {
return fmt.Errorf("source and target address shouldn't be empty when type in {rump}")
if (tp == TypeDump || tp == TypeSync || tp == TypeRump) && len(conf.Options.SourceAddress) == 0 {
return fmt.Errorf("source address shouldn't be empty when type in {dump, sync, rump}")
}
if (tp == TypeRestore || tp == TypeDecode) && len(conf.Options.InputRdb) == 0 {
if (tp == TypeRestore || tp == TypeDecode) && len(conf.Options.RdbInput) == 0 {
return fmt.Errorf("input rdb shouldn't be empty when type in {restore, decode}")
}
if tp == TypeDump && conf.Options.OutputRdb == "" {
conf.Options.OutputRdb = "output-rdb-dump"
if tp == TypeDump && conf.Options.RdbOutput == "" {
conf.Options.RdbOutput = "output-rdb-dump"
}

if conf.Options.RdbParallel == 0 {
if tp == TypeDump || tp == TypeSync {
conf.Options.RdbParallel = len(conf.Options.SourceAddress)
} else if tp == TypeRestore {
conf.Options.RdbParallel = len(conf.Options.RdbInput)
}
}

if tp == TypeRestore && conf.Options.RdbParallel > len(conf.Options.RdbInput) {
conf.Options.RdbParallel = len(conf.Options.RdbInput)
}

if conf.Options.SourcePasswordRaw != "" && conf.Options.SourcePasswordEncoding != "" {
Expand Down Expand Up @@ -318,45 +346,51 @@ func sanitizeOptions(tp string) error {

if conf.Options.HttpProfile < 0 || conf.Options.HttpProfile > 65535 {
return fmt.Errorf("HttpProfile[%v] should in [0, 65535]", conf.Options.HttpProfile)
} else if conf.Options.HttpProfile == 0 {
} else if conf.Options.HttpProfile == 0 {
// set to default when not set
conf.Options.HttpProfile = defaultHttpPort
}

if conf.Options.SystemProfile < 0 || conf.Options.SystemProfile > 65535 {
return fmt.Errorf("SystemProfile[%v] should in [0, 65535]", conf.Options.SystemProfile)
} else if conf.Options.SystemProfile == 0 {
} else if conf.Options.SystemProfile == 0 {
// set to default when not set
conf.Options.SystemProfile = defaultSystemPort
}

if conf.Options.SenderSize < 0 || conf.Options.SenderSize >= 1073741824 {
return fmt.Errorf("SenderSize[%v] should in [0, 1073741824]", conf.Options.SenderSize)
} else if conf.Options.SenderSize == 0 {
} else if conf.Options.SenderSize == 0 {
// set to default when not set
conf.Options.SenderSize = defaultSenderSize
}

if conf.Options.SenderCount < 0 || conf.Options.SenderCount >= 100000 {
return fmt.Errorf("SenderCount[%v] should in [0, 100000]", conf.Options.SenderCount)
} else if conf.Options.SenderCount == 0 {
} else if conf.Options.SenderCount == 0 {
// set to default when not set
conf.Options.SenderCount = defaultSenderCount
}

if tp == TypeRestore || tp == TypeSync {
// get target redis version and set TargetReplace.
if conf.Options.TargetRedisVersion, err = utils.GetRedisVersion(conf.Options.TargetAddress,
conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw); err != nil {
return fmt.Errorf("get target redis version failed[%v]", err)
} else {
if strings.HasPrefix(conf.Options.TargetRedisVersion, "4.") ||
strings.HasPrefix(conf.Options.TargetRedisVersion, "3.") {
conf.Options.TargetReplace = true
for _, address := range conf.Options.TargetAddress {
if v, err := utils.GetRedisVersion(address, conf.Options.TargetAuthType,
conf.Options.TargetPasswordRaw); err != nil {
return fmt.Errorf("get target redis version failed[%v]", err)
} else if conf.Options.TargetRedisVersion != "" && conf.Options.TargetRedisVersion != v {
return fmt.Errorf("target redis version is different: [%v %v]", conf.Options.TargetRedisVersion, v)
} else {
conf.Options.TargetReplace = false
conf.Options.TargetRedisVersion = v
}
}
if strings.HasPrefix(conf.Options.TargetRedisVersion, "4.") ||
strings.HasPrefix(conf.Options.TargetRedisVersion, "3.") ||
strings.HasPrefix(conf.Options.TargetRedisVersion, "5.") {
conf.Options.TargetReplace = true
} else {
conf.Options.TargetReplace = false
}
}

if tp == TypeRump {
Expand All @@ -365,7 +399,7 @@ func sanitizeOptions(tp string) error {
}

if conf.Options.ScanSpecialCloud != "" && conf.Options.ScanSpecialCloud != scanner.TencentCluster &&
conf.Options.ScanSpecialCloud != scanner.AliyunCluster {
conf.Options.ScanSpecialCloud != scanner.AliyunCluster {
return fmt.Errorf("special cloud type[%s] is not supported", conf.Options.ScanSpecialCloud)
}

Expand Down

0 comments on commit aa04ed0

Please sign in to comment.