diff --git a/pkg/cmd/controller.go b/pkg/cmd/controller.go deleted file mode 100644 index be5b0e54ed0..00000000000 --- a/pkg/cmd/controller.go +++ /dev/null @@ -1,59 +0,0 @@ -/* -Copyright © 2021 Microshift Contributors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package cmd - -import ( - "github.com/openshift/microshift/pkg/components" - "github.com/openshift/microshift/pkg/config" - "github.com/openshift/microshift/pkg/controllers" - "github.com/sirupsen/logrus" -) - -func startControllerOnly(cfg *config.MicroshiftConfig) error { - etcdReadyCh := make(chan bool, 1) - if err := controllers.StartEtcd(cfg, etcdReadyCh); err != nil { - return err - } - <-etcdReadyCh - - logrus.Infof("starting kube-apiserver") - controllers.KubeAPIServer(cfg) - - logrus.Infof("starting kube-controller-manager") - controllers.KubeControllerManager(cfg) - - logrus.Infof("starting kube-scheduler") - controllers.KubeScheduler(cfg) - - if err := controllers.PrepareOCP(cfg); err != nil { - return err - } - - logrus.Infof("starting openshift-apiserver") - controllers.OCPAPIServer(cfg) - - //TODO: cloud provider - controllers.OCPControllerManager(cfg) - - if err := controllers.StartOCPAPIComponents(cfg); err != nil { - return err - } - - if err := components.StartComponents(cfg); err != nil { - return err - } - return nil -} diff --git a/pkg/cmd/init.go b/pkg/cmd/init.go index 887e9ec9ba2..bc759096d28 100644 --- a/pkg/cmd/init.go +++ b/pkg/cmd/init.go @@ -50,13 +50,13 @@ func initCerts(cfg *config.MicroshiftConfig) error { } // based on https://github.com/openshift/cluster-etcd-operator/blob/master/bindata/bootkube/bootstrap-manifests/etcd-member-pod.yaml#L19 - if err := util.GenCerts("etcd-server", cfg.DataDir+"/certs/secrets/etcd-all-serving", + if err := util.GenCerts("etcd-server", cfg.DataDir+"/certs/etcd", "etcd-serving.crt", "etcd-serving.key", []string{"localhost", cfg.HostIP, "127.0.0.1", cfg.HostName}); err != nil { return err } - if err := util.GenCerts("etcd-peer", cfg.DataDir+"/certs/secrets/etcd-all-peer", + if err := util.GenCerts("etcd-peer", cfg.DataDir+"/certs/etcd", "etcd-peer.crt", "etcd-peer.key", []string{"localhost", cfg.HostIP, "127.0.0.1", cfg.HostName}); err != nil { return err @@ -122,14 +122,6 @@ func initCerts(cfg *config.MicroshiftConfig) error { } func initServerConfig(cfg *config.MicroshiftConfig) error { - if err := config.KubeAPIServerConfig(cfg); err != nil { - return err - } - /* - if err := config.KubeControllerManagerConfig(cfg); err != nil { - return err - } - */ if err := config.OpenShiftAPIServerConfig(cfg); err != nil { return err } diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index 4d18247a21a..32254b398d5 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -1,16 +1,29 @@ package cmd import ( + "context" "errors" "os" + "os/signal" + "syscall" + "time" + "github.com/coreos/go-systemd/daemon" + "github.com/openshift/microshift/pkg/components" "github.com/openshift/microshift/pkg/config" + "github.com/openshift/microshift/pkg/controllers" "github.com/openshift/microshift/pkg/node" + "github.com/openshift/microshift/pkg/servicemanager" + "github.com/openshift/microshift/pkg/util" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/pflag" ) +const ( + gracefulShutdownTimeout = 60 +) + func NewRunMicroshiftCommand() *cobra.Command { cfg := config.NewMicroshiftConfig() @@ -52,20 +65,112 @@ func RunMicroshift(cfg *config.MicroshiftConfig, flags *pflag.FlagSet) error { // if log dir is missing, create it os.MkdirAll(cfg.LogDir, 0700) + m := servicemanager.NewServiceManager() if config.StringInList("controlplane", cfg.Roles) { - if err := startControllerOnly(cfg); err != nil { - return err - } + util.Must(m.AddService(controllers.NewEtcd(cfg))) + util.Must(m.AddService(controllers.NewKubeAPIServer(cfg))) + // util.Must(m.AddService(controllers.NewKubeScheduler())) + // util.Must(m.AddService(controllers.NewKubeControllerManager())) + // util.Must(m.AddService(controllers.NewOpenShiftPrepJob())) + // util.Must(m.AddService(controllers.NewOpenShiftAPIServer())) + // util.Must(m.AddService(controllers.NewOpenShiftControllerManager())) + // util.Must(m.AddService(controllers.NewOpenShiftAPIComponents())) + // util.Must(m.AddService(controllers.NewInfrastructureServices())) + + util.Must(m.AddService(servicemanager.NewGenericService( + "other-controlplane", + []string{"kube-apiserver"}, + func(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error { + defer close(stopped) + defer close(ready) + + startControllerOnly(cfg) + + return nil + }, + ))) } if config.StringInList("node", cfg.Roles) { - if err := node.StartKubelet(cfg); err != nil { - return err - } - if err := node.StartKubeProxy(cfg); err != nil { - return err + util.Must(m.AddService(servicemanager.NewGenericService( + "other-node", + []string{"kube-apiserver"}, + func(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error { + defer close(stopped) + defer close(ready) + + if err := node.StartKubelet(cfg); err != nil { + return err + } + if err := node.StartKubeProxy(cfg); err != nil { + return err + } + return nil + }, + ))) + } + + logrus.Info("Starting Microshift") + + ctx, cancel := context.WithCancel(context.Background()) + ready, stopped := make(chan struct{}), make(chan struct{}) + go func() { + logrus.Infof("Starting %s", m.Name()) + if err := m.Run(ctx, ready, stopped); err != nil { + logrus.Infof("%s stopped: %s", m.Name(), err) + } else { + logrus.Infof("%s completed", m.Name()) } + }() + + sigTerm := make(chan os.Signal, 1) + signal.Notify(sigTerm, os.Interrupt, syscall.SIGTERM) + + select { + case <-ready: + logrus.Info("MicroShift is ready.") + daemon.SdNotify(false, daemon.SdNotifyReady) + + <-sigTerm + case <-sigTerm: + } + logrus.Info("Interrupt received. Stopping services.") + cancel() + + select { + case <-stopped: + case <-sigTerm: + logrus.Info("Another interrupt received. Force terminating services.") + case <-time.After(time.Duration(gracefulShutdownTimeout) * time.Second): + logrus.Info("Timed out waiting for services to stop.") } + logrus.Info("MicroShift stopped.") + return nil +} - select {} +func startControllerOnly(cfg *config.MicroshiftConfig) error { + logrus.Infof("starting kube-controller-manager") + controllers.KubeControllerManager(cfg) + + logrus.Infof("starting kube-scheduler") + controllers.KubeScheduler(cfg) + + if err := controllers.PrepareOCP(cfg); err != nil { + return err + } + + logrus.Infof("starting openshift-apiserver") + controllers.OCPAPIServer(cfg) + + //TODO: cloud provider + controllers.OCPControllerManager(cfg) + + if err := controllers.StartOCPAPIComponents(cfg); err != nil { + return err + } + + if err := components.StartComponents(cfg); err != nil { + return err + } + return nil } diff --git a/pkg/controllers/etcd.go b/pkg/controllers/etcd.go index d17bd3c9072..2f210c943bd 100644 --- a/pkg/controllers/etcd.go +++ b/pkg/controllers/etcd.go @@ -16,14 +16,14 @@ limitations under the License. package controllers import ( + "context" "fmt" "net/url" - "time" + "path/filepath" + "github.com/openshift/microshift/pkg/config" "github.com/sirupsen/logrus" etcd "go.etcd.io/etcd/embed" - - "github.com/openshift/microshift/pkg/config" ) var ( @@ -38,53 +38,75 @@ var ( ) const ( - etcdStartupTimeout = 10 + etcdStartupTimeout = 60 ) -func StartEtcd(c *config.MicroshiftConfig, ready chan bool) error { +type EtcdService struct { + etcdCfg *etcd.Config +} + +func NewEtcd(cfg *config.MicroshiftConfig) *EtcdService { + s := &EtcdService{} + s.configure(cfg) + return s +} + +func (s *EtcdService) Name() string { return "etcd" } +func (s *EtcdService) Dependencies() []string { return []string{} } + +func (s *EtcdService) configure(cfg *config.MicroshiftConfig) { + caCertFile := filepath.Join(cfg.DataDir, "certs", "ca-bundle", "ca-bundle.crt") + certDir := filepath.Join(cfg.DataDir, "certs", s.Name()) + dataDir := filepath.Join(cfg.DataDir, s.Name()) + // based on https://github.com/openshift/cluster-etcd-operator/blob/master/bindata/bootkube/bootstrap-manifests/etcd-member-pod.yaml#L19 - cfg := etcd.NewConfig() - cfg.ClusterState = "new" - //cfg.ForceNewCluster = true //TODO - cfg.Logger = "zap" - cfg.Dir = c.DataDir + "/etcd/" - cfg.APUrls = setURL([]string{c.HostIP}, ":2380") - cfg.LPUrls = setURL([]string{c.HostIP}, ":2380") - cfg.ACUrls = setURL([]string{c.HostIP}, ":2379") - cfg.LCUrls = setURL([]string{"127.0.0.1", c.HostIP}, ":2379") - cfg.ListenMetricsUrls = setURL([]string{"127.0.0.1"}, ":2381") - - cfg.Name = c.HostName - cfg.InitialCluster = c.HostName + "=" + "https://" + c.HostIP + ":2380" - - cfg.CipherSuites = tlsCipherSuites - cfg.ClientTLSInfo.CertFile = c.DataDir + "/certs/secrets/etcd-all-serving/etcd-serving.crt" - cfg.ClientTLSInfo.KeyFile = c.DataDir + "/certs/secrets/etcd-all-serving/etcd-serving.key" - cfg.ClientTLSInfo.TrustedCAFile = c.DataDir + "/certs/ca-bundle/ca-bundle.crt" - cfg.ClientTLSInfo.ClientCertAuth = false - cfg.ClientTLSInfo.InsecureSkipVerify = true //TODO after fix GenCert to generate client cert - - cfg.PeerTLSInfo.CertFile = c.DataDir + "/certs/secrets/etcd-all-peer/etcd-peer.crt" - cfg.PeerTLSInfo.KeyFile = c.DataDir + "/certs/secrets/etcd-all-peer/etcd-peer.key" - cfg.PeerTLSInfo.TrustedCAFile = c.DataDir + "/certs/ca-bundle/ca-bundle.crt" - cfg.PeerTLSInfo.ClientCertAuth = false - cfg.PeerTLSInfo.InsecureSkipVerify = true //TODO after fix GenCert to generate client cert - - e, err := etcd.StartEtcd(cfg) + s.etcdCfg = etcd.NewConfig() + s.etcdCfg.ClusterState = "new" + //s.etcdCfg.ForceNewCluster = true //TODO + s.etcdCfg.Logger = "zap" + s.etcdCfg.Dir = dataDir + s.etcdCfg.APUrls = setURL([]string{cfg.HostIP}, ":2380") + s.etcdCfg.LPUrls = setURL([]string{cfg.HostIP}, ":2380") + s.etcdCfg.ACUrls = setURL([]string{cfg.HostIP}, ":2379") + s.etcdCfg.LCUrls = setURL([]string{"127.0.0.1", cfg.HostIP}, ":2379") + s.etcdCfg.ListenMetricsUrls = setURL([]string{"127.0.0.1"}, ":2381") + + s.etcdCfg.Name = cfg.HostName + s.etcdCfg.InitialCluster = fmt.Sprintf("%s=https://%s:2380", cfg.HostName, cfg.HostIP) + + s.etcdCfg.CipherSuites = tlsCipherSuites + s.etcdCfg.ClientTLSInfo.CertFile = filepath.Join(certDir, "etcd-serving.crt") + s.etcdCfg.ClientTLSInfo.KeyFile = filepath.Join(certDir, "etcd-serving.key") + s.etcdCfg.ClientTLSInfo.TrustedCAFile = caCertFile + s.etcdCfg.ClientTLSInfo.ClientCertAuth = false + s.etcdCfg.ClientTLSInfo.InsecureSkipVerify = true //TODO after fix GenCert to generate client cert + + s.etcdCfg.PeerTLSInfo.CertFile = filepath.Join(certDir, "etcd-peer.crt") + s.etcdCfg.PeerTLSInfo.KeyFile = filepath.Join(certDir, "etcd-peer.key") + s.etcdCfg.PeerTLSInfo.TrustedCAFile = caCertFile + s.etcdCfg.PeerTLSInfo.ClientCertAuth = false + s.etcdCfg.PeerTLSInfo.InsecureSkipVerify = true //TODO after fix GenCert to generate client cert +} + +func (s *EtcdService) Run(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error { + defer close(stopped) + + e, err := etcd.StartEtcd(s.etcdCfg) if err != nil { - return fmt.Errorf("etcd failed to start: %v", err) + return fmt.Errorf("%s failed to start: %v", s.Name(), err) } + + // run readiness check go func() { - select { - case <-e.Server.ReadyNotify(): - logrus.Info("Server is ready!") - ready <- true - case <-time.After(etcdStartupTimeout * time.Second): - e.Server.Stop() - logrus.Fatalf("etcd failed to start in %d seconds", etcdStartupTimeout) - } + <-e.Server.ReadyNotify() + logrus.Infof("%s is ready", s.Name()) + close(ready) }() - return nil + + <-ctx.Done() + e.Server.Stop() + <-e.Server.StopNotify() + return ctx.Err() } func setURL(hostnames []string, port string) []url.URL { diff --git a/pkg/controllers/kube-api.go b/pkg/controllers/kube-api.go index 9b637077444..efb5484d44b 100644 --- a/pkg/controllers/kube-api.go +++ b/pkg/controllers/kube-api.go @@ -16,23 +16,61 @@ limitations under the License. package controllers import ( + "context" + "fmt" "path/filepath" "strconv" "time" "github.com/sirupsen/logrus" + "github.com/spf13/cobra" "github.com/openshift/microshift/pkg/config" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" + "k8s.io/component-base/cli/globalflag" genericcontrollermanager "k8s.io/controller-manager/app" kubeapiserver "k8s.io/kubernetes/cmd/kube-apiserver/app" + "k8s.io/kubernetes/cmd/kube-apiserver/app/options" ) -func KubeAPIServer(cfg *config.MicroshiftConfig) error { - command := kubeapiserver.NewAPIServerCommand() - apiArgs := []string{ +const ( + kubeAPIStartupTimeout = 60 +) + +type KubeAPIServer struct { + serverOptions *options.ServerRunOptions + + kubeconfig string +} + +func NewKubeAPIServer(cfg *config.MicroshiftConfig) *KubeAPIServer { + s := &KubeAPIServer{} + s.configure(cfg) + return s +} + +func (s *KubeAPIServer) Name() string { return "kube-apiserver" } +func (s *KubeAPIServer) Dependencies() []string { return []string{"etcd"} } + +func (s *KubeAPIServer) configure(cfg *config.MicroshiftConfig) { + caCertFile := filepath.Join(cfg.DataDir, "certs", "ca-bundle", "ca-bundle.crt") + // certDir := filepath.Join(cfg.DataDir, "certs", s.Name()) + // dataDir := filepath.Join(cfg.DataDir, s.Name()) + + // configure audit policy and oauth + // TODO: consolidate this + if err := config.KubeAPIServerConfig(cfg); err != nil { + return + } + + // configure the kube-apiserver instance + // TODO: configure serverOptions directly rather than via cobra + s.serverOptions = options.NewServerRunOptions() + + args := []string{ //"--openshift-config=" + cfg.DataDir + "/resources/kube-apiserver/config/config.yaml", //TOOD //"--advertise-address=" + ip, "--allow-privileged=true", @@ -42,21 +80,21 @@ func KubeAPIServer(cfg *config.MicroshiftConfig) error { "--authorization-mode=Node,RBAC", "--bind-address=0.0.0.0", "--secure-port=6443", - "--client-ca-file=" + cfg.DataDir + "/certs/ca-bundle/ca-bundle.crt", + "--client-ca-file=" + caCertFile, "--enable-admission-plugins=NodeRestriction", "--enable-aggregator-routing=true", - "--etcd-cafile=" + cfg.DataDir + "/certs/ca-bundle/ca-bundle.crt", + "--etcd-cafile=" + caCertFile, "--etcd-certfile=" + cfg.DataDir + "/resources/kube-apiserver/secrets/etcd-client/tls.crt", "--etcd-keyfile=" + cfg.DataDir + "/resources/kube-apiserver/secrets/etcd-client/tls.key", "--etcd-servers=https://127.0.0.1:2379", - "--kubelet-certificate-authority=" + cfg.DataDir + "/certs/ca-bundle/ca-bundle.crt", + "--kubelet-certificate-authority=" + caCertFile, "--kubelet-client-certificate=" + cfg.DataDir + "/resources/kube-apiserver/secrets/kubelet-client/tls.crt", "--kubelet-client-key=" + cfg.DataDir + "/resources/kube-apiserver/secrets/kubelet-client/tls.key", "--profiling=false", "--proxy-client-cert-file=" + cfg.DataDir + "/certs/kube-apiserver/secrets/aggregator-client/tls.crt", "--proxy-client-key-file=" + cfg.DataDir + "/certs/kube-apiserver/secrets/aggregator-client/tls.key", "--requestheader-allowed-names=aggregator,system:aggregator,openshift-apiserver,system:openshift-apiserver,kube-apiserver-proxy,system:kube-apiserver-proxy,openshift-aggregator,system:openshift-aggregator", - "--requestheader-client-ca-file=" + cfg.DataDir + "/certs/ca-bundle/ca-bundle.crt", + "--requestheader-client-ca-file=" + caCertFile, "--requestheader-extra-headers-prefix=X-Remote-Extra-", "--requestheader-group-headers=X-Remote-Group", "--requestheader-username-headers=X-Remote-User", @@ -74,37 +112,69 @@ func KubeAPIServer(cfg *config.MicroshiftConfig) error { "--vmodule=" + cfg.LogVModule, } if cfg.LogDir != "" { - apiArgs = append(apiArgs, + args = append(args, "--log-file="+filepath.Join(cfg.LogDir, "kube-apiserver.log"), "--audit-log-path="+filepath.Join(cfg.LogDir, "kube-apiserver-audit.log")) } - if err := command.ParseFlags(apiArgs); err != nil { - logrus.Fatalf("failed to parse flags:%v", err) + + // fake the kube-apiserver cobra command to parse args into serverOptions + cmd := &cobra.Command{ + Use: "kube-apiserver", + Long: `kube-apiserver`, + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { return nil }, + } + namedFlagSets := s.serverOptions.Flags() + globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name()) + options.AddCustomGlobalFlags(namedFlagSets.FlagSet("generic")) + for _, f := range namedFlagSets.FlagSets { + cmd.Flags().AddFlagSet(f) + } + if err := cmd.ParseFlags(args); err != nil { + logrus.Fatalf("%s failed to parse flags: %v", s.Name(), err) } - logrus.Infof("starting kube-apiserver %s, args: %v", cfg.HostIP, apiArgs) + s.kubeconfig = filepath.Join(cfg.DataDir, "resources", "kubeadmin", "kubeconfig") +} + +func (s *KubeAPIServer) Run(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error { + defer close(stopped) + + // run readiness check go func() { - logrus.Fatalf("kube-apiserver exited: %v", command.RunE(command, nil)) - }() + restConfig, err := clientcmd.BuildConfigFromFlags("", s.kubeconfig) + if err != nil { + logrus.Warningf("%s readiness check: %v", s.Name(), err) + return + } - logrus.Info("waiting for kube-apiserver") + versionedClient, err := kubernetes.NewForConfig(restConfig) + if err != nil { + logrus.Warningf("%s readiness check: %v", s.Name(), err) + return + } - restConfig, err := clientcmd.BuildConfigFromFlags("", cfg.DataDir+"/resources/kubeadmin/kubeconfig") - if err != nil { - return err - } + if genericcontrollermanager.WaitForAPIServer(versionedClient, kubeAPIStartupTimeout*time.Second) != nil { + logrus.Warningf("%s readiness check timed out: %v", s.Name(), err) + return + } - versionedClient, err := kubernetes.NewForConfig(restConfig) - if err != nil { - return err - } + logrus.Infof("%s is ready", s.Name()) + close(ready) + }() - err = genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second) + // Work around that that the NewAPIServerCommand hardcodes the stop channel to SIGTERM signals, + // so we cannot use the cobra RunE command directly. + completedOptions, err := kubeapiserver.Complete(s.serverOptions) if err != nil { - logrus.Warningf("Failed to wait for apiserver being healthy: %v", err) - return nil + return fmt.Errorf("%s configuration error: %v", s.Name(), err) + } + if errs := completedOptions.Validate(); len(errs) != 0 { + return fmt.Errorf("%s configuration error: %v", s.Name(), utilerrors.NewAggregate(errs)) } - logrus.Info("kube-apiserver is ready") - return nil + if err := kubeapiserver.Run(completedOptions, ctx.Done()); err != nil { + return err + } + return ctx.Err() } diff --git a/pkg/servicemanager/manager.go b/pkg/servicemanager/manager.go new file mode 100644 index 00000000000..961b2d132ff --- /dev/null +++ b/pkg/servicemanager/manager.go @@ -0,0 +1,177 @@ +package servicemanager + +import ( + "context" + "fmt" + + "github.com/openshift/microshift/pkg/util/sigchannel" + "github.com/sirupsen/logrus" +) + +type ServiceManager struct { + name string + deps []string + + services []Service + serviceMap map[string]Service +} + +func NewServiceManager() *ServiceManager { + return &ServiceManager{ + name: "service-manager", + deps: []string{}, + + services: []Service{}, + serviceMap: make(map[string]Service), + } +} +func (s *ServiceManager) Name() string { return s.name } +func (s *ServiceManager) Dependencies() []string { return s.deps } + +func (m *ServiceManager) AddService(s Service) error { + if _, exists := m.serviceMap[s.Name()]; exists { + return fmt.Errorf("service '%s' added more than once", s.Name()) + } + for _, dependency := range s.Dependencies() { + // Enforce that services can only be added after adding their dependencies, + // i.e. they'll always remain topology sorted. Should we want to relax this + // constraint later, we can add topo sorting in the Run() step. + if _, exists := m.serviceMap[dependency]; !exists { + return fmt.Errorf("dependecy '%s' of service '%s' not yet defined", dependency, s.Name()) + } + } + + m.services = append(m.services, s) + m.serviceMap[s.Name()] = s + return nil +} + +func (m *ServiceManager) Run(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error { + defer close(stopped) + + services := m.services + // No need for topological sorting here as long as we enforce order while adding services. + // services, err := m.topoSort(services) + // if err != nil { + // fmt.Error("error: %v", err) + // } + + readyMap := make(map[string]<-chan struct{}) + stoppedMap := make(map[string]<-chan struct{}) + + for _, service := range services { + // Compile a list of ready channels of the service's dependencies (if any). + depsReadyList := []<-chan struct{}{} + for _, dependency := range service.Dependencies() { + depsReadyList = append(depsReadyList, readyMap[dependency]) + } + + // Wait until all of the service's dependencies signalled readiness. + // If the context gets canceled before, return immediately. + select { + case <-sigchannel.And(depsReadyList): + case <-ctx.Done(): + return ctx.Err() + } + + // Start the service and store its ready and stopped channels + serviceReady, serviceStopped := m.asyncRun(ctx, service) + readyMap[service.Name()] = serviceReady + stoppedMap[service.Name()] = serviceStopped + } + + // If we receive readiness signals from all services, signal readiness of manager + go func() { + <-sigchannel.And(values(readyMap)) + close(ready) + }() + + // Stop manager when all services stopped + <-sigchannel.And(values(stoppedMap)) + return ctx.Err() +} + +func (m *ServiceManager) asyncRun(ctx context.Context, service Service) (<-chan struct{}, <-chan struct{}) { + ready, stopped := make(chan struct{}), make(chan struct{}) + go func() { + logrus.Infof("Starting %s", service.Name()) + if err := service.Run(ctx, ready, stopped); err != nil { + logrus.Infof("%s stopped: %s", service.Name(), err) + } else { + logrus.Infof("%s completed", service.Name()) + } + }() + return ready, stopped +} + +func values(m map[string]<-chan struct{}) []<-chan struct{} { + values := make([]<-chan struct{}, 0, len(m)) + for _, v := range m { + values = append(values, v) + } + return values +} + +//---- topological sorting of directed acyclic graphs via DFS traversal ----- + +// type markers map[Service]bool + +// // Find remaining unmarked nodes and visit them until all nodes are marked. +// func (m *ServiceManager) topoSort(services []Service) ([]Service, error) { +// sorted := []Service{} + +// permanent := make(markers) +// temporary := make(markers) + +// for foundUnmarked := true; foundUnmarked; { +// foundUnmarked = false +// for _, service := range services { +// if !marked(service, permanent) { +// if err := m.visit(&sorted, service, permanent, temporary); err != nil { +// return nil, err +// } +// foundUnmarked = true +// } +// } +// } + +// return sorted, nil +// } + +// func mark(service Service, m markers) { +// m[service] = true +// } + +// func unmark(service Service, m markers) { +// delete(m, service) +// } + +// func marked(service Service, m markers) bool { +// _, ok := m[service] +// return ok +// } + +// // Recursively visit all of a node's dependencies. +// func (m *ServiceManager) visit(sorted *[]Service, service Service, permanent markers, temporary markers) error { +// if marked(service, permanent) { +// return nil +// } +// if marked(service, temporary) { +// return fmt.Errorf("detected cyclic dependencies") +// } + +// mark(service, temporary) +// for _, name := range service.Dependencies() { +// dependency, exists := m.serviceMap[name] +// if !exists { +// return fmt.Errorf("unknown dependency '%s' for service '%s'", dependency, name) +// } +// m.visit(sorted, dependency, permanent, temporary) +// } +// unmark(service, temporary) + +// mark(service, permanent) +// *sorted = append(*sorted, service) + +// return nil +// } diff --git a/pkg/servicemanager/service.go b/pkg/servicemanager/service.go new file mode 100644 index 00000000000..6ae679496ac --- /dev/null +++ b/pkg/servicemanager/service.go @@ -0,0 +1,35 @@ +package servicemanager + +import ( + "context" + "fmt" +) + +type RunFunc func(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error + +type GenericService struct { + name string + deps []string + + run RunFunc +} + +func NewGenericService(name string, dependencies []string, run RunFunc) *GenericService { + return &GenericService{ + name: name, + deps: dependencies, + run: run, + } +} +func (s *GenericService) Name() string { return s.name } +func (s *GenericService) Dependencies() []string { return s.deps } + +func (s *GenericService) Run(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error { + if s.run == nil { + defer close(stopped) + defer close(ready) + return fmt.Errorf("no run function defined for '%s'", s.Name()) + } + + return s.run(ctx, ready, stopped) +} diff --git a/pkg/servicemanager/type.go b/pkg/servicemanager/type.go new file mode 100644 index 00000000000..c8b759d00c9 --- /dev/null +++ b/pkg/servicemanager/type.go @@ -0,0 +1,15 @@ +package servicemanager + +import ( + "context" +) + +type Runner interface { + Run(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error +} + +type Service interface { + Name() string + Dependencies() []string + Runner +} diff --git a/pkg/util/sigchannel/sigchannel.go b/pkg/util/sigchannel/sigchannel.go new file mode 100644 index 00000000000..84c403afc81 --- /dev/null +++ b/pkg/util/sigchannel/sigchannel.go @@ -0,0 +1,42 @@ +package sigchannel + +// IsClosed tests whether a signalling channel has been closed. +// Note: Must only be used on broadcast signalling channels, i.e. channels +// that only ever get closed, not sent any values. +func IsClosed(channel <-chan struct{}) bool { + select { + case <-channel: + return true + default: + return false + } +} + +// AllClosed tests whether all signalling channels have been closed. +// Note: Must only be used on broadcast signalling channels, i.e. channels +// that only ever get closed, not sent any values. +func AllClosed(channels []<-chan struct{}) bool { + for _, channel := range channels { + if !IsClosed(channel) { + return false + } + } + return true +} + +// And returns a signalling channel that will be closed when all operand +// signalling channels have been closed. +func And(channels []<-chan struct{}) <-chan struct{} { + andChannel := make(chan struct{}) + + go func() { + defer close(andChannel) + for { + if AllClosed(channels) { + break + } + } + }() + + return andChannel +} diff --git a/pkg/util/util.go b/pkg/util/util.go new file mode 100644 index 00000000000..e2ad95c78e8 --- /dev/null +++ b/pkg/util/util.go @@ -0,0 +1,18 @@ +package util + +import ( + "fmt" +) + +func Must(err error) { + if err != nil { + panic(fmt.Errorf("internal error: %v", err)) + } +} + +func Default(s string, defaultS string) string { + if s == "" { + return defaultS + } + return s +}