Skip to content

Commit

Permalink
Merge fa0b60e into 9f6c3f1
Browse files Browse the repository at this point in the history
  • Loading branch information
nioshield committed Feb 1, 2019
2 parents 9f6c3f1 + fa0b60e commit 7e6a509
Show file tree
Hide file tree
Showing 12 changed files with 129 additions and 51 deletions.
6 changes: 1 addition & 5 deletions command/test_test.go
Expand Up @@ -8,13 +8,9 @@ import (
"github.com/meitu/titan/conf"
"github.com/meitu/titan/context"
"github.com/meitu/titan/db"
"github.com/meitu/titan/db/store"
)

var cfg = &conf.Tikv{
PdAddrs: store.MockAddr,
DB: conf.DB{},
}
var cfg = &conf.MockConf().Tikv
var mockdb *db.RedisStore

func init() {
Expand Down
19 changes: 19 additions & 0 deletions conf/config.go
Expand Up @@ -12,10 +12,12 @@ type Titan struct {
PIDFileName string `cfg:"pid-filename; titan.pid; ; the file name to record connd PID"`
}

//DB config is the config of titan data struct
type DB struct {
Hash Hash `cfg:"hash"`
}

//Hash config is the config of titan hash data struct
type Hash struct {
MetaSlot int64 `cfg:"meta-slot;0;numeric;hashes slot key count"`
}
Expand All @@ -31,17 +33,34 @@ type Server struct {
type Tikv struct {
PdAddrs string `cfg:"pd-addrs;required; ;pd address in tidb"`
DB DB `cfg:"db"`
GC GC `cfg:"gc"`
Expire Expire `cfg:"expire"`
ZT ZT `cfg:"zt"`
TikvGC TikvGC `cfg:"tikv-gc"`
}

//TikvGC config is the config of implement tikv sdk gcwork
type TikvGC struct {
Interval time.Duration `cfg:"interval;20m;;gc work tick interval"`
LeaderLifeTime time.Duration `cfg:"leader-life-time;30m;;lease flush leader interval"`
SafePointLifeTime time.Duration `cfg:"safe-point-life-time;10m;;safe point life time "`
Concurrency int `cfg:"concurrency;2;;gc work concurrency"`
}

//GC config is the config of Titan GC work
type GC struct {
Interval time.Duration `cfg:"interval;1s;;gc work tick interval"`
LeaderLifeTime time.Duration `cfg:"leader-life-time;3m;;lease flush leader interval"`
BatchLimit int `cfg:"batch-limit;256;numeric;key count limitation per-transection"`
}

//Expire config is the config of Titan expire work
type Expire struct {
Interval time.Duration `cfg:"interval;1s;;expire work tick interval"`
LeaderLifeTime time.Duration `cfg:"leader-life-time;3m;;lease flush leader interval"`
BatchLimit int `cfg:"batch-limit;256;numeric;key count limitation per-transection"`
}

//ZT config is the config of zlist
type ZT struct {
Wrokers int `cfg:"workers;5;numeric;parallel workers count"`
Expand Down
34 changes: 34 additions & 0 deletions conf/mockconfig.go
@@ -0,0 +1,34 @@
package conf

import "time"

//MockConf init and return titan mock conf
func MockConf() *Titan {
return &Titan{
Tikv: Tikv{
PdAddrs: "mocktikv://",
GC: GC{
Interval: time.Second,
LeaderLifeTime: 3 * time.Minute,
BatchLimit: 256,
},
Expire: Expire{
Interval: time.Second,
LeaderLifeTime: 3 * time.Minute,
BatchLimit: 256,
},
ZT: ZT{
Wrokers: 5,
BatchCount: 10,
QueueDepth: 100,
Interval: 1000 * time.Millisecond,
},
TikvGC: TikvGC{
Interval: 20 * time.Minute,
LeaderLifeTime: 30 * time.Minute,
SafePointLifeTime: 10 * time.Minute,
Concurrency: 2,
},
},
}
}
38 changes: 38 additions & 0 deletions conf/titan.toml
Expand Up @@ -49,6 +49,44 @@ pd-addrs = ""
#meta-slot = 0


[tikv.gc]

#type: time.Duration
#description: gc work tick interval
#default: 1s
#interval = "1s"

#type: time.Duration
#description: lease flush leader interval
#default: 3m
#leader-life-time = "3m0s"

#type: int
#rules: numeric
#description: key count limitation per-transection
#default: 256
#batch-limit = 256


[tikv.expire]

#type: time.Duration
#description: expire work tick interval
#default: 1s
#interval = "1s"

#type: time.Duration
#description: lease flush leader interval
#default: 3m
#leader-life-time = "3m0s"

#type: int
#rules: numeric
#description: key count limitation per-transection
#default: 256
#batch-limit = 256


[tikv.zt]

#type: int
Expand Down
4 changes: 2 additions & 2 deletions db/db.go
Expand Up @@ -110,8 +110,8 @@ func Open(conf *conf.Tikv) (*RedisStore, error) {
}
rds := &RedisStore{Storage: s, conf: conf}
sysdb := rds.DB(sysNamespace, sysDatabaseID)
go StartGC(sysdb)
go StartExpire(sysdb)
go StartGC(sysdb, &conf.GC)
go StartExpire(sysdb, &conf.Expire)
go StartZT(sysdb, &conf.ZT)
go StartTikvGC(sysdb, &conf.TikvGC)
return rds, nil
Expand Down
6 changes: 3 additions & 3 deletions db/db_test.go
Expand Up @@ -17,12 +17,12 @@ func TestMain(m *testing.M) {
if err != nil {
panic(err)
}
conf := &conf.Tikv{}
mockConf := conf.MockConf()
mockDB = &DB{
Namespace: "mockdb-ns",
ID: 1,
kv: &RedisStore{Storage: store, conf: conf},
conf: &conf.DB,
kv: &RedisStore{Storage: store, conf: &mockConf.Tikv},
conf: &mockConf.Tikv.DB,
}

os.Exit(m.Run())
Expand Down
25 changes: 10 additions & 15 deletions db/expire.go
Expand Up @@ -5,20 +5,15 @@ import (
"context"
"time"

"github.com/meitu/titan/conf"
"github.com/meitu/titan/db/store"
"github.com/meitu/titan/metrics"
"go.uber.org/zap"
)

const (
expireBatchLimit = 256
expireTick = time.Duration(time.Second)
)

var (
expireKeyPrefix = []byte("$sys:0:at:")
sysExpireLeader = []byte("$sys:0:EXL:EXLeader")
sysExpireLeaderFlushInterval = 10 * time.Second
expireKeyPrefix = []byte("$sys:0:at:")
sysExpireLeader = []byte("$sys:0:EXL:EXLeader")

// $sys:0:at:{ts}:{metaKey}
expireTimestampOffset = len(expireKeyPrefix)
Expand Down Expand Up @@ -84,12 +79,12 @@ func unExpireAt(txn store.Transaction, mkey []byte, expireAt int64) error {
}

// StartExpire get leader from db
func StartExpire(db *DB) error {
ticker := time.NewTicker(expireTick)
func StartExpire(db *DB, conf *conf.Expire) error {
ticker := time.NewTicker(conf.Interval)
defer ticker.Stop()
id := UUID()
for range ticker.C {
isLeader, err := isLeader(db, sysExpireLeader, id, sysExpireLeaderFlushInterval)
isLeader, err := isLeader(db, sysExpireLeader, id, conf.LeaderLifeTime)
if err != nil {
zap.L().Error("[Expire] check expire leader failed", zap.Error(err))
continue
Expand All @@ -98,7 +93,7 @@ func StartExpire(db *DB) error {
zap.L().Debug("[Expire] not expire leader")
continue
}
runExpire(db)
runExpire(db, conf.BatchLimit)
}
return nil
}
Expand All @@ -121,7 +116,7 @@ func toTikvDataKey(namespace []byte, id DBID, key []byte) []byte {
return b
}

func runExpire(db *DB) {
func runExpire(db *DB, batchLimit int) {
txn, err := db.Begin()
if err != nil {
zap.L().Error("[Expire] txn begin failed", zap.Error(err))
Expand All @@ -133,7 +128,7 @@ func runExpire(db *DB) {
txn.Rollback()
return
}
limit := expireBatchLimit
limit := batchLimit
now := time.Now().UnixNano()
for iter.Valid() && iter.Key().HasPrefix(expireKeyPrefix) && limit > 0 {
key := iter.Key()
Expand Down Expand Up @@ -190,5 +185,5 @@ func runExpire(db *DB) {
txn.Rollback()
zap.L().Error("[Expire] commit failed", zap.Error(err))
}
metrics.GetMetrics().ExpireKeysTotal.WithLabelValues("expired").Add(float64(expireBatchLimit - limit))
metrics.GetMetrics().ExpireKeysTotal.WithLabelValues("expired").Add(float64(batchLimit - limit))
}
28 changes: 12 additions & 16 deletions db/gc.go
Expand Up @@ -4,19 +4,14 @@ import (
"context"
"time"

"github.com/meitu/titan/conf"
"github.com/meitu/titan/db/store"
"github.com/meitu/titan/metrics"
"go.uber.org/zap"
)

var (
sysGCLeader = []byte("$sys:0:GCL:GCLeader")
gcInterval = time.Duration(1) //TODO 使用配置
)

const (
sysGCBurst = 256
sysGCLeaseFlushInterval = 10 * time.Second
)

func toTikvGCKey(key []byte) []byte {
Expand Down Expand Up @@ -57,8 +52,8 @@ func gcGetPrefix(txn store.Transaction) ([]byte, error) {
return key[len(gcPrefix):], nil
}

func gcDeleteRange(txn store.Transaction, prefix []byte, limit int64) (int64, error) {
count := int64(0)
func gcDeleteRange(txn store.Transaction, prefix []byte, limit int) (int, error) {
var count int
itr, err := txn.Iter(prefix, nil)
if err != nil {
return count, err
Expand Down Expand Up @@ -91,7 +86,7 @@ func gcComplete(txn store.Transaction, prefix []byte) error {
return txn.Delete(toTikvGCKey(prefix))
}

func doGC(db *DB, limit int64) error {
func doGC(db *DB, limit int) error {
left := limit
for left > 0 {
txn, err := db.Begin()
Expand All @@ -108,8 +103,8 @@ func doGC(db *DB, limit int64) error {
zap.L().Debug("[GC] no gc item")
return nil
}
count := int64(0)
zap.L().Debug("[GC] start to delete prefix", zap.String("prefix", string(prefix)), zap.Int64("limit", limit))
count := 0
zap.L().Debug("[GC] start to delete prefix", zap.String("prefix", string(prefix)), zap.Int("limit", limit))
if count, err = gcDeleteRange(txn.t, prefix, limit); err != nil {
return err
}
Expand Down Expand Up @@ -140,11 +135,12 @@ func doGC(db *DB, limit int64) error {
// StartGC start gc
//1.获取leader许可
//2.leader 执行清理任务
func StartGC(db *DB) {
ticker := time.Tick(gcInterval * time.Second)
func StartGC(db *DB, conf *conf.GC) {
ticker := time.NewTicker(conf.Interval)
defer ticker.Stop()
id := UUID()
for range ticker {
isLeader, err := isLeader(db, sysGCLeader, id, sysGCLeaseFlushInterval)
for range ticker.C {
isLeader, err := isLeader(db, sysGCLeader, id, conf.LeaderLifeTime)
if err != nil {
zap.L().Error("[GC] check GC leader failed", zap.Error(err))
continue
Expand All @@ -153,7 +149,7 @@ func StartGC(db *DB) {
zap.L().Debug("[GC] not GC leader")
continue
}
if err := doGC(db, sysGCBurst); err != nil {
if err := doGC(db, conf.BatchLimit); err != nil {
zap.L().Error("[GC] do GC failed", zap.Error(err))
continue
}
Expand Down
3 changes: 2 additions & 1 deletion db/test_test.go
Expand Up @@ -12,6 +12,7 @@ func MockDB() *DB {
if err != nil {
panic(err)
}
redis := &RedisStore{Storage: store, conf: &conf.Tikv{}}
mockConf := conf.MockConf()
redis := &RedisStore{Storage: store, conf: &mockConf.Tikv}
return &DB{Namespace: "ns", ID: DBID(1), kv: redis}
}
5 changes: 3 additions & 2 deletions db/tikvgc.go
Expand Up @@ -22,10 +22,11 @@ const (

// StartTikvGC start tikv gcwork
func StartTikvGC(db *DB, tikvCfg *conf.TikvGC) {
ticker := time.Tick(tikvCfg.Interval)
ticker := time.NewTicker(tikvCfg.Interval)
defer ticker.Stop()
uuid := UUID()
ctx := context.Background()
for range ticker {
for range ticker.C {
isLeader, err := isLeader(db, sysTikvGCLeader, uuid, tikvCfg.LeaderLifeTime)
if err != nil {
zap.L().Error("[TikvGC] check TikvGC leader failed", zap.Error(err))
Expand Down
7 changes: 4 additions & 3 deletions db/ztransfer.go
Expand Up @@ -194,9 +194,10 @@ func StartZT(db *DB, conf *conf.ZT) {

// check leader and fill the channel
prefix := toZTKey(nil)
tick := time.Tick(conf.Interval)
ticker := time.NewTicker(conf.Interval)
defer ticker.Stop()
id := UUID()
for range tick {
for range ticker.C {
isLeader, err := isLeader(db, sysZTLeader, id, sysZTLeaderFlushInterval)
if err != nil {
zap.L().Error("[ZT] check ZT leader failed",
Expand All @@ -209,7 +210,7 @@ func StartZT(db *DB, conf *conf.ZT) {
continue
}

if prefix, err = runZT(db, prefix, tick); err != nil {
if prefix, err = runZT(db, prefix, ticker.C); err != nil {
zap.L().Error("[ZT] error in run ZT",
zap.Int64("dbid", int64(db.ID)),
zap.ByteString("prefix", prefix),
Expand Down
5 changes: 1 addition & 4 deletions tools/integration/titan.go
Expand Up @@ -20,10 +20,7 @@ var (
MaxConnection: 10000,
Auth: "",
}
tikvConf = conf.Tikv{
PdAddrs: "mocktikv://",
}

tikvConf = conf.MockConf().Tikv
//ServerAddr default server addr
ServerAddr = "127.0.0.1:17369"
lis net.Listener
Expand Down

0 comments on commit 7e6a509

Please sign in to comment.