Skip to content

Commit

Permalink
feat: Added initial version of admin API
Browse files Browse the repository at this point in the history
  • Loading branch information
JigarJoshi committed Jul 1, 2022
1 parent 3e9c839 commit 311de7d
Show file tree
Hide file tree
Showing 11 changed files with 283 additions and 13 deletions.
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -30,7 +30,7 @@ ${PROTO_DIR}/%_openapi.yaml ${GEN_DIR}/%.pb.go ${GEN_DIR}/%.pb.gw.go: ${PROTO_DI
${DATA_PROTO_DIR}/%.pb.go: ${DATA_PROTO_DIR}/%.proto
protoc -I${DATA_PROTO_DIR} --go_out=${DATA_PROTO_DIR} --go_opt=paths=source_relative $<

generate: ${GEN_DIR}/api.pb.go ${GEN_DIR}/api.pb.gw.go ${GEN_DIR}/health.pb.go ${GEN_DIR}/health.pb.gw.go ${DATA_PROTO_DIR}/data.pb.go
generate: ${GEN_DIR}/api.pb.go ${GEN_DIR}/api.pb.gw.go ${GEN_DIR}/health.pb.go ${GEN_DIR}/health.pb.gw.go ${GEN_DIR}/admin.pb.go ${GEN_DIR}/admin.pb.gw.go ${DATA_PROTO_DIR}/data.pb.go

server: server/service
server/service: $(GO_SRC) generate
Expand Down
2 changes: 1 addition & 1 deletion api/proto
Submodule proto updated from d5caff to 4adadf
6 changes: 5 additions & 1 deletion server/main.go
Expand Up @@ -18,8 +18,10 @@ import (
"github.com/rs/zerolog/log"
"github.com/spf13/pflag"
"github.com/tigrisdata/tigris/server/config"
"github.com/tigrisdata/tigris/server/metadata"
"github.com/tigrisdata/tigris/server/metrics"
"github.com/tigrisdata/tigris/server/muxer"
"github.com/tigrisdata/tigris/server/transaction"
"github.com/tigrisdata/tigris/store/kv"
"github.com/tigrisdata/tigris/store/search"
"github.com/tigrisdata/tigris/util"
Expand Down Expand Up @@ -80,8 +82,10 @@ func main() {
log.Fatal().Err(err).Msg("error initializing search store")
}

tenantMgr := metadata.NewTenantManager()
txMgr := transaction.NewManager(kvStore)
mx := muxer.NewMuxer(&config.DefaultConfig)
mx.RegisterServices(kvStore, searchStore)
mx.RegisterServices(kvStore, searchStore, tenantMgr, txMgr)
if err := mx.Start(config.DefaultConfig.Server.Host, config.DefaultConfig.Server.Port); err != nil {
log.Fatal().Err(err).Msgf("error starting server")
}
Expand Down
28 changes: 28 additions & 0 deletions server/metadata/tenant.go
Expand Up @@ -166,6 +166,34 @@ func (m *TenantManager) CreateOrGetTenant(ctx context.Context, txMgr *transactio
return m.createOrGetTenantInternal(ctx, tx, namespace)
}

// CreateTenant is a thread safe implementation of creating a new tenant. It returns the error if it already exists.
func (m *TenantManager) CreateTenant(ctx context.Context, tx transaction.Tx, namespace Namespace) error {
m.Lock()
defer m.Unlock()
namespaces, err := m.encoder.GetNamespaces(ctx, tx)
if err != nil {
return err
}

if id, found := namespaces[namespace.Name()]; found {
return api.Errorf(api.Code_CONFLICT, "namespace with same name already exists with id '%s'", fmt.Sprint(id))
}
for name, id := range namespaces {
if id == namespace.Id() {
return api.Errorf(api.Code_CONFLICT, "namespace with same id already exists with name '%s'", name)
}
}
if err := m.versionH.Increment(ctx, tx); ulog.E(err) {
return err
}

if err := m.encoder.ReserveNamespace(ctx, tx, namespace.Name(), namespace.Id()); ulog.E(err) {
return err
}

return nil
}

func (m *TenantManager) createOrGetTenantInternal(ctx context.Context, tx transaction.Tx, namespace Namespace) (*Tenant, error) {
namespaces, err := m.encoder.GetNamespaces(ctx, tx)
if err != nil {
Expand Down
101 changes: 100 additions & 1 deletion server/metadata/tenant_test.go
Expand Up @@ -30,7 +30,7 @@ import (
ulog "github.com/tigrisdata/tigris/util/log"
)

func TestTenantManager_CreateTenant(t *testing.T) {
func TestTenantManager_CreateOrGetTenant(t *testing.T) {
fdbCfg, err := config.GetTestFDBConfig("../..")
require.NoError(t, err)

Expand Down Expand Up @@ -60,6 +60,7 @@ func TestTenantManager_CreateTenant(t *testing.T) {
require.Empty(t, tenant.databases)
_ = kvStore.DropTable(ctx, m.mdNameRegistry.ReservedSubspaceName())
})

t.Run("create_multiple_tenants", func(t *testing.T) {
m := newTenantManager(&encoding.TestMDNameRegistry{
ReserveSB: "test_tenant_reserve",
Expand Down Expand Up @@ -135,6 +136,104 @@ func TestTenantManager_CreateTenant(t *testing.T) {
})
}

func TestTenantManager_CreateTenant(t *testing.T) {
fdbCfg, err := config.GetTestFDBConfig("../..")
require.NoError(t, err)

kvStore, err := kv.NewKeyValueStore(fdbCfg)
require.NoError(t, err)

tm := transaction.NewManager(kvStore)
t.Run("create_tenant", func(t *testing.T) {
m := newTenantManager(&encoding.TestMDNameRegistry{
ReserveSB: "test_tenant_reserve",
EncodingSB: "test_tenant_encoding",
SchemaSB: "test_tenant_schema",
})

ctx := context.TODO()
_ = kvStore.DropTable(ctx, m.mdNameRegistry.ReservedSubspaceName())
_ = kvStore.DropTable(ctx, m.mdNameRegistry.EncodingSubspaceName())
_ = kvStore.DropTable(ctx, m.mdNameRegistry.SchemaSubspaceName())
tx, e := tm.StartTx(ctx)
require.NoError(t, e)
err = m.CreateTenant(ctx, tx, &TenantNamespace{"ns-test1", 2})
require.NoError(t, err)
namespaces, err := m.encoder.GetNamespaces(ctx, tx)
require.NoError(t, err)
id := namespaces["ns-test1"]
require.Equal(t, uint32(2), id)
_ = kvStore.DropTable(ctx, m.mdNameRegistry.ReservedSubspaceName())
})
t.Run("create_multiple_tenants", func(t *testing.T) {
m := newTenantManager(&encoding.TestMDNameRegistry{
ReserveSB: "test_tenant_reserve",
EncodingSB: "test_tenant_encoding",
SchemaSB: "test_tenant_schema",
})

ctx := context.TODO()
_ = kvStore.DropTable(ctx, m.mdNameRegistry.ReservedSubspaceName())

tx, e := tm.StartTx(ctx)
require.NoError(t, e)

err = m.CreateTenant(ctx, tx, &TenantNamespace{"ns-test1", 2})
require.NoError(t, err)

err = m.CreateTenant(ctx, tx, &TenantNamespace{"ns-test2", 3})
require.NoError(t, err)
namespaces, err := m.encoder.GetNamespaces(ctx, tx)
require.NoError(t, err)

id, _ := namespaces["ns-test1"]
require.Equal(t, uint32(2), id)

id, _ = namespaces["ns-test2"]
require.Equal(t, uint32(3), id)
_ = kvStore.DropTable(ctx, m.mdNameRegistry.ReservedSubspaceName())
})
t.Run("create_duplicate_tenant_error", func(t *testing.T) {
m := newTenantManager(&encoding.TestMDNameRegistry{
ReserveSB: "test_tenant_reserve",
EncodingSB: "test_tenant_encoding",
SchemaSB: "test_tenant_schema",
})

ctx := context.TODO()
_ = kvStore.DropTable(ctx, m.mdNameRegistry.ReservedSubspaceName())
tx, e := tm.StartTx(ctx)
require.NoError(t, e)
err = m.CreateTenant(ctx, tx, &TenantNamespace{"ns-test1", 2})
require.NoError(t, err)

// should fail now
err = m.CreateTenant(context.TODO(), tx, &TenantNamespace{"ns-test1", 3})
require.Equal(t, "namespace with same name already exists with id '2'", err.(*api.TigrisError).Error())

_ = kvStore.DropTable(ctx, m.mdNameRegistry.ReservedSubspaceName())
})
t.Run("create_duplicate_tenant_id_error", func(t *testing.T) {
m := newTenantManager(&encoding.TestMDNameRegistry{
ReserveSB: "test_tenant_reserve",
EncodingSB: "test_tenant_encoding",
SchemaSB: "test_tenant_schema",
})

ctx := context.TODO()
_ = kvStore.DropTable(ctx, m.mdNameRegistry.ReservedSubspaceName())
tx, e := tm.StartTx(ctx)
require.NoError(t, e)
err = m.CreateTenant(ctx, tx, &TenantNamespace{"ns-test1", 2})
require.NoError(t, err)

// should fail now
err = m.CreateTenant(ctx, tx, &TenantNamespace{"ns-test2", 2})
require.Equal(t, "namespace with same id already exists with name 'ns-test1'", err.(*api.TigrisError).Error())
_ = kvStore.DropTable(ctx, m.mdNameRegistry.ReservedSubspaceName())
})
}

func TestTenantManager_CreateDatabases(t *testing.T) {
fdbCfg, err := config.GetTestFDBConfig("../..")
require.NoError(t, err)
Expand Down
6 changes: 4 additions & 2 deletions server/muxer/muxer.go
Expand Up @@ -21,8 +21,10 @@ import (
"github.com/rs/zerolog/log"
"github.com/soheilhy/cmux"
"github.com/tigrisdata/tigris/server/config"
"github.com/tigrisdata/tigris/server/metadata"
"github.com/tigrisdata/tigris/server/metrics"
v1 "github.com/tigrisdata/tigris/server/services/v1"
"github.com/tigrisdata/tigris/server/transaction"
"github.com/tigrisdata/tigris/store/kv"
"github.com/tigrisdata/tigris/store/search"
)
Expand All @@ -46,8 +48,8 @@ func NewMuxer(cfg *config.Config) *Muxer {
return m
}

func (m *Muxer) RegisterServices(kvStore kv.KeyValueStore, searchStore search.Store) {
services := v1.GetRegisteredServices(kvStore, searchStore)
func (m *Muxer) RegisterServices(kvStore kv.KeyValueStore, searchStore search.Store, tenantMgr *metadata.TenantManager, txMgr *transaction.Manager) {
services := v1.GetRegisteredServices(kvStore, searchStore, tenantMgr, txMgr)
for _, r := range services {
for _, v := range m.servers {
if s, ok := v.(*GRPCServer); ok {
Expand Down
88 changes: 88 additions & 0 deletions server/services/v1/admin.go
@@ -0,0 +1,88 @@
// Copyright 2022 Tigris Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v1

import (
"context"
"fmt"
"net/http"

"github.com/fullstorydev/grpchan/inprocgrpc"
"github.com/go-chi/chi/v5"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
api "github.com/tigrisdata/tigris/api/server/v1"
"github.com/tigrisdata/tigris/server/metadata"
"github.com/tigrisdata/tigris/server/transaction"
"google.golang.org/grpc"
)

const (
adminPath = "/admin/" + version
)
const (
namespacePath = "/namespaces"
namespacePathPattern = namespacePath + "/*"
)

type adminService struct {
api.UnimplementedAdminServer
tenantMgr *metadata.TenantManager
txMgr *transaction.Manager
}

func newAdminService(tenantMgr *metadata.TenantManager, txMgr *transaction.Manager) *adminService {
return &adminService{
tenantMgr: tenantMgr,
txMgr: txMgr,
}
}

func (a *adminService) CreateNamespace(ctx context.Context, req *api.CreateNamespaceRequest) (*api.CreateNamespaceResponse, error) {
namespace := metadata.NewTenantNamespace(req.Name, uint32(req.Id))
tx, err := a.txMgr.StartTx(ctx)
if err != nil {
return nil, api.Errorf(api.Code_INTERNAL, "Failed to create namespace")
}
err = a.tenantMgr.CreateTenant(ctx, tx, namespace)
if err != nil {
_ = tx.Rollback(ctx)
return nil, err
} else {
if err = tx.Commit(ctx); err == nil {
return &api.CreateNamespaceResponse{
Status: "CREATED",
Message: "Namespace created, with id=" + fmt.Sprint(req.Id) + ", and name=" + req.Name,
}, nil
} else {
return nil, err
}
}
}

func (h *adminService) RegisterHTTP(router chi.Router, _ *inprocgrpc.Channel) error {
mux := runtime.NewServeMux(runtime.WithMarshalerOption(string(JSON), &runtime.JSONBuiltin{}))
if err := api.RegisterAdminHandlerServer(context.TODO(), mux, h); err != nil {
return err
}
router.HandleFunc(adminPath+namespacePathPattern, func(w http.ResponseWriter, r *http.Request) {
mux.ServeHTTP(w, r)
})
return nil
}

func (h *adminService) RegisterGRPC(grpc *grpc.Server) error {
api.RegisterAdminServer(grpc, h)
return nil
}
7 changes: 3 additions & 4 deletions server/services/v1/api.go
Expand Up @@ -63,10 +63,10 @@ type apiService struct {
searchStore search.Store
}

func newApiService(kv kv.KeyValueStore, searchStore search.Store) *apiService {
func newApiService(kv kv.KeyValueStore, searchStore search.Store, tenantMgr *metadata.TenantManager, txMgr *transaction.Manager) *apiService {
u := &apiService{
kvStore: kv,
txMgr: transaction.NewManager(kv),
txMgr: txMgr,
versionH: &metadata.VersionHandler{},
searchStore: searchStore,
}
Expand All @@ -77,15 +77,14 @@ func newApiService(kv kv.KeyValueStore, searchStore search.Store) *apiService {
log.Fatal().Err(err).Msgf("error starting server: starting transaction failed")
}

tenantMgr := metadata.NewTenantManager()
if err := tenantMgr.Reload(ctx, tx); ulog.E(err) {
// ToDo: no need to panic, probably handle through async thread.
log.Err(err).Msgf("error starting server: reloading tenants failed")
}
_ = tx.Commit(ctx)

u.tenantMgr = tenantMgr
u.encoder = metadata.NewEncoder(tenantMgr)
u.encoder = metadata.NewEncoder(u.tenantMgr)
u.cdcMgr = cdc.NewManager()
u.sessions = NewSessionManager(u.txMgr, u.tenantMgr, u.versionH, u.cdcMgr, u.searchStore, u.encoder)
u.runnerFactory = NewQueryRunnerFactory(u.txMgr, u.encoder, u.cdcMgr, u.searchStore)
Expand Down
2 changes: 1 addition & 1 deletion server/services/v1/query_runner.go
Expand Up @@ -601,7 +601,7 @@ func (runner *SearchQueryRunner) Run(ctx context.Context, tx transaction.Tx, ten
break
}

resp.Meta.Page.Total = totalInPage
resp.Meta.Page.Size = totalInPage
if err := runner.streaming.Send(resp); err != nil {
return nil, ctx, err
}
Expand Down
7 changes: 5 additions & 2 deletions server/services/v1/service.go
Expand Up @@ -17,6 +17,8 @@ package v1
import (
"github.com/fullstorydev/grpchan/inprocgrpc"
"github.com/go-chi/chi/v5"
"github.com/tigrisdata/tigris/server/metadata"
"github.com/tigrisdata/tigris/server/transaction"
"github.com/tigrisdata/tigris/store/kv"
"github.com/tigrisdata/tigris/store/search"
"google.golang.org/grpc"
Expand All @@ -33,10 +35,11 @@ type Service interface {
RegisterGRPC(grpc *grpc.Server) error
}

func GetRegisteredServices(kvStore kv.KeyValueStore, searchStore search.Store) []Service {
func GetRegisteredServices(kvStore kv.KeyValueStore, searchStore search.Store, tenantMgr *metadata.TenantManager, txMgr *transaction.Manager) []Service {
var v1Services []Service

v1Services = append(v1Services, newApiService(kvStore, searchStore))
v1Services = append(v1Services, newApiService(kvStore, searchStore, tenantMgr, txMgr))
v1Services = append(v1Services, newHealthService())
v1Services = append(v1Services, newAdminService(tenantMgr, txMgr))
return v1Services
}

0 comments on commit 311de7d

Please sign in to comment.