Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retrofit the scheduler with the leader election client. #19347

Merged
merged 1 commit into from
Jan 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/admin/kube-scheduler.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ kube-scheduler
--kube-api-burst=100: Burst to use while talking with kubernetes apiserver
--kube-api-qps=50: QPS to use while talking with kubernetes apiserver
--kubeconfig="": Path to kubeconfig file with authorization and master location information.
--leader-elect[=false]: Start a leader election client and gain leadership before executing scheduler loop. Enable this when running replicated schedulers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any issue in O(1) case to not default to on?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with that but I'd like to enable it by default only once we have an e2e test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe say "replicated scheduler for high availability" just to make sure people don't confuse this with running multiple schedulers for performance or partitioning diffrent scheduling algorithms.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidopp - Good point, defaulting on would be bad in this case.

--leader-elect-lease-duration=15s: The duration that non-leader candidates will wait after observing a leadership renewal until attempting to acquire leadership of a led but unrenewed leader slot. This is effectively the maximum duration that a leader can be stopped before it is replaced by another candidate. This is only applicable if leader election is enabled.
--leader-elect-renew-deadline=10s: The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to the lease duration. This is only applicable if leader election is enabled.
--leader-elect-retry-period=2s: The duration the clients should wait between attempting acquisition and renewal of a leadership. This is only applicable if leader election is enabled.
--log-flush-frequency=5s: Maximum number of seconds between log flushes
--master="": The address of the Kubernetes API server (overrides any value in kubeconfig)
--policy-config-file="": File with scheduler policy configuration
Expand All @@ -70,7 +74,7 @@ kube-scheduler
--scheduler-name="default-scheduler": Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name'
```

###### Auto generated by spf13/cobra on 14-Dec-2015
###### Auto generated by spf13/cobra on 12-Jan-2016


<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
Expand Down
4 changes: 4 additions & 0 deletions hack/verify-flags/known-flags.txt
Original file line number Diff line number Diff line change
Expand Up @@ -356,3 +356,7 @@ www-prefix
clientset-name
clientset-only
clientset-path
leader-elect
leader-elect-lease-duration
leader-elect-renew-deadline
leader-elect-retry-period
58 changes: 56 additions & 2 deletions pkg/client/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,26 @@ import (
"reflect"
"time"

"github.com/golang/glog"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"

"github.com/golang/glog"
"github.com/spf13/pflag"
)

const (
JitterFactor = 1.2

LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader"

DefaultLeaseDuration = 15 * time.Second
DefaultRenewDeadline = 10 * time.Second
DefaultRetryPeriod = 2 * time.Second
)

// NewLeadereElector creates a LeaderElector from a LeaderElecitionConfig
Expand Down Expand Up @@ -173,6 +178,16 @@ func (le *LeaderElector) Run() {
close(stop)
}

// RunOrDie starts a client with the provided config or panics if the config
// fails to validate.
func RunOrDie(lec LeaderElectionConfig) {
le, err := NewLeaderElector(lec)
if err != nil {
panic(err)
}
le.Run()
}

// GetLeader returns the identity of the last observed leader or returns the empty string if
// no leader has yet been observed.
func (le *LeaderElector) GetLeader() string {
Expand Down Expand Up @@ -315,3 +330,42 @@ func (l *LeaderElector) maybeReportTransition() {
go l.config.Callbacks.OnNewLeader(l.reportedLeader)
}
}

func DefaultLeaderElectionCLIConfig() LeaderElectionCLIConfig {
return LeaderElectionCLIConfig{
LeaderElect: false,
LeaseDuration: DefaultLeaseDuration,
RenewDeadline: DefaultRenewDeadline,
RetryPeriod: DefaultRetryPeriod,
}
}

// LeaderElectionCLIConfig is useful for embedding into component configuration objects
// to maintain consistent command line flags.
type LeaderElectionCLIConfig struct {
LeaderElect bool
LeaseDuration time.Duration
RenewDeadline time.Duration
RetryPeriod time.Duration
}

// BindFlags binds the common LeaderElectionCLIConfig flags to a flagset
func (l *LeaderElectionCLIConfig) BindFlags(fs *pflag.FlagSet) {
fs.BoolVar(&l.LeaderElect, "leader-elect", l.LeaderElect, ""+
"Start a leader election client and gain leadership before "+
"executing scheduler loop. Enable this when running replicated "+
"schedulers.")
fs.DurationVar(&l.LeaseDuration, "leader-elect-lease-duration", l.LeaseDuration, ""+
"The duration that non-leader candidates will wait after observing a leadership"+
"renewal until attempting to acquire leadership of a led but unrenewed leader "+
"slot. This is effectively the maximum duration that a leader can be stopped "+
"before it is replaced by another candidate. This is only applicable if leader "+
"election is enabled.")
fs.DurationVar(&l.RenewDeadline, "leader-elect-renew-deadline", l.RenewDeadline, ""+
"The interval between attempts by the acting master to renew a leadership slot "+
"before it stops leading. This must be less than or equal to the lease duration. "+
"This is only applicable if leader election is enabled.")
fs.DurationVar(&l.RetryPeriod, "leader-elect-retry-period", l.RetryPeriod, ""+
"The duration the clients should wait between attempting acquisition and renewal "+
"of a leadership. This is only applicable if leader election is enabled.")
}
4 changes: 4 additions & 0 deletions plugin/cmd/kube-scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"

Expand All @@ -41,6 +42,7 @@ type SchedulerServer struct {
KubeAPIQPS float32
KubeAPIBurst int
SchedulerName string
LeaderElection leaderelection.LeaderElectionCLIConfig
}

// NewSchedulerServer creates a new SchedulerServer with default parameters
Expand All @@ -54,6 +56,7 @@ func NewSchedulerServer() *SchedulerServer {
KubeAPIQPS: 50.0,
KubeAPIBurst: 100,
SchedulerName: api.DefaultSchedulerName,
LeaderElection: leaderelection.DefaultLeaderElectionCLIConfig(),
}
return &s
}
Expand All @@ -72,4 +75,5 @@ func (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) {
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")
fs.StringVar(&s.SchedulerName, "scheduler-name", s.SchedulerName, "Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name'")
s.LeaderElection.BindFlags(fs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be doing this everywhere!

}
40 changes: 38 additions & 2 deletions plugin/cmd/kube-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"strconv"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
Expand Down Expand Up @@ -110,9 +111,44 @@ func Run(s *options.SchedulerServer) error {
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))

sched := scheduler.New(config)
sched.Run()

select {}
run := func(_ <-chan struct{}) {
sched.Run()
select {}
}

if !s.LeaderElection.LeaderElect {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment re: is there any reason to not have this always enabled?

run(nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must be missing something -- while can't you just make this block say

sched.Run()
select {}

instead of having a separate run() function? Is run() used from somewhere else that I missed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used again on line 43

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 43 is an import line AFAICT

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidopp sorry 143

glog.Fatal("this statement is unreachable")
panic("unreachable")
}

id, err := os.Hostname()
if err != nil {
return err
}

leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
EndpointsMeta: api.ObjectMeta{
Namespace: "kube-system",
Name: "kube-scheduler",
},
Client: kubeClient,
Identity: id,
EventRecorder: config.Recorder,
LeaseDuration: s.LeaderElection.LeaseDuration,
RenewDeadline: s.LeaderElection.RenewDeadline,
RetryPeriod: s.LeaderElection.RetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("lost master")
},
},
})

glog.Fatal("this statement is unreachable")
panic("unreachable")
}

func createConfig(s *options.SchedulerServer, configFactory *factory.ConfigFactory) (*scheduler.Config, error) {
Expand Down