Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

autoSchema: Optimizing Property Merging #4535

Merged
merged 4 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion adapters/handlers/rest/handlers_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (s *schemaHandlers) deleteClass(params schema.SchemaObjectsDeleteParams, pr
func (s *schemaHandlers) addClassProperty(params schema.SchemaObjectsPropertiesAddParams,
principal *models.Principal,
) middleware.Responder {
err := s.manager.AddClassProperty(params.HTTPRequest.Context(), principal, params.ClassName, params.Body)
err := s.manager.AddClassProperty(params.HTTPRequest.Context(), principal, s.manager.ReadOnlyClass(params.ClassName), false, params.Body)
if err != nil {
s.metricRequestsTotal.logError(params.ClassName, err)
switch err.(type) {
Expand Down
10 changes: 5 additions & 5 deletions adapters/repos/db/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,17 +409,17 @@ func (i *Index) IterateShards(ctx context.Context, cb func(index *Index, shard S
})
}

func (i *Index) addProperty(ctx context.Context, prop *models.Property) error {
func (i *Index) addProperty(ctx context.Context, props ...*models.Property) error {
eg := enterrors.NewErrorGroupWrapper(i.logger)
eg.SetLimit(_NUMCPU)

i.ForEachShard(func(key string, shard ShardLike) error {
shard.createPropertyIndex(ctx, prop, eg)
shard.createPropertyIndex(ctx, eg, props...)
if err := eg.Wait(); err != nil {
return errors.Wrapf(err, "extend idx '%s' with properties '%v", i.ID(), props)
}
return nil
})
if err := eg.Wait(); err != nil {
return errors.Wrapf(err, "extend idx '%s' with property '%s", i.ID(), prop.Name)
}
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions adapters/repos/db/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,13 @@ func (m *Migrator) updateIndexAddMissingProperties(ctx context.Context, idx *Ind
return nil
}

func (m *Migrator) AddProperty(ctx context.Context, className string, prop *models.Property) error {
func (m *Migrator) AddProperty(ctx context.Context, className string, prop ...*models.Property) error {
idx := m.db.GetIndex(schema.ClassName(className))
if idx == nil {
return errors.Errorf("cannot add property to a non-existing index for %s", className)
}

return idx.addProperty(ctx, prop)
return idx.addProperty(ctx, prop...)
}

// DropProperty is ignored, API compliant change
Expand Down
61 changes: 34 additions & 27 deletions adapters/repos/db/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type ShardLike interface {
addIDProperty(ctx context.Context) error
addDimensionsProperty(ctx context.Context) error
addTimestampProperties(ctx context.Context) error
createPropertyIndex(ctx context.Context, prop *models.Property, eg *enterrors.ErrorGroupWrapper)
createPropertyIndex(ctx context.Context, eg *enterrors.ErrorGroupWrapper, props ...*models.Property) error
BeginBackup(ctx context.Context) error
ListBackupFiles(ctx context.Context, ret *backup.ShardDescriptor) error
resumeMaintenanceCycles(ctx context.Context) error
Expand Down Expand Up @@ -713,36 +713,43 @@ func (s *Shard) dynamicMemtableSizing() lsmkv.BucketOption {
)
}

func (s *Shard) createPropertyIndex(ctx context.Context, prop *models.Property, eg *enterrors.ErrorGroupWrapper) {
if !inverted.HasInvertedIndex(prop) {
return
}

eg.Go(func() error {
if err := s.createPropertyValueIndex(ctx, prop); err != nil {
return errors.Wrapf(err, "create property '%s' value index on shard '%s'", prop.Name, s.ID())
func (s *Shard) createPropertyIndex(ctx context.Context, eg *enterrors.ErrorGroupWrapper, props ...*models.Property) error {
for _, prop := range props {
if !inverted.HasInvertedIndex(prop) {
continue
}

if s.index.invertedIndexConfig.IndexNullState {
eg.Go(func() error {
if err := s.createPropertyNullIndex(ctx, prop); err != nil {
return errors.Wrapf(err, "create property '%s' null index on shard '%s'", prop.Name, s.ID())
}
return nil
})
}
eg.Go(func() error {
if err := s.createPropertyValueIndex(ctx, prop); err != nil {
return errors.Wrapf(err, "create property '%s' value index on shard '%s'", prop.Name, s.ID())
}

if s.index.invertedIndexConfig.IndexPropertyLength {
eg.Go(func() error {
if err := s.createPropertyLengthIndex(ctx, prop); err != nil {
return errors.Wrapf(err, "create property '%s' length index on shard '%s'", prop.Name, s.ID())
}
return nil
})
}
if s.index.invertedIndexConfig.IndexNullState {
eg.Go(func() error {
if err := s.createPropertyNullIndex(ctx, prop); err != nil {
return errors.Wrapf(err, "create property '%s' null index on shard '%s'", prop.Name, s.ID())
}
return nil
})
}

return nil
})
if s.index.invertedIndexConfig.IndexPropertyLength {
eg.Go(func() error {
if err := s.createPropertyLengthIndex(ctx, prop); err != nil {
return errors.Wrapf(err, "create property '%s' length index on shard '%s'", prop.Name, s.ID())
}
return nil
})
}

return nil
})

if err := eg.Wait(); err != nil {
return err
}
}
return nil
}

func (s *Shard) createPropertyValueIndex(ctx context.Context, prop *models.Property) error {
Expand Down
4 changes: 1 addition & 3 deletions adapters/repos/db/shard_init_properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ func (s *Shard) initProperties(class *models.Class) error {
}

eg := enterrors.NewErrorGroupWrapper(s.index.logger)
for _, prop := range class.Properties {
s.createPropertyIndex(context.TODO(), prop, eg)
}
s.createPropertyIndex(context.Background(), eg, class.Properties...)

eg.Go(func() error {
if err := s.addIDProperty(context.TODO()); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions adapters/repos/db/shard_lazyloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,9 @@ func (l *LazyLoadShard) addTimestampProperties(ctx context.Context) error {
return l.shard.addTimestampProperties(ctx)
}

func (l *LazyLoadShard) createPropertyIndex(ctx context.Context, prop *models.Property, eg *enterrors.ErrorGroupWrapper) {
func (l *LazyLoadShard) createPropertyIndex(ctx context.Context, eg *enterrors.ErrorGroupWrapper, props ...*models.Property) error {
l.mustLoad()
l.shard.createPropertyIndex(ctx, prop, eg)
return l.shard.createPropertyIndex(ctx, eg, props...)
}

func (l *LazyLoadShard) BeginBackup(ctx context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion cluster/proto/cluster/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type UpdateClassRequest struct {
}

type AddPropertyRequest struct {
moogacs marked this conversation as resolved.
Show resolved Hide resolved
*models.Property
Properties []*models.Property
}

type DeleteClassRequest struct {
Expand Down
6 changes: 3 additions & 3 deletions cluster/store/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ func (db *localDB) AddProperty(cmd *command.ApplyRequest, schemaOnly bool) error
if err := json.Unmarshal(cmd.SubCommand, &req); err != nil {
return fmt.Errorf("%w: %w", errBadRequest, err)
}
if req.Property == nil {
return fmt.Errorf("%w: nil property", errBadRequest)
if len(req.Properties) == 0 {
return fmt.Errorf("%w: empty property", errBadRequest)
}

return db.apply(
cmd.GetType().String(),
func() error { return db.Schema.addProperty(cmd.Class, *req.Property) },
func() error { return db.Schema.addProperty(cmd.Class, req.Properties...) },
func() error { return db.store.AddProperty(cmd.Class, req) },
schemaOnly)
}
Expand Down
30 changes: 25 additions & 5 deletions cluster/store/meta_class.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package store

import (
"fmt"
"strings"
"sync"

command "github.com/weaviate/weaviate/cluster/proto/cluster"
Expand Down Expand Up @@ -121,19 +122,38 @@ func (m *metaClass) CopyShardingState() *sharding.State {
return &st
}

func (m *metaClass) AddProperty(p models.Property) error {
func (m *metaClass) AddProperty(props ...*models.Property) error {
m.Lock()
defer m.Unlock()

// update all at once to prevent race condition with concurrent readers
src := m.Class.Properties
dest := make([]*models.Property, len(src)+1)
copy(dest, src)
dest[len(src)] = &p
src := filterOutDuplicates(m.Class.Properties, props)
dest := make([]*models.Property, len(src)+len(props))
copy(dest, append(src, props...))
m.Class.Properties = dest
return nil
}

// filterOutDuplicates removes from the old any existing property could cause duplication
func filterOutDuplicates(old, new []*models.Property) []*models.Property {
// create memory to avoid duplication
var newUnique []*models.Property
mem := make(map[string]int, len(new))
antas-marcin marked this conversation as resolved.
Show resolved Hide resolved
for idx := range new {
mem[strings.ToLower(new[idx].Name)] = idx
}

// pick only what is not in the new proprieties
for idx := range old {
if _, exists := mem[strings.ToLower(old[idx].Name)]; exists {
continue
}
newUnique = append(newUnique, old[idx])
}

return newUnique
}

func (m *metaClass) AddTenants(nodeID string, req *command.AddTenantsRequest) error {
req.Tenants = removeNilTenants(req.Tenants)
m.Lock()
Expand Down
4 changes: 2 additions & 2 deletions cluster/store/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,15 @@ func (s *schema) deleteClass(name string) {
delete(s.Classes, name)
}

func (s *schema) addProperty(class string, p models.Property) error {
func (s *schema) addProperty(class string, props ...*models.Property) error {
s.Lock()
defer s.Unlock()

meta := s.Classes[class]
if meta == nil {
return errClassNotFound
}
return meta.AddProperty(p)
return meta.AddProperty(props...)
}

func (s *schema) addTenants(class string, req *command.AddTenantsRequest) error {
Expand Down
10 changes: 6 additions & 4 deletions cluster/store/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,13 @@ func (s *Service) RestoreClass(cls *models.Class, ss *sharding.State) error {
return s.Execute(command)
}

func (s *Service) AddProperty(class string, p *models.Property) error {
if p == nil || p.Name == "" || class == "" {
return fmt.Errorf("empty property or empty class name : %w", errBadRequest)
func (s *Service) AddProperty(class string, props ...*models.Property) error {
for _, p := range props {
if p == nil || p.Name == "" || class == "" {
return fmt.Errorf("empty property or empty class name : %w", errBadRequest)
}
}
req := cmd.AddPropertyRequest{Property: p}
req := cmd.AddPropertyRequest{Properties: props}
subCommand, err := json.Marshal(&req)
if err != nil {
return fmt.Errorf("marshal request: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions cluster/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,15 +444,15 @@ func TestStoreApply(t *testing.T) {
{
name: "AddProperty/ClassNotFound",
req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_PROPERTY,
cmd.AddPropertyRequest{Property: &models.Property{Name: "P1"}}, nil)},
cmd.AddPropertyRequest{Properties: []*models.Property{{Name: "P1"}}}, nil)},
resp: Response{Error: errSchema},
doBefore: doFirst,
},
{
name: "AddProperty/Nil",
req: raft.Log{
Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_PROPERTY,
cmd.AddPropertyRequest{Property: nil}, nil),
cmd.AddPropertyRequest{Properties: nil}, nil),
},
resp: Response{Error: errBadRequest},
doBefore: func(m *MockStore) {
Expand All @@ -464,7 +464,7 @@ func TestStoreApply(t *testing.T) {
name: "AddProperty/Success",
req: raft.Log{
Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_PROPERTY,
cmd.AddPropertyRequest{Property: &models.Property{Name: "P1"}}, nil),
cmd.AddPropertyRequest{Properties: []*models.Property{{Name: "P1"}}}, nil),
},
resp: Response{Error: nil},
doBefore: func(m *MockStore) {
Expand Down
20 changes: 11 additions & 9 deletions usecases/modules/module_config_init_and_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,19 @@ func (p *Provider) setPerClassConfigDefaults(cfg *ClassBasedModuleConfig,
// SetSinglePropertyDefaults can be used when a property is added later, e.g.
// as part of merging in a ref prop after a class has already been created
func (p *Provider) SetSinglePropertyDefaults(class *models.Class,
prop *models.Property,
props ...*models.Property,
) {
if !hasTargetVectors(class) {
p.setSinglePropertyDefaults(prop, class.Vectorizer)
return
}
for _, prop := range props {
if !hasTargetVectors(class) {
p.setSinglePropertyDefaults(prop, class.Vectorizer)
continue
}

for _, vectorConfig := range class.VectorConfig {
if moduleConfig, ok := vectorConfig.Vectorizer.(map[string]interface{}); ok && len(moduleConfig) == 1 {
for vectorizer := range moduleConfig {
p.setSinglePropertyDefaults(prop, vectorizer)
for _, vectorConfig := range class.VectorConfig {
if moduleConfig, ok := vectorConfig.Vectorizer.(map[string]interface{}); ok && len(moduleConfig) == 1 {
for vectorizer := range moduleConfig {
p.setSinglePropertyDefaults(prop, vectorizer)
}
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions usecases/objects/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ type schemaManager interface {
) (*models.Class, error)
// ReadOnlyClass return class model.
ReadOnlyClass(name string) *models.Class
// AddClassProperty it is upsert operation. it adds properties to a class and updates
// existing properties if the merge bool passed true.
AddClassProperty(ctx context.Context, principal *models.Principal,
class string, property *models.Property) error
MergeClassObjectProperty(ctx context.Context, principal *models.Principal,
class string, property *models.Property) error
class *models.Class, merge bool, prop ...*models.Property) error
MultiTenancy(class string) models.MultiTenancyConfig
}

Expand Down
Loading
Loading