Skip to content

Commit

Permalink
[dbnode] implement deletion method in namespace kvadmin service (#2861)
Browse files Browse the repository at this point in the history
* [dbnode] implement deletion method in namespace kvadmin service

* Fix lint issues

* Increase test coverage

* Address comments
  • Loading branch information
MantasMiksys committed Nov 10, 2020
1 parent c6a256d commit 2d8dfe5
Show file tree
Hide file tree
Showing 2 changed files with 240 additions and 17 deletions.
62 changes: 56 additions & 6 deletions src/dbnode/namespace/kvadmin/ns_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ import (
"errors"
"fmt"

uuid "github.com/satori/go.uuid"

"github.com/m3db/m3/src/cluster/kv"
nsproto "github.com/m3db/m3/src/dbnode/generated/proto/namespace"
"github.com/m3db/m3/src/dbnode/namespace"
xerrors "github.com/m3db/m3/src/x/errors"

"github.com/satori/go.uuid"
)

var (
ErrNotImplemented = errors.New("api not implemented")
// ErrNamespaceNotFound is returned when namespace is not found in registry.
ErrNamespaceNotFound = errors.New("namespace is not found")
// ErrNamespaceAlreadyExist is returned for addition of a namespace which already exists.
ErrNamespaceAlreadyExist = errors.New("namespace already exists")
)

Expand Down Expand Up @@ -154,8 +155,58 @@ func (as *adminService) Set(name string, options *nsproto.NamespaceOptions) erro
}

func (as *adminService) Delete(name string) error {
// TODO [haijun] move logic from src/query/api/v1/handler/namespace here
return ErrNotImplemented
currentRegistry, currentVersion, err := as.currentRegistry()
if err != nil {
return xerrors.Wrapf(err, "failed to load current namespace metadatas for %s", as.key)
}

nsMap, err := namespace.FromProto(*currentRegistry)
if err != nil {
return xerrors.Wrap(err, "failed to unmarshal namespace registry")
}

metadatas := nsMap.Metadatas()
mdIdx := -1
for idx, md := range nsMap.Metadatas() {
if md.ID().String() == name {
mdIdx = idx

break
}
}

if mdIdx == -1 {
return ErrNamespaceNotFound
}

if len(metadatas) == 1 {
if _, err := as.store.Delete(as.key); err != nil {
return xerrors.Wrap(err, "failed to delete kv key")
}

return nil
}

// Replace the index where we found the metadata with the last element, then truncate
metadatas[mdIdx] = metadatas[len(metadatas)-1]
metadatas = metadatas[:len(metadatas)-1]

newMap, err := namespace.NewMap(metadatas)
if err != nil {
return xerrors.Wrap(err, "namespace map construction failed")
}

protoMap, err := namespace.ToProto(newMap)
if err != nil {
return xerrors.Wrap(err, "namespace registry proto conversion failed")
}

_, err = as.store.CheckAndSet(as.key, currentVersion, protoMap)
if err != nil {
return xerrors.Wrapf(err, "failed to delete namespace %v", name)
}

return nil
}

