Skip to content

Commit

Permalink
Merge pull request #4856 from weaviate/support-raft-rf-scale
Browse files Browse the repository at this point in the history
RAFT: support back RF scale increase
  • Loading branch information
moogacs committed May 14, 2024
2 parents 4d46bbc + d9f1fd6 commit e902c0f
Show file tree
Hide file tree
Showing 21 changed files with 196 additions and 179 deletions.
2 changes: 1 addition & 1 deletion adapters/handlers/rest/clusterapi/indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,7 @@ func (i *indices) postShard() http.Handler {
func (i *indices) putShardReinit() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
args := i.regexpShardReinit.FindStringSubmatch(r.URL.Path)
fmt.Println(args)

if len(args) != 3 {
http.Error(w, "invalid URI", http.StatusBadRequest)
return
Expand Down
1 change: 1 addition & 0 deletions adapters/repos/db/helper_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ func testShardWithSettings(t *testing.T, ctx context.Context, class *models.Clas
RootPath: tmpDir,
ClassName: schema.ClassName(class.Class),
QueryMaximumResults: maxResults,
ReplicationFactor: NewAtomicInt64(1),
},
invertedIndexConfig: iic,
vectorIndexUserConfig: vic,
Expand Down
4 changes: 2 additions & 2 deletions adapters/repos/db/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ type IndexConfig struct {
MemtablesMaxActiveSeconds int
MaxSegmentSize int64
HNSWMaxLogSize int64
ReplicationFactor int64
ReplicationFactor *atomic.Int64
AvoidMMap bool
DisableLazyLoadShards bool

Expand Down Expand Up @@ -688,7 +688,7 @@ func (i *Index) IncomingPutObject(ctx context.Context, shardName string,
}

func (i *Index) replicationEnabled() bool {
return i.Config.ReplicationFactor > 1
return i.Config.ReplicationFactor.Load() > 1
}

// parseDateFieldsInProps checks the schema for the current class for which
Expand Down
16 changes: 10 additions & 6 deletions adapters/repos/db/index_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ func TestIndex_DropWithDataAndRecreateWithDataIndex(t *testing.T) {
// create index with data
shardState := singleShardState()
index, err := NewIndex(testCtx(), IndexConfig{
RootPath: dirName,
ClassName: schema.ClassName(class.Class),
RootPath: dirName,
ClassName: schema.ClassName(class.Class),
ReplicationFactor: NewAtomicInt64(1),
}, shardState, inverted.ConfigFromModel(class.InvertedIndexConfig),
hnsw.NewDefaultUserConfig(), nil, &fakeSchemaGetter{
schema: fakeSchema, shardState: shardState,
Expand Down Expand Up @@ -163,8 +164,9 @@ func TestIndex_DropWithDataAndRecreateWithDataIndex(t *testing.T) {

// recreate the index
index, err = NewIndex(testCtx(), IndexConfig{
RootPath: dirName,
ClassName: schema.ClassName(class.Class),
RootPath: dirName,
ClassName: schema.ClassName(class.Class),
ReplicationFactor: NewAtomicInt64(1),
}, shardState, inverted.ConfigFromModel(class.InvertedIndexConfig),
hnsw.NewDefaultUserConfig(), nil, &fakeSchemaGetter{
schema: fakeSchema,
Expand Down Expand Up @@ -273,8 +275,9 @@ func TestIndex_DropReadOnlyIndexWithData(t *testing.T) {

shardState := singleShardState()
index, err := NewIndex(ctx, IndexConfig{
RootPath: dirName,
ClassName: schema.ClassName(class.Class),
RootPath: dirName,
ClassName: schema.ClassName(class.Class),
ReplicationFactor: NewAtomicInt64(1),
}, shardState, inverted.ConfigFromModel(class.InvertedIndexConfig),
hnsw.NewDefaultUserConfig(), nil, &fakeSchemaGetter{
schema: fakeSchema, shardState: shardState,
Expand Down Expand Up @@ -332,6 +335,7 @@ func emptyIdx(t *testing.T, rootDir string, class *models.Class) *Index {
RootPath: rootDir,
ClassName: schema.ClassName(class.Class),
DisableLazyLoadShards: true,
ReplicationFactor: NewAtomicInt64(1),
}, shardState, inverted.ConfigFromModel(invertedConfig()),
hnsw.NewDefaultUserConfig(), nil, &fakeSchemaGetter{
shardState: shardState,
Expand Down
9 changes: 8 additions & 1 deletion adapters/repos/db/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"os"
"path"
"sync/atomic"
"time"

enterrors "github.com/weaviate/weaviate/entities/errors"
Expand Down Expand Up @@ -94,7 +95,7 @@ func (db *DB) init(ctx context.Context) error {
TrackVectorDimensions: db.config.TrackVectorDimensions,
AvoidMMap: db.config.AvoidMMap,
DisableLazyLoadShards: db.config.DisableLazyLoadShards,
ReplicationFactor: class.ReplicationConfig.Factor,
ReplicationFactor: NewAtomicInt64(class.ReplicationConfig.Factor),
}, db.schemaGetter.CopyShardingState(class.Class),
inverted.ConfigFromModel(invertedConfig),
convertToVectorIndexConfig(class.VectorIndexConfig),
Expand Down Expand Up @@ -168,3 +169,9 @@ func fileExists(file string) (bool, error) {
}
return true, nil
}

func NewAtomicInt64(val int64) *atomic.Int64 {
aval := &atomic.Int64{}
aval.Store(val)
return aval
}
12 changes: 11 additions & 1 deletion adapters/repos/db/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (m *Migrator) AddClass(ctx context.Context, class *models.Class,
TrackVectorDimensions: m.db.config.TrackVectorDimensions,
AvoidMMap: m.db.config.AvoidMMap,
DisableLazyLoadShards: m.db.config.DisableLazyLoadShards,
ReplicationFactor: class.ReplicationConfig.Factor,
ReplicationFactor: NewAtomicInt64(class.ReplicationConfig.Factor),
},
shardState,
// no backward-compatibility check required, since newly added classes will
Expand Down Expand Up @@ -456,6 +456,16 @@ func (m *Migrator) UpdateInvertedIndexConfig(ctx context.Context, className stri
return idx.updateInvertedIndexConfig(ctx, conf)
}

func (m *Migrator) UpdateReplicationFactor(ctx context.Context, className string, factor int64) error {
idx := m.db.GetIndex(schema.ClassName(className))
if idx == nil {
return errors.Errorf("cannot update replication factor of non-existing index for %s", className)
}

idx.Config.ReplicationFactor.Store(factor)
return nil
}

func (m *Migrator) RecalculateVectorDimensions(ctx context.Context) error {
count := 0
m.logger.
Expand Down
6 changes: 4 additions & 2 deletions adapters/repos/db/node_wide_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ func TestShardActivity(t *testing.T) {
indices: map[string]*Index{
"Col1": {
Config: IndexConfig{
ClassName: "Col1",
ClassName: "Col1",
ReplicationFactor: NewAtomicInt64(1),
},
partitioningEnabled: true,
shards: shardMap{},
},
"NonMT": {
Config: IndexConfig{
ClassName: "NonMT",
ClassName: "NonMT",
ReplicationFactor: NewAtomicInt64(1),
},
partitioningEnabled: false,
shards: shardMap{},
Expand Down
56 changes: 27 additions & 29 deletions adapters/repos/db/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"path/filepath"

"github.com/go-openapi/strfmt"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/entities/additional"
"github.com/weaviate/weaviate/entities/multi"
"github.com/weaviate/weaviate/entities/schema"
Expand Down Expand Up @@ -253,12 +254,34 @@ func (i *Index) IncomingCreateShard(ctx context.Context, className string, shard
func (i *Index) IncomingReinitShard(ctx context.Context,
shardName string,
) error {
shard, err := i.getOrInitLocalShard(ctx, shardName)
if err != nil {
return fmt.Errorf("shard %q does not exist locally", shardName)
shard := func() ShardLike {
i.shardInUseLocks.Lock(shardName)
defer i.shardInUseLocks.Unlock(shardName)

return i.shards.Load(shardName)
}()

if shard != nil {
err := func() error {
i.shardCreateLocks.Lock(shardName)
defer i.shardCreateLocks.Unlock(shardName)

i.shards.LoadAndDelete(shardName)

if err := shard.Shutdown(ctx); err != nil {
if !errors.Is(err, errAlreadyShutdown) {
return err
}
}
return nil
}()
if err != nil {
return err
}
}

return shard.reinit(ctx)
_, err := i.getOrInitLocalShard(ctx, shardName)
return err
}

func (s *Shard) filePutter(ctx context.Context,
Expand All @@ -280,31 +303,6 @@ func (s *Shard) filePutter(ctx context.Context,
return f, nil
}

func (s *Shard) reinit(ctx context.Context) error {
if err := s.Shutdown(ctx); err != nil {
return fmt.Errorf("shutdown shard: %w", err)
}

if err := s.initNonVector(ctx, nil); err != nil {
return fmt.Errorf("reinit non-vector: %w", err)
}

if s.hasTargetVectors() {
if err := s.initTargetVectors(ctx); err != nil {
return fmt.Errorf("reinit vector: %w", err)
}
} else {
if err := s.initLegacyVector(ctx); err != nil {
return fmt.Errorf("reinit vector: %w", err)
}
}

s.initCycleCallbacks()
s.initDimensionTracking()

return nil
}

// OverwriteObjects if their state didn't change in the meantime
// It returns nil if all object have been successfully overwritten
// and otherwise a list of failed operations.
Expand Down
1 change: 0 additions & 1 deletion adapters/repos/db/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ type ShardLike interface {

commitReplication(context.Context, string, *backupMutex) interface{}
abortReplication(context.Context, string) replica.SimpleResponse
reinit(context.Context) error
filePutter(context.Context, string) (io.WriteCloser, error)

// TODO tests only
Expand Down
7 changes: 0 additions & 7 deletions adapters/repos/db/shard_lazyloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,13 +504,6 @@ func (l *LazyLoadShard) abortReplication(ctx context.Context, shardID string) re
return l.shard.abortReplication(ctx, shardID)
}

func (l *LazyLoadShard) reinit(ctx context.Context) error {
if err := l.Load(ctx); err != nil {
return err
}
return l.shard.reinit(ctx)
}

func (l *LazyLoadShard) filePutter(ctx context.Context, shardID string) (io.WriteCloser, error) {
if err := l.Load(ctx); err != nil {
return nil, err
Expand Down
12 changes: 4 additions & 8 deletions cluster/store/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,12 @@ func (db *localDB) UpdateClass(cmd *command.ApplyRequest, nodeID string, schemaO
meta.Class.VectorIndexConfig = u.VectorIndexConfig
meta.Class.InvertedIndexConfig = u.InvertedIndexConfig
meta.Class.VectorConfig = u.VectorConfig
// TODO: fix PushShard issues before enabling scale out
// https://github.com/weaviate/weaviate/issues/4840
// meta.Class.ReplicationConfig = u.ReplicationConfig
meta.Class.ReplicationConfig = u.ReplicationConfig
meta.Class.MultiTenancyConfig = u.MultiTenancyConfig
meta.ClassVersion = cmd.Version
// TODO: fix PushShard issues before enabling scale out
// https://github.com/weaviate/weaviate/issues/4840
// if req.State != nil {
// meta.Sharding = *req.State
// }
if req.State != nil {
meta.Sharding = *req.State
}
return nil
}

Expand Down
Loading

0 comments on commit e902c0f

Please sign in to comment.