Skip to content

Commit

Permalink
Service: Select consensus on "init" (not on "daemon")
Browse files Browse the repository at this point in the history
Fixes #865.

This makes the necessary changes so that consensu is selected on "init" with a
flag set, by default, to "crdt". This generates only a "crdt" or a "raft"
section, not both.

If the configuration file has a "raft" section, "raft" will be used to start
the daemon. If it has a "crdt" section, "crdt" will be used. If it has none or
both sections, an error will happen.

This also affects "state *" commands, which will now autoselect how to work
from the existing configuration.
  • Loading branch information
hsanjuan committed Aug 9, 2019
1 parent 00e78a6 commit 063c5f1
Show file tree
Hide file tree
Showing 16 changed files with 132 additions and 100 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Expand Up @@ -57,4 +57,4 @@ VOLUME $IPFS_CLUSTER_PATH
ENTRYPOINT ["/sbin/tini", "--", "/usr/local/bin/entrypoint.sh"]

# Defaults for ipfs-cluster-service go here
CMD ["daemon", "--consensus raft"]
CMD ["daemon"]
2 changes: 1 addition & 1 deletion Dockerfile-test
Expand Up @@ -58,4 +58,4 @@ VOLUME $IPFS_CLUSTER_PATH
ENTRYPOINT ["/usr/local/bin/start-daemons.sh"]