func (as *adminService) ResetSchema(name string) error {
Expand Down Expand Up @@ -239,7 +290,6 @@ func (as *adminService) currentRegistry() (*nsproto.Registry, int, error) {
return &protoRegistry, value.Version(), nil
}


func LoadSchemaRegistryFromKVStore(schemaReg namespace.SchemaRegistry, kvStore kv.Store) error {
if kvStore == nil {
return errors.New("m3db metadata store is not configured properly")
Expand Down
195 changes: 184 additions & 11 deletions src/dbnode/namespace/kvadmin/ns_admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@
package kvadmin

import (
"errors"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

"github.com/m3db/m3/src/cluster/kv"
"github.com/m3db/m3/src/cluster/kv/mem"
nsproto "github.com/m3db/m3/src/dbnode/generated/proto/namespace"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/x/ident"

"github.com/stretchr/testify/require"
"github.com/m3db/m3/src/cluster/kv/mem"
)

const (
Expand Down Expand Up @@ -61,15 +62,17 @@ message ImportedMessage {
bytes deliveryID = 4;
}
`

nsRegKey = "nsRegKey"
testNamespaceID = "test-namespace"
)

func TestAdminService_DeploySchema(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

storeMock := kv.NewMockStore(ctrl)
var nsRegKey = "nsRegKey"
as := NewAdminService(storeMock, nsRegKey, func() string {return "first"})
as := NewAdminService(storeMock, nsRegKey, func() string { return testNamespaceID })
require.NotNil(t, as)

currentMeta, err := namespace.NewMetadata(ident.StringID("ns1"), namespace.NewOptions())
Expand All @@ -83,7 +86,8 @@ func TestAdminService_DeploySchema(t *testing.T) {
protoMsg := "mainpkg.TestMessage"
protoMap := map[string]string{protoFile: mainProtoStr, "mainpkg/imported.proto": importedProtoStr}

expectedSchemaOpt, err := namespace.AppendSchemaOptions(nil, protoFile, protoMsg, protoMap, "first")
expectedSchemaOpt, err := namespace.
AppendSchemaOptions(nil, protoFile, protoMsg, protoMap, testNamespaceID)
require.NoError(t, err)
expectedSh, err := namespace.LoadSchemaHistory(expectedSchemaOpt)
require.NoError(t, err)
Expand Down Expand Up @@ -115,14 +119,14 @@ func TestAdminService_ResetSchema(t *testing.T) {
defer ctrl.Finish()

storeMock := kv.NewMockStore(ctrl)
var nsRegKey = "nsRegKey"
as := NewAdminService(storeMock, nsRegKey, func() string {return "first"})
as := NewAdminService(storeMock, nsRegKey, func() string { return testNamespaceID })
require.NotNil(t, as)

protoFile := "mainpkg/test.proto"
protoMsg := "mainpkg.TestMessage"
protoMap := map[string]string{protoFile: mainProtoStr, "mainpkg/imported.proto": importedProtoStr}
currentSchemaOpt, err := namespace.AppendSchemaOptions(nil, protoFile, protoMsg, protoMap, "first")
currentSchemaOpt, err := namespace.
AppendSchemaOptions(nil, protoFile, protoMsg, protoMap, testNamespaceID)
require.NoError(t, err)
currentSchemaHist, err := namespace.LoadSchemaHistory(currentSchemaOpt)
require.NoError(t, err)
Expand Down Expand Up @@ -163,8 +167,7 @@ func TestAdminService_Crud(t *testing.T) {
defer ctrl.Finish()

store := mem.NewStore()
var nsRegKey = "nsRegKey"
as := NewAdminService(store, nsRegKey, func() string {return "first"})
as := NewAdminService(store, nsRegKey, func() string { return testNamespaceID })
require.NotNil(t, as)

expectedOpt := namespace.NewOptions()
Expand All @@ -176,6 +179,7 @@ func TestAdminService_Crud(t *testing.T) {
require.Error(t, as.Add("ns1", optProto))
require.NoError(t, as.Set("ns1", optProto))
require.Error(t, as.Set("ns2", optProto))
require.NoError(t, as.Add("ns3", optProto))

nsOpt, err := as.Get("ns1")
require.NoError(t, err)
Expand All @@ -189,5 +193,174 @@ func TestAdminService_Crud(t *testing.T) {

nsReg, err := as.GetAll()
require.NoError(t, err)
require.Len(t, nsReg.Namespaces, 2)

err = as.Delete("ns1")
require.NoError(t, err)

nsReg, err = as.GetAll()
require.NoError(t, err)
require.Len(t, nsReg.Namespaces, 1)
}

func TestAdminService_DeleteOneNamespace(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

storeMock := kv.NewMockStore(ctrl)
as := NewAdminService(storeMock, nsRegKey, func() string { return testNamespaceID })

currentMeta1, err := namespace.NewMetadata(ident.StringID("ns1"), namespace.NewOptions())
require.NoError(t, err)
currentMeta2, err := namespace.NewMetadata(ident.StringID("ns2"), namespace.NewOptions())
require.NoError(t, err)

currentMap, err := namespace.NewMap([]namespace.Metadata{currentMeta1, currentMeta2})
require.NoError(t, err)
currentReg, err := namespace.ToProto(currentMap)
require.NoError(t, err)

expectedMeta, err := namespace.NewMetadata(ident.StringID("ns2"), namespace.NewOptions())
require.NoError(t, err)
expectedMap, err := namespace.NewMap([]namespace.Metadata{expectedMeta})
require.NoError(t, err)

mValue := kv.NewMockValue(ctrl)
mValue.EXPECT().Unmarshal(gomock.Any()).Return(nil).Do(func(reg *nsproto.Registry) {
*reg = *currentReg
})
mValue.EXPECT().Version().Return(1)
storeMock.EXPECT().Get(nsRegKey).Return(mValue, nil)
storeMock.EXPECT().CheckAndSet(nsRegKey, 1, gomock.Any()).Return(2, nil).Do(
func(k string, version int, actualReg *nsproto.Registry) {
actualMap, err := namespace.FromProto(*actualReg)
require.NoError(t, err)
require.True(t, actualMap.Equal(expectedMap))
},
)

err = as.Delete("ns1")
require.NoError(t, err)
}

func TestAdminService_DeleteOneNamespaceFailedSetting(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

storeMock := kv.NewMockStore(ctrl)
as := NewAdminService(storeMock, nsRegKey, func() string { return testNamespaceID })

currentMeta1, err := namespace.NewMetadata(ident.StringID("ns1"), namespace.NewOptions())
require.NoError(t, err)
currentMeta2, err := namespace.NewMetadata(ident.StringID("ns2"), namespace.NewOptions())
require.NoError(t, err)

currentMap, err := namespace.NewMap([]namespace.Metadata{currentMeta1, currentMeta2})
require.NoError(t, err)
currentReg, err := namespace.ToProto(currentMap)
require.NoError(t, err)

mValue := kv.NewMockValue(ctrl)
mValue.EXPECT().Unmarshal(gomock.Any()).Return(nil).Do(func(reg *nsproto.Registry) {
*reg = *currentReg
})
mValue.EXPECT().Version().Return(1)
storeMock.EXPECT().Get(nsRegKey).Return(mValue, nil)
storeMock.EXPECT().CheckAndSet(nsRegKey, 1, gomock.Any()).Return(-1, errors.New("some error"))

err = as.Delete("ns1")
require.Error(t, err)
}

func TestAdminService_DeleteLastNamespace(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

storeMock := kv.NewMockStore(ctrl)
as := NewAdminService(storeMock, nsRegKey, func() string { return testNamespaceID })

currentMeta, err := namespace.NewMetadata(ident.StringID("ns1"), namespace.NewOptions())
require.NoError(t, err)

currentMap, err := namespace.NewMap([]namespace.Metadata{currentMeta})
require.NoError(t, err)
currentReg, err := namespace.ToProto(currentMap)
require.NoError(t, err)

mValue := kv.NewMockValue(ctrl)
mValue.EXPECT().Unmarshal(gomock.Any()).Return(nil).Do(func(reg *nsproto.Registry) {
*reg = *currentReg
})
mValue.EXPECT().Version().Return(1)
storeMock.EXPECT().Get(nsRegKey).Return(mValue, nil)
storeMock.EXPECT().Delete(nsRegKey).Return(nil, nil)

err = as.Delete("ns1")
require.NoError(t, err)
}

func TestAdminService_DeleteLastNamespaceFailed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

storeMock := kv.NewMockStore(ctrl)
as := NewAdminService(storeMock, nsRegKey, func() string { return testNamespaceID })

currentMeta, err := namespace.NewMetadata(ident.StringID("ns1"), namespace.NewOptions())
require.NoError(t, err)

currentMap, err := namespace.NewMap([]namespace.Metadata{currentMeta})
require.NoError(t, err)
currentReg, err := namespace.ToProto(currentMap)
require.NoError(t, err)

mValue := kv.NewMockValue(ctrl)
mValue.EXPECT().Unmarshal(gomock.Any()).Return(nil).Do(func(reg *nsproto.Registry) {
*reg = *currentReg
})
mValue.EXPECT().Version().Return(1)
storeMock.EXPECT().Get(nsRegKey).Return(mValue, nil)
storeMock.EXPECT().Delete(nsRegKey).Return(nil, errors.New("some error"))

err = as.Delete("ns1")
require.Error(t, err)
}

func TestAdminService_DeleteMissingNamespace(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

storeMock := kv.NewMockStore(ctrl)
as := NewAdminService(storeMock, nsRegKey, func() string { return testNamespaceID })

currentMeta, err := namespace.NewMetadata(ident.StringID("ns1"), namespace.NewOptions())
require.NoError(t, err)

currentMap, err := namespace.NewMap([]namespace.Metadata{currentMeta})
require.NoError(t, err)
currentReg, err := namespace.ToProto(currentMap)
require.NoError(t, err)

mValue := kv.NewMockValue(ctrl)
mValue.EXPECT().Unmarshal(gomock.Any()).Return(nil).Do(func(reg *nsproto.Registry) {
*reg = *currentReg
})
mValue.EXPECT().Version().Return(1)
storeMock.EXPECT().Get(nsRegKey).Return(mValue, nil)

err = as.Delete("missing-namespace")
require.EqualError(t, ErrNamespaceNotFound, err.Error())
}

func TestAdminService_DeleteNilRegistry(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

storeMock := kv.NewMockStore(ctrl)
as := NewAdminService(storeMock, nsRegKey, func() string { return testNamespaceID })

storeMock.EXPECT().Get(nsRegKey).Return(nil, errors.New("some error"))

err := as.Delete("missing-namespace")
require.Error(t, err)
}

0 comments on commit 2d8dfe5

Please sign in to comment.