Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP/MESOS: handle slave-lost and REASON_SLAVE_REMOVED events #21366

Closed
426 changes: 426 additions & 0 deletions contrib/mesos/pkg/controller/node/node.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"
cmoptions "k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
kubeletoptions "k8s.io/kubernetes/cmd/kubelet/app/options"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
)
Expand Down Expand Up @@ -51,7 +52,9 @@ func Test_nodeWithUpdatedStatus(t *testing.T) {
assert.True(t, kubecfg.NodeStatusUpdateFrequency.Duration*3 < cm.NodeMonitorGracePeriod.Duration) // sanity check for defaults

n := testNode(0, api.ConditionTrue, "KubeletReady")
su := NewStatusUpdater(nil, cm.NodeMonitorPeriod.Duration, func() time.Time { return now })
controller := NewController(nil, cm.NodeMonitorPeriod.Duration, NowFunc(func() time.Time { return now }))
su := controller.newSlaveStatusController()

_, updated, err := su.nodeWithUpdatedStatus(n)
assert.NoError(t, err)
assert.False(t, updated, "no update expected b/c kubelet updated heartbeat just now")
Expand All @@ -60,8 +63,8 @@ func Test_nodeWithUpdatedStatus(t *testing.T) {
n2, updated, err := su.nodeWithUpdatedStatus(n)
assert.NoError(t, err)
assert.True(t, updated, "update expected b/c kubelet's update is older than DefaultNodeMonitorGracePeriod")
assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Reason, slaveReadyReason)
assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Message, slaveReadyMessage)
assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Reason, node.SlaveReadyReason)
assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Message, node.SlaveReadyMessage)

n = testNode(-kubecfg.NodeStatusUpdateFrequency.Duration, api.ConditionTrue, "KubeletReady")
n2, updated, err = su.nodeWithUpdatedStatus(n)
Expand All @@ -72,6 +75,6 @@ func Test_nodeWithUpdatedStatus(t *testing.T) {
n2, updated, err = su.nodeWithUpdatedStatus(n)
assert.NoError(t, err)
assert.True(t, updated, "update expected b/c kubelet's update is older than 3*DefaultNodeStatusUpdateFrequency")
assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Reason, slaveReadyReason)
assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Message, slaveReadyMessage)
assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Reason, node.SlaveReadyReason)
assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Message, node.SlaveReadyMessage)
}
12 changes: 7 additions & 5 deletions contrib/mesos/pkg/controllermanager/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

kubecontrollermanager "k8s.io/kubernetes/cmd/kube-controller-manager/app"
"k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/contrib/mesos/pkg/controller/node"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
Expand Down Expand Up @@ -158,10 +158,12 @@ func (s *CMServer) Run(_ []string) error {
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod.Duration)

nodeStatusUpdaterController := node.NewStatusUpdater(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-status-controller")), s.NodeMonitorPeriod.Duration, time.Now)
if err := nodeStatusUpdaterController.Run(wait.NeverStop); err != nil {
glog.Fatalf("Failed to start node status update controller: %v", err)
}
// k8sm node controller
go node.NewController(
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "k8sm-node-controller")),
s.NodeMonitorPeriod.Duration,
node.SlaveStatusController(),
).Run(wait.NeverStop)

serviceController := servicecontroller.New(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "service-controller")), s.ClusterName)
if err := serviceController.Run(s.ServiceSyncPeriod.Duration, s.NodeSyncPeriod.Duration); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion contrib/mesos/pkg/executor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ import (
const (
DefaultInfoID = "k8sm-executor"
DefaultInfoSource = "kubernetes"
DefaultSuicideTimeout = 20 * time.Minute
DefaultSuicideTimeout = 5 * time.Minute
DefaultLaunchGracePeriod = 5 * time.Minute
)
36 changes: 25 additions & 11 deletions contrib/mesos/pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ type Executor struct {
dockerClient dockertools.DockerInterface
suicideWatch suicideWatcher
suicideTimeout time.Duration
shutdownAlert func() // invoked just prior to executor shutdown
kubeletFinished <-chan struct{} // signals that kubelet Run() died
shutdownAlert func() // invoked just prior to executor shutdown
exitFunc func(int)
staticPodsConfigPath string
staticPodsFilters podutil.Filters
podTaskFilters podutil.Filters // applied at binding time
launchGracePeriod time.Duration
nodeInfos chan<- NodeInfo
initCompleted chan struct{} // closes upon completion of Init()
Expand Down Expand Up @@ -142,7 +142,6 @@ func New(config Config) *Executor {
outgoing: make(chan func() (mesos.Status, error), 1024),
dockerClient: config.Docker,
suicideTimeout: config.SuicideTimeout,
kubeletFinished: config.KubeletFinished,
suicideWatch: &suicideTimer{},
shutdownAlert: config.ShutdownAlert,
exitFunc: config.ExitFunc,
Expand Down Expand Up @@ -175,6 +174,13 @@ func StaticPods(configPath string, f podutil.Filters) Option {
}
}

// PodTaskFilters creates a pod-task filters Option for an Executor
func PodTaskFilters(f podutil.Filters) Option {
return func(k *Executor) {
k.podTaskFilters = f
}
}

// Done returns a chan that closes when the executor is shutting down
func (k *Executor) Done() <-chan struct{} {
return k.terminate
Expand Down Expand Up @@ -203,8 +209,6 @@ func (k *Executor) Init(driver bindings.ExecutorDriver) {
}
return true
})

//TODO(jdef) monitor kubeletFinished and shutdown if it happens
}

