Skip to content

Commit

Permalink
Merge pull request #4882 from weaviate/refactor-db-open-4
Browse files Browse the repository at this point in the history
raft: remove schemaOnly and rename lastAppliedIndexOnStart
  • Loading branch information
reyreaud-l committed May 10, 2024
2 parents 9a018c7 + 348da5b commit 4ca4c0f
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 49 deletions.
2 changes: 1 addition & 1 deletion adapters/repos/db/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (db *DB) localNodeStatistics() (*models.Statistics, error) {
IsVoter: stats["is_voter"].(bool),
Open: stats["open"].(bool),
Bootstrapped: stats["bootstrapped"].(bool),
InitialLastAppliedIndex: stats["initial_last_applied_index"].(uint64),
InitialLastAppliedIndex: stats["last_store_log_applied_index"].(uint64),
DbLoaded: stats["db_loaded"].(bool),
Candidates: stats["candidates"],
Raft: raft,
Expand Down
28 changes: 9 additions & 19 deletions cluster/store/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (db *localDB) SetIndexer(idx Indexer) {
db.Schema.shardReader = idx
}

func (db *localDB) AddClass(cmd *command.ApplyRequest, nodeID string, schemaOnly bool) error {
func (db *localDB) AddClass(cmd *command.ApplyRequest, nodeID string) error {
req := command.AddClassRequest{}
if err := json.Unmarshal(cmd.SubCommand, &req); err != nil {
return fmt.Errorf("%w: %w", errBadRequest, err)
Expand All @@ -57,13 +57,12 @@ func (db *localDB) AddClass(cmd *command.ApplyRequest, nodeID string, schemaOnly
op: cmd.GetType().String(),
updateSchema: func() error { return db.Schema.addClass(req.Class, req.State, cmd.Version) },
updateStore: func() error { return db.store.AddClass(req) },
schemaOnly: schemaOnly,
triggerSchemaCallback: true,
},
)
}

func (db *localDB) RestoreClass(cmd *command.ApplyRequest, nodeID string, schemaOnly bool) error {
func (db *localDB) RestoreClass(cmd *command.ApplyRequest, nodeID string) error {
req := command.AddClassRequest{}
if err := json.Unmarshal(cmd.SubCommand, &req); err != nil {
return fmt.Errorf("%w: %w", errBadRequest, err)
Expand All @@ -87,15 +86,14 @@ func (db *localDB) RestoreClass(cmd *command.ApplyRequest, nodeID string, schema
op: cmd.GetType().String(),
updateSchema: func() error { return db.Schema.addClass(req.Class, req.State, cmd.Version) },
updateStore: func() error { return db.store.AddClass(req) },
schemaOnly: schemaOnly,
triggerSchemaCallback: true,
},
)
}

// UpdateClass modifies the vectors and inverted indexes associated with a class
// Other class properties are handled by separate functions
func (db *localDB) UpdateClass(cmd *command.ApplyRequest, nodeID string, schemaOnly bool) error {
func (db *localDB) UpdateClass(cmd *command.ApplyRequest, nodeID string) error {
req := command.UpdateClassRequest{}
if err := json.Unmarshal(cmd.SubCommand, &req); err != nil {
return fmt.Errorf("%w: %w", errBadRequest, err)
Expand Down Expand Up @@ -130,25 +128,23 @@ func (db *localDB) UpdateClass(cmd *command.ApplyRequest, nodeID string, schemaO
op: cmd.GetType().String(),
updateSchema: func() error { return db.Schema.updateClass(req.Class.Class, update) },
updateStore: func() error { return db.store.UpdateClass(req) },
schemaOnly: schemaOnly,
triggerSchemaCallback: true,
},
)
}

func (db *localDB) DeleteClass(cmd *command.ApplyRequest, schemaOnly bool) error {
func (db *localDB) DeleteClass(cmd *command.ApplyRequest) error {
return db.apply(
applyOp{
op: cmd.GetType().String(),
updateSchema: func() error { db.Schema.deleteClass(cmd.Class); return nil },
updateStore: func() error { return db.store.DeleteClass(cmd.Class) },
schemaOnly: schemaOnly,
triggerSchemaCallback: true,
},
)
}

func (db *localDB) AddProperty(cmd *command.ApplyRequest, schemaOnly bool) error {
func (db *localDB) AddProperty(cmd *command.ApplyRequest) error {
req := command.AddPropertyRequest{}
if err := json.Unmarshal(cmd.SubCommand, &req); err != nil {
return fmt.Errorf("%w: %w", errBadRequest, err)
Expand All @@ -162,13 +158,12 @@ func (db *localDB) AddProperty(cmd *command.ApplyRequest, schemaOnly bool) error
op: cmd.GetType().String(),
updateSchema: func() error { return db.Schema.addProperty(cmd.Class, cmd.Version, req.Properties...) },
updateStore: func() error { return db.store.AddProperty(cmd.Class, req) },
schemaOnly: schemaOnly,
triggerSchemaCallback: true,
},
)
}

func (db *localDB) UpdateShardStatus(cmd *command.ApplyRequest, schemaOnly bool) error {
func (db *localDB) UpdateShardStatus(cmd *command.ApplyRequest) error {
req := command.UpdateShardStatusRequest{}
if err := json.Unmarshal(cmd.SubCommand, &req); err != nil {
return fmt.Errorf("%w: %w", errBadRequest, err)
Expand All @@ -179,12 +174,11 @@ func (db *localDB) UpdateShardStatus(cmd *command.ApplyRequest, schemaOnly bool)
op: cmd.GetType().String(),
updateSchema: func() error { return nil },
updateStore: func() error { return db.store.UpdateShardStatus(&req) },
schemaOnly: schemaOnly,
},
)
}

func (db *localDB) AddTenants(cmd *command.ApplyRequest, schemaOnly bool) error {
func (db *localDB) AddTenants(cmd *command.ApplyRequest) error {
req := &command.AddTenantsRequest{}
if err := gproto.Unmarshal(cmd.SubCommand, req); err != nil {
return fmt.Errorf("%w: %w", errBadRequest, err)
Expand All @@ -195,12 +189,11 @@ func (db *localDB) AddTenants(cmd *command.ApplyRequest, schemaOnly bool) error
op: cmd.GetType().String(),
updateSchema: func() error { return db.Schema.addTenants(cmd.Class, cmd.Version, req) },
updateStore: func() error { return db.store.AddTenants(cmd.Class, req) },
schemaOnly: schemaOnly,
},
)
}

func (db *localDB) UpdateTenants(cmd *command.ApplyRequest, schemaOnly bool) (n int, err error) {
func (db *localDB) UpdateTenants(cmd *command.ApplyRequest) (n int, err error) {
req := &command.UpdateTenantsRequest{}
if err := gproto.Unmarshal(cmd.SubCommand, req); err != nil {
return 0, fmt.Errorf("%w: %w", errBadRequest, err)
Expand All @@ -211,12 +204,11 @@ func (db *localDB) UpdateTenants(cmd *command.ApplyRequest, schemaOnly bool) (n
op: cmd.GetType().String(),
updateSchema: func() error { n, err = db.Schema.updateTenants(cmd.Class, cmd.Version, req); return err },
updateStore: func() error { return db.store.UpdateTenants(cmd.Class, req) },
schemaOnly: schemaOnly,
},
)
}

func (db *localDB) DeleteTenants(cmd *command.ApplyRequest, schemaOnly bool) error {
func (db *localDB) DeleteTenants(cmd *command.ApplyRequest) error {
req := &command.DeleteTenantsRequest{}
if err := gproto.Unmarshal(cmd.SubCommand, req); err != nil {
return fmt.Errorf("%w: %w", errBadRequest, err)
Expand All @@ -227,7 +219,6 @@ func (db *localDB) DeleteTenants(cmd *command.ApplyRequest, schemaOnly bool) err
op: cmd.GetType().String(),
updateSchema: func() error { return db.Schema.deleteTenants(cmd.Class, cmd.Version, req) },
updateStore: func() error { return db.store.DeleteTenants(cmd.Class, req) },
schemaOnly: schemaOnly,
},
)
}
Expand All @@ -247,7 +238,6 @@ type applyOp struct {
op string
updateSchema func() error
updateStore func() error
schemaOnly bool
triggerSchemaCallback bool
}

Expand Down
55 changes: 26 additions & 29 deletions cluster/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,8 @@ type Store struct {
mutex sync.Mutex
candidates map[string]string

// initialLastAppliedIndex represents the index of the last applied command when the store is opened.
initialLastAppliedIndex uint64

// lastIndex atomic.Uint64
// lastAppliedIndexOnStart represents the index of the last applied command when the store is opened.
lastAppliedIndexOnStart uint64

// lastAppliedIndex index of latest update to the store
lastAppliedIndex atomic.Uint64
Expand Down Expand Up @@ -239,7 +237,7 @@ func (st *Store) Open(ctx context.Context) (err error) {
}

rLog := rLog{st.logStore}
st.initialLastAppliedIndex, err = rLog.LastAppliedCommand()
st.lastAppliedIndexOnStart, err = rLog.LastAppliedCommand()
if err != nil {
return fmt.Errorf("read log last command: %w", err)
}
Expand All @@ -252,19 +250,19 @@ func (st *Store) Open(ctx context.Context) (err error) {
if err != nil {
return fmt.Errorf("raft.NewRaft %v %w", st.transport.LocalAddr(), err)
}
if st.initialLastAppliedIndex <= st.raft.LastIndex() {
if st.lastAppliedIndexOnStart <= st.raft.LastIndex() {
// this should include empty and non empty node
st.openDatabase(ctx)
}

st.lastAppliedIndex.Store(st.raft.AppliedIndex())

st.log.WithFields(logrus.Fields{
"raft_applied_index": st.raft.AppliedIndex(),
"raft_last_index": st.raft.LastIndex(),
"initial_store_log_applied_index": st.initialLastAppliedIndex,
"last_store_applied_index": st.lastAppliedIndex.Load(),
"last_snapshot_index": snapshotIndex(st.snapshotStore),
"raft_applied_index": st.raft.AppliedIndex(),
"raft_last_index": st.raft.LastIndex(),
"last_store_log_applied_index": st.lastAppliedIndexOnStart,
"last_store_applied_index": st.lastAppliedIndex.Load(),
"last_snapshot_index": snapshotIndex(st.snapshotStore),
}).Info("raft node constructed")

// There's no hard limit on the migration, so it should take as long as necessary.
Expand Down Expand Up @@ -516,8 +514,8 @@ func (f *Store) FindSimilarClass(name string) string {
// The value of "candidates" is a map[string]string of the current candidates IDs/addresses,
// see Store.candidates.
//
// The value of "initial_last_applied_index" is the index of the last applied command found when
// the store was opened, see Store.initialLastAppliedIndex.
// The value of "last_store_log_applied_index" is the index of the last applied command found when
// the store was opened, see Store.lastAppliedIndexOnStart.
//
// The value of "last_applied_index" is the index of the latest update to the store,
// see Store.lastAppliedIndex.
Expand All @@ -540,7 +538,7 @@ func (st *Store) Stats() map[string]any {
stats["open"] = st.open.Load()
stats["bootstrapped"] = st.bootstrapped.Load()
stats["candidates"] = st.candidates
stats["initial_last_applied_index"] = st.initialLastAppliedIndex
stats["last_store_log_applied_index"] = st.lastAppliedIndexOnStart
stats["last_applied_index"] = st.lastAppliedIndex.Load()
stats["db_loaded"] = st.dbLoaded.Load()

Expand Down Expand Up @@ -639,7 +637,6 @@ func (st *Store) Apply(l *raft.Log) interface{} {
panic("error proto un-marshalling log data")
}

schemaOnly := l.Index <= st.initialLastAppliedIndex
defer func() {
st.lastAppliedIndex.Store(l.Index)
if ret.Error != nil {
Expand All @@ -654,31 +651,31 @@ func (st *Store) Apply(l *raft.Log) interface{} {
switch cmd.Type {

case api.ApplyRequest_TYPE_ADD_CLASS:
ret.Error = st.db.AddClass(&cmd, st.nodeID, schemaOnly)
ret.Error = st.db.AddClass(&cmd, st.nodeID)

case api.ApplyRequest_TYPE_RESTORE_CLASS:
ret.Error = st.db.RestoreClass(&cmd, st.nodeID, schemaOnly)
ret.Error = st.db.RestoreClass(&cmd, st.nodeID)

case api.ApplyRequest_TYPE_UPDATE_CLASS:
ret.Error = st.db.UpdateClass(&cmd, st.nodeID, schemaOnly)
ret.Error = st.db.UpdateClass(&cmd, st.nodeID)

case api.ApplyRequest_TYPE_DELETE_CLASS:
ret.Error = st.db.DeleteClass(&cmd, schemaOnly)
ret.Error = st.db.DeleteClass(&cmd)

case api.ApplyRequest_TYPE_ADD_PROPERTY:
ret.Error = st.db.AddProperty(&cmd, schemaOnly)
ret.Error = st.db.AddProperty(&cmd)

case api.ApplyRequest_TYPE_UPDATE_SHARD_STATUS:
ret.Error = st.db.UpdateShardStatus(&cmd, schemaOnly)
ret.Error = st.db.UpdateShardStatus(&cmd)

case api.ApplyRequest_TYPE_ADD_TENANT:
ret.Error = st.db.AddTenants(&cmd, schemaOnly)
ret.Error = st.db.AddTenants(&cmd)

case api.ApplyRequest_TYPE_UPDATE_TENANT:
ret.Data, ret.Error = st.db.UpdateTenants(&cmd, schemaOnly)
ret.Data, ret.Error = st.db.UpdateTenants(&cmd)

case api.ApplyRequest_TYPE_DELETE_TENANT:
ret.Error = st.db.DeleteTenants(&cmd, schemaOnly)
ret.Error = st.db.DeleteTenants(&cmd)

case api.ApplyRequest_TYPE_STORE_SCHEMA_V1:
ret.Error = st.StoreSchemaV1()
Expand Down Expand Up @@ -874,11 +871,11 @@ func (st *Store) reloadDBFromSnapshot() bool {
// the snapshot already includes the state from the raft log
snapIndex := snapshotIndex(st.snapshotStore)
st.log.WithFields(logrus.Fields{
"last_applied_index": st.lastAppliedIndex.Load(),
"initial_last_applied_index": st.initialLastAppliedIndex,
"last_snapshot_index": snapIndex,
"last_applied_index": st.lastAppliedIndex.Load(),
"last_store_log_applied_index": st.lastAppliedIndexOnStart,
"last_snapshot_index": snapIndex,
}).Info("load local db from snapshot")
if st.initialLastAppliedIndex <= snapIndex {
if st.lastAppliedIndexOnStart <= snapIndex {
st.openDatabase(ctx)
return true
}
Expand All @@ -895,7 +892,7 @@ func (st *Store) reloadDBFromSnapshot() bool {
st.db.store.ReloadLocalDB(context.Background(), cs)

st.dbLoaded.Store(true)
st.initialLastAppliedIndex = 0
st.lastAppliedIndexOnStart = 0
return true
}

Expand Down

0 comments on commit 4ca4c0f

Please sign in to comment.