Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel context when RunPermitPlugins finishes #84337

Merged
merged 1 commit into from Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 16 additions & 12 deletions pkg/scheduler/scheduler.go
Expand Up @@ -614,7 +614,9 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// Synchronously attempt to find a fit for the pod.
start := time.Now()
state := framework.NewCycleState()
scheduleResult, err := sched.Algorithm.Schedule(ctx, state, pod)
schedulingCycleCtx, cancel := context.WithCancel(ctx)
hex108 marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)
if err != nil {
sched.recordSchedulingFailure(podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error())
// Schedule() may have failed because the pod would not fit on any host, so we try to
Expand All @@ -627,7 +629,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
" No preemption is performed.")
} else {
preemptionStartTime := time.Now()
sched.preempt(ctx, state, fwk, pod, fitError)
sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError)
metrics.PreemptionAttempts.Inc()
metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
metrics.DeprecatedSchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
Expand Down Expand Up @@ -667,7 +669,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
}

// Run "reserve" plugins.
if sts := fwk.RunReservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
if sts := fwk.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
sched.recordSchedulingFailure(assumedPodInfo, sts.AsError(), SchedulerError, sts.Message())
metrics.PodScheduleErrors.Inc()
return
Expand All @@ -684,11 +686,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err))
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
return
}
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
metrics.SchedulerGoroutines.WithLabelValues("binding").Inc()
defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec()

Expand All @@ -699,13 +703,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
sched.recordSchedulingFailure(assumedPodInfo, err, "VolumeBindingFailed", err.Error())
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
return
}
}

// Run "permit" plugins.
permitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
permitStatus := fwk.RunPermitPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if !permitStatus.IsSuccess() {
var reason string
if permitStatus.IsUnschedulable() {
Expand All @@ -719,13 +723,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, permitStatus.AsError(), reason, permitStatus.Message())
return
}

// Run "prebind" plugins.
preBindStatus := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if !preBindStatus.IsSuccess() {
var reason string
metrics.PodScheduleErrors.Inc()
Expand All @@ -734,18 +738,18 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message())
return
}

err := sched.bind(ctx, assumedPod, scheduleResult.SuggestedHost, state)
err := sched.bind(bindingCycleCtx, assumedPod, scheduleResult.SuggestedHost, state)
metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start))
metrics.DeprecatedE2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
if err != nil {
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err))
} else {
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
Expand All @@ -758,7 +762,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.PodSchedulingDuration.Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))

// Run "postbind" plugins.
fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
}
}()
}
Expand Down
133 changes: 79 additions & 54 deletions test/integration/scheduler/framework_test.go
Expand Up @@ -101,6 +101,7 @@ type PermitPlugin struct {
waitAndRejectPermit bool
waitAndAllowPermit bool
allowPermit bool
cancelled bool
fh framework.FrameworkHandle
}

Expand Down Expand Up @@ -395,8 +396,15 @@ func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState,
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0
}
if pp.timeoutPermit {
go func() {
select {
case <-ctx.Done():
pp.cancelled = true
}
}()
return framework.NewStatus(framework.Wait, ""), 3 * time.Second
}

if pp.allowPermit && pod.Name != "waiting-pod" {
return nil, 0
}
Expand Down Expand Up @@ -429,6 +437,11 @@ func (pp *PermitPlugin) allowAllPods() {
pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Allow(pp.name) })
}

// rejectAllPods rejects all waiting pods.
func (pp *PermitPlugin) rejectAllPods() {
pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Reject("rejectAllPods") })
}

// reset used to reset permit plugin.
func (pp *PermitPlugin) reset() {
pp.numPermitCalled = 0
Expand All @@ -438,6 +451,7 @@ func (pp *PermitPlugin) reset() {
pp.waitAndRejectPermit = false
pp.waitAndAllowPermit = false
pp.allowPermit = false
pp.cancelled = false
}

// newPermitPlugin returns a factory for permit plugin with specified PermitPlugin.
Expand Down Expand Up @@ -1079,18 +1093,7 @@ func TestPostBindPlugin(t *testing.T) {
func TestPermitPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a permit plugin.
perPlugin := &PermitPlugin{name: permitPluginName}
registry := framework.Registry{permitPluginName: newPermitPlugin(perPlugin)}

// Setup initial permit plugin for testing.
plugins := &schedulerconfig.Plugins{
Permit: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: permitPluginName,
},
},
},
}
registry, plugins := initRegistryAndConfig(perPlugin)

// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2,
Expand Down Expand Up @@ -1178,24 +1181,7 @@ func TestMultiplePermitPlugins(t *testing.T) {
// Create a plugin registry for testing.
perPlugin1 := &PermitPlugin{name: "permit-plugin-1"}
perPlugin2 := &PermitPlugin{name: "permit-plugin-2"}
registry := framework.Registry{
perPlugin1.Name(): newPermitPlugin(perPlugin1),
perPlugin2.Name(): newPermitPlugin(perPlugin2),
}

// Setup initial permit plugins for testing.
plugins := &schedulerconfig.Plugins{
Permit: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: perPlugin1.Name(),
},
{
Name: perPlugin2.Name(),
},
},
},
}
registry, plugins := initRegistryAndConfig(perPlugin1, perPlugin2)

// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "multi-permit-plugin", nil), 2,
Expand Down Expand Up @@ -1245,22 +1231,53 @@ func TestMultiplePermitPlugins(t *testing.T) {
cleanupPods(context.clientSet, t, []*v1.Pod{pod})
}

// TestPermitPluginsCancelled tests whether all permit plugins are cancelled when pod is rejected.
func TestPermitPluginsCancelled(t *testing.T) {
// Create a plugin registry for testing.
perPlugin1 := &PermitPlugin{name: "permit-plugin-1"}
perPlugin2 := &PermitPlugin{name: "permit-plugin-2"}
registry, plugins := initRegistryAndConfig(perPlugin1, perPlugin2)

// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugins", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithFrameworkDefaultRegistry(registry))
defer cleanupTest(t, context)

// Both permit plugins will return Wait for permitting
perPlugin1.timeoutPermit = true
perPlugin2.timeoutPermit = true

// Create a test pod.
podName := "test-pod"
pod, err := createPausePod(context.clientSet,
initPausePod(context.clientSet, &pausePodConfig{Name: podName, Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}

var waitingPod framework.WaitingPod
// Wait until the test pod is actually waiting.
wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) {
waitingPod = perPlugin1.fh.GetWaitingPod(pod.UID)
return waitingPod != nil, nil
})

perPlugin1.rejectAllPods()
// Wait some time for the permit plugins to be cancelled
err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) {
return perPlugin1.cancelled && perPlugin2.cancelled, nil
})
if err != nil {
t.Errorf("Expected all permit plugins to be cancelled")
}
}

// TestCoSchedulingWithPermitPlugin tests invocation of permit plugins.
func TestCoSchedulingWithPermitPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a permit plugin.
permitPlugin := &PermitPlugin{name: permitPluginName}
registry := framework.Registry{permitPluginName: newPermitPlugin(permitPlugin)}

// Setup initial permit plugin for testing.
plugins := &schedulerconfig.Plugins{
Permit: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: permitPluginName,
},
},
},
}
registry, plugins := initRegistryAndConfig(permitPlugin)

// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2,
Expand Down Expand Up @@ -1432,18 +1449,7 @@ func TestPostFilterPlugin(t *testing.T) {
func TestPreemptWithPermitPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a permit plugin.
permitPlugin := &PermitPlugin{}
registry := framework.Registry{permitPluginName: newPermitPlugin(permitPlugin)}

// Setup initial permit plugin for testing.
plugins := &schedulerconfig.Plugins{
Permit: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: permitPluginName,
},
},
},
}
registry, plugins := initRegistryAndConfig(permitPlugin)

// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "preempt-with-permit-plugin", nil), 0,
Expand Down Expand Up @@ -1522,3 +1528,22 @@ func initTestSchedulerForFrameworkTest(t *testing.T, context *testContext, nodeC
}
return c
}

// initRegistryAndConfig returns registry and plugins config based on give plugins.
// TODO: refactor it to a more generic functions that accepts all kinds of Plugins as arguments
func initRegistryAndConfig(pp ...*PermitPlugin) (registry framework.Registry, plugins *schedulerconfig.Plugins) {
if len(pp) == 0 {
return
}

registry = framework.Registry{}
plugins = &schedulerconfig.Plugins{
Permit: &schedulerconfig.PluginSet{},
}

for _, p := range pp {
registry.Register(p.Name(), newPermitPlugin(p))
plugins.Permit.Enabled = append(plugins.Permit.Enabled, schedulerconfig.Plugin{Name: p.Name()})
}
return
}