func (k *Executor) isDone() bool {
Expand Down Expand Up @@ -273,8 +277,12 @@ func (k *Executor) Reregistered(driver bindings.ExecutorDriver, slaveInfo *mesos
return
}
log.Infof("Reregistered with slave %v\n", slaveInfo)

if !(&k.state).transition(disconnectedState, connectedState) {
log.Errorf("failed to reregister/transition to a connected state")
// in reality, we may have been temporarily disconnected from the slave during a slave
// failover event. but because we never tried to send a message to the slave, we never
// found out that we were disconnected.
log.V(2).Info("not disconnected, but we received a re-registration callback from slave?")
}

if slaveInfo != nil {
Expand Down Expand Up @@ -469,6 +477,15 @@ func (k *Executor) bindAndWatchTask(driver bindings.ExecutorDriver, task *mesos.
return
}

// apply pod task filters
ok, err := k.podTaskFilters.Accept(pod)
if !ok || err != nil {
log.Errorf("failed to apply pod filters for task %v pod %v/%v: %v",
task.TaskId.GetValue(), pod.Namespace, pod.Name, err)
k.sendStatus(driver, newStatus(task.TaskId, mesos.TaskState_TASK_FAILED, fmt.Sprintf("filter failed: %v", err)))
return
}

err = k.registry.bind(task.TaskId.GetValue(), pod)
if err != nil {
log.Errorf("failed to bind task %v pod %v/%v: %v",
Expand Down Expand Up @@ -633,19 +650,16 @@ func (k *Executor) doShutdown(driver bindings.ExecutorDriver) {
// if needed, so don't take extra time to do that here.
k.registry.shutdown()

select {
// the main Run() func may still be running... wait for it to finish: it will
// clear the pod configuration cleanly, telling k8s "there are no pods" and
// clean up resources (pods, volumes, etc).
case <-k.kubeletFinished:

//TODO(jdef) attempt to wait for events to propagate to API server?

// TODO(jdef) extract constant, should be smaller than whatever the
// slave graceful shutdown timeout period is.
case <-time.After(15 * time.Second):
log.Errorf("timed out waiting for kubelet Run() to die")
}
time.Sleep(15 * time.Second)

log.Infoln("exiting")
if k.exitFunc != nil {
k.exitFunc(0)
Expand Down
79 changes: 41 additions & 38 deletions contrib/mesos/pkg/executor/service/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ limitations under the License.
package service

import (
log "github.com/golang/glog"
// log "github.com/golang/glog"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove commented package imports

"k8s.io/kubernetes/pkg/kubelet"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/util/runtime"
// "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)

Expand All @@ -35,50 +35,53 @@ type executorKubelet struct {
// Run runs the main kubelet loop, closing the kubeletFinished chan when the
// loop exits. Like the upstream Run, it will never return.
func (kl *executorKubelet) Run(mergedUpdates <-chan kubetypes.PodUpdate) {
defer func() {
// When this Run function is called, we close it here.
// Otherwise, KubeletExecutorServer.runKubelet will.
close(kl.kubeletDone)
runtime.HandleCrash()
log.Infoln("kubelet run terminated") //TODO(jdef) turn down verbosity
// important: never return! this is in our contract
select {}
}()
wait.Until(func() { kl.Kubelet.Run(mergedUpdates) }, 0, kl.executorDone)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get rid of this entirely

/*
defer func() {
// When this Run function is called, we close it here.
// Otherwise, KubeletExecutorServer.runKubelet will.
close(kl.kubeletDone)
runtime.HandleCrash()
log.Infoln("kubelet run terminated") //TODO(jdef) turn down verbosity
// important: never return! this is in our contract
select {}
}()

// push merged updates into another, closable update channel which is closed
// when the executor shuts down.
closableUpdates := make(chan kubetypes.PodUpdate)
go func() {
// closing closableUpdates will cause our patched kubelet's syncLoop() to exit
defer close(closableUpdates)
pipeLoop:
for {
select {
case <-kl.executorDone:
break pipeLoop
default:
// push merged updates into another, closable update channel which is closed
// when the executor shuts down.
closableUpdates := make(chan kubetypes.PodUpdate)
go func() {
// closing closableUpdates will cause our patched kubelet's syncLoop() to exit
defer close(closableUpdates)
pipeLoop:
for {
select {
case u := <-mergedUpdates:
case <-kl.executorDone:
break pipeLoop
default:
select {
case closableUpdates <- u: // noop
case u := <-mergedUpdates:
select {
case closableUpdates <- u: // noop
case <-kl.executorDone:
break pipeLoop
}
case <-kl.executorDone:
break pipeLoop
}
case <-kl.executorDone:
break pipeLoop
}
}
}
}()
}()

// we expect that Run() will complete after closableUpdates is closed and the
// kubelet's syncLoop() has finished processing its backlog, which hopefully
// will not take very long. Peeking into the future (current k8s master) it
// seems that the backlog has grown from 1 to 50 -- this may negatively impact
// us going forward, time will tell.
wait.Until(func() { kl.Kubelet.Run(closableUpdates) }, 0, kl.executorDone)
// we expect that Run() will complete after closableUpdates is closed and the
// kubelet's syncLoop() has finished processing its backlog, which hopefully
// will not take very long. Peeking into the future (current k8s master) it
// seems that the backlog has grown from 1 to 50 -- this may negatively impact
// us going forward, time will tell.
wait.Until(func() { kl.Kubelet.Run(closableUpdates) }, 0, kl.executorDone)

//TODO(jdef) revisit this if/when executor failover lands
// Force kubelet to delete all pods.
kl.HandlePodDeletions(kl.GetPods())
//TODO(jdef) revisit this if/when executor failover lands
// Force kubelet to delete all pods.
kl.HandlePodDeletions(kl.GetPods())
*/
}
Loading