diff --git a/adapters/repos/db/nodes.go b/adapters/repos/db/nodes.go index 873e15dc98..726f7d3c0a 100644 --- a/adapters/repos/db/nodes.go +++ b/adapters/repos/db/nodes.go @@ -291,7 +291,7 @@ func (db *DB) localNodeStatistics() (*models.Statistics, error) { IsVoter: stats["is_voter"].(bool), Open: stats["open"].(bool), Bootstrapped: stats["bootstrapped"].(bool), - InitialLastAppliedIndex: stats["initial_last_applied_index"].(uint64), + InitialLastAppliedIndex: stats["last_store_log_applied_index"].(uint64), DbLoaded: stats["db_loaded"].(bool), Candidates: stats["candidates"], Raft: raft, diff --git a/cluster/store/db.go b/cluster/store/db.go index a198e38402..9163eaf1ab 100644 --- a/cluster/store/db.go +++ b/cluster/store/db.go @@ -40,7 +40,7 @@ func (db *localDB) SetIndexer(idx Indexer) { db.Schema.shardReader = idx } -func (db *localDB) AddClass(cmd *command.ApplyRequest, nodeID string, schemaOnly bool) error { +func (db *localDB) AddClass(cmd *command.ApplyRequest, nodeID string) error { req := command.AddClassRequest{} if err := json.Unmarshal(cmd.SubCommand, &req); err != nil { return fmt.Errorf("%w: %w", errBadRequest, err) @@ -57,13 +57,12 @@ func (db *localDB) AddClass(cmd *command.ApplyRequest, nodeID string, schemaOnly op: cmd.GetType().String(), updateSchema: func() error { return db.Schema.addClass(req.Class, req.State, cmd.Version) }, updateStore: func() error { return db.store.AddClass(req) }, - schemaOnly: schemaOnly, triggerSchemaCallback: true, }, ) } -func (db *localDB) RestoreClass(cmd *command.ApplyRequest, nodeID string, schemaOnly bool) error { +func (db *localDB) RestoreClass(cmd *command.ApplyRequest, nodeID string) error { req := command.AddClassRequest{} if err := json.Unmarshal(cmd.SubCommand, &req); err != nil { return fmt.Errorf("%w: %w", errBadRequest, err) @@ -87,7 +86,6 @@ func (db *localDB) RestoreClass(cmd *command.ApplyRequest, nodeID string, schema op: cmd.GetType().String(), updateSchema: func() error { return db.Schema.addClass(req.Class, req.State, cmd.Version) }, updateStore: func() error { return db.store.AddClass(req) }, - schemaOnly: schemaOnly, triggerSchemaCallback: true, }, ) @@ -95,7 +93,7 @@ func (db *localDB) RestoreClass(cmd *command.ApplyRequest, nodeID string, schema // UpdateClass modifies the vectors and inverted indexes associated with a class // Other class properties are handled by separate functions -func (db *localDB) UpdateClass(cmd *command.ApplyRequest, nodeID string, schemaOnly bool) error { +func (db *localDB) UpdateClass(cmd *command.ApplyRequest, nodeID string) error { req := command.UpdateClassRequest{} if err := json.Unmarshal(cmd.SubCommand, &req); err != nil { return fmt.Errorf("%w: %w", errBadRequest, err) @@ -130,25 +128,23 @@ func (db *localDB) UpdateClass(cmd *command.ApplyRequest, nodeID string, schemaO op: cmd.GetType().String(), updateSchema: func() error { return db.Schema.updateClass(req.Class.Class, update) }, updateStore: func() error { return db.store.UpdateClass(req) }, - schemaOnly: schemaOnly, triggerSchemaCallback: true, }, ) } -func (db *localDB) DeleteClass(cmd *command.ApplyRequest, schemaOnly bool) error { +func (db *localDB) DeleteClass(cmd *command.ApplyRequest) error { return db.apply( applyOp{ op: cmd.GetType().String(), updateSchema: func() error { db.Schema.deleteClass(cmd.Class); return nil }, updateStore: func() error { return db.store.DeleteClass(cmd.Class) }, - schemaOnly: schemaOnly, triggerSchemaCallback: true, }, ) } -func (db *localDB) AddProperty(cmd *command.ApplyRequest, schemaOnly bool) error { +func (db *localDB) AddProperty(cmd *command.ApplyRequest) error { req := command.AddPropertyRequest{} if err := json.Unmarshal(cmd.SubCommand, &req); err != nil { return fmt.Errorf("%w: %w", errBadRequest, err) @@ -162,13 +158,12 @@ func (db *localDB) AddProperty(cmd *command.ApplyRequest, schemaOnly bool) error op: cmd.GetType().String(), updateSchema: func() error { return db.Schema.addProperty(cmd.Class, cmd.Version, req.Properties...) }, updateStore: func() error { return db.store.AddProperty(cmd.Class, req) }, - schemaOnly: schemaOnly, triggerSchemaCallback: true, }, ) } -func (db *localDB) UpdateShardStatus(cmd *command.ApplyRequest, schemaOnly bool) error { +func (db *localDB) UpdateShardStatus(cmd *command.ApplyRequest) error { req := command.UpdateShardStatusRequest{} if err := json.Unmarshal(cmd.SubCommand, &req); err != nil { return fmt.Errorf("%w: %w", errBadRequest, err) @@ -179,12 +174,11 @@ func (db *localDB) UpdateShardStatus(cmd *command.ApplyRequest, schemaOnly bool) op: cmd.GetType().String(), updateSchema: func() error { return nil }, updateStore: func() error { return db.store.UpdateShardStatus(&req) }, - schemaOnly: schemaOnly, }, ) } -func (db *localDB) AddTenants(cmd *command.ApplyRequest, schemaOnly bool) error { +func (db *localDB) AddTenants(cmd *command.ApplyRequest) error { req := &command.AddTenantsRequest{} if err := gproto.Unmarshal(cmd.SubCommand, req); err != nil { return fmt.Errorf("%w: %w", errBadRequest, err) @@ -195,12 +189,11 @@ func (db *localDB) AddTenants(cmd *command.ApplyRequest, schemaOnly bool) error op: cmd.GetType().String(), updateSchema: func() error { return db.Schema.addTenants(cmd.Class, cmd.Version, req) }, updateStore: func() error { return db.store.AddTenants(cmd.Class, req) }, - schemaOnly: schemaOnly, }, ) } -func (db *localDB) UpdateTenants(cmd *command.ApplyRequest, schemaOnly bool) (n int, err error) { +func (db *localDB) UpdateTenants(cmd *command.ApplyRequest) (n int, err error) { req := &command.UpdateTenantsRequest{} if err := gproto.Unmarshal(cmd.SubCommand, req); err != nil { return 0, fmt.Errorf("%w: %w", errBadRequest, err) @@ -211,12 +204,11 @@ func (db *localDB) UpdateTenants(cmd *command.ApplyRequest, schemaOnly bool) (n op: cmd.GetType().String(), updateSchema: func() error { n, err = db.Schema.updateTenants(cmd.Class, cmd.Version, req); return err }, updateStore: func() error { return db.store.UpdateTenants(cmd.Class, req) }, - schemaOnly: schemaOnly, }, ) } -func (db *localDB) DeleteTenants(cmd *command.ApplyRequest, schemaOnly bool) error { +func (db *localDB) DeleteTenants(cmd *command.ApplyRequest) error { req := &command.DeleteTenantsRequest{} if err := gproto.Unmarshal(cmd.SubCommand, req); err != nil { return fmt.Errorf("%w: %w", errBadRequest, err) @@ -227,7 +219,6 @@ func (db *localDB) DeleteTenants(cmd *command.ApplyRequest, schemaOnly bool) err op: cmd.GetType().String(), updateSchema: func() error { return db.Schema.deleteTenants(cmd.Class, cmd.Version, req) }, updateStore: func() error { return db.store.DeleteTenants(cmd.Class, req) }, - schemaOnly: schemaOnly, }, ) } @@ -247,7 +238,6 @@ type applyOp struct { op string updateSchema func() error updateStore func() error - schemaOnly bool triggerSchemaCallback bool } diff --git a/cluster/store/store.go b/cluster/store/store.go index 2e641b73ff..fb8b69a4da 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -173,10 +173,8 @@ type Store struct { mutex sync.Mutex candidates map[string]string - // initialLastAppliedIndex represents the index of the last applied command when the store is opened. - initialLastAppliedIndex uint64 - - // lastIndex atomic.Uint64 + // lastAppliedIndexOnStart represents the index of the last applied command when the store is opened. + lastAppliedIndexOnStart uint64 // lastAppliedIndex index of latest update to the store lastAppliedIndex atomic.Uint64 @@ -239,7 +237,7 @@ func (st *Store) Open(ctx context.Context) (err error) { } rLog := rLog{st.logStore} - st.initialLastAppliedIndex, err = rLog.LastAppliedCommand() + st.lastAppliedIndexOnStart, err = rLog.LastAppliedCommand() if err != nil { return fmt.Errorf("read log last command: %w", err) } @@ -252,7 +250,7 @@ func (st *Store) Open(ctx context.Context) (err error) { if err != nil { return fmt.Errorf("raft.NewRaft %v %w", st.transport.LocalAddr(), err) } - if st.initialLastAppliedIndex <= st.raft.LastIndex() { + if st.lastAppliedIndexOnStart <= st.raft.LastIndex() { // this should include empty and non empty node st.openDatabase(ctx) } @@ -260,11 +258,11 @@ func (st *Store) Open(ctx context.Context) (err error) { st.lastAppliedIndex.Store(st.raft.AppliedIndex()) st.log.WithFields(logrus.Fields{ - "raft_applied_index": st.raft.AppliedIndex(), - "raft_last_index": st.raft.LastIndex(), - "initial_store_log_applied_index": st.initialLastAppliedIndex, - "last_store_applied_index": st.lastAppliedIndex.Load(), - "last_snapshot_index": snapshotIndex(st.snapshotStore), + "raft_applied_index": st.raft.AppliedIndex(), + "raft_last_index": st.raft.LastIndex(), + "last_store_log_applied_index": st.lastAppliedIndexOnStart, + "last_store_applied_index": st.lastAppliedIndex.Load(), + "last_snapshot_index": snapshotIndex(st.snapshotStore), }).Info("raft node constructed") // There's no hard limit on the migration, so it should take as long as necessary. @@ -516,8 +514,8 @@ func (f *Store) FindSimilarClass(name string) string { // The value of "candidates" is a map[string]string of the current candidates IDs/addresses, // see Store.candidates. // -// The value of "initial_last_applied_index" is the index of the last applied command found when -// the store was opened, see Store.initialLastAppliedIndex. +// The value of "last_store_log_applied_index" is the index of the last applied command found when +// the store was opened, see Store.lastAppliedIndexOnStart. // // The value of "last_applied_index" is the index of the latest update to the store, // see Store.lastAppliedIndex. @@ -540,7 +538,7 @@ func (st *Store) Stats() map[string]any { stats["open"] = st.open.Load() stats["bootstrapped"] = st.bootstrapped.Load() stats["candidates"] = st.candidates - stats["initial_last_applied_index"] = st.initialLastAppliedIndex + stats["last_store_log_applied_index"] = st.lastAppliedIndexOnStart stats["last_applied_index"] = st.lastAppliedIndex.Load() stats["db_loaded"] = st.dbLoaded.Load() @@ -639,7 +637,6 @@ func (st *Store) Apply(l *raft.Log) interface{} { panic("error proto un-marshalling log data") } - schemaOnly := l.Index <= st.initialLastAppliedIndex defer func() { st.lastAppliedIndex.Store(l.Index) if ret.Error != nil { @@ -654,31 +651,31 @@ func (st *Store) Apply(l *raft.Log) interface{} { switch cmd.Type { case api.ApplyRequest_TYPE_ADD_CLASS: - ret.Error = st.db.AddClass(&cmd, st.nodeID, schemaOnly) + ret.Error = st.db.AddClass(&cmd, st.nodeID) case api.ApplyRequest_TYPE_RESTORE_CLASS: - ret.Error = st.db.RestoreClass(&cmd, st.nodeID, schemaOnly) + ret.Error = st.db.RestoreClass(&cmd, st.nodeID) case api.ApplyRequest_TYPE_UPDATE_CLASS: - ret.Error = st.db.UpdateClass(&cmd, st.nodeID, schemaOnly) + ret.Error = st.db.UpdateClass(&cmd, st.nodeID) case api.ApplyRequest_TYPE_DELETE_CLASS: - ret.Error = st.db.DeleteClass(&cmd, schemaOnly) + ret.Error = st.db.DeleteClass(&cmd) case api.ApplyRequest_TYPE_ADD_PROPERTY: - ret.Error = st.db.AddProperty(&cmd, schemaOnly) + ret.Error = st.db.AddProperty(&cmd) case api.ApplyRequest_TYPE_UPDATE_SHARD_STATUS: - ret.Error = st.db.UpdateShardStatus(&cmd, schemaOnly) + ret.Error = st.db.UpdateShardStatus(&cmd) case api.ApplyRequest_TYPE_ADD_TENANT: - ret.Error = st.db.AddTenants(&cmd, schemaOnly) + ret.Error = st.db.AddTenants(&cmd) case api.ApplyRequest_TYPE_UPDATE_TENANT: - ret.Data, ret.Error = st.db.UpdateTenants(&cmd, schemaOnly) + ret.Data, ret.Error = st.db.UpdateTenants(&cmd) case api.ApplyRequest_TYPE_DELETE_TENANT: - ret.Error = st.db.DeleteTenants(&cmd, schemaOnly) + ret.Error = st.db.DeleteTenants(&cmd) case api.ApplyRequest_TYPE_STORE_SCHEMA_V1: ret.Error = st.StoreSchemaV1() @@ -874,11 +871,11 @@ func (st *Store) reloadDBFromSnapshot() bool { // the snapshot already includes the state from the raft log snapIndex := snapshotIndex(st.snapshotStore) st.log.WithFields(logrus.Fields{ - "last_applied_index": st.lastAppliedIndex.Load(), - "initial_last_applied_index": st.initialLastAppliedIndex, - "last_snapshot_index": snapIndex, + "last_applied_index": st.lastAppliedIndex.Load(), + "last_store_log_applied_index": st.lastAppliedIndexOnStart, + "last_snapshot_index": snapIndex, }).Info("load local db from snapshot") - if st.initialLastAppliedIndex <= snapIndex { + if st.lastAppliedIndexOnStart <= snapIndex { st.openDatabase(ctx) return true } @@ -895,7 +892,7 @@ func (st *Store) reloadDBFromSnapshot() bool { st.db.store.ReloadLocalDB(context.Background(), cs) st.dbLoaded.Store(true) - st.initialLastAppliedIndex = 0 + st.lastAppliedIndexOnStart = 0 return true }