Skip to content

Commit

Permalink
server/kv: rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
huachaohuang committed Nov 2, 2016
1 parent 0066461 commit 6e5886a
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 48 deletions.
2 changes: 1 addition & 1 deletion server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (c *RaftCluster) putConfig(meta *metapb.Cluster) error {
return errors.Errorf("invalid cluster %v, mismatch cluster id %d", meta, c.clusterID)
}

if err := c.s.kv.saveCluster(meta); err != nil {
if err := c.s.kv.saveMeta(meta); err != nil {
return errors.Trace(err)
}

Expand Down
72 changes: 33 additions & 39 deletions server/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,42 +102,39 @@ func (kv *kv) bootstrapCluster(store *metapb.Store, region *metapb.Region) error

func (kv *kv) initCluster() (*clusterInfo, error) {
log.Info("load cluster")
cluster := &metapb.Cluster{}
ok, err := kv.loadCluster(cluster)

cache := newClusterInfo(kv.s.idAlloc)
cache.meta = &metapb.Cluster{}

ok, err := kv.loadMeta(cache.meta)
if err != nil {
return nil, errors.Trace(err)
}
if !ok {
return nil, nil
}

cache := newClusterInfo(kv.s.idAlloc)
cache.setMeta(cluster)

start := time.Now()
if err := kv.loadStores(cache, kvRangeLimit); err != nil {
if err := kv.loadStores(cache.stores, kvRangeLimit); err != nil {
return nil, errors.Trace(err)
}
log.Infof("load %v stores cost %v", cache.getStoreCount(), time.Since(start))

start = time.Now()
if err := kv.loadRegions(cache, kvRangeLimit); err != nil {
if err := kv.loadRegions(cache.regions, kvRangeLimit); err != nil {
return nil, errors.Trace(err)
}
log.Infof("load %v regions cost %v", cache.getRegionCount(), time.Since(start))

return cache, nil
}

func (kv *kv) loadStores(cache *clusterInfo, rangeLimit int64) error {
func (kv *kv) loadStores(stores *storesInfo, rangeLimit int64) error {
nextID := uint64(0)
endStore := kv.storePath(math.MaxUint64)
withRange := clientv3.WithRange(endStore)
withLimit := clientv3.WithLimit(rangeLimit)

cache.Lock()
defer cache.Unlock()

for {
key := kv.storePath(nextID)
resp, err := kvGet(kv.client(), key, withRange, withLimit)
Expand All @@ -155,20 +152,17 @@ func (kv *kv) loadStores(cache *clusterInfo, rangeLimit int64) error {
}

nextID = store.GetId() + 1
cache.stores.setStore(newStoreInfo(store))
stores.setStore(newStoreInfo(store))
}
}
}

func (kv *kv) loadRegions(cache *clusterInfo, rangeLimit int64) error {
func (kv *kv) loadRegions(regions *regionsInfo, rangeLimit int64) error {
nextID := uint64(0)
endRegion := kv.regionPath(math.MaxUint64)
withRange := clientv3.WithRange(endRegion)
withLimit := clientv3.WithLimit(rangeLimit)

cache.Lock()
defer cache.Unlock()

for {
key := kv.regionPath(nextID)
resp, err := kvGet(kv.client(), key, withRange, withLimit)
Expand All @@ -186,41 +180,33 @@ func (kv *kv) loadRegions(cache *clusterInfo, rangeLimit int64) error {
}

nextID = region.GetId() + 1
cache.regions.addRegion(newRegionInfo(region, nil))
regions.setRegion(newRegionInfo(region, nil))
}
}
}

func (kv *kv) saveCluster(cluster *metapb.Cluster) error {
return kv.saveProto(kv.clusterPath, cluster)
}

func (kv *kv) loadCluster(cluster *metapb.Cluster) (bool, error) {
return kv.loadProto(kv.clusterPath, cluster)
func (kv *kv) loadMeta(meta *metapb.Cluster) (bool, error) {
return kv.loadProto(kv.clusterPath, meta)
}

func (kv *kv) saveStore(store *metapb.Store) error {
return kv.saveProto(kv.storePath(store.GetId()), store)
func (kv *kv) saveMeta(meta *metapb.Cluster) error {
return kv.saveProto(kv.clusterPath, meta)
}

func (kv *kv) loadStore(storeID uint64, store *metapb.Store) (bool, error) {
return kv.loadProto(kv.storePath(storeID), store)
}

func (kv *kv) saveRegion(region *metapb.Region) error {
return kv.saveProto(kv.regionPath(region.GetId()), region)
func (kv *kv) saveStore(store *metapb.Store) error {
return kv.saveProto(kv.storePath(store.GetId()), store)
}

func (kv *kv) loadRegion(regionID uint64, region *metapb.Region) (bool, error) {
return kv.loadProto(kv.regionPath(regionID), region)
}

func (kv *kv) saveProto(key string, msg proto.Message) error {
value, err := proto.Marshal(msg)
if err != nil {
return errors.Trace(err)
}
return kv.save(key, string(value))
func (kv *kv) saveRegion(region *metapb.Region) error {
return kv.saveProto(kv.regionPath(region.GetId()), region)
}

func (kv *kv) loadProto(key string, msg proto.Message) (bool, error) {
Expand All @@ -234,15 +220,12 @@ func (kv *kv) loadProto(key string, msg proto.Message) (bool, error) {
return true, proto.Unmarshal(value, msg)
}

func (kv *kv) save(key, value string) error {
resp, err := kv.txn().Then(clientv3.OpPut(key, string(value))).Commit()
func (kv *kv) saveProto(key string, msg proto.Message) error {
value, err := proto.Marshal(msg)
if err != nil {
return errors.Trace(err)
}
if !resp.Succeeded {
return errors.Trace(errTxnFailed)
}
return nil
return kv.save(key, string(value))
}

func (kv *kv) load(key string) ([]byte, error) {
Expand All @@ -258,6 +241,17 @@ func (kv *kv) load(key string) ([]byte, error) {
return resp.Kvs[0].Value, nil
}

func (kv *kv) save(key, value string) error {
resp, err := kv.txn().Then(clientv3.OpPut(key, string(value))).Commit()
if err != nil {
return errors.Trace(err)
}
if !resp.Succeeded {
return errors.Trace(errTxnFailed)
}
return nil
}

func kvGet(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
ctx, cancel := context.WithTimeout(c.Ctx(), requestTimeout)
defer cancel()
Expand Down
16 changes: 8 additions & 8 deletions server/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ func (s *testKVSuite) TestBasic(c *C) {
c.Assert(kv.storePath(123), Equals, "/pd/0/raft/s/00000000000000000123")
c.Assert(kv.regionPath(123), Equals, "/pd/0/raft/r/00000000000000000123")

cluster := &metapb.Cluster{Id: 123}
ok, err := kv.loadCluster(cluster)
meta := &metapb.Cluster{Id: 123}
ok, err := kv.loadMeta(meta)
c.Assert(ok, IsFalse)
c.Assert(err, IsNil)
c.Assert(kv.saveCluster(cluster), IsNil)
newCluster := &metapb.Cluster{}
ok, err = kv.loadCluster(newCluster)
c.Assert(kv.saveMeta(meta), IsNil)
newMeta := &metapb.Cluster{}
ok, err = kv.loadMeta(newMeta)
c.Assert(ok, IsTrue)
c.Assert(err, IsNil)
c.Assert(newCluster, DeepEquals, cluster)
c.Assert(newMeta, DeepEquals, meta)

store := &metapb.Store{Id: 123}
ok, err = kv.loadStore(123, store)
Expand Down Expand Up @@ -93,7 +93,7 @@ func (s *testKVSuite) TestBootstrap(c *C) {

func (s *testKVSuite) TestLoadStores(c *C) {
kv := newKV(s.server)
cache := newClusterInfo(newMockIDAllocator())
cache := newStoresInfo()

n := uint64(10)
stores := make([]*metapb.Store, 0, n)
Expand All @@ -116,7 +116,7 @@ func (s *testKVSuite) TestLoadStores(c *C) {

func (s *testKVSuite) TestLoadRegions(c *C) {
kv := newKV(s.server)
cache := newClusterInfo(newMockIDAllocator())
cache := newRegionsInfo()

n := uint64(10)
regions := make([]*metapb.Region, 0, n)
Expand Down

0 comments on commit 6e5886a

Please sign in to comment.