Skip to content

Commit

Permalink
table: add meta decode, refactor tests. (#827)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored and siddontang committed Nov 6, 2017
1 parent ac44202 commit 3e093ab
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 160 deletions.
47 changes: 17 additions & 30 deletions table/codec.go
Expand Up @@ -20,18 +20,10 @@ import (
"github.com/juju/errors"
)

// IDDecoder defines method to extract tableID from key
type IDDecoder interface {
DecodeTableID(key Key) int64
}

// DefaultIDDecoder is the default decoder.
// unit test will use other mocked decoder.
var DefaultIDDecoder defaultIDDecoder

type defaultIDDecoder struct{}

var tablePrefix = []byte{'t'}
var (
tablePrefix = []byte{'t'}
metaPrefix = []byte{'m'}
)

const (
signMask uint64 = 0x8000000000000000
Expand All @@ -44,19 +36,14 @@ const (
// Key represents high-level Key type.
type Key []byte

// HasPrefix tests whether the Key begins with prefix.
func (k Key) HasPrefix(prefix Key) bool {
return bytes.HasPrefix(k, prefix)
}

// DecodeTableID decodes the table ID of the key, if the key is not table key, returns 0.
func (decoder defaultIDDecoder) DecodeTableID(key Key) int64 {
_, key, err := decodeBytes(key)
// TableID returns the table ID of the key, if the key is not table key, returns 0.
func (k Key) TableID() int64 {
_, key, err := decodeBytes(k)
if err != nil {
// should never happen
return 0
}
if !key.HasPrefix(tablePrefix) {
if !bytes.HasPrefix(key, tablePrefix) {
return 0
}
key = key[len(tablePrefix):]
Expand All @@ -65,6 +52,15 @@ func (decoder defaultIDDecoder) DecodeTableID(key Key) int64 {
return tableID
}

// IsMeta returns if the key is a meta key.
func (k Key) IsMeta() bool {
_, key, err := decodeBytes(k)
if err != nil {
return false
}
return bytes.HasPrefix(key, metaPrefix)
}

// DecodeInt decodes value encoded by EncodeInt before.
// It returns the leftover un-decoded slice, decoded value if no error.
func DecodeInt(b []byte) ([]byte, int64, error) {
Expand All @@ -82,15 +78,6 @@ func decodeCmpUintToInt(u uint64) int64 {
return int64(u ^ signMask)
}

// IsPureTableID returns true if b is consist of tablePrefix and 8-byte tableID
func IsPureTableID(b []byte) bool {
_, key, err := decodeBytes(b)
if err != nil {
return false
}
return len(key) == len(tablePrefix)+8
}

func decodeBytes(b []byte) ([]byte, []byte, error) {
data := make([]byte, 0, len(b))
for {
Expand Down
12 changes: 5 additions & 7 deletions table/codec_test.go
Expand Up @@ -63,19 +63,17 @@ func (s *testCodecSuite) TestDecodeBytes(c *C) {

func (s *testCodecSuite) TestTableID(c *C) {
key := encodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\xff"))
c.Assert(DefaultIDDecoder.DecodeTableID(key), Equals, int64(0xff))
c.Assert(IsPureTableID(key), IsTrue)
c.Assert(Key(key).TableID(), Equals, int64(0xff))

key = encodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\xff_i\x01\x02"))
c.Assert(DefaultIDDecoder.DecodeTableID(key), Equals, int64(0xff))
c.Assert(IsPureTableID(key), IsFalse)
c.Assert(Key(key).TableID(), Equals, int64(0xff))

key = []byte("t\x80\x00\x00\x00\x00\x00\x00\xff")
c.Assert(DefaultIDDecoder.DecodeTableID(key), Equals, int64(0))
c.Assert(Key(key).TableID(), Equals, int64(0))

key = encodeBytes([]byte("T\x00\x00\x00\x00\x00\x00\x00\xff"))
c.Assert(DefaultIDDecoder.DecodeTableID(key), Equals, int64(0))
c.Assert(Key(key).TableID(), Equals, int64(0))

key = encodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\xff"))
c.Assert(DefaultIDDecoder.DecodeTableID(key), Equals, int64(0))
c.Assert(Key(key).TableID(), Equals, int64(0))
}
16 changes: 7 additions & 9 deletions table/namespace_classifier.go
Expand Up @@ -89,10 +89,9 @@ func (ns *Namespace) AddStoreID(storeID uint64) {
// tableNamespaceClassifier implements Classifier interface
type tableNamespaceClassifier struct {
sync.RWMutex
nsInfo *namespacesInfo
tableIDDecoder IDDecoder
kv *core.KV
idAlloc core.IDAllocator
nsInfo *namespacesInfo
kv *core.KV
idAlloc core.IDAllocator
http.Handler
}

Expand All @@ -106,10 +105,9 @@ func NewTableNamespaceClassifier(kv *core.KV, idAlloc core.IDAllocator) (namespa
return nil, errors.Trace(err)
}
c := &tableNamespaceClassifier{
nsInfo: nsInfo,
tableIDDecoder: DefaultIDDecoder,
kv: kv,
idAlloc: idAlloc,
nsInfo: nsInfo,
kv: kv,
idAlloc: idAlloc,
}
c.Handler = newTableClassifierHandler(c)
return c, nil
Expand Down Expand Up @@ -144,7 +142,7 @@ func (c *tableNamespaceClassifier) GetRegionNamespace(regionInfo *core.RegionInf
c.RLock()
defer c.RUnlock()

tableID := c.tableIDDecoder.DecodeTableID(regionInfo.StartKey)
tableID := Key(regionInfo.StartKey).TableID()
if tableID == 0 {
return namespace.DefaultNamespace
}
Expand Down
174 changes: 60 additions & 114 deletions table/namespace_classifier_test.go
Expand Up @@ -14,7 +14,6 @@
package table

import (
"bytes"
"sort"

. "github.com/pingcap/check"
Expand All @@ -24,74 +23,42 @@ import (

var _ = Suite(&testTableNamespaceSuite{})

type testTableNamespaceSuite struct {
}

type mockTableIDDecoderForTarget struct{}
type mockTableIDDecoderForGlobal struct{}
type mockTableIDDecoderForEdge struct{}
type mockTableIDDecoderForCrossTable struct{}

const (
targetTableID = 12345
targetStoreID = 54321
globalTableID = 789
globalStoreID = 987
testTable1 = 1 + iota
testTable2
testNS1
testNS2
testStore1
testStore2
)

var tableStartKey = encodeBytes([]byte{'t', 0, 0, 0, '1', 0, 0, 0, 0})

func (d mockTableIDDecoderForTarget) DecodeTableID(key Key) int64 {
return targetTableID
}

func (d mockTableIDDecoderForGlobal) DecodeTableID(key Key) int64 {
return globalTableID
}

func (d mockTableIDDecoderForEdge) DecodeTableID(key Key) int64 {
if string(key) == "startKey" || string(key) == "endKey" {
return 0
}
return targetTableID
}

func (d mockTableIDDecoderForCrossTable) DecodeTableID(key Key) int64 {
if string(key) == "startKey" {
return targetTableID
} else if string(key) == "endKey" {
return targetTableID + 1
} else if bytes.Equal(key, tableStartKey) {
return targetTableID + 1
}
return targetTableID
type testTableNamespaceSuite struct {
}

func (s *testTableNamespaceSuite) newClassifier(c *C, decoder IDDecoder) *tableNamespaceClassifier {
func (s *testTableNamespaceSuite) newClassifier(c *C) *tableNamespaceClassifier {
kv := core.NewKV(core.NewMemoryKV())
classifier, err := NewTableNamespaceClassifier(kv, core.NewMockIDAllocator())
c.Assert(err, IsNil)
tableClassifier := classifier.(*tableNamespaceClassifier)
tableClassifier.tableIDDecoder = decoder
testNamespace1 := Namespace{
ID: 1,
Name: "test1",
ID: testNS1,
Name: "ns1",
TableIDs: map[int64]bool{
targetTableID: true,
testTable1: true,
},
StoreIDs: map[uint64]bool{
targetStoreID: true,
testStore1: true,
},
}

testNamespace2 := Namespace{
ID: 2,
Name: "test2",
ID: testNS2,
Name: "ns2",
TableIDs: map[int64]bool{
targetTableID + 1: true,
testTable2: true,
},
StoreIDs: map[uint64]bool{
targetStoreID + 1: true,
testStore2: true,
},
}

Expand All @@ -101,43 +68,71 @@ func (s *testTableNamespaceSuite) newClassifier(c *C, decoder IDDecoder) *tableN
}

func (s *testTableNamespaceSuite) TestTableNameSpaceGetAllNamespace(c *C) {
classifier := s.newClassifier(c, mockTableIDDecoderForTarget{})
classifier := s.newClassifier(c)
ns := classifier.GetAllNamespaces()
sort.Strings(ns)
c.Assert(ns, DeepEquals, []string{"global", "test1", "test2"})
c.Assert(ns, DeepEquals, []string{"global", "ns1", "ns2"})
}

func (s *testTableNamespaceSuite) TestTableNameSpaceGetStoreNamespace(c *C) {
classifier := s.newClassifier(c, mockTableIDDecoderForTarget{})
classifier := s.newClassifier(c)

// Test store namespace
meatapdStore := metapb.Store{Id: targetStoreID}
meatapdStore := metapb.Store{Id: testStore1}
storeInfo := core.NewStoreInfo(&meatapdStore)
c.Assert(classifier.GetStoreNamespace(storeInfo), Equals, "test1")
c.Assert(classifier.GetStoreNamespace(storeInfo), Equals, "ns1")

meatapdStore = metapb.Store{Id: globalStoreID}
meatapdStore = metapb.Store{Id: testStore2 + 1}
storeInfo = core.NewStoreInfo(&meatapdStore)
c.Assert(classifier.GetStoreNamespace(storeInfo), Equals, "global")
}

func (s *testTableNamespaceSuite) TestTableNameSpaceGetRegionNamespace(c *C) {
// Test region namespace when tableIDDecoder returns the region's tableId
classifier := s.newClassifier(c, mockTableIDDecoderForTarget{})
regionInfo := core.NewRegionInfo(&metapb.Region{}, &metapb.Peer{})
c.Assert(classifier.GetRegionNamespace(regionInfo), Equals, "test1")

// Test region namespace when tableIDDecoder doesn't return the region's tableId
classifier = s.newClassifier(c, mockTableIDDecoderForGlobal{})
regionInfo = core.NewRegionInfo(&metapb.Region{}, &metapb.Peer{})
c.Assert(classifier.GetRegionNamespace(regionInfo), Equals, "global")
type Case struct {
endcoded bool
startKey, endKey string
tableID int64
isMeta bool
namespace string
}
testCases := []Case{
{false, "", "", 0, false, "global"},
{false, "", "t\x80\x00\x00\x00\x00\x00\x00\x01", 0, false, "global"},
{false, "", "t\x80\x00\x00\x00\x00\x00\x00\x01\x02\x03", 0, false, "global"},
{false, "t\x80\x00\x00\x00\x00\x00\x00\x01", "", testTable1, false, "ns1"},
{false, "t\x80\x00\x00\x00\x00\x00\x00\x01\x02\x03", "", testTable1, false, "ns1"},
{false, "t\x80\x00\x00\x00\x00\x00\x00\x01", "t\x80\x00\x00\x00\x00\x00\x00\x01\x02\x03", testTable1, false, "ns1"},
{false, "t\x80\x00\x00\x00\x00\x00\x00\x01", "t\x80\x00\x00\x00\x00\x00\x00\x02", testTable1, false, "ns1"},
{false, "t\x80\x00\x00\x00\x00\x00\x00\x02", "t\x80\x00\x00\x00\x00\x00\x00\x02\x01", testTable2, false, "ns2"},
{false, "t\x80\x00\x00\x00\x00\x00\x00\x02", "", testTable2, false, "ns2"},
{false, "t\x80\x00\x00\x00\x00\x00\x00\x03", "t\x80\x00\x00\x00\x00\x00\x00\x04", 3, false, "global"},
{false, "m\x80\x00\x00\x00\x00\x00\x00\x01", "", 0, true, "global"},
{false, "", "m\x80\x00\x00\x00\x00\x00\x00\x01", 0, false, "global"},
{true, string(encodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\x01"))), "", testTable1, false, "ns1"},
{true, "t\x80\x00\x00\x00\x00\x00\x00\x01", "", 0, false, "global"}, // decode error
}
classifier := s.newClassifier(c)
for _, t := range testCases {
startKey, endKey := Key(t.startKey), Key(t.endKey)
if !t.endcoded {
startKey, endKey = encodeBytes(startKey), encodeBytes(endKey)
}
c.Assert(Key(startKey).TableID(), Equals, t.tableID)
c.Assert(Key(startKey).IsMeta(), Equals, t.isMeta)

region := core.NewRegionInfo(&metapb.Region{
StartKey: startKey,
EndKey: endKey,
}, &metapb.Peer{})
c.Assert(classifier.GetRegionNamespace(region), Equals, t.namespace)
}
}

func (s *testTableNamespaceSuite) TestNamespaceOperation(c *C) {
kv := core.NewKV(core.NewMemoryKV())
classifier, err := NewTableNamespaceClassifier(kv, core.NewMockIDAllocator())
c.Assert(err, IsNil)
tableClassifier := classifier.(*tableNamespaceClassifier)
tableClassifier.tableIDDecoder = mockTableIDDecoderForGlobal{}
nsInfo := tableClassifier.nsInfo

err = tableClassifier.CreateNamespace("(invalid_name")
Expand Down Expand Up @@ -188,52 +183,3 @@ func (s *testTableNamespaceSuite) TestNamespaceOperation(c *C) {
err = tableClassifier.AddNamespaceTableID("test_not_exist", 2)
c.Assert(err, NotNil)
}

func (s *testTableNamespaceSuite) TestClassifierWithInfiniteEdge(c *C) {
// mock the start edge
classifier := s.newClassifier(c, mockTableIDDecoderForEdge{})
regionInfo := core.NewRegionInfo(&metapb.Region{
StartKey: []byte("startKey"),
}, &metapb.Peer{})
ns := classifier.GetRegionNamespace(regionInfo)
c.Assert(ns, Equals, "global")

// mock the end edge
classifier = s.newClassifier(c, mockTableIDDecoderForEdge{})
regionInfo = core.NewRegionInfo(&metapb.Region{
EndKey: []byte("endKey"),
}, &metapb.Peer{})
ns = classifier.GetRegionNamespace(regionInfo)
c.Assert(ns, Equals, "test1")

// mock the region ("", ""), should return global
classifier = s.newClassifier(c, mockTableIDDecoderForEdge{})
regionInfo = core.NewRegionInfo(&metapb.Region{
StartKey: []byte("startKey"),
EndKey: []byte("endKey"),
}, &metapb.Peer{})
ns = classifier.GetRegionNamespace(regionInfo)
c.Assert(ns, Equals, "global")
}

func (s *testTableNamespaceSuite) TestClassifierWithCrossTable(c *C) {
// mock the cross table
classifier := s.newClassifier(c, mockTableIDDecoderForCrossTable{})
regionInfo := core.NewRegionInfo(&metapb.Region{
StartKey: []byte("startKey"),
EndKey: []byte("endKey"),
}, &metapb.Peer{})
ns := classifier.GetRegionNamespace(regionInfo)
c.Assert(ns, Equals, "test1")
}

func (s *testTableNamespaceSuite) TestClassifierWithTableSplit(c *C) {
// mock the cross table
classifier := s.newClassifier(c, mockTableIDDecoderForCrossTable{})
regionInfo := core.NewRegionInfo(&metapb.Region{
StartKey: []byte("startKey"),
EndKey: tableStartKey,
}, &metapb.Peer{})
ns := classifier.GetRegionNamespace(regionInfo)
c.Assert(ns, Equals, "test1")
}

0 comments on commit 3e093ab

Please sign in to comment.