Skip to content

Commit

Permalink
make coordinator node wait for schema version if changed on validate …
Browse files Browse the repository at this point in the history
…step

Signed-off-by: Loic Reyreaud <loic@weaviate.io>
  • Loading branch information
reyreaud-l committed Apr 29, 2024
1 parent 0f34714 commit a4fdf47
Show file tree
Hide file tree
Showing 15 changed files with 92 additions and 12 deletions.
7 changes: 7 additions & 0 deletions cluster/store/retry_schema_with_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ import (
"github.com/weaviate/weaviate/usecases/sharding"
)

func (rs retrySchema) WaitForUpdate(ctx context.Context, version uint64) error {
if version > 0 {
return rs.versionedSchema.WaitForUpdate(ctx, version)
}
return nil
}

func (rs retrySchema) ClassInfoWithVersion(ctx context.Context, class string, version uint64) (ci ClassInfo, err error) {
if version > 0 {
return rs.versionedSchema.ClassInfo(ctx, class, version)
Expand Down
4 changes: 4 additions & 0 deletions usecases/objects/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func (m *Manager) addObjectToConnectorAndSchema(ctx context.Context, principal *
return nil, err
}

// Ensure that the local schema has caught up to the version we used to validate
if err := m.schemaManager.WaitForUpdate(ctx, schemaVersion); err != nil {
return nil, fmt.Errorf("error waiting for local schema to catch up to version %d: %w", schemaVersion, err)
}
err = m.vectorRepo.PutObject(ctx, object, object.Vector, object.Vectors, repl, schemaVersion)
if err != nil {
return nil, fmt.Errorf("put object: %w", err)
Expand Down
14 changes: 9 additions & 5 deletions usecases/objects/auto_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func (m *autoSchemaManager) determineNestedPropertiesOfArray(valArray []interfac

func (m *autoSchemaManager) autoTenants(ctx context.Context,
principal *models.Principal, objects []*models.Object,
) error {
) (uint64, error) {
classTenants := make(map[string]map[string]struct{})

// group by tenants by class
Expand All @@ -469,26 +469,30 @@ func (m *autoSchemaManager) autoTenants(ctx context.Context,
}

// skip invalid classes, non-MT classes, no auto tenant creation classes
var maxSchemaVersion uint64
for className, tenantNames := range classTenants {
class, _, err := m.schemaManager.GetCachedClass(ctx, principal, className)
class, schemaVersion, err := m.schemaManager.GetCachedClass(ctx, principal, className)
if err != nil || // invalid class
class == nil || // class is nil
!schema.MultiTenancyEnabled(class) || // non-MT class
!class.MultiTenancyConfig.AutoTenantCreation { // no auto tenant creation
continue
}

tenants := make([]*models.Tenant, len(tenantNames))
i := 0
for name := range tenantNames {
tenants[i] = &models.Tenant{Name: name}
i++
}
if err := m.addTenants(ctx, principal, className, tenants); err != nil {
return fmt.Errorf("add tenants to class %q: %w", className, err)
return 0, fmt.Errorf("add tenants to class %q: %w", className, err)
}

if schemaVersion > maxSchemaVersion {
maxSchemaVersion = schemaVersion
}
}
return nil
return maxSchemaVersion, nil
}

func (m *autoSchemaManager) addTenants(ctx context.Context, principal *models.Principal,
Expand Down
18 changes: 13 additions & 5 deletions usecases/objects/batch_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,28 @@ func (b *BatchManager) AddObjects(ctx context.Context, principal *models.Princip
return nil, errEmptyObjects
}

batchObjects, schemaVersion := b.validateAndGetVector(ctx, principal, objects, repl)

if err := b.autoSchemaManager.autoTenants(ctx, principal, objects); err != nil {
var maxSchemaVersion uint64
batchObjects, maxSchemaVersion := b.validateAndGetVector(ctx, principal, objects, repl)
schemaVersion, err := b.autoSchemaManager.autoTenants(ctx, principal, objects)
if err != nil {
return nil, fmt.Errorf("auto create tenants: %w", err)
}
if schemaVersion > maxSchemaVersion {
maxSchemaVersion = schemaVersion
}

b.metrics.BatchOp("total_preprocessing", beforePreProcessing.UnixNano())

var res BatchObjects

beforePersistence := time.Now()
defer b.metrics.BatchOp("total_persistence_level", beforePersistence.UnixNano())
if res, err = b.vectorRepo.BatchPutObjects(ctx, batchObjects, repl, schemaVersion); err != nil {

// Ensure that the local schema has caught up to the version we used to validate
if err := b.schemaManager.WaitForUpdate(ctx, maxSchemaVersion); err != nil {
return nil, fmt.Errorf("error waiting for local schema to catch up to version %d: %w", maxSchemaVersion, err)
}
if res, err = b.vectorRepo.BatchPutObjects(ctx, batchObjects, repl, maxSchemaVersion); err != nil {
return nil, NewErrInternal("batch objects: %#v", err)
}

Expand Down Expand Up @@ -157,7 +166,6 @@ func (b *BatchManager) validateAndGetVector(ctx context.Context, principal *mode
origIndex := originalIndexPerClass[className][i]
batchObjects[origIndex].Err = err
}

}

return batchObjects, maxSchemaVersion
Expand Down
4 changes: 4 additions & 0 deletions usecases/objects/batch_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func (b *BatchManager) deleteObjects(ctx context.Context, principal *models.Prin
return nil, NewErrInvalidUserInput("validate: %v", err)
}

// Ensure that the local schema has caught up to the version we used to validate
if err := b.schemaManager.WaitForUpdate(ctx, schemaVersion); err != nil {
return nil, fmt.Errorf("error waiting for local schema to catch up to version %d: %w", schemaVersion, err)
}
result, err := b.vectorRepo.BatchDeleteObjects(ctx, *params, repl, tenant, schemaVersion)
if err != nil {
return nil, fmt.Errorf("batch delete objects: %w", err)
Expand Down
6 changes: 4 additions & 2 deletions usecases/objects/batch_references_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ func (b *BatchManager) AddReferences(ctx context.Context, principal *models.Prin
}
defer unlock()

ctx = classcache.ContextWithClassCache(ctx)

b.metrics.BatchRefInc()
defer b.metrics.BatchRefDec()

Expand Down Expand Up @@ -82,6 +80,10 @@ func (b *BatchManager) addReferences(ctx context.Context, principal *models.Prin
}
}

// Ensure that the local schema has caught up to the version we used to validate
if err := b.schemaManager.WaitForUpdate(ctx, schemaVersion); err != nil {
return nil, fmt.Errorf("error waiting for local schema to catch up to version %d: %w", schemaVersion, err)
}
if res, err := b.vectorRepo.AddBatchReferences(ctx, batchReferences, repl, schemaVersion); err != nil {
return nil, NewErrInternal("could not add batch request to connector: %v", err)
} else {
Expand Down
4 changes: 4 additions & 0 deletions usecases/objects/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (m *Manager) DeleteObject(ctx context.Context,
return NewErrNotFound("object %v could not be found", path)
}

// Ensure that the local schema has caught up to the version we used to validate
if err := m.schemaManager.WaitForUpdate(ctx, schemaVersion); err != nil {
return fmt.Errorf("error waiting for local schema to catch up to version %d: %w", schemaVersion, err)
}
err = m.vectorRepo.DeleteObject(ctx, class, id, repl, tenant, schemaVersion)
if err != nil {
return NewErrInternal("could not delete object from vector repo: %v", err)
Expand Down
4 changes: 4 additions & 0 deletions usecases/objects/fakes_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func (f *fakeSchemaManager) MultiTenancy(class string) models.MultiTenancyConfig
return models.MultiTenancyConfig{Enabled: f.tenantsEnabled}
}

func (f *fakeSchemaManager) WaitForUpdate(ctx context.Context, schemaVersion uint64) error {
return nil
}

type fakeLocks struct {
Err error
}
Expand Down
3 changes: 3 additions & 0 deletions usecases/objects/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type schemaManager interface {
GetCachedClass(ctx context.Context, principal *models.Principal, name string,
) (*models.Class, uint64, error)

// WaitForUpdate ensures that the local schema has caught up to schemaVersion
WaitForUpdate(ctx context.Context, schemaVersion uint64) error

// GetConsistentSchema retrieves a locally cached copy of the schema
GetConsistentSchema(principal *models.Principal, consistency bool) (schema.Schema, error)
}
Expand Down
8 changes: 8 additions & 0 deletions usecases/objects/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ func (m *Manager) patchObject(ctx context.Context, principal *models.Principal,
mergeDoc.AdditionalProperties = objWithVec.Additional
}

// Ensure that the local schema has caught up to the version we used to validate
if err := m.schemaManager.WaitForUpdate(ctx, schemaVersion); err != nil {
return &Error{
Msg: fmt.Sprintf("error waiting for local schema to catch up to version %d", schemaVersion),
Code: StatusInternalServerError,
Err: err,
}
}
if err := m.vectorRepo.Merge(ctx, mergeDoc, repl, tenant, schemaVersion); err != nil {
return &Error{"repo.merge", StatusInternalServerError, err}
}
Expand Down
8 changes: 8 additions & 0 deletions usecases/objects/references_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ func (m *Manager) AddObjectReference(ctx context.Context, principal *models.Prin
}
}

// Ensure that the local schema has caught up to the version we used to validate
if err := m.schemaManager.WaitForUpdate(ctx, schemaVersion); err != nil {
return &Error{
Msg: fmt.Sprintf("error waiting for local schema to catch up to version %d", schemaVersion),
Code: StatusInternalServerError,
Err: err,
}
}
if err := m.vectorRepo.AddReference(ctx, source, target, repl, tenant, schemaVersion); err != nil {
return &Error{"add reference to repo", StatusInternalServerError, err}
}
Expand Down
8 changes: 8 additions & 0 deletions usecases/objects/references_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ func (m *Manager) DeleteObjectReference(ctx context.Context, principal *models.P
return &Error{"repo.putobject", StatusInternalServerError, err}
}

// Ensure that the local schema has caught up to the version we used to validate
if err := m.schemaManager.WaitForUpdate(ctx, schemaVersion); err != nil {
return &Error{
Msg: fmt.Sprintf("error waiting for local schema to catch up to version %d", schemaVersion),
Code: StatusInternalServerError,
Err: err,
}
}
if err := m.updateRefVector(ctx, principal, input.Class, input.ID, tenant, class, schemaVersion); err != nil {
return &Error{"update ref vector", StatusInternalServerError, err}
}
Expand Down
9 changes: 9 additions & 0 deletions usecases/objects/references_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ func (m *Manager) UpdateObjectReferences(ctx context.Context, principal *models.
obj.Properties.(map[string]interface{})[input.Property] = input.Refs
}
obj.LastUpdateTimeUnix = m.timeSource.Now()

// Ensure that the local schema has caught up to the version we used to validate
if err := m.schemaManager.WaitForUpdate(ctx, schemaVersion); err != nil {
return &Error{
Msg: fmt.Sprintf("error waiting for local schema to catch up to version %d", schemaVersion),
Code: StatusInternalServerError,
Err: err,
}
}
err = m.vectorRepo.PutObject(ctx, obj, res.Vector, res.Vectors, repl, schemaVersion)
if err != nil {
return &Error{"repo.putobject", StatusInternalServerError, err}
Expand Down
4 changes: 4 additions & 0 deletions usecases/schema/fakes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ func (f *fakeMetaHandler) GetShardsStatus(class, tenant string) (models.ShardSta
return args.Get(0).(models.ShardStatusList), args.Error(1)
}

func (f *fakeMetaHandler) WaitForUpdate(ctx context.Context, schemaVersion uint64) error {
return nil
}

type fakeStore struct {
collections map[string]*models.Class
parser Parser
Expand Down
3 changes: 3 additions & 0 deletions usecases/schema/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type metaWriter interface {
}

type metaReader interface {
// WaitForUpdate ensures that the local schema has caught up to schemaVersion
WaitForUpdate(ctx context.Context, schemaVersion uint64) error

ClassEqual(name string) string
// MultiTenancy checks for multi-tenancy support
MultiTenancy(class string) models.MultiTenancyConfig
Expand Down

0 comments on commit a4fdf47

Please sign in to comment.