# Defaults would go here
CMD ["daemon", "--upgrade"]
CMD ["daemon"]
36 changes: 14 additions & 22 deletions cmd/ipfs-cluster-service/daemon.go
Expand Up @@ -46,10 +46,6 @@ func parseBootstraps(flagVal []string) (bootstraps []ma.Multiaddr) {

// Runs the cluster peer
func daemon(c *cli.Context) error {
if c.String("consensus") == "" {
checkErr("starting daemon", errors.New("--consensus flag must be set to \"raft\" or \"crdt\""))
}

logger.Info("Initializing. For verbose output run with \"-l debug\". Please wait...")

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -78,7 +74,7 @@ func daemon(c *cli.Context) error {

// Cleanup state if bootstrapping
raftStaging := false
if len(bootstraps) > 0 && c.String("consensus") == "raft" {
if len(bootstraps) > 0 && cfgHelper.GetConsensus() == cfgs.Raft.ConfigKey() {
raft.CleanupRaft(cfgs.Raft)
raftStaging = true
}
Expand Down Expand Up @@ -159,14 +155,13 @@ func createCluster(
tracer, err := observations.SetupTracing(cfgs.Tracing)
checkErr("setting up Tracing", err)

store := setupDatastore(c.String("consensus"), cfgHelper.Identity(), cfgs)
store := setupDatastore(cfgHelper)

cons, err := setupConsensus(
c.String("consensus"),
cfgHelper,
host,
dht,
pubsub,
cfgs,
store,
raftStaging,
)
Expand All @@ -176,7 +171,7 @@ func createCluster(
}

var peersF func(context.Context) ([]peer.ID, error)
if c.String("consensus") == "raft" {
if cfgHelper.GetConsensus() == cfgs.Raft.ConfigKey() {
peersF = cons.Peers
}

Expand Down Expand Up @@ -293,45 +288,42 @@ func setupPinTracker(
}
}

func setupDatastore(
consensus string,
ident *config.Identity,
cfgs *cmdutils.Configs,
) ds.Datastore {
stmgr, err := cmdutils.NewStateManager(consensus, ident, cfgs)
func setupDatastore(cfgHelper *cmdutils.ConfigHelper) ds.Datastore {
stmgr, err := cmdutils.NewStateManager(cfgHelper.GetConsensus(), cfgHelper.Identity(), cfgHelper.Configs())
checkErr("creating state manager", err)
store, err := stmgr.GetStore()
checkErr("creating datastore", err)
return store
}

func setupConsensus(
name string,
cfgHelper *cmdutils.ConfigHelper,
h host.Host,
dht *dht.IpfsDHT,
pubsub *pubsub.PubSub,
cfgs *cmdutils.Configs,
store ds.Datastore,
raftStaging bool,
) (ipfscluster.Consensus, error) {
switch name {
case "raft":

cfgs := cfgHelper.Configs()
switch cfgHelper.GetConsensus() {
case cfgs.Raft.ConfigKey():
rft, err := raft.NewConsensus(
h,
cfgs.Raft,
cfgHelper.Configs().Raft,
store,
raftStaging,
)
if err != nil {
return nil, errors.Wrap(err, "creating Raft component")
}
return rft, nil
case "crdt":
case cfgs.Crdt.ConfigKey():
convrdt, err := crdt.New(
h,
dht,
pubsub,
cfgs.Crdt,
cfgHelper.Configs().Crdt,
store,
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/ipfs-cluster-service/lock.go
Expand Up @@ -29,7 +29,7 @@ func (l *lock) lock() {
}

// we should have a config folder whenever we try to lock
cfgHelper := cmdutils.NewConfigHelper(configPath, identityPath)
cfgHelper := cmdutils.NewConfigHelper(configPath, identityPath, "")
cfgHelper.MakeConfigFolder()

// set the lock file within this function
Expand Down
66 changes: 33 additions & 33 deletions cmd/ipfs-cluster-service/main.go
Expand Up @@ -4,6 +4,7 @@ package main
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
Expand All @@ -30,6 +31,7 @@ const programName = `ipfs-cluster-service`
const (
defaultPinTracker = "map"
defaultLogLevel = "info"
defaultConsensus = "crdt"
)

const (
Expand Down Expand Up @@ -221,26 +223,30 @@ func main() {
This command will initialize a new %s configuration file and, if it
does already exist, generate a new %s for %s.
If the optional [source-url] is given, the generated configuration file
If the optional [source-url] is given, the generated configuration file
will refer to it. The source configuration will be fetched from its source
URL during the launch of the daemon. If not, a default standard configuration
file will be created.
In the latter case, a cluster secret will be generated as required by %s.
Alternatively, this secret can be manually provided with --custom-secret (in
which case it will be prompted), or by setting the CLUSTER_SECRET environment
variable.
In the latter case, a cluster secret will be generated as required
by %s. Alternatively, this secret can be manually
provided with --custom-secret (in which case it will be prompted), or
by setting the CLUSTER_SECRET environment variable.
The --consensus flag allows to select an alternative consensus components for
in the newly-generated configuration.
Note that the --force flag allows to overwrite an existing
configuration with default values. To generate a new identity, please
remove the %s file first and clean any Raft state.
By default, an empty peerstore file will be created too. Initial contents can
be provided with the -peers flag. In this case, the "trusted_peers" list in
the "crdt" configuration section and the "init_peerset" list in the "raft"
configuration section will be prefilled to the peer IDs in the given
multiaddresses.
be provided with the --peers flag. Depending on the chosen consensus, the
"trusted_peers" list in the "crdt" configuration section and the
"init_peerset" list in the "raft" configuration section will be prefilled to
the peer IDs in the given multiaddresses.
`,

DefaultConfigFile,
DefaultIdentityFile,
programName,
Expand All @@ -249,6 +255,11 @@ multiaddresses.
),
ArgsUsage: "[http-source-url]",
Flags: []cli.Flag{
cli.StringFlag{
Name: "consensus",
Usage: "select consensus component: 'crdt' or 'raft'",
Value: defaultConsensus,
},
cli.BoolFlag{
Name: "custom-secret, s",
Usage: "prompt for the cluster secret (when no source specified)",
Expand All @@ -263,7 +274,12 @@ multiaddresses.
},
},
Action: func(c *cli.Context) error {
cfgHelper := cmdutils.NewConfigHelper(configPath, identityPath)
consensus := c.String("consensus")
if consensus != "raft" && consensus != "crdt" {
checkErr("choosing consensus", errors.New("flag value must be set to 'raft' or 'crdt'"))
}

cfgHelper := cmdutils.NewConfigHelper(configPath, identityPath, consensus)
defer cfgHelper.Manager().Shutdown() // wait for saves

configExists := false
Expand Down Expand Up @@ -375,10 +391,6 @@ multiaddresses.
Usage: "remove peer from cluster on exit. Overrides \"leave_on_shutdown\"",
Hidden: true,
},
cli.StringFlag{
Name: "consensus",
Usage: "shared state management provider [raft,crdt]",
},
cli.StringFlag{
Name: "pintracker",
Value: defaultPinTracker,
Expand Down Expand Up @@ -418,16 +430,12 @@ By default, the state will be printed to stdout.
Value: "",
Usage: "writes to an output file",
},
cli.StringFlag{
Name: "consensus",
Usage: "consensus component to export data from [raft, crdt]",
},
},
Action: func(c *cli.Context) error {
locker.lock()
defer locker.tryUnlock()

mgr := getStateManager(c.String("consensus"))
mgr := getStateManager()

var w io.WriteCloser
var err error
Expand Down Expand Up @@ -463,10 +471,6 @@ to import. If no argument is provided, stdin will be used.
Name: "force, f",
Usage: "skips confirmation prompt",
},
cli.StringFlag{
Name: "consensus",
Usage: "consensus component to export data from [raft, crdt]",
},
},
Action: func(c *cli.Context) error {
locker.lock()
Expand All @@ -478,7 +482,7 @@ to import. If no argument is provided, stdin will be used.
return nil
}

mgr := getStateManager(c.String("consensus"))
mgr := getStateManager()

// Get the importing file path
importFile := c.Args().First()
Expand Down Expand Up @@ -511,10 +515,6 @@ to all effects. Peers may need to bootstrap and sync from scratch after this.
Name: "force, f",
Usage: "skip confirmation prompt",
},
cli.StringFlag{
Name: "consensus",
Usage: "consensus component to export data from [raft, crdt]",
},
},
Action: func(c *cli.Context) error {
locker.lock()
Expand All @@ -528,7 +528,7 @@ to all effects. Peers may need to bootstrap and sync from scratch after this.
return nil
}

mgr := getStateManager(c.String("consensus"))
mgr := getStateManager()
checkErr("cleaning state", mgr.Clean())
logger.Info("data correctly cleaned up")
return nil
Expand Down Expand Up @@ -608,21 +608,21 @@ func yesNoPrompt(prompt string) bool {

func loadConfigHelper() *cmdutils.ConfigHelper {
// Load all the configurations and identity
cfgHelper := cmdutils.NewConfigHelper(configPath, identityPath)
cfgHelper := cmdutils.NewConfigHelper(configPath, identityPath, "")
err := cfgHelper.LoadFromDisk()
checkErr("loading identity or configurations", err)
return cfgHelper
}

func getStateManager(consensus string) cmdutils.StateManager {
func getStateManager() cmdutils.StateManager {
cfgHelper := loadConfigHelper()
// since we won't save configs we can shutdown
cfgHelper.Manager().Shutdown()
mgr, err := cmdutils.NewStateManager(
consensus,
cfgHelper.GetConsensus(),
cfgHelper.Identity(),
cfgHelper.Configs(),
)
checkErr("creating state manager,", err)
checkErr("creating state manager", err)
return mgr
}
39 changes: 36 additions & 3 deletions cmdutils/configs.go
Expand Up @@ -50,14 +50,16 @@ type ConfigHelper struct {

configPath string
identityPath string
consensus string
}

// NewConfigHelper creates a config helper given the paths to the
// configuration and identity files.
func NewConfigHelper(configPath, identityPath string) *ConfigHelper {
func NewConfigHelper(configPath, identityPath, consensus string) *ConfigHelper {
ch := &ConfigHelper{
configPath: configPath,
identityPath: identityPath,
consensus: consensus,
}
ch.init()
return ch
Expand Down Expand Up @@ -138,6 +140,29 @@ func (ch *ConfigHelper) Configs() *Configs {
return ch.configs
}

// GetConsensus attempts to return the configured consensus.
// If the ConfigHelper was initialized with a consensus string
// then it returns that.
//
// Otherwise it checks whether one of the consensus configurations
// has been loaded. If both or non have been loaded, it returns
// an empty string.
func (ch *ConfigHelper) GetConsensus() string {
if ch.consensus != "" {
return ch.consensus
}
crdtLoaded := ch.manager.IsLoadedFromJSON(config.Consensus, ch.configs.Crdt.ConfigKey())
raftLoaded := ch.manager.IsLoadedFromJSON(config.Consensus, ch.configs.Raft.ConfigKey())
if crdtLoaded == raftLoaded { //both loaded or none
return ""
}

if crdtLoaded {
return ch.configs.Crdt.ConfigKey()
}
return ch.configs.Raft.ConfigKey()
}

// register all current cluster components
func (ch *ConfigHelper) init() {
man := config.NewManager()
Expand All @@ -160,8 +185,6 @@ func (ch *ConfigHelper) init() {
man.RegisterComponent(config.API, cfgs.Restapi)
man.RegisterComponent(config.API, cfgs.Ipfsproxy)
man.RegisterComponent(config.IPFSConn, cfgs.Ipfshttp)
man.RegisterComponent(config.Consensus, cfgs.Raft)
man.RegisterComponent(config.Consensus, cfgs.Crdt)
man.RegisterComponent(config.PinTracker, cfgs.Maptracker)
man.RegisterComponent(config.PinTracker, cfgs.Statelesstracker)
man.RegisterComponent(config.Monitor, cfgs.Pubsubmon)
Expand All @@ -170,6 +193,16 @@ func (ch *ConfigHelper) init() {
man.RegisterComponent(config.Observations, cfgs.Tracing)
man.RegisterComponent(config.Datastore, cfgs.Badger)

switch ch.consensus {
case cfgs.Raft.ConfigKey():
man.RegisterComponent(config.Consensus, cfgs.Raft)
case cfgs.Crdt.ConfigKey():
man.RegisterComponent(config.Consensus, cfgs.Crdt)
default:
man.RegisterComponent(config.Consensus, cfgs.Raft)
man.RegisterComponent(config.Consensus, cfgs.Crdt)
}

ch.identity = &config.Identity{}
ch.manager = man
ch.configs = cfgs
Expand Down
6 changes: 3 additions & 3 deletions cmdutils/state.go
Expand Up @@ -33,12 +33,12 @@ type StateManager interface {
// consensus ("raft" or "crdt"). It will need initialized configs.
func NewStateManager(consensus string, ident *config.Identity, cfgs *Configs) (StateManager, error) {
switch consensus {
case "raft":
case cfgs.Raft.ConfigKey():
return &raftStateManager{ident, cfgs}, nil
case "crdt":
case cfgs.Crdt.ConfigKey():
return &crdtStateManager{ident, cfgs}, nil
case "":
return nil, errors.New("unspecified consensus component")
return nil, errors.New("could not determine the consensus component")
default:
return nil, fmt.Errorf("unknown consensus component '%s'", consensus)
}
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Expand Up @@ -423,7 +423,7 @@ func (cfg *Manager) LoadJSON(bs []byte) error {
logger.Debugf("%s component configuration loaded", name)
} else {
cfg.undefinedComps[t][name] = true
logger.Warningf("%s component is empty, generating default", name)
logger.Debugf("%s component is empty, generating default", name)
component.Default()
}

Expand Down

0 comments on commit 063c5f1

Please sign in to comment.