Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add framework for managing startup dependencies and shutdown #153

Merged
merged 2 commits into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 0 additions & 59 deletions pkg/cmd/controller.go

This file was deleted.

12 changes: 2 additions & 10 deletions pkg/cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ func initCerts(cfg *config.MicroshiftConfig) error {
}

// based on https://github.com/openshift/cluster-etcd-operator/blob/master/bindata/bootkube/bootstrap-manifests/etcd-member-pod.yaml#L19
if err := util.GenCerts("etcd-server", cfg.DataDir+"/certs/secrets/etcd-all-serving",
if err := util.GenCerts("etcd-server", cfg.DataDir+"/certs/etcd",
"etcd-serving.crt", "etcd-serving.key",
[]string{"localhost", cfg.HostIP, "127.0.0.1", cfg.HostName}); err != nil {
return err
}

if err := util.GenCerts("etcd-peer", cfg.DataDir+"/certs/secrets/etcd-all-peer",
if err := util.GenCerts("etcd-peer", cfg.DataDir+"/certs/etcd",
"etcd-peer.crt", "etcd-peer.key",
[]string{"localhost", cfg.HostIP, "127.0.0.1", cfg.HostName}); err != nil {
return err
Expand Down Expand Up @@ -122,14 +122,6 @@ func initCerts(cfg *config.MicroshiftConfig) error {
}

func initServerConfig(cfg *config.MicroshiftConfig) error {
if err := config.KubeAPIServerConfig(cfg); err != nil {
return err
}
/*
if err := config.KubeControllerManagerConfig(cfg); err != nil {
return err
}
*/
if err := config.OpenShiftAPIServerConfig(cfg); err != nil {
return err
}
Expand Down
123 changes: 114 additions & 9 deletions pkg/cmd/run.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
package cmd

import (
"context"
"errors"
"os"
"os/signal"
"syscall"
"time"

"github.com/coreos/go-systemd/daemon"
"github.com/openshift/microshift/pkg/components"
"github.com/openshift/microshift/pkg/config"
"github.com/openshift/microshift/pkg/controllers"
"github.com/openshift/microshift/pkg/node"
"github.com/openshift/microshift/pkg/servicemanager"
"github.com/openshift/microshift/pkg/util"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)

const (
gracefulShutdownTimeout = 60
)

func NewRunMicroshiftCommand() *cobra.Command {
cfg := config.NewMicroshiftConfig()

Expand Down Expand Up @@ -52,20 +65,112 @@ func RunMicroshift(cfg *config.MicroshiftConfig, flags *pflag.FlagSet) error {
// if log dir is missing, create it
os.MkdirAll(cfg.LogDir, 0700)

m := servicemanager.NewServiceManager()
if config.StringInList("controlplane", cfg.Roles) {
if err := startControllerOnly(cfg); err != nil {
return err
}
util.Must(m.AddService(controllers.NewEtcd(cfg)))
util.Must(m.AddService(controllers.NewKubeAPIServer(cfg)))
// util.Must(m.AddService(controllers.NewKubeScheduler()))
// util.Must(m.AddService(controllers.NewKubeControllerManager()))
// util.Must(m.AddService(controllers.NewOpenShiftPrepJob()))
// util.Must(m.AddService(controllers.NewOpenShiftAPIServer()))
// util.Must(m.AddService(controllers.NewOpenShiftControllerManager()))
// util.Must(m.AddService(controllers.NewOpenShiftAPIComponents()))
// util.Must(m.AddService(controllers.NewInfrastructureServices()))

util.Must(m.AddService(servicemanager.NewGenericService(
"other-controlplane",
[]string{"kube-apiserver"},
func(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error {
defer close(stopped)
defer close(ready)

startControllerOnly(cfg)

return nil
},
)))
}

if config.StringInList("node", cfg.Roles) {
if err := node.StartKubelet(cfg); err != nil {
return err
}
if err := node.StartKubeProxy(cfg); err != nil {
return err
util.Must(m.AddService(servicemanager.NewGenericService(
"other-node",
[]string{"kube-apiserver"},
func(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error {
defer close(stopped)
defer close(ready)

if err := node.StartKubelet(cfg); err != nil {
return err
}
if err := node.StartKubeProxy(cfg); err != nil {
return err
}
return nil
},
)))
}

logrus.Info("Starting Microshift")

ctx, cancel := context.WithCancel(context.Background())
ready, stopped := make(chan struct{}), make(chan struct{})
go func() {
logrus.Infof("Starting %s", m.Name())
if err := m.Run(ctx, ready, stopped); err != nil {
logrus.Infof("%s stopped: %s", m.Name(), err)
} else {
logrus.Infof("%s completed", m.Name())
}
}()

sigTerm := make(chan os.Signal, 1)
signal.Notify(sigTerm, os.Interrupt, syscall.SIGTERM)

select {
case <-ready:
logrus.Info("MicroShift is ready.")
daemon.SdNotify(false, daemon.SdNotifyReady)

<-sigTerm
case <-sigTerm:
}
logrus.Info("Interrupt received. Stopping services.")
cancel()

select {
case <-stopped:
case <-sigTerm:
logrus.Info("Another interrupt received. Force terminating services.")
case <-time.After(time.Duration(gracefulShutdownTimeout) * time.Second):
logrus.Info("Timed out waiting for services to stop.")
}
logrus.Info("MicroShift stopped.")
return nil
}

select {}
func startControllerOnly(cfg *config.MicroshiftConfig) error {
logrus.Infof("starting kube-controller-manager")
controllers.KubeControllerManager(cfg)

logrus.Infof("starting kube-scheduler")
controllers.KubeScheduler(cfg)

if err := controllers.PrepareOCP(cfg); err != nil {
return err
}

logrus.Infof("starting openshift-apiserver")
controllers.OCPAPIServer(cfg)

//TODO: cloud provider
controllers.OCPControllerManager(cfg)

if err := controllers.StartOCPAPIComponents(cfg); err != nil {
return err
}

if err := components.StartComponents(cfg); err != nil {
return err
}
return nil
}
108 changes: 65 additions & 43 deletions pkg/controllers/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ limitations under the License.
package controllers

import (
"context"
"fmt"
"net/url"
"time"
"path/filepath"

"github.com/openshift/microshift/pkg/config"
"github.com/sirupsen/logrus"
etcd "go.etcd.io/etcd/embed"

"github.com/openshift/microshift/pkg/config"
)

var (
Expand All @@ -38,53 +38,75 @@ var (
)

const (
etcdStartupTimeout = 10
etcdStartupTimeout = 60
)

func StartEtcd(c *config.MicroshiftConfig, ready chan bool) error {
type EtcdService struct {
etcdCfg *etcd.Config
}

func NewEtcd(cfg *config.MicroshiftConfig) *EtcdService {
s := &EtcdService{}
s.configure(cfg)
return s
}

func (s *EtcdService) Name() string { return "etcd" }
func (s *EtcdService) Dependencies() []string { return []string{} }

func (s *EtcdService) configure(cfg *config.MicroshiftConfig) {
caCertFile := filepath.Join(cfg.DataDir, "certs", "ca-bundle", "ca-bundle.crt")
certDir := filepath.Join(cfg.DataDir, "certs", s.Name())
dataDir := filepath.Join(cfg.DataDir, s.Name())

// based on https://github.com/openshift/cluster-etcd-operator/blob/master/bindata/bootkube/bootstrap-manifests/etcd-member-pod.yaml#L19
cfg := etcd.NewConfig()
cfg.ClusterState = "new"
//cfg.ForceNewCluster = true //TODO
cfg.Logger = "zap"
cfg.Dir = c.DataDir + "/etcd/"
cfg.APUrls = setURL([]string{c.HostIP}, ":2380")
cfg.LPUrls = setURL([]string{c.HostIP}, ":2380")
cfg.ACUrls = setURL([]string{c.HostIP}, ":2379")
cfg.LCUrls = setURL([]string{"127.0.0.1", c.HostIP}, ":2379")
cfg.ListenMetricsUrls = setURL([]string{"127.0.0.1"}, ":2381")

cfg.Name = c.HostName
cfg.InitialCluster = c.HostName + "=" + "https://" + c.HostIP + ":2380"

cfg.CipherSuites = tlsCipherSuites
cfg.ClientTLSInfo.CertFile = c.DataDir + "/certs/secrets/etcd-all-serving/etcd-serving.crt"
cfg.ClientTLSInfo.KeyFile = c.DataDir + "/certs/secrets/etcd-all-serving/etcd-serving.key"
cfg.ClientTLSInfo.TrustedCAFile = c.DataDir + "/certs/ca-bundle/ca-bundle.crt"
cfg.ClientTLSInfo.ClientCertAuth = false
cfg.ClientTLSInfo.InsecureSkipVerify = true //TODO after fix GenCert to generate client cert

cfg.PeerTLSInfo.CertFile = c.DataDir + "/certs/secrets/etcd-all-peer/etcd-peer.crt"
cfg.PeerTLSInfo.KeyFile = c.DataDir + "/certs/secrets/etcd-all-peer/etcd-peer.key"
cfg.PeerTLSInfo.TrustedCAFile = c.DataDir + "/certs/ca-bundle/ca-bundle.crt"
cfg.PeerTLSInfo.ClientCertAuth = false
cfg.PeerTLSInfo.InsecureSkipVerify = true //TODO after fix GenCert to generate client cert

e, err := etcd.StartEtcd(cfg)
s.etcdCfg = etcd.NewConfig()
s.etcdCfg.ClusterState = "new"
//s.etcdCfg.ForceNewCluster = true //TODO
s.etcdCfg.Logger = "zap"
s.etcdCfg.Dir = dataDir
s.etcdCfg.APUrls = setURL([]string{cfg.HostIP}, ":2380")
s.etcdCfg.LPUrls = setURL([]string{cfg.HostIP}, ":2380")
s.etcdCfg.ACUrls = setURL([]string{cfg.HostIP}, ":2379")
s.etcdCfg.LCUrls = setURL([]string{"127.0.0.1", cfg.HostIP}, ":2379")
s.etcdCfg.ListenMetricsUrls = setURL([]string{"127.0.0.1"}, ":2381")

s.etcdCfg.Name = cfg.HostName
s.etcdCfg.InitialCluster = fmt.Sprintf("%s=https://%s:2380", cfg.HostName, cfg.HostIP)

s.etcdCfg.CipherSuites = tlsCipherSuites
s.etcdCfg.ClientTLSInfo.CertFile = filepath.Join(certDir, "etcd-serving.crt")
s.etcdCfg.ClientTLSInfo.KeyFile = filepath.Join(certDir, "etcd-serving.key")
s.etcdCfg.ClientTLSInfo.TrustedCAFile = caCertFile
s.etcdCfg.ClientTLSInfo.ClientCertAuth = false
s.etcdCfg.ClientTLSInfo.InsecureSkipVerify = true //TODO after fix GenCert to generate client cert

s.etcdCfg.PeerTLSInfo.CertFile = filepath.Join(certDir, "etcd-peer.crt")
s.etcdCfg.PeerTLSInfo.KeyFile = filepath.Join(certDir, "etcd-peer.key")
s.etcdCfg.PeerTLSInfo.TrustedCAFile = caCertFile
s.etcdCfg.PeerTLSInfo.ClientCertAuth = false
s.etcdCfg.PeerTLSInfo.InsecureSkipVerify = true //TODO after fix GenCert to generate client cert
}

func (s *EtcdService) Run(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error {
defer close(stopped)

e, err := etcd.StartEtcd(s.etcdCfg)
if err != nil {
return fmt.Errorf("etcd failed to start: %v", err)
return fmt.Errorf("%s failed to start: %v", s.Name(), err)
}

// run readiness check
go func() {
select {
case <-e.Server.ReadyNotify():
logrus.Info("Server is ready!")
ready <- true
case <-time.After(etcdStartupTimeout * time.Second):
e.Server.Stop()
logrus.Fatalf("etcd failed to start in %d seconds", etcdStartupTimeout)
}
<-e.Server.ReadyNotify()
logrus.Infof("%s is ready", s.Name())
close(ready)
}()
return nil

<-ctx.Done()
e.Server.Stop()
<-e.Server.StopNotify()
return ctx.Err()
}

func setURL(hostnames []string, port string) []url.URL {
Expand Down
Loading