Skip to content

Commit

Permalink
cmd/svc: force quit ipfs-cluster-service on 2nd ctrl-c
Browse files Browse the repository at this point in the history
Refactor daemon() to reduce code complexity.

Refactor configuration in ipfs-cluster-service.

License: MIT
Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
  • Loading branch information
lanzafame committed Mar 29, 2018
1 parent f8acd4f commit 31bce16
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 68 deletions.
157 changes: 100 additions & 57 deletions ipfs-cluster-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,11 @@ func out(m string, a ...interface{}) {
fmt.Fprintf(os.Stderr, m, a...)
}

func checkErr(doing string, err error) {
func checkErr(doing string, err error, args ...interface{}) {
if err != nil {
if len(args) > 0 {
doing = fmt.Sprintf(doing, args)
}
out("error %s: %s\n", doing, err)
err = locker.tryUnlock()
if err != nil {
Expand Down Expand Up @@ -223,20 +226,20 @@ configuration.
},
Action: func(c *cli.Context) error {
userSecret, userSecretDefined := userProvidedSecret(c.Bool("custom-secret"))
cfg, clustercfg, _, _, _, _, _, _, _ := makeConfigs()
defer cfg.Shutdown() // wait for saves
cfgMgr, cfgs := makeConfigs()
defer cfgMgr.Shutdown() // wait for saves

// Generate defaults for all registered components
err := cfg.Default()
err := cfgMgr.Default()
checkErr("generating default configuration", err)

// Set user secret
if userSecretDefined {
clustercfg.Secret = userSecret
cfgs.clusterCfg.Secret = userSecret
}

// Save
saveConfig(cfg, c.GlobalBool("force"))
saveConfig(cfgMgr, c.GlobalBool("force"))
return nil
},
},
Expand Down Expand Up @@ -379,11 +382,11 @@ the mth data folder (m currently defaults to 5)
}
}

cfg, _, _, _, consensusCfg, _, _, _, _ := makeConfigs()
err = cfg.LoadJSONFromFile(configPath)
cfgMgr, cfgs := makeConfigs()
err = cfgMgr.LoadJSONFromFile(configPath)
checkErr("initializing configs", err)

dataFolder := filepath.Join(consensusCfg.BaseDir, raft.DefaultDataSubFolder)
dataFolder := filepath.Join(cfgs.consensusCfg.BaseDir, raft.DefaultDataSubFolder)
err = raft.CleanupRaft(dataFolder)
checkErr("Cleaning up consensus data", err)
logger.Warningf("the %s folder has been rotated. Next start will use an empty state", dataFolder)
Expand Down Expand Up @@ -443,7 +446,7 @@ func daemon(c *cli.Context) error {
logger.Info("Initializing. For verbose output run with \"-l debug\". Please wait...")

// Load all the configurations
cfg, clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, monCfg, diskInfCfg, numpinInfCfg := makeConfigs()
cfgMgr, cfgs := makeConfigs()

// Run any migrations
if c.Bool("upgrade") {
Expand All @@ -458,76 +461,47 @@ func daemon(c *cli.Context) error {

// Load all the configurations
// always wait for configuration to be saved
defer cfg.Shutdown()
defer cfgMgr.Shutdown()

err = cfg.LoadJSONFromFile(configPath)
err = cfgMgr.LoadJSONFromFile(configPath)
checkErr("loading configuration", err)

if a := c.String("bootstrap"); a != "" {
if len(clusterCfg.Peers) > 0 && !c.Bool("force") {
if len(cfgs.clusterCfg.Peers) > 0 && !c.Bool("force") {
return errors.New("the configuration provides cluster.Peers. Use -f to ignore and proceed bootstrapping")
}
joinAddr, err := ma.NewMultiaddr(a)
if err != nil {
return fmt.Errorf("error parsing multiaddress: %s", err)
}
clusterCfg.Bootstrap = []ma.Multiaddr{joinAddr}
clusterCfg.Peers = []ma.Multiaddr{}
checkErr("error parsing multiaddress: %s", err)
cfgs.clusterCfg.Bootstrap = []ma.Multiaddr{joinAddr}
cfgs.clusterCfg.Peers = []ma.Multiaddr{}
}

if c.Bool("leave") {
clusterCfg.LeaveOnShutdown = true
cfgs.clusterCfg.LeaveOnShutdown = true
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

host, err := ipfscluster.NewClusterHost(ctx, clusterCfg)
checkErr("creating libP2P Host", err)

api, err := rest.NewAPIWithHost(apiCfg, host)
checkErr("creating REST API component", err)

proxy, err := ipfshttp.NewConnector(ipfshttpCfg)
checkErr("creating IPFS Connector component", err)

state := mapstate.NewMapState()

err = validateVersion(clusterCfg, consensusCfg)
checkErr("validating version", err)

tracker := maptracker.NewMapPinTracker(trackerCfg, clusterCfg.ID)
mon, err := basic.NewMonitor(monCfg)
checkErr("creating Monitor component", err)
informer, alloc := setupAllocation(c.GlobalString("alloc"), diskInfCfg, numpinInfCfg)

cluster, err := ipfscluster.NewCluster(
host,
clusterCfg,
consensusCfg,
api,
proxy,
state,
tracker,
mon,
alloc,
informer)
cluster, err := initializeCluster(ctx, c, cfgs)
checkErr("starting cluster", err)

signalChan := make(chan os.Signal, 20)
signal.Notify(signalChan,
signal.Notify(
signalChan,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGHUP)
syscall.SIGHUP,
)

var ctrlcCount int
for {
select {
case <-signalChan:
err = cluster.Shutdown()
checkErr("shutting down cluster", err)
ctrlcCount++
handleCtrlC(cluster, ctrlcCount)
case <-cluster.Done():
return nil

//case <-cluster.Ready():
}
}
}
Expand Down Expand Up @@ -617,7 +591,7 @@ func yesNoPrompt(prompt string) bool {
return false
}

func makeConfigs() (*config.Manager, *ipfscluster.Config, *rest.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *basic.Config, *disk.Config, *numpin.Config) {
func makeConfigs() (*config.Manager, *cfgs) {
cfg := config.NewManager()
clusterCfg := &ipfscluster.Config{}
apiCfg := &rest.Config{}
Expand All @@ -635,5 +609,74 @@ func makeConfigs() (*config.Manager, *ipfscluster.Config, *rest.Config, *ipfshtt
cfg.RegisterComponent(config.Monitor, monCfg)
cfg.RegisterComponent(config.Informer, diskInfCfg)
cfg.RegisterComponent(config.Informer, numpinInfCfg)
return cfg, clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, monCfg, diskInfCfg, numpinInfCfg
return cfg, &cfgs{clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, monCfg, diskInfCfg, numpinInfCfg}
}

type cfgs struct {
clusterCfg *ipfscluster.Config
apiCfg *rest.Config
ipfshttpCfg *ipfshttp.Config
consensusCfg *raft.Config
trackerCfg *maptracker.Config
monCfg *basic.Config
diskInfCfg *disk.Config
numpinInfCfg *numpin.Config
}

func initializeCluster(ctx context.Context, c *cli.Context, cfgs *cfgs) (*ipfscluster.Cluster, error) {
host, err := ipfscluster.NewClusterHost(ctx, cfgs.clusterCfg)
checkErr("creating libP2P Host", err)

api, err := rest.NewAPIWithHost(cfgs.apiCfg, host)
checkErr("creating REST API component", err)

proxy, err := ipfshttp.NewConnector(cfgs.ipfshttpCfg)
checkErr("creating IPFS Connector component", err)

state := mapstate.NewMapState()

err = validateVersion(cfgs.clusterCfg, cfgs.consensusCfg)
checkErr("validating version", err)

tracker := maptracker.NewMapPinTracker(cfgs.trackerCfg, cfgs.clusterCfg.ID)
mon, err := basic.NewMonitor(cfgs.monCfg)
checkErr("creating Monitor component", err)
informer, alloc := setupAllocation(c.GlobalString("alloc"), cfgs.diskInfCfg, cfgs.numpinInfCfg)

return ipfscluster.NewCluster(
host,
cfgs.clusterCfg,
cfgs.consensusCfg,
api,
proxy,
state,
tracker,
mon,
alloc,
informer,
)
}

func handleCtrlC(cluster *ipfscluster.Cluster, ctrlcCount int) {
switch ctrlcCount {
case 1:
go func() {
err := cluster.Shutdown()
checkErr("shutting down cluster", err)
}()
case 2:
out(`
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Shutdown is taking too long! Press Ctrl-c again to manually kill cluster.
Note that this may corrupt the local cluster state.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
`)
case 3:
out("exiting cluster NOW")
os.Exit(-1)
}
}
22 changes: 11 additions & 11 deletions ipfs-cluster-service/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ func upgrade() error {
return nil
}

cfg, clusterCfg, _, _, consensusCfg, _, _, _, _ := makeConfigs()
cfgMgr, cfgs := makeConfigs()

err = cfg.LoadJSONFromFile(configPath)
err = cfgMgr.LoadJSONFromFile(configPath)
if err != nil {
return err
}

raftPeers := append(ipfscluster.PeersFromMultiaddrs(clusterCfg.Peers), clusterCfg.ID)
return raft.SnapshotSave(consensusCfg, newState, raftPeers)
raftPeers := append(ipfscluster.PeersFromMultiaddrs(cfgs.clusterCfg.Peers), cfgs.clusterCfg.ID)
return raft.SnapshotSave(cfgs.consensusCfg, newState, raftPeers)
}

func export(w io.Writer) error {
Expand All @@ -50,14 +50,14 @@ func export(w io.Writer) error {
// snapshot, a flag set to true when the state format has the
// current version and an error
func restoreStateFromDisk() (*mapstate.MapState, bool, error) {
cfg, _, _, _, consensusCfg, _, _, _, _ := makeConfigs()
cfgMgr, cfgs := makeConfigs()

err := cfg.LoadJSONFromFile(configPath)
err := cfgMgr.LoadJSONFromFile(configPath)
if err != nil {
return nil, false, err
}

r, snapExists, err := raft.LastStateRaw(consensusCfg)
r, snapExists, err := raft.LastStateRaw(cfgs.consensusCfg)
if !snapExists {
err = errNoSnapshot
}
Expand Down Expand Up @@ -90,9 +90,9 @@ func restoreStateFromDisk() (*mapstate.MapState, bool, error) {
}

func stateImport(r io.Reader) error {
cfg, clusterCfg, _, _, consensusCfg, _, _, _, _ := makeConfigs()
cfgMgr, cfgs := makeConfigs()

err := cfg.LoadJSONFromFile(configPath)
err := cfgMgr.LoadJSONFromFile(configPath)
if err != nil {
return err
}
Expand All @@ -111,8 +111,8 @@ func stateImport(r io.Reader) error {
return err
}
}
raftPeers := append(ipfscluster.PeersFromMultiaddrs(clusterCfg.Peers), clusterCfg.ID)
return raft.SnapshotSave(consensusCfg, stateToImport, raftPeers)
raftPeers := append(ipfscluster.PeersFromMultiaddrs(cfgs.clusterCfg.Peers), cfgs.clusterCfg.ID)
return raft.SnapshotSave(cfgs.consensusCfg, stateToImport, raftPeers)
}

func validateVersion(cfg *ipfscluster.Config, cCfg *raft.Config) error {
Expand Down

0 comments on commit 31bce16

Please sign in to comment.