Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make coordinator node wait for schema version if changed on validate step #4794

Merged
merged 1 commit into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cluster/store/retry_schema_with_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ import (
"github.com/weaviate/weaviate/usecases/sharding"
)

func (rs retrySchema) WaitForUpdate(ctx context.Context, version uint64) error {
if version > 0 {
return rs.versionedSchema.WaitForUpdate(ctx, version)
}
return nil
}

func (rs retrySchema) ClassInfoWithVersion(ctx context.Context, class string, version uint64) (ci ClassInfo, err error) {
if version > 0 {
return rs.versionedSchema.ClassInfo(ctx, class, version)
Expand Down
4 changes: 4 additions & 0 deletions usecases/objects/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func (m *Manager) addObjectToConnectorAndSchema(ctx context.Context, principal *
return nil, err
}

// Ensure that the local schema has caught up to the version we used to validate
if err := m.schemaManager.WaitForUpdate(ctx, schemaVersion); err != nil {
return nil, fmt.Errorf("error waiting for local schema to catch up to version %d: %w", schemaVersion, err)
}
err = m.vectorRepo.PutObject(ctx, object, object.Vector, object.Vectors, repl, schemaVersion)
if err != nil {
return nil, fmt.Errorf("put object: %w", err)
Expand Down
14 changes: 9 additions & 5 deletions usecases/objects/auto_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func (m *autoSchemaManager) determineNestedPropertiesOfArray(valArray []interfac

func (m *autoSchemaManager) autoTenants(ctx context.Context,
principal *models.Principal, objects []*models.Object,
) error {
) (uint64, error) {
classTenants := make(map[string]map[string]struct{})

// group by tenants by class
Expand All @@ -469,26 +469,30 @@ func (m *autoSchemaManager) autoTenants(ctx context.Context,
}

// skip invalid classes, non-MT classes, no auto tenant creation classes
var maxSchemaVersion uint64
for className, tenantNames := range classTenants {
class, _, err := m.schemaManager.GetCachedClass(ctx, principal, className)
class, schemaVersion, err := m.schemaManager.GetCachedClass(ctx, principal, className)
if err != nil || // invalid class
class == nil || // class is nil
!schema.MultiTenancyEnabled(class) || // non-MT class
!class.MultiTenancyConfig.AutoTenantCreation { // no auto tenant creation
continue
}

tenants := make([]*models.Tenant, len(tenantNames))
i := 0
for name := range tenantNames {
tenants[i] = &models.Tenant{Name: name}
i++
}
if err := m.addTenants(ctx, principal, className, tenants); err != nil {
return fmt.Errorf("add tenants to class %q: %w", className, err)
return 0, fmt.Errorf("add tenants to class %q: %w", className, err)
}

if schemaVersion > maxSchemaVersion {
maxSchemaVersion = schemaVersion
}
}
return nil
return maxSchemaVersion, nil
}

func (m *autoSchemaManager) addTenants(ctx context.Context, principal *models.Principal,
Expand Down
18 changes: 13 additions & 5 deletions usecases/objects/batch_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,28 @@ func (b *BatchManager) AddObjects(ctx context.Context, principal *models.Princip
return nil, errEmptyObjects
}

batchObjects, schemaVersion := b.validateAndGetVector(ctx, principal, objects, repl)

if err := b.autoSchemaManager.autoTenants(ctx, principal, objects); err != nil {
var maxSchemaVersion uint64
batchObjects, maxSchemaVersion := b.validateAndGetVector(ctx, principal, objects, repl)
schemaVersion, err := b.autoSchemaManager.autoTenants(ctx, principal, objects)
if err != nil {
return nil, fmt.Errorf("auto create tenants: %w", err)
}
if schemaVersion > maxSchemaVersion {
maxSchemaVersion = schemaVersion
}

b.metrics.BatchOp("total_preprocessing", beforePreProcessing.UnixNano())

var res BatchObjects

beforePersistence := time.Now()
defer b.metrics.BatchOp("total_persistence_level", beforePersistence.UnixNano())
if res, err = b.vectorRepo.BatchPutObjects(ctx, batchObjects, repl, schemaVersion); err != nil {

// Ensure that the local schema has caught up to the version we used to validate
if err := b.schemaManager.WaitForUpdate(ctx, maxSchemaVersion); err != nil {
return nil, fmt.Errorf("error waiting for local schema to catch up to version %d: %w", maxSchemaVersion, err)
}
if res, err = b.vectorRepo.BatchPutObjects(ctx, batchObjects, repl, maxSchemaVersion); err != nil {
return nil, NewErrInternal("batch objects: %#v", err)
}

Expand Down Expand Up @@ -157,7 +166,6 @@ func (b *BatchManager) validateAndGetVector(ctx context.Context, principal *mode
origIndex := originalIndexPerClass[className][i]
batchObjects[origIndex].Err = err
}

}

return batchObjects, maxSchemaVersion
Expand Down
4 changes: 4 additions & 0 deletions usecases/objects/batch_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func (b *BatchManager) deleteObjects(ctx context.Context, principal *models.Prin
return nil, NewErrInvalidUserInput("validate: %v", err)
}

// Ensure that the local schema has caught up to the version we used to validate
if err := b.schemaManager.WaitForUpdate(ctx, schemaVersion); err != nil {
return nil, fmt.Errorf("error waiting for local schema to catch up to version %d: %w", schemaVersion, err)
}
result, err := b.vectorRepo.BatchDeleteObjects(ctx, *params, repl, tenant, schemaVersion)
if err != nil {
return nil, fmt.Errorf("batch delete objects: %w", err)
Expand Down
6 changes: 4 additions & 2 deletions usecases/objects/batch_references_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ func (b *BatchManager) AddReferences(ctx context.Context, principal *models.Prin
}
defer unlock()

ctx = classcache.ContextWithClassCache(ctx)
moogacs marked this conversation as resolved.
Show resolved Hide resolved

b.metrics.BatchRefInc()
defer b.metrics.BatchRefDec()

Expand Down Expand Up @@ -82,6 +80,10 @@ func (b *BatchManager) addReferences(ctx context.Context, principal *models.Prin
}
}

// Ensure that the local schema has caught up to the version we used to validate
if err := b.schemaManager.WaitForUpdate(ctx, schemaVersion); err != nil {
return nil, fmt.Errorf("error waiting for local schema to catch up to version %d: %w", schemaVersion, err)
}
if res, err := b.vectorRepo.AddBatchReferences(ctx, batchReferences, repl, schemaVersion); err != nil {
return nil, NewErrInternal("could not add batch request to connector: %v", err)
} else {
Expand Down
4 changes: 4 additions & 0 deletions usecases/objects/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (m *Manager) DeleteObject(ctx context.Context,
return NewErrNotFound("object %v could not be found", path)
}

// Ensure that the local schema has caught up to the version we used to validate
if err := m.schemaManager.WaitForUpdate(ctx, schemaVersion); err != nil {
return fmt.Errorf("error waiting for local schema to catch up to version %d: %w", schemaVersion, err)
}
err = m.vectorRepo.DeleteObject(ctx, class, id, repl, tenant, schemaVersion)
if err != nil {
return NewErrInternal("could not delete object from vector repo: %v", err)
Expand Down
4 changes: 4 additions & 0 deletions usecases/objects/fakes_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func (f *fakeSchemaManager) MultiTenancy(class string) models.MultiTenancyConfig
return models.MultiTenancyConfig{Enabled: f.tenantsEnabled}
}

func (f *fakeSchemaManager) WaitForUpdate(ctx context.Context, schemaVersion uint64) error {
return nil
}

type fakeLocks struct {
Err error
}
Expand Down
3 changes: 3 additions & 0 deletions usecases/objects/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type schemaManager interface {
GetCachedClass(ctx context.Context, principal *models.Principal, name string,
) (*models.Class, uint64, error)

// WaitForUpdate ensures that the local schema has caught up to schemaVersion
WaitForUpdate(ctx context.Context, schemaVersion uint64) error

// GetConsistentSchema retrieves a locally cached copy of the schema
GetConsistentSchema(principal *models.Principal, consistency bool) (schema.Schema, error)
}
Expand Down
8 changes: 8 additions & 0 deletions usecases/objects/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ func (m *Manager) patchObject(ctx context.Context, principal *models.Principal,
mergeDoc.AdditionalProperties = objWithVec.Additional
}

// Ensure that the local schema has caught up to the version we used to validate
if err := m.schemaManager.WaitForUpdate(ctx, schemaVersion); err != nil {
return &Error{
Msg: fmt.Sprintf("error waiting for local schema to catch up to version %d", schemaVersion),
Code: StatusInternalServerError,
Err: err,
}
}
if err := m.vectorRepo.Merge(ctx, mergeDoc, repl, tenant, schemaVersion); err != nil {
return &Error{"repo.merge", StatusInternalServerError, err}
}
Expand Down
8 changes: 8 additions & 0 deletions usecases/objects/references_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ func (m *Manager) AddObjectReference(ctx context.Context, principal *models.Prin
}
}

// Ensure that the local schema has caught up to the version we used to validate
if err := m.schemaManager.WaitForUpdate(ctx, schemaVersion); err != nil {
return &Error{
Msg: fmt.Sprintf("error waiting for local schema to catch up to version %d", schemaVersion),
Code: StatusInternalServerError,
Err: err,
}
}
if err := m.vectorRepo.AddReference(ctx, source, target, repl, tenant, schemaVersion); err != nil {
return &Error{"add reference to repo", StatusInternalServerError, err}
}
Expand Down
8 changes: 8 additions & 0 deletions usecases/objects/references_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ func (m *Manager) DeleteObjectReference(ctx context.Context, principal *models.P
return &Error{"repo.putobject", StatusInternalServerError, err}
}

// Ensure that the local schema has caught up to the version we used to validate
if err := m.schemaManager.WaitForUpdate(ctx, schemaVersion); err != nil {
return &Error{
Msg: fmt.Sprintf("error waiting for local schema to catch up to version %d", schemaVersion),
Code: StatusInternalServerError,
Err: err,
}
}
if err := m.updateRefVector(ctx, principal, input.Class, input.ID, tenant, class, schemaVersion); err != nil {
return &Error{"update ref vector", StatusInternalServerError, err}
}
Expand Down
9 changes: 9 additions & 0 deletions usecases/objects/references_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ func (m *Manager) UpdateObjectReferences(ctx context.Context, principal *models.
obj.Properties.(map[string]interface{})[input.Property] = input.Refs
}
obj.LastUpdateTimeUnix = m.timeSource.Now()

// Ensure that the local schema has caught up to the version we used to validate
if err := m.schemaManager.WaitForUpdate(ctx, schemaVersion); err != nil {
return &Error{
Msg: fmt.Sprintf("error waiting for local schema to catch up to version %d", schemaVersion),
Code: StatusInternalServerError,
Err: err,
}
}
err = m.vectorRepo.PutObject(ctx, obj, res.Vector, res.Vectors, repl, schemaVersion)
if err != nil {
return &Error{"repo.putobject", StatusInternalServerError, err}
Expand Down
4 changes: 4 additions & 0 deletions usecases/schema/fakes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ func (f *fakeMetaHandler) GetShardsStatus(class, tenant string) (models.ShardSta
return args.Get(0).(models.ShardStatusList), args.Error(1)
}

func (f *fakeMetaHandler) WaitForUpdate(ctx context.Context, schemaVersion uint64) error {
return nil
}

type fakeStore struct {
collections map[string]*models.Class
parser Parser
Expand Down
3 changes: 3 additions & 0 deletions usecases/schema/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type metaWriter interface {
}

type metaReader interface {
// WaitForUpdate ensures that the local schema has caught up to schemaVersion
WaitForUpdate(ctx context.Context, schemaVersion uint64) error

ClassEqual(name string) string
// MultiTenancy checks for multi-tenancy support
MultiTenancy(class string) models.MultiTenancyConfig
Expand Down
Loading