Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix tablecodec #773

Merged
merged 3 commits into from Sep 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion server/cluster.go
Expand Up @@ -110,7 +110,13 @@ func (c *RaftCluster) start() error {
return nil
}
c.cachedCluster = cluster
c.coordinator = newCoordinator(c.cachedCluster, c.s.scheduleOpt, c.s.hbStreams, namespace.DefaultClassifier)
var classifier namespace.Classifier
if c.s.cfg.EnableNamespace {
classifier = newTableNamespaceClassifier(c.cachedCluster.namespacesInfo, core.DefaultTableIDDecoder)
} else {
classifier = namespace.DefaultClassifier
}
c.coordinator = newCoordinator(c.cachedCluster, c.s.scheduleOpt, c.s.hbStreams, classifier)
c.quit = make(chan struct{})

c.wg.Add(2)
Expand Down
4 changes: 4 additions & 0 deletions server/config.go
Expand Up @@ -88,6 +88,9 @@ type Config struct {
// For all warnings during parsing.
WarningMsgs []string

// Enable namespace isolation.
EnableNamespace bool `toml:"enable-namespace" json:"enable-namespace"`

// Only test can change them.
nextRetryDelay time.Duration
disableStrictReconfigCheck bool
Expand Down Expand Up @@ -116,6 +119,7 @@ func NewConfig() *Config {
fs.StringVar(&cfg.Log.Level, "L", "", "log level: debug, info, warn, error, fatal (default 'info')")
fs.StringVar(&cfg.Log.File.Filename, "log-file", "", "log file path")
fs.BoolVar(&cfg.Log.File.LogRotate, "log-rotate", true, "rotate log")
fs.BoolVar(&cfg.EnableNamespace, "enable-namespace", false, "enable namespace isolation (default 'false')")

return cfg
}
Expand Down
49 changes: 48 additions & 1 deletion server/core/tablecodec.go
Expand Up @@ -33,7 +33,13 @@ type defaultTableIDDecoder struct{}

var tablePrefix = []byte{'t'}

const signMask uint64 = 0x8000000000000000
const (
signMask uint64 = 0x8000000000000000

encGroupSize = 8
encMarker = byte(0xFF)
encPad = byte(0x0)
)

// Key represents high-level Key type.
type Key []byte
Expand All @@ -45,10 +51,16 @@ func (k Key) HasPrefix(prefix Key) bool {

// DecodeTableID decodes the table ID of the key, if the key is not table key, returns 0.
func (decoder defaultTableIDDecoder) DecodeTableID(key Key) int64 {
_, key, err := decodeBytes(key)
if err != nil {
// should never happen
return 0
}
if !key.HasPrefix(tablePrefix) {
return 0
}
key = key[len(tablePrefix):]

_, tableID, _ := DecodeInt(key)
return tableID
}
Expand All @@ -74,3 +86,38 @@ func decodeCmpUintToInt(u uint64) int64 {
func IsPureTableID(b []byte) bool {
return len(b) == len(tablePrefix)+8
}

func decodeBytes(b []byte) ([]byte, []byte, error) {
data := make([]byte, 0, len(b))
for {
if len(b) < encGroupSize+1 {
return nil, nil, errors.New("insufficient bytes to decode value")
}

groupBytes := b[:encGroupSize+1]

group := groupBytes[:encGroupSize]
marker := groupBytes[encGroupSize]

padCount := encMarker - marker
if padCount > encGroupSize {
return nil, nil, errors.Errorf("invalid marker byte, group bytes %q", groupBytes)
}

realGroupSize := encGroupSize - padCount
data = append(data, group[:realGroupSize]...)
b = b[encGroupSize+1:]

if padCount != 0 {
var padByte = encPad
// Check validity of padding bytes.
for _, v := range group[realGroupSize:] {
if v != padByte {
return nil, nil, errors.Errorf("invalid padding byte, group bytes %q", groupBytes)
}
}
break
}
}
return b, data, nil
}