Skip to content

Commit

Permalink
namespace: support set namespace for meta (#834)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored and siddontang committed Nov 8, 2017
1 parent f931b1d commit a685b9b
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 4 deletions.
45 changes: 44 additions & 1 deletion pdctl/command/table_namespace_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
const (
namespacePrefix = "pd/api/v1/classifier/table/namespaces"
namespaceTablePrefix = "pd/api/v1/classifier/table/namespaces/table"
namespaceMetaPrefix = "pd/api/v1/classifier/table/namespaces/meta"
storeNsPrefix = "pd/api/v1/classifier/table/store_ns/%s"
)

// NewTableNamespaceCommand return a table namespace sub-command of rootCmd
func NewTableNamespaceCommand() *cobra.Command {
s := &cobra.Command{
Use: "table_ns [create|add|remove|set_store|rm_store]",
Use: "table_ns [create|add|remove|set_store|rm_store|set_meta|rm_meta]",
Short: "show the table namespace information",
Run: showNamespaceCommandFunc,
}
Expand All @@ -39,6 +40,8 @@ func NewTableNamespaceCommand() *cobra.Command {
s.AddCommand(NewRemoveTableIDCommand())
s.AddCommand(NewSetNamespaceStoreCommand())
s.AddCommand(NewRemoveNamespaceStoreCommand())
s.AddCommand(newSetMetaNamespaceCommand())
s.AddCommand(newRemoveMetaNamespaceCommand())
return s
}

Expand Down Expand Up @@ -187,3 +190,43 @@ func removeNamespaceStoreCommandFunc(cmd *cobra.Command, args []string) {
"action": "remove",
})
}

func newSetMetaNamespaceCommand() *cobra.Command {
return &cobra.Command{
Use: "set_meta <namespace>",
Short: "set meta to namespace",
Run: setMetaNamespaceCommandFunc,
}
}

func newRemoveMetaNamespaceCommand() *cobra.Command {
return &cobra.Command{
Use: "rm_meta <namespace>",
Short: "remove meta from namespace",
Run: removeMetaNamespaceCommandFunc,
}
}

func setMetaNamespaceCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 1 {
fmt.Println("Usage: set_meta <namespace>")
return
}
input := map[string]interface{}{
"namespace": args[0],
"action": "add",
}
postJSON(cmd, namespaceMetaPrefix, input)
}

func removeMetaNamespaceCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 1 {
fmt.Println("Usage: rm_meta <namespace>")
return
}
input := map[string]interface{}{
"namespace": args[0],
"action": "remove",
}
postJSON(cmd, namespaceMetaPrefix, input)
}
49 changes: 47 additions & 2 deletions table/namespace_classifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Namespace struct {
Name string `json:"Name"`
TableIDs map[int64]bool `json:"table_ids,omitempty"`
StoreIDs map[uint64]bool `json:"store_ids,omitempty"`
Meta bool `json:"meta,omitempty"`
}

