Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Collection and Index creation to respect specified limits #1277

Merged
merged 2 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ local_rt_run: server
fdbcli -C ./test/config/fdb.cluster --exec "configure new single memory" || true
TIGRIS_SERVER_SERVER_TYPE=realtime ./server/service -c config/server.dev.yaml

local_debug:
$(DOCKER_COMPOSE) up --no-build --detach tigris_search tigris_db2

# Runs tigris server and foundationdb, plus additional tools for it like:
# - prometheus and grafana for monitoring
run_full: coverdir
Expand Down
2 changes: 1 addition & 1 deletion api/proto
Submodule proto updated from 5cc3ca to 7e29de
15 changes: 15 additions & 0 deletions api/server/v1/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,21 @@ func (x *ChargeTier) MarshalJSON() ([]byte, error) {
return jsoniter.Marshal(resp)
}

func (x *ProjectInfo) MarshalJSON() ([]byte, error) {
resp := struct {
Project string `json:"project"`
Limits *ProjectLimits `json:"limits"`
}{
Project: x.GetProject(),
Limits: x.GetLimits(),
}
if resp.Limits == nil {
resp.Limits = &ProjectLimits{}
}

return jsoniter.Marshal(resp)
}

func unmarshalAdditionalFunction(data []byte) (*AdditionalFunction, error) {
var mp map[string]jsoniter.RawMessage

Expand Down
3 changes: 2 additions & 1 deletion server/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type AuthConfig struct {
}

type Invitation struct {
Enabled bool `json:"enabled" mapstructure:"enabled" yaml:"enabled"`
Enabled bool `json:"enabled" mapstructure:"enabled" yaml:"enabled"`
ExpireAfterSec int64 `json:"expire_after_sec" mapstructure:"expire_after_sec" yaml:"expire_after_sec"`
}

Expand Down Expand Up @@ -388,6 +388,7 @@ var DefaultConfig = Config{
Compression: false,
IgnoreExtraFields: false,
LogFilter: false,
AuthKey: "ts_test_key",
},
KV: KVConfig{
Chunking: false,
Expand Down
18 changes: 18 additions & 0 deletions server/metadata/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -1224,9 +1224,27 @@ func (tenant *Tenant) ListProjects(_ context.Context) []string {

// GetProjectMetadata retrieves the metadata associated with the project.
func (tenant *Tenant) GetProjectMetadata(ctx context.Context, tx transaction.Tx, projName string) (*ProjectMetadata, error) {
tenant.RLock()
defer tenant.RUnlock()

_, ok := tenant.projects[projName]
if !ok {
return nil, NewProjectNotFoundErr(projName)
}
return tenant.namespaceStore.GetProjectMetadata(ctx, tx, tenant.namespace.Id(), projName)
}

func (tenant *Tenant) UpdateProjectMetadata(ctx context.Context, tx transaction.Tx, projName string, update *ProjectMetadata) error {
tenant.Lock()
defer tenant.Unlock()

_, ok := tenant.projects[projName]
if !ok {
return NewProjectNotFoundErr(projName)
}
return tenant.namespaceStore.UpdateProjectMetadata(ctx, tx, tenant.namespace.Id(), projName, update)
}

// DeleteTenant is used to delete tenant and all the content within it.
// This needs to be followed up with a "restart" of server to clear memory state.
// Be careful calling this.
Expand Down
18 changes: 15 additions & 3 deletions server/services/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (s *apiService) RegisterHTTP(router chi.Router, inproc *inprocgrpc.Channel)
router.HandleFunc(apiPathPrefix+projectsPath, func(w http.ResponseWriter, r *http.Request) {
mux.ServeHTTP(w, r)
})
for _, projectPath := range []string{"/create", "/delete"} {
for _, projectPath := range []string{"/create", "/delete", "/update"} {
// explicit add project related path
router.HandleFunc(apiPathPrefix+fullProjectPath+projectPath, func(w http.ResponseWriter, r *http.Request) {
mux.ServeHTTP(w, r)
Expand Down Expand Up @@ -522,8 +522,20 @@ func (s *apiService) CreateProject(ctx context.Context, r *api.CreateProjectRequ
}, nil
}

func (*apiService) UpdateProject(_ context.Context, _ *api.UpdateProjectRequest) (*api.UpdateProjectResponse, error) {
return nil, errors.Unimplemented("Update project is not available")
func (s *apiService) UpdateProject(ctx context.Context, r *api.UpdateProjectRequest) (*api.UpdateProjectResponse, error) {
accessToken, _ := request.GetAccessToken(ctx)
runner := s.runnerFactory.GetProjectQueryRunner(accessToken)
runner.SetUpdateProjectReq(r)

resp, err := s.sessions.Execute(ctx, runner, database.ReqOptions{
MetadataChange: true,
InstantVerTracking: true,
})
if err != nil {
return nil, err
}

return resp.Response.(*api.UpdateProjectResponse), nil
}

func (s *apiService) DeleteProject(ctx context.Context, r *api.DeleteProjectRequest) (*api.DeleteProjectResponse, error) {
Expand Down
14 changes: 14 additions & 0 deletions server/services/v1/database/collection_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,20 @@ func (runner *CollectionQueryRunner) createOrUpdate(ctx context.Context, tx tran
return Response{}, ctx, errors.AlreadyExists("collection already exist")
}

// collectionExists == true && check project limits
if collectionExists {
projMeta, err := tenant.GetProjectMetadata(ctx, tx, req.GetProject())
if err != nil {
return Response{}, ctx, err
}
if projMeta != nil && projMeta.Limits != nil && projMeta.Limits.MaxCollections != nil {
maxCollections := int(*(projMeta.Limits.MaxCollections))
if len(db.ListCollection()) >= maxCollections {
return Response{}, ctx, errors.InvalidArgument("collections limit reached for project: %d", maxCollections)
}
}
}

schFactory, err := schema.NewFactoryBuilder(true).Build(req.GetCollection(), req.GetSchema())
if err != nil {
return Response{}, ctx, err
Expand Down
41 changes: 41 additions & 0 deletions server/services/v1/database/database_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type ProjectQueryRunner struct {
createReq *api.CreateProjectRequest
listReq *api.ListProjectsRequest
describeReq *api.DescribeDatabaseRequest
updateReq *api.UpdateProjectRequest
}

func (runner *ProjectQueryRunner) SetCreateProjectReq(create *api.CreateProjectRequest) {
Expand All @@ -55,6 +56,10 @@ func (runner *ProjectQueryRunner) SetDescribeDatabaseReq(describe *api.DescribeD
runner.describeReq = describe
}

func (runner *ProjectQueryRunner) SetUpdateProjectReq(update *api.UpdateProjectRequest) {
runner.updateReq = update
}

func (runner *ProjectQueryRunner) create(ctx context.Context, tx transaction.Tx, tenant *metadata.Tenant) (Response, context.Context, error) {
projMetadata, err := createProjectMetadata(ctx, runner.createReq.Options)
if err != nil {
Expand Down Expand Up @@ -115,6 +120,40 @@ func (*ProjectQueryRunner) list(ctx context.Context, tx transaction.Tx, tenant *
}, ctx, nil
}

func (runner *ProjectQueryRunner) update(ctx context.Context, tx transaction.Tx, tenant *metadata.Tenant) (Response, context.Context, error) {
meta, err := tenant.GetProjectMetadata(ctx, tx, runner.updateReq.GetProject())
if err != nil {
return Response{}, ctx, err
}

if meta == nil {
meta, err = createProjectMetadata(ctx, runner.updateReq.GetOptions())
if err != nil {
return Response{}, ctx, err
}
} else if runner.updateReq.GetOptions() != nil {
limits := runner.updateReq.GetOptions().GetLimits()
if limits == nil {
meta.Limits = nil
} else {
meta.Limits = &metadata.ProjectLimits{
MaxCollections: limits.MaxCollections,
MaxSearchIndexes: limits.MaxSearchIndexes,
}
}
}
err = tenant.UpdateProjectMetadata(ctx, tx, runner.updateReq.GetProject(), meta)
if err != nil {
return Response{}, ctx, err
}

return Response{
Response: &api.UpdateProjectResponse{
Status: UpdatedStatus,
},
}, ctx, nil
}

func (runner *ProjectQueryRunner) describe(ctx context.Context, tx transaction.Tx, tenant *metadata.Tenant) (Response, context.Context, error) {
db, err := runner.getDatabase(ctx, tx, tenant, runner.describeReq.GetProject(), runner.describeReq.GetBranch())
if err != nil {
Expand Down Expand Up @@ -182,6 +221,8 @@ func (runner *ProjectQueryRunner) Run(ctx context.Context, tx transaction.Tx, te
return runner.list(ctx, tx, tenant)
case runner.describeReq != nil:
return runner.describe(ctx, tx, tenant)
case runner.updateReq != nil:
return runner.update(ctx, tx, tenant)
}

return Response{}, ctx, errors.Unknown("unknown request path")
Expand Down
12 changes: 12 additions & 0 deletions server/services/v1/search/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,18 @@ func (runner *IndexRunner) Run(ctx context.Context, tx transaction.Tx, tenant *m
if err != nil {
return Response{}, createApiError(err)
}
projMeta, err := tenant.GetProjectMetadata(ctx, tx, runner.create.GetProject())
if err != nil {
return Response{}, createApiError(err)
}
if projMeta != nil && projMeta.Limits != nil && projMeta.Limits.MaxSearchIndexes != nil {
maxIndexes := int(*projMeta.Limits.MaxSearchIndexes)
indexes := project.GetSearch().GetIndexes()
if len(indexes) >= maxIndexes {
return Response{}, errors.InvalidArgument("search indexes limit reached for project: %d", maxIndexes)
}
}

factory.Sub = currentSub
if err = tenant.CreateSearchIndex(ctx, tx, project, factory); err != nil {
return Response{}, createApiError(err)
Expand Down