Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce internal channels for scheduler #84336

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/controller/.import-restrictions
Expand Up @@ -262,6 +262,8 @@
"k8s.io/kubernetes/pkg/capabilities",
"k8s.io/kubernetes/pkg/master/ports",
"k8s.io/kubernetes/pkg/scheduler/api",
"k8s.io/kubernetes/pkg/scheduler/internal",
"k8s.io/kubernetes/pkg/scheduler/internal/errchannel",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't want controllers depending on internal scheduler packages, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually we dont. I plan to remove this coupling in the coming weeks, the code can be refactored to remove this dependency. This is added for now for the scheduler cleanup. Can be marked as a TODO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is intended to be a cleanup PR, it would be preferable not to introduce new debt in it. can that dependency be dropped before we mark this internal?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hprateek43, how do you plan to remove the dependency?

Other than that, I don't understand why import-restrictions apply to indirect imports. I feel like it shouldn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea is to first eliminate the utils package and then refactor predicates to remove dependencies on internal packages. But @alculquicondor is also right in his place that transient dependencies should not be considered.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as a reminder, .import-restrictions is a home-grown k8s utility that requires us to whitelist all transitive dependencies. The go compiler prevents pkg/controller from directly importing anything in pkg/scheduler/internal/...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liggitt This point could be considered. Although now I am planning to remove the error channel altogether

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liggitt This point could be considered

that's a good point, and would remove my objection

Although now I am planning to remove the error channel altogether

that sounds even better :)

"k8s.io/kubernetes/pkg/scheduler/util",
"k8s.io/kubernetes/pkg/scheduler/listers",
"k8s.io/kubernetes/pkg/security/apparmor",
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/BUILD
Expand Up @@ -129,6 +129,7 @@ filegroup(
"//pkg/scheduler/core:all-srcs",
"//pkg/scheduler/framework:all-srcs",
"//pkg/scheduler/internal/cache:all-srcs",
"//pkg/scheduler/internal/errchannel:all-srcs",
"//pkg/scheduler/internal/heap:all-srcs",
"//pkg/scheduler/internal/queue:all-srcs",
"//pkg/scheduler/listers:all-srcs",
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/algorithm/predicates/BUILD
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/features:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/internal/errchannel:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/util:go_default_library",
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/algorithm/predicates/metadata.go
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
"k8s.io/kubernetes/pkg/scheduler/internal/errchannel"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
Expand Down Expand Up @@ -731,7 +732,7 @@ func podMatchesAnyAffinityTermProperties(pod *v1.Pod, properties []*affinityTerm
// (1) Whether it has PodAntiAffinity
// (2) Whether any AffinityTerm matches the incoming pod
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*topologyPairsMaps, error) {
errCh := schedutil.NewErrorChannel()
errCh := errchannel.New()
var lock sync.Mutex
topologyMaps := newTopologyPairsMaps()

Expand Down Expand Up @@ -780,7 +781,7 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*sched
return newTopologyPairsMaps(), newTopologyPairsMaps(), nil
}

errCh := schedutil.NewErrorChannel()
errCh := errchannel.New()
var lock sync.Mutex
topologyPairsAffinityPodsMaps = newTopologyPairsMaps()
topologyPairsAntiAffinityPodsMaps = newTopologyPairsMaps()
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/algorithm/priorities/BUILD
Expand Up @@ -36,9 +36,9 @@ go_library(
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/errchannel:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/util/parsers:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
Expand Down
3 changes: 1 addition & 2 deletions pkg/scheduler/algorithm/priorities/even_pods_spread.go
Expand Up @@ -26,12 +26,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"

"k8s.io/klog"
)

type topologyPair struct {
Expand Down
7 changes: 3 additions & 4 deletions pkg/scheduler/algorithm/priorities/interpod_affinity.go
Expand Up @@ -24,13 +24,12 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/internal/errchannel"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"

"k8s.io/klog"
)

type topologyPairToScore map[string]map[string]int64
Expand Down Expand Up @@ -240,7 +239,7 @@ func buildTopologyPairToScore(
return nil
}

errCh := schedutil.NewErrorChannel()
errCh := errchannel.New()
ctx, cancel := context.WithCancel(context.Background())
processNode := func(i int) {
nodeInfo := allNodes[i]
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/core/BUILD
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/errchannel:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/core/generic_scheduler.go
Expand Up @@ -45,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/internal/errchannel"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
Expand Down Expand Up @@ -484,7 +485,7 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
// Create filtered list with enough space to avoid growing it
// and allow assigning.
filtered = make([]*v1.Node, numNodesToFind)
errCh := util.NewErrorChannel()
errCh := errchannel.New()
var (
predicateResultLock sync.Mutex
filteredLen int32
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/framework/v1alpha1/BUILD
Expand Up @@ -14,10 +14,10 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/internal/errchannel:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/framework/v1alpha1/framework.go
Expand Up @@ -31,10 +31,10 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/internal/errchannel"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)

const (
Expand Down Expand Up @@ -482,7 +482,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes))
}
ctx, cancel := context.WithCancel(ctx)
errCh := schedutil.NewErrorChannel()
errCh := errchannel.New()

// Run Score method for each node in parallel.
workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) {
Expand Down
32 changes: 32 additions & 0 deletions pkg/scheduler/internal/errchannel/BUILD
@@ -0,0 +1,32 @@
package(default_visibility = ["//visibility:public"])

load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)

go_test(
name = "go_default_test",
srcs = ["error_channel_test.go"],
embed = [":go_default_library"],
)

go_library(
name = "go_default_library",
srcs = ["error_channel.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/internal/errchannel",
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package util
package errchannel

import "context"

Expand Down Expand Up @@ -51,8 +51,8 @@ func (e *ErrorChannel) ReceiveError() error {
}
}

// NewErrorChannel returns a new ErrorChannel.
func NewErrorChannel() *ErrorChannel {
// New returns a new ErrorChannel.
func New() *ErrorChannel {
return &ErrorChannel{
errCh: make(chan error, 1),
}
Expand Down
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package util
package errchannel

import (
"context"
Expand All @@ -23,7 +23,7 @@ import (
)

func TestErrorChannel(t *testing.T) {
errCh := NewErrorChannel()
errCh := New()

if actualErr := errCh.ReceiveError(); actualErr != nil {
t.Errorf("expect nil from err channel, but got %v", actualErr)
Expand Down
6 changes: 1 addition & 5 deletions pkg/scheduler/util/BUILD
Expand Up @@ -8,10 +8,7 @@ load(

go_test(
name = "go_default_test",
srcs = [
"error_channel_test.go",
"utils_test.go",
],
srcs = ["utils_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/apis/extender/v1:go_default_library",
Expand All @@ -25,7 +22,6 @@ go_library(
name = "go_default_library",
srcs = [
"clock.go",
"error_channel.go",
"utils.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/util",
Expand Down