Skip to content

Commit

Permalink
fix tablecodec (#773)
Browse files Browse the repository at this point in the history
* fix table id decoder
  • Loading branch information
dantin authored and nolouch committed Sep 27, 2017
1 parent d5f6a6f commit 3d95400
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 2 deletions.
8 changes: 7 additions & 1 deletion server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,13 @@ func (c *RaftCluster) start() error {
return nil
}
c.cachedCluster = cluster
c.coordinator = newCoordinator(c.cachedCluster, c.s.scheduleOpt, c.s.hbStreams, namespace.DefaultClassifier)
var classifier namespace.Classifier
if c.s.cfg.EnableNamespace {
classifier = newTableNamespaceClassifier(c.cachedCluster.namespacesInfo, core.DefaultTableIDDecoder)
} else {
classifier = namespace.DefaultClassifier
}
c.coordinator = newCoordinator(c.cachedCluster, c.s.scheduleOpt, c.s.hbStreams, classifier)
c.quit = make(chan struct{})

c.wg.Add(2)
Expand Down
4 changes: 4 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type Config struct {
// For all warnings during parsing.
WarningMsgs []string

// Enable namespace isolation.
EnableNamespace bool `toml:"enable-namespace" json:"enable-namespace"`

// Only test can change them.
nextRetryDelay time.Duration
disableStrictReconfigCheck bool
Expand Down Expand Up @@ -116,6 +119,7 @@ func NewConfig() *Config {
fs.StringVar(&cfg.Log.Level, "L", "", "log level: debug, info, warn, error, fatal (default 'info')")
fs.StringVar(&cfg.Log.File.Filename, "log-file", "", "log file path")
fs.BoolVar(&cfg.Log.File.LogRotate, "log-rotate", true, "rotate log")
fs.BoolVar(&cfg.EnableNamespace, "enable-namespace", false, "enable namespace isolation (default 'false')")

return cfg
}
Expand Down
49 changes: 48 additions & 1 deletion server/core/tablecodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ type defaultTableIDDecoder struct{}

var tablePrefix = []byte{'t'}

const signMask uint64 = 0x8000000000000000
const (
signMask uint64 = 0x8000000000000000

encGroupSize = 8
encMarker = byte(0xFF)
encPad = byte(0x0)
)

// Key represents high-level Key type.
type Key []byte
Expand All @@ -45,10 +51,16 @@ func (k Key) HasPrefix(prefix Key) bool {

// DecodeTableID decodes the table ID of the key, if the key is not table key, returns 0.
func (decoder defaultTableIDDecoder) DecodeTableID(key Key) int64 {
_, key, err := decodeBytes(key)
if err != nil {
// should never happen
return 0
}
if !key.HasPrefix(tablePrefix) {
return 0
}
key = key[len(tablePrefix):]

_, tableID, _ := DecodeInt(key)
return tableID
}
Expand All @@ -74,3 +86,38 @@ func decodeCmpUintToInt(u uint64) int64 {
func IsPureTableID(b []byte) bool {
return len(b) == len(tablePrefix)+8
}

func decodeBytes(b []byte) ([]byte, []byte, error) {
data := make([]byte, 0, len(b))
for {
if len(b) < encGroupSize+1 {
return nil, nil, errors.New("insufficient bytes to decode value")
}

groupBytes := b[:encGroupSize+1]

group := groupBytes[:encGroupSize]
marker := groupBytes[encGroupSize]

padCount := encMarker - marker
if padCount > encGroupSize {
return nil, nil, errors.Errorf("invalid marker byte, group bytes %q", groupBytes)
}

realGroupSize := encGroupSize - padCount
data = append(data, group[:realGroupSize]...)
b = b[encGroupSize+1:]

if padCount != 0 {
var padByte = encPad
// Check validity of padding bytes.
for _, v := range group[realGroupSize:] {
if v != padByte {
return nil, nil, errors.Errorf("invalid padding byte, group bytes %q", groupBytes)
}
}
break
}
}
return b, data, nil
}

0 comments on commit 3d95400

Please sign in to comment.