From 23d69e93132c6e85f3efea361a1787bac5829773 Mon Sep 17 00:00:00 2001 From: Adrian Lanzafame Date: Fri, 23 Mar 2018 11:08:00 +1000 Subject: [PATCH] cmd/svc: force quit ipfs-cluster-service on 2nd ctrl-c Refactor daemon() to reduce code complexity. Refactor configuration in ipfs-cluster-service. License: MIT Signed-off-by: Adrian Lanzafame --- ipfs-cluster-service/main.go | 123 ++++++++++++++++++++-------------- ipfs-cluster-service/state.go | 22 +++--- 2 files changed, 85 insertions(+), 60 deletions(-) diff --git a/ipfs-cluster-service/main.go b/ipfs-cluster-service/main.go index 57b31f608..4f9886563 100644 --- a/ipfs-cluster-service/main.go +++ b/ipfs-cluster-service/main.go @@ -222,20 +222,20 @@ configuration. }, Action: func(c *cli.Context) error { userSecret, userSecretDefined := userProvidedSecret(c.Bool("custom-secret")) - cfg, clustercfg, _, _, _, _, _, _, _ := makeConfigs() - defer cfg.Shutdown() // wait for saves + cfgMgr, cfgs := makeConfigs() + defer cfgMgr.Shutdown() // wait for saves // Generate defaults for all registered components - err := cfg.Default() + err := cfgMgr.Default() checkErr("generating default configuration", err) // Set user secret if userSecretDefined { - clustercfg.Secret = userSecret + cfgs.clusterCfg.Secret = userSecret } // Save - saveConfig(cfg, c.GlobalBool("force")) + saveConfig(cfgMgr, c.GlobalBool("force")) return nil }, }, @@ -378,11 +378,11 @@ the mth data folder (m currently defaults to 5) } } - cfg, _, _, _, consensusCfg, _, _, _, _ := makeConfigs() - err = cfg.LoadJSONFromFile(configPath) + cfgMgr, cfgs := makeConfigs() + err = cfgMgr.LoadJSONFromFile(configPath) checkErr("initializing configs", err) - dataFolder := filepath.Join(consensusCfg.BaseDir, raft.DefaultDataSubFolder) + dataFolder := filepath.Join(cfgs.consensusCfg.BaseDir, raft.DefaultDataSubFolder) err = raft.CleanupRaft(dataFolder) checkErr("Cleaning up consensus data", err) logger.Warningf("the %s folder has been rotated. Next start will use an empty state", dataFolder) @@ -442,7 +442,7 @@ func daemon(c *cli.Context) error { logger.Info("Initializing. For verbose output run with \"-l debug\". Please wait...") // Load all the configurations - cfg, clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, monCfg, diskInfCfg, numpinInfCfg := makeConfigs() + cfgMgr, cfgs := makeConfigs() // Run any migrations if c.Bool("upgrade") { @@ -457,69 +457,53 @@ func daemon(c *cli.Context) error { // Load all the configurations // always wait for configuration to be saved - defer cfg.Shutdown() + defer cfgMgr.Shutdown() - err = cfg.LoadJSONFromFile(configPath) + err = cfgMgr.LoadJSONFromFile(configPath) checkErr("loading configuration", err) if a := c.String("bootstrap"); a != "" { - if len(clusterCfg.Peers) > 0 && !c.Bool("force") { + if len(cfgs.clusterCfg.Peers) > 0 && !c.Bool("force") { return errors.New("the configuration provides cluster.Peers. Use -f to ignore and proceed bootstrapping") } joinAddr, err := ma.NewMultiaddr(a) if err != nil { return fmt.Errorf("error parsing multiaddress: %s", err) } - clusterCfg.Bootstrap = []ma.Multiaddr{joinAddr} - clusterCfg.Peers = []ma.Multiaddr{} + cfgs.clusterCfg.Bootstrap = []ma.Multiaddr{joinAddr} + cfgs.clusterCfg.Peers = []ma.Multiaddr{} } if c.Bool("leave") { - clusterCfg.LeaveOnShutdown = true + cfgs.clusterCfg.LeaveOnShutdown = true } - api, err := rest.NewAPI(apiCfg) - checkErr("creating REST API component", err) - - proxy, err := ipfshttp.NewConnector(ipfshttpCfg) - checkErr("creating IPFS Connector component", err) - - state := mapstate.NewMapState() - - err = validateVersion(clusterCfg, consensusCfg) - checkErr("validating version", err) - - tracker := maptracker.NewMapPinTracker(trackerCfg, clusterCfg.ID) - mon, err := basic.NewMonitor(monCfg) - checkErr("creating Monitor component", err) - informer, alloc := setupAllocation(c.GlobalString("alloc"), diskInfCfg, numpinInfCfg) - - cluster, err := ipfscluster.NewCluster( - clusterCfg, - consensusCfg, - api, - proxy, - state, - tracker, - mon, - alloc, - informer) + cluster, err := initializeCluster(c, cfgs) checkErr("starting cluster", err) signalChan := make(chan os.Signal, 20) - signal.Notify(signalChan, + signal.Notify( + signalChan, syscall.SIGINT, syscall.SIGTERM, - syscall.SIGHUP) + syscall.SIGHUP, + ) + + var alreadyExiting bool for { select { case <-signalChan: - err = cluster.Shutdown() - checkErr("shutting down cluster", err) + if alreadyExiting { + logger.Error("exiting cluster NOW") + return nil + } + go func() { + alreadyExiting = true + cluster.Shutdown() + checkErr("shutting down cluster", err) + }() case <-cluster.Done(): return nil - - //case <-cluster.Ready(): } } } @@ -609,7 +593,7 @@ func yesNoPrompt(prompt string) bool { return false } -func makeConfigs() (*config.Manager, *ipfscluster.Config, *rest.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *basic.Config, *disk.Config, *numpin.Config) { +func makeConfigs() (*config.Manager, *cfgs) { cfg := config.NewManager() clusterCfg := &ipfscluster.Config{} apiCfg := &rest.Config{} @@ -627,5 +611,46 @@ func makeConfigs() (*config.Manager, *ipfscluster.Config, *rest.Config, *ipfshtt cfg.RegisterComponent(config.Monitor, monCfg) cfg.RegisterComponent(config.Informer, diskInfCfg) cfg.RegisterComponent(config.Informer, numpinInfCfg) - return cfg, clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, monCfg, diskInfCfg, numpinInfCfg + return cfg, &cfgs{clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, monCfg, diskInfCfg, numpinInfCfg} +} + +type cfgs struct { + clusterCfg *ipfscluster.Config + apiCfg *rest.Config + ipfshttpCfg *ipfshttp.Config + consensusCfg *raft.Config + trackerCfg *maptracker.Config + monCfg *basic.Config + diskInfCfg *disk.Config + numpinInfCfg *numpin.Config +} + +func initializeCluster(c *cli.Context, cfgs *cfgs) (*ipfscluster.Cluster, error) { + api, err := rest.NewAPI(cfgs.apiCfg) + checkErr("creating REST API component", err) + + proxy, err := ipfshttp.NewConnector(cfgs.ipfshttpCfg) + checkErr("creating IPFS Connector component", err) + + state := mapstate.NewMapState() + + err = validateVersion(cfgs.clusterCfg, cfgs.consensusCfg) + checkErr("validating version", err) + + tracker := maptracker.NewMapPinTracker(cfgs.trackerCfg, cfgs.clusterCfg.ID) + mon, err := basic.NewMonitor(cfgs.monCfg) + checkErr("creating Monitor component", err) + informer, alloc := setupAllocation(c.GlobalString("alloc"), cfgs.diskInfCfg, cfgs.numpinInfCfg) + + return ipfscluster.NewCluster( + cfgs.clusterCfg, + cfgs.consensusCfg, + api, + proxy, + state, + tracker, + mon, + alloc, + informer, + ) } diff --git a/ipfs-cluster-service/state.go b/ipfs-cluster-service/state.go index 331a0f6b4..3af6df75f 100644 --- a/ipfs-cluster-service/state.go +++ b/ipfs-cluster-service/state.go @@ -26,15 +26,15 @@ func upgrade() error { return nil } - cfg, clusterCfg, _, _, consensusCfg, _, _, _, _ := makeConfigs() + cfgMgr, cfgs := makeConfigs() - err = cfg.LoadJSONFromFile(configPath) + err = cfgMgr.LoadJSONFromFile(configPath) if err != nil { return err } - raftPeers := append(ipfscluster.PeersFromMultiaddrs(clusterCfg.Peers), clusterCfg.ID) - return raft.SnapshotSave(consensusCfg, newState, raftPeers) + raftPeers := append(ipfscluster.PeersFromMultiaddrs(cfgs.clusterCfg.Peers), cfgs.clusterCfg.ID) + return raft.SnapshotSave(cfgs.consensusCfg, newState, raftPeers) } func export(w io.Writer) error { @@ -50,14 +50,14 @@ func export(w io.Writer) error { // snapshot, a flag set to true when the state format has the // current version and an error func restoreStateFromDisk() (*mapstate.MapState, bool, error) { - cfg, _, _, _, consensusCfg, _, _, _, _ := makeConfigs() + cfgMgr, cfgs := makeConfigs() - err := cfg.LoadJSONFromFile(configPath) + err := cfgMgr.LoadJSONFromFile(configPath) if err != nil { return nil, false, err } - r, snapExists, err := raft.LastStateRaw(consensusCfg) + r, snapExists, err := raft.LastStateRaw(cfgs.consensusCfg) if !snapExists { err = errNoSnapshot } @@ -90,9 +90,9 @@ func restoreStateFromDisk() (*mapstate.MapState, bool, error) { } func stateImport(r io.Reader) error { - cfg, clusterCfg, _, _, consensusCfg, _, _, _, _ := makeConfigs() + cfgMgr, cfgs := makeConfigs() - err := cfg.LoadJSONFromFile(configPath) + err := cfgMgr.LoadJSONFromFile(configPath) if err != nil { return err } @@ -111,8 +111,8 @@ func stateImport(r io.Reader) error { return err } } - raftPeers := append(ipfscluster.PeersFromMultiaddrs(clusterCfg.Peers), clusterCfg.ID) - return raft.SnapshotSave(consensusCfg, stateToImport, raftPeers) + raftPeers := append(ipfscluster.PeersFromMultiaddrs(cfgs.clusterCfg.Peers), cfgs.clusterCfg.ID) + return raft.SnapshotSave(cfgs.consensusCfg, stateToImport, raftPeers) } func validateVersion(cfg *ipfscluster.Config, cCfg *raft.Config) error {