Skip to content

Commit

Permalink
RAFT: support back RF scale+/-
Browse files Browse the repository at this point in the history
  • Loading branch information
moogacs committed May 6, 2024
1 parent 2a02d51 commit 301554f
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 65 deletions.
12 changes: 7 additions & 5 deletions adapters/repos/db/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func (m *shardMap) Load(name string) ShardLike {
if !ok {
return nil
}
return v.(ShardLike)
s := v.(ShardLike)
s.load()
return s
}

// Store sets a shard giving its name and value
Expand Down Expand Up @@ -1724,11 +1726,11 @@ func (i *Index) getOrInitLocalShardNoShutdown(ctx context.Context, shardName str
return nil, func() {}, err
}

release, err := shard.preventShutdown()
if err != nil {
return nil, func() {}, err
if shard != nil {
shard.load()
}
return shard, release, nil

return shard, func() {}, nil
}

// Intended to run on "receiver" nodes, where local shard
Expand Down
45 changes: 20 additions & 25 deletions adapters/repos/db/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type ShardLike interface {
Queues() map[string]*IndexQueue
Shutdown(context.Context) error // Shutdown the shard
preventShutdown() (release func(), err error)
load() error

// TODO tests only
ObjectList(ctx context.Context, limit int, sort []filters.Sort, cursor *filters.Cursor,
Expand Down Expand Up @@ -215,11 +216,9 @@ type Shard struct {
activityTracker atomic.Int32

// indicates whether shard is shut down or dropped (or ongoing)
shut bool
shut atomic.Bool
// indicates whether shard in being used at the moment (e.g. write request)
inUseCounter atomic.Int64
// allows concurrent shut read/write
shutdownLock *sync.RWMutex
}

func NewShard(ctx context.Context, promMetrics *monitoring.PrometheusMetrics,
Expand All @@ -238,11 +237,10 @@ func NewShard(ctx context.Context, promMetrics *monitoring.PrometheusMetrics,
replicationMap: pendingReplicaTasks{Tasks: make(map[string]replicaTask, 32)},
centralJobQueue: jobQueueCh,
indexCheckpoints: indexCheckpoints,

shut: false,
shutdownLock: new(sync.RWMutex),
}

s.shut.Store(false)

s.activityTracker.Store(1) // initial state
s.initCycleCallbacks()

Expand Down Expand Up @@ -1042,11 +1040,15 @@ func (s *Shard) Shutdown(ctx context.Context) (err error) {
return nil
}

func (s *Shard) preventShutdown() (release func(), err error) {
s.shutdownLock.RLock()
defer s.shutdownLock.RUnlock()
// load makes sure the shut flag is false
func (s *Shard) load() error {
s.shut.Store(false)
return nil
}

if s.shut {
func (s *Shard) preventShutdown() (release func(), err error) {
if s.shut.Load() {
s.index.logger.WithField("shard", s.Name()).Debug("was already shut or dropped")
return func() {}, errAlreadyShutdown
}

Expand All @@ -1061,9 +1063,7 @@ func (s *Shard) waitForShutdown(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

if eligible, err := s.checkEligibleForShutdown(); err != nil {
return err
} else if !eligible {
if !s.eligibleForShutdown() {
ticker := time.NewTicker(checkInterval)
defer ticker.Stop()

Expand All @@ -1072,9 +1072,7 @@ func (s *Shard) waitForShutdown(ctx context.Context) error {
case <-ctx.Done():
return fmt.Errorf("Shard::proceedWithShutdown: %w", ctx.Err())
case <-ticker.C:
if eligible, err := s.checkEligibleForShutdown(); err != nil {
return err
} else if eligible {
if s.eligibleForShutdown() {
return nil
}
}
Expand All @@ -1085,20 +1083,17 @@ func (s *Shard) waitForShutdown(ctx context.Context) error {

// checks whether shutdown can be executed
// (shard is not in use at the moment)
func (s *Shard) checkEligibleForShutdown() (eligible bool, err error) {
s.shutdownLock.Lock()
defer s.shutdownLock.Unlock()

if s.shut {
return false, errAlreadyShutdown
func (s *Shard) eligibleForShutdown() bool {
if s.shut.Load() {
return false
}

if s.inUseCounter.Load() == 0 {
s.shut = true
return true, nil
s.shut.Store(true)
return true
}

return false, nil
return false
}

func (s *Shard) NotifyReady() {
Expand Down
8 changes: 8 additions & 0 deletions adapters/repos/db/shard_lazyloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,14 @@ func (l *LazyLoadShard) Shutdown(ctx context.Context) error {
return l.shard.Shutdown(ctx)
}

func (l *LazyLoadShard) load() error {
if err := l.Load(context.Background()); err != nil {
return fmt.Errorf("LazyLoadShard::load: %w", err)
}

return l.shard.load()
}

func (l *LazyLoadShard) preventShutdown() (release func(), err error) {
if err := l.Load(context.Background()); err != nil {
return nil, fmt.Errorf("LazyLoadShard::preventShutdown: %w", err)
Expand Down
4 changes: 1 addition & 3 deletions cluster/store/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,7 @@ func (db *localDB) UpdateClass(cmd *command.ApplyRequest, nodeID string, schemaO
meta.Class.VectorIndexConfig = u.VectorIndexConfig
meta.Class.InvertedIndexConfig = u.InvertedIndexConfig
meta.Class.VectorConfig = u.VectorConfig
// TODO: fix PushShard issues before enabling scale out
// https://github.com/weaviate/weaviate/issues/4840
// meta.Class.ReplicationConfig = u.ReplicationConfig
meta.Class.ReplicationConfig = u.ReplicationConfig
meta.Class.MultiTenancyConfig = u.MultiTenancyConfig
meta.ClassVersion = cmd.Version
if req.State != nil {
Expand Down
49 changes: 41 additions & 8 deletions test/acceptance/replication/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ func immediateReplicaCRUD(t *testing.T) {
}

func eventualReplicaCRUD(t *testing.T) {
t.Skip("Skip until https://github.com/weaviate/weaviate/issues/4840 is resolved")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

Expand Down Expand Up @@ -354,19 +353,44 @@ func eventualReplicaCRUD(t *testing.T) {
helper.UpdateClass(t, pc)
})

t.Run("StopNode-2", func(t *testing.T) {
stopNodeAt(ctx, t, compose, 2)
t.Run("StopNode-3", func(t *testing.T) {
stopNodeAt(ctx, t, compose, 3)
})

t.Run("assert all previous data replicated to node 2", func(t *testing.T) {
resp := gqlGet(t, compose.GetWeaviateNode2().URI(), "Article", replica.Quorum)
assert.Len(t, resp, len(articleIDs))
resp = gqlGet(t, compose.GetWeaviateNode2().URI(), "Paragraph", replica.Quorum)
assert.Len(t, resp, len(paragraphIDs))
})

t.Run("RestartNode-3", func(t *testing.T) {
startNodeAt(ctx, t, compose, 3)
})

t.Run("configure classes to replicate to node 3", func(t *testing.T) {
ac := helper.GetClass(t, "Article")
ac.ReplicationConfig = &models.ReplicationConfig{
Factor: 3,
}
helper.UpdateClass(t, ac)

pc := helper.GetClass(t, "Paragraph")
pc.ReplicationConfig = &models.ReplicationConfig{
Factor: 3,
}
helper.UpdateClass(t, pc)
})

t.Run("assert all previous data replicated to node 3", func(t *testing.T) {
resp := gqlGet(t, compose.GetWeaviateNode3().URI(), "Article", replica.One)
resp := gqlGet(t, compose.GetWeaviateNode3().URI(), "Article", replica.All)
assert.Len(t, resp, len(articleIDs))
resp = gqlGet(t, compose.GetWeaviateNode3().URI(), "Paragraph", replica.One)
resp = gqlGet(t, compose.GetWeaviateNode3().URI(), "Paragraph", replica.All)
assert.Len(t, resp, len(paragraphIDs))
})

t.Run("RestartNode-1", func(t *testing.T) {
restartNode1(ctx, t, compose)
t.Run("RestartCluster", func(t *testing.T) {
restartCluster(ctx, t, compose)
})

t.Run("assert any future writes are replicated", func(t *testing.T) {
Expand Down Expand Up @@ -400,6 +424,15 @@ func eventualReplicaCRUD(t *testing.T) {
t.Run("RestartNode-2", func(t *testing.T) {
startNodeAt(ctx, t, compose, 2)
})

t.Run("PatchedOnNode-3", func(t *testing.T) {
after, err := getObjectFromNode(t, compose.GetWeaviateNode3().URI(), "Article", articleIDs[0], "node3")
require.Nil(t, err)

newVal, ok := after.Properties.(map[string]interface{})["title"]
require.True(t, ok)
assert.Equal(t, newTitle, newVal)
})
})

t.Run("DeleteObject", func(t *testing.T) {
Expand Down Expand Up @@ -443,7 +476,7 @@ func eventualReplicaCRUD(t *testing.T) {
})
}

func restartNode1(ctx context.Context, t *testing.T, compose *docker.DockerCompose) {
func restartCluster(ctx context.Context, t *testing.T, compose *docker.DockerCompose) {
// since node1 is the gossip "leader", node 2 and 3 must be stopped and restarted
// after node1 to re-facilitate internode communication
eg := errgroup.Group{}
Expand Down
1 change: 0 additions & 1 deletion test/acceptance/replication/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
)

func multiShardScaleOut(t *testing.T) {
t.Skip("Skip until https://github.com/weaviate/weaviate/issues/4840 is resolved")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

Expand Down
32 changes: 15 additions & 17 deletions usecases/schema/class.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,23 +222,21 @@ func (h *Handler) UpdateClass(ctx context.Context, principal *models.Principal,
return err
}

// TODO: fix PushShard issues before enabling scale out
// https://github.com/weaviate/weaviate/issues/4840
//initialRF := initial.ReplicationConfig.Factor
//updatedRF := updated.ReplicationConfig.Factor
//
//if initialRF != updatedRF {
// ss, _, err := h.metaWriter.QueryShardingState(className)
// if err != nil {
// return fmt.Errorf("query sharding state for %q: %w", className, err)
// }
// shardingState, err = h.scaleOut.Scale(ctx, className, ss.Config, initialRF, updatedRF)
// if err != nil {
// return fmt.Errorf(
// "scale %q from %d replicas to %d: %w",
// className, initialRF, updatedRF, err)
// }
//}
initialRF := initial.ReplicationConfig.Factor
updatedRF := updated.ReplicationConfig.Factor

if initialRF != updatedRF {
ss, _, err := h.metaWriter.QueryShardingState(className)
if err != nil {
return fmt.Errorf("query sharding state for %q: %w", className, err)
}
shardingState, err = h.scaleOut.Scale(ctx, className, ss.Config, initialRF, updatedRF)
if err != nil {
return fmt.Errorf(
"scale %q from %d replicas to %d: %w",
className, initialRF, updatedRF, err)
}
}

if err := validateImmutableFields(initial, updated); err != nil {
return err
Expand Down
6 changes: 0 additions & 6 deletions usecases/schema/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,6 @@ func (p *Parser) ParseClassUpdate(class, update *models.Class) (*models.Class, e
return nil, err
}

// TODO: fix PushShard issues before enabling scale out
// https://github.com/weaviate/weaviate/issues/4840
if class.ReplicationConfig.Factor != update.ReplicationConfig.Factor {
return nil, fmt.Errorf("updating replication factor is not supported yet")
}

if err := validateImmutableFields(class, update); err != nil {
return nil, err
}
Expand Down

0 comments on commit 301554f

Please sign in to comment.