// NewNamespace creates a new namespace
Expand Down Expand Up @@ -142,14 +143,15 @@ func (c *tableNamespaceClassifier) GetRegionNamespace(regionInfo *core.RegionInf
c.RLock()
defer c.RUnlock()

isMeta := Key(regionInfo.StartKey).IsMeta()
tableID := Key(regionInfo.StartKey).TableID()
if tableID == 0 {
if tableID == 0 && !isMeta {
return namespace.DefaultNamespace
}

for name, ns := range c.nsInfo.namespaces {
_, ok := ns.TableIDs[tableID]
if ok {
if ok || (isMeta && ns.Meta) {
return name
}
}
Expand Down Expand Up @@ -228,6 +230,39 @@ func (c *tableNamespaceClassifier) RemoveNamespaceTableID(name string, tableID i
return c.putNamespaceLocked(n)
}

// AddMetaToNamespace adds meta to a namespace.
func (c *tableNamespaceClassifier) AddMetaToNamespace(name string) error {
c.Lock()
defer c.Unlock()

n := c.nsInfo.getNamespaceByName(name)
if n == nil {
return errors.Errorf("invalid namespace Name %s, not found", name)
}
if c.nsInfo.IsMetaExist() {
return errors.New("meta is already set")
}

n.Meta = true
return c.putNamespaceLocked(n)
}

// RemoveMeta removes meta from a namespace.
func (c *tableNamespaceClassifier) RemoveMeta(name string) error {
c.Lock()
defer c.Unlock()

n := c.nsInfo.getNamespaceByName(name)
if n == nil {
return errors.Errorf("invalid namespace Name %s, not found", name)
}
if !n.Meta {
return errors.Errorf("meta is not belong to %s", name)
}
n.Meta = false
return c.putNamespaceLocked(n)
}

// AddNamespaceStoreID adds store ID to namespace.
func (c *tableNamespaceClassifier) AddNamespaceStoreID(name string, storeID uint64) error {
c.Lock()
Expand Down Expand Up @@ -330,6 +365,16 @@ func (namespaceInfo *namespacesInfo) IsStoreIDExist(storeID uint64) bool {
return false
}

// IsMetaExist returns true if meta is binded to a namespace.
func (namespaceInfo *namespacesInfo) IsMetaExist() bool {
for _, ns := range namespaceInfo.namespaces {
if ns.Meta {
return true
}
}
return false
}

func (namespaceInfo *namespacesInfo) namespacePath(nsID uint64) string {
return path.Join("namespace", fmt.Sprintf("%20d", nsID))
}
Expand Down
12 changes: 11 additions & 1 deletion table/namespace_classifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (s *testTableNamespaceSuite) newClassifier(c *C) *tableNamespaceClassifier
StoreIDs: map[uint64]bool{
testStore2: true,
},
Meta: true,
}

tableClassifier.nsInfo.setNamespace(&testNamespace1)
Expand Down Expand Up @@ -106,7 +107,7 @@ func (s *testTableNamespaceSuite) TestTableNameSpaceGetRegionNamespace(c *C) {
{false, "t\x80\x00\x00\x00\x00\x00\x00\x02", "t\x80\x00\x00\x00\x00\x00\x00\x02\x01", testTable2, false, "ns2"},
{false, "t\x80\x00\x00\x00\x00\x00\x00\x02", "", testTable2, false, "ns2"},
{false, "t\x80\x00\x00\x00\x00\x00\x00\x03", "t\x80\x00\x00\x00\x00\x00\x00\x04", 3, false, "global"},
{false, "m\x80\x00\x00\x00\x00\x00\x00\x01", "", 0, true, "global"},
{false, "m\x80\x00\x00\x00\x00\x00\x00\x01", "", 0, true, "ns2"},
{false, "", "m\x80\x00\x00\x00\x00\x00\x00\x01", 0, false, "global"},
{true, string(encodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\x01"))), "", testTable1, false, "ns1"},
{true, "t\x80\x00\x00\x00\x00\x00\x00\x01", "", 0, false, "global"}, // decode error
Expand Down Expand Up @@ -182,4 +183,13 @@ func (s *testTableNamespaceSuite) TestNamespaceOperation(c *C) {
// Add tableID to a namespace that doesn't exist
err = tableClassifier.AddNamespaceTableID("test_not_exist", 2)
c.Assert(err, NotNil)

// Test set meta.
c.Assert(tableClassifier.AddMetaToNamespace("test1"), IsNil)
// Can't be set again.
c.Assert(tableClassifier.AddMetaToNamespace("test1"), NotNil)
c.Assert(tableClassifier.AddMetaToNamespace("test2"), NotNil)
// Remove from test1.
c.Assert(tableClassifier.RemoveMeta("test1"), IsNil)
c.Assert(tableClassifier.AddMetaToNamespace("test2"), IsNil)
}
26 changes: 26 additions & 0 deletions table/namespace_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func newTableClassifierHandler(classifier *tableNamespaceClassifier) http.Handle
router.HandleFunc("/table/namespaces", h.Get).Methods("GET")
router.HandleFunc("/table/namespaces", h.Post).Methods("POST")
router.HandleFunc("/table/namespaces/table", h.Update).Methods("POST")
router.HandleFunc("/table/namespaces/meta", h.SetMetaNamespace).Methods("POST")
router.HandleFunc("/table/store_ns/{id}", h.SetNamespace).Methods("POST")
return router
}
Expand Down Expand Up @@ -113,6 +114,31 @@ func (h *tableNamespaceHandler) Update(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, nil)
}

func (h *tableNamespaceHandler) SetMetaNamespace(w http.ResponseWriter, r *http.Request) {
var input map[string]string
if err := apiutil.ReadJSON(r.Body, &input); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
ns := input["namespace"]
switch input["action"] {
case "add":
if err := h.classifier.AddMetaToNamespace(ns); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
case "remove":
if err := h.classifier.RemoveMeta(ns); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
default:
h.rd.JSON(w, http.StatusBadRequest, errors.New("unknown action"))
return
}
h.rd.JSON(w, http.StatusOK, nil)
}

func (h *tableNamespaceHandler) SetNamespace(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
storeIDStr := vars["id"]
Expand Down

0 comments on commit a685b9b

Please sign in to comment.