Skip to content

Commit

Permalink
Specialize voter nodes for metadata storage
Browse files Browse the repository at this point in the history
 Introduce cluster-wide flag to exclude shards/tenants on voters
  • Loading branch information
redouan-rhazouani committed Apr 21, 2024
1 parent 3d373b6 commit 49b4594
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 63 deletions.
17 changes: 16 additions & 1 deletion adapters/handlers/rest/configure_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ func MakeAppState(ctx context.Context, options *swag.CommandLineOptionsGroup) *s
SnapshotInterval: appState.ServerConfig.Config.Raft.SnapshotInterval,
SnapshotThreshold: appState.ServerConfig.Config.Raft.SnapshotThreshold,
UpdateWaitTimeout: time.Second * 10, // TODO-RAFT read from the flag
MetadataOnlyVoters: appState.ServerConfig.Config.Raft.MetadataOnlyVoters,
DB: nil,
Parser: schema.NewParser(appState.Cluster, vectorIndex.ParseAndValidateConfig, migrator),
AddrResolver: appState.Cluster,
Expand Down Expand Up @@ -441,6 +442,16 @@ func parseNode2Port(appState *state.State) (m map[string]int, err error) {
return m, nil
}

// parseVotersNames parses names of all voters.
// If we reach this point, we assume that the configuration is valid
func parseVotersNames(cfg config.Raft) (m map[string]struct{}) {
m = make(map[string]struct{}, cfg.BootstrapExpect)
for _, raftNamePort := range cfg.Join[:cfg.BootstrapExpect] {
m[strings.Split(raftNamePort, ":")[0]] = struct{}{}
}
return m
}

func configureAPI(api *operations.WeaviateAPI) http.Handler {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 60*time.Minute)
Expand Down Expand Up @@ -599,7 +610,11 @@ func startupRoutine(ctx context.Context, options *swag.CommandLineOptionsGroup)
logger.WithField("action", "startup").WithField("startup_time_left", timeTillDeadline(ctx)).
Debug("initialized schema")

clusterState, err := cluster.Init(serverConfig.Config.Cluster, dataPath, logger)
var nonStorageNodes map[string]struct{}
if cfg := serverConfig.Config.Raft; cfg.MetadataOnlyVoters {
nonStorageNodes = parseVotersNames(cfg)
}
clusterState, err := cluster.Init(serverConfig.Config.Cluster, dataPath, nonStorageNodes, logger)
if err != nil {
logger.WithField("action", "startup").WithError(err).
Error("could not init cluster state")
Expand Down
80 changes: 43 additions & 37 deletions cluster/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ type Config struct {
UpdateWaitTimeout time.Duration
SnapshotThreshold uint64

// MetadataOnlyVoters configures the voters to store metadata exclusively, without storing any other data
MetadataOnlyVoters bool

DB Indexer
Parser Parser
AddrResolver addressResolver
Expand All @@ -129,16 +132,16 @@ type Config struct {
}

type Store struct {
raft *raft.Raft
open atomic.Bool
raftDir string
raftPort int
voter bool
bootstrapExpect int
recoveryTimeout time.Duration
heartbeatTimeout time.Duration
electionTimeout time.Duration
snapshotInterval time.Duration
raft *raft.Raft
open atomic.Bool
raftDir string
raftPort int
voter bool
bootstrapExpect int
recoveryTimeout time.Duration
heartbeatTimeout time.Duration
electionTimeout time.Duration
metadataOnlyVoters bool

// applyTimeout timeout limit the amount of time raft waits for a command to be started
applyTimeout time.Duration
Expand All @@ -149,19 +152,21 @@ type Store struct {
// UpdateWaitTimeout Timeout duration for waiting for the update to be propagated to this follower node.
updateWaitTimeout time.Duration

snapshotThreshold uint64

nodeID string
host string
db *localDB
log *slog.Logger
logLevel string

bootstrapped atomic.Bool
logStore *raftbolt.BoltStore
addResolver *addrResolver
transport *raft.NetworkTransport
snapshotStore *raft.FileSnapshotStore
// snapshot
snapshotStore *raft.FileSnapshotStore
snapshotInterval time.Duration
snapshotThreshold uint64

bootstrapped atomic.Bool
logStore *raftbolt.BoltStore
addResolver *addrResolver
transport *raft.NetworkTransport

mutex sync.Mutex
candidates map[string]string
Expand All @@ -184,25 +189,26 @@ type Store struct {

func New(cfg Config) Store {
return Store{
raftDir: cfg.WorkDir,
raftPort: cfg.RaftPort,
voter: cfg.Voter,
bootstrapExpect: cfg.BootstrapExpect,
candidates: make(map[string]string, cfg.BootstrapExpect),
recoveryTimeout: cfg.RecoveryTimeout,
heartbeatTimeout: cfg.HeartbeatTimeout,
electionTimeout: cfg.ElectionTimeout,
snapshotInterval: cfg.SnapshotInterval,
snapshotThreshold: cfg.SnapshotThreshold,
migrationTimeout: cfg.BootstrapTimeout,
updateWaitTimeout: cfg.UpdateWaitTimeout,
applyTimeout: time.Second * 20,
nodeID: cfg.NodeID,
host: cfg.Host,
addResolver: newAddrResolver(&cfg),
db: &localDB{NewSchema(cfg.NodeID, cfg.DB), cfg.DB, cfg.Parser, cfg.Logger},
log: cfg.Logger,
logLevel: cfg.LogLevel,
raftDir: cfg.WorkDir,
raftPort: cfg.RaftPort,
voter: cfg.Voter,
bootstrapExpect: cfg.BootstrapExpect,
candidates: make(map[string]string, cfg.BootstrapExpect),
recoveryTimeout: cfg.RecoveryTimeout,
heartbeatTimeout: cfg.HeartbeatTimeout,
electionTimeout: cfg.ElectionTimeout,
metadataOnlyVoters: cfg.MetadataOnlyVoters,
snapshotInterval: cfg.SnapshotInterval,
snapshotThreshold: cfg.SnapshotThreshold,
migrationTimeout: cfg.BootstrapTimeout,
updateWaitTimeout: cfg.UpdateWaitTimeout,
applyTimeout: time.Second * 20,
nodeID: cfg.NodeID,
host: cfg.Host,
addResolver: newAddrResolver(&cfg),
db: &localDB{NewSchema(cfg.NodeID, cfg.DB), cfg.DB, cfg.Parser, cfg.Logger},
log: cfg.Logger,
logLevel: cfg.LogLevel,

// loadLegacySchema is responsible for loading old schema from boltDB
loadLegacySchema: cfg.LoadLegacySchema,
Expand Down Expand Up @@ -236,7 +242,7 @@ func (st *Store) Open(ctx context.Context) (err error) {
st.loadDatabase(ctx)
}

st.log.Info("construct a new raft node")
st.log.Info("construct a new raft node", "metadata_only_voter", st.metadataOnlyVoters)
st.raft, err = raft.NewRaft(st.raftConfig(), st, logCache, st.logStore, st.snapshotStore, st.transport)
if err != nil {
return fmt.Errorf("raft.NewRaft %v %w", st.transport.LocalAddr(), err)
Expand Down
33 changes: 27 additions & 6 deletions usecases/cluster/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
type State struct {
config Config
// that lock to serialize access to memberlist
listLock sync.RWMutex
list *memberlist.Memberlist
delegate delegate
listLock sync.RWMutex
list *memberlist.Memberlist
nonStorageNodes map[string]struct{}
delegate delegate
}

type Config struct {
Expand Down Expand Up @@ -57,14 +58,15 @@ func (ba BasicAuth) Enabled() bool {
return ba.Username != "" || ba.Password != ""
}

func Init(userConfig Config, dataPath string, logger logrus.FieldLogger) (_ *State, err error) {
func Init(userConfig Config, dataPath string, nonStorageNodes map[string]struct{}, logger logrus.FieldLogger) (_ *State, err error) {
cfg := memberlist.DefaultLANConfig()
cfg.LogOutput = newLogParser(logger)
if userConfig.Hostname != "" {
cfg.Name = userConfig.Hostname
}
state := State{
config: userConfig,
config: userConfig,
nonStorageNodes: nonStorageNodes,
delegate: delegate{
Name: cfg.Name,
dataPath: dataPath,
Expand Down Expand Up @@ -173,10 +175,29 @@ func (s *State) AllNames() []string {
return out
}

// StorageNodes returns all nodes except non storage nodes
func (s *State) StorageNodes() []string {
if len(s.nonStorageNodes) == 0 {
return s.AllNames()
}
members := s.list.Members()
out := make([]string, len(members))
n := 0
for _, m := range members {
name := m.Name
if _, ok := s.nonStorageNodes[name]; !ok {
out[n] = m.Name
n++
}
}

return out[:n]
}

// Candidates returns list of nodes (names) sorted by the
// free amount of disk space in descending order
func (s *State) Candidates() []string {
return s.delegate.sortCandidates(s.AllNames())
return s.delegate.sortCandidates(s.StorageNodes())
}

// All node names (not their hostnames!) for live members, including self.
Expand Down
34 changes: 19 additions & 15 deletions usecases/config/config_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,16 @@ const (
type Flags struct {
ConfigFile string `long:"config-file" description:"path to config file (default: ./weaviate.conf.json)"`

RaftPort int `long:"raft-port" description:"the port used by Raft for inter-node communication"`
RaftInternalRPCPort int `long:"raft-internal-rpc-port" description:"the port used for internal RPCs within the cluster"`
RaftJoin []string `long:"raft-join" description:"a comma-separated list of server addresses to join on startup. Each element needs to be in the form NODE_NAME[:NODE_PORT]. If NODE_PORT is not present, raft-internal-rpc-port default value will be used instead"`
RaftBootstrapTimeout int `long:"raft-bootstrap-timeout" description:"the duration for which the raft bootstrap procedure will wait for each node in raft-join to be reachable"`
RaftBootstrapExpect int `long:"raft-bootstrap-expect" description:"specifies the number of server nodes to wait for before bootstrapping the cluster"`
RaftHeartbeatTimeout int `long:"raft-heartbeat-timeout" description:"raft heartbeat timeout"`
RaftElectionTimeout int `long:"raft-election-timeout" description:"raft election timeout"`
RaftSnapshotThreshold int `long:"raft-snap-threshold" description:"number of outstanding log entries before performing a snapshot"`
RaftSnapshotInterval int `long:"raft-snap-interval" description:"controls how often raft checks if it should perform a snapshot"`
RaftPort int `long:"raft-port" description:"the port used by Raft for inter-node communication"`
RaftInternalRPCPort int `long:"raft-internal-rpc-port" description:"the port used for internal RPCs within the cluster"`
RaftJoin []string `long:"raft-join" description:"a comma-separated list of server addresses to join on startup. Each element needs to be in the form NODE_NAME[:NODE_PORT]. If NODE_PORT is not present, raft-internal-rpc-port default value will be used instead"`
RaftBootstrapTimeout int `long:"raft-bootstrap-timeout" description:"the duration for which the raft bootstrap procedure will wait for each node in raft-join to be reachable"`
RaftBootstrapExpect int `long:"raft-bootstrap-expect" description:"specifies the number of server nodes to wait for before bootstrapping the cluster"`
RaftHeartbeatTimeout int `long:"raft-heartbeat-timeout" description:"raft heartbeat timeout"`
RaftElectionTimeout int `long:"raft-election-timeout" description:"raft election timeout"`
RaftSnapshotThreshold int `long:"raft-snap-threshold" description:"number of outstanding log entries before performing a snapshot"`
RaftSnapshotInterval int `long:"raft-snap-interval" description:"controls how often raft checks if it should perform a snapshot"`
RaftMetadataOnlyVoters bool `long:"raft-metadata-only-voters" description:"configures the voters to store metadata exclusively, without storing any other data"`
}

// Config outline of the config file
Expand Down Expand Up @@ -298,8 +299,9 @@ type Raft struct {
ElectionTimeout time.Duration
SnapshotInterval time.Duration

BootstrapTimeout time.Duration
BootstrapExpect int
BootstrapTimeout time.Duration
BootstrapExpect int
MetadataOnlyVoters bool
}

func (r *Raft) Validate() error {
Expand Down Expand Up @@ -343,10 +345,9 @@ func (r *Raft) Validate() error {
if r.BootstrapExpect == 0 {
return fmt.Errorf("raft.bootstrap_expect must be greater than 0")
}
// TODO-RAFT: Currently to simplify bootstrapping we expect to bootstrap all the nodes in the join list together. We keep the expect
// paramater so that in the future we can change the behaviour if we want (bootstrap once X nodes among Y are online).
if r.BootstrapExpect != len(r.Join) {
return fmt.Errorf("raft.bootstrap.expect must be equal to the length of raft.join")

if r.BootstrapExpect > len(r.Join) {
return fmt.Errorf("raft.bootstrap.expect must be less than or equal to the length of raft.join")
}
return nil
}
Expand Down Expand Up @@ -488,6 +489,9 @@ func (f *WeaviateConfig) fromFlags(flags *Flags) {
if flags.RaftSnapshotThreshold > 0 {
f.Config.Raft.SnapshotThreshold = uint64(flags.RaftSnapshotThreshold)
}
if flags.RaftMetadataOnlyVoters {
f.Config.Raft.MetadataOnlyVoters = true
}
}

func configErr(err error) error {
Expand Down
5 changes: 4 additions & 1 deletion usecases/config/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,10 @@ func FromEnv(config *Config) error {

func parseRAFTConfig(hostname string) (Raft, error) {
// flag.IntVar()
cfg := Raft{}
cfg := Raft{
MetadataOnlyVoters: configbase.Enabled(os.Getenv("RAFT_METADATA_ONLY_VOTERS")),
}

if err := parsePositiveInt(
"RAFT_PORT",
func(val int) { cfg.Port = val },
Expand Down
6 changes: 3 additions & 3 deletions usecases/sharding/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (p *Physical) AdjustReplicas(count int, nodes nodes) error {

names := nodes.Candidates()
if count > len(names) {
return fmt.Errorf("not enough replicas: found %d want %d", len(names), count)
return fmt.Errorf("not enough storage replicas: found %d want %d", len(names), count)
}

// make sure included nodes are unique
Expand Down Expand Up @@ -141,7 +141,7 @@ func InitState(id string, config config.Config, nodes nodes, replFactor int64, p

names := nodes.Candidates()
if f, n := replFactor, len(names); f > int64(n) {
return nil, fmt.Errorf("not enough replicas: found %d want %d", n, f)
return nil, fmt.Errorf("not enough storage replicas: found %d want %d", n, f)
}

if err := out.initPhysical(names, replFactor); err != nil {
Expand Down Expand Up @@ -295,7 +295,7 @@ func (s *State) initPhysical(nodes []string, replFactor int64) error {
func (s *State) GetPartitions(lookUp nodes, shards []string, replFactor int64) (map[string][]string, error) {
nodes := lookUp.Candidates()
if len(nodes) == 0 {
return nil, fmt.Errorf("list of node candidates is empty")
return nil, fmt.Errorf("list of storage nodes is empty")
}
if f, n := replFactor, len(nodes); f > int64(n) {
return nil, fmt.Errorf("not enough replicas: found %d want %d", n, f)
Expand Down

0 comments on commit 49b4594

Please sign in to comment.