Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove evil global state shared by all schedulers #5351

Merged
merged 1 commit into from
Mar 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 21 additions & 3 deletions plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,21 @@ func defaultPredicates() util.StringSet {
// Fit is defined based on the absence of port conflicts.
factory.RegisterFitPredicate("PodFitsPorts", algorithm.PodFitsPorts),
// Fit is determined by resource availability.
factory.RegisterFitPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister)),
factory.RegisterFitPredicateFactory(
"PodFitsResources",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return algorithm.NewResourceFitPredicate(args.NodeInfo)
},
),
// Fit is determined by non-conflicting disk volumes.
factory.RegisterFitPredicate("NoDiskConflict", algorithm.NoDiskConflict),
// Fit is determined by node selector query.
factory.RegisterFitPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister)),
factory.RegisterFitPredicateFactory(
"MatchNodeSelector",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return algorithm.NewSelectorMatchPredicate(args.NodeInfo)
},
),
// Fit is determined by the presence of the Host parameter and a string match
factory.RegisterFitPredicate("HostName", algorithm.PodFitsHost),
)
Expand All @@ -47,7 +57,15 @@ func defaultPriorities() util.StringSet {
// Prioritize nodes by least requested utilization.
factory.RegisterPriorityFunction("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1),
// spreads pods by minimizing the number of pods (belonging to the same service) on the same minion.
factory.RegisterPriorityFunction("ServiceSpreadingPriority", algorithm.NewServiceSpreadPriority(factory.ServiceLister), 1),
factory.RegisterPriorityConfigFactory(
"ServiceSpreadingPriority",
func(args factory.PluginFactoryArgs) algorithm.PriorityConfig {
return algorithm.PriorityConfig{
Function: algorithm.NewServiceSpreadPriority(args.ServiceLister),
Weight: 1,
}
},
),
// EqualPriority is a prioritizer function that gives an equal weight of one to all minions
factory.RegisterPriorityFunction("EqualPriority", algorithm.EqualPriority, 0),
)
Expand Down
29 changes: 14 additions & 15 deletions plugin/pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ import (
"github.com/golang/glog"
)

var (
PodLister = &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}
MinionLister = &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}
ServiceLister = &cache.StoreToServiceLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}
)

// ConfigFactory knows how to fill out a scheduler config with its support functions.
type ConfigFactory struct {
Client *client.Client
Expand All @@ -50,7 +44,7 @@ type ConfigFactory struct {
// a means to list all scheduled pods
PodLister *cache.StoreToPodLister
// a means to list all minions
MinionLister *cache.StoreToNodeLister
NodeLister *cache.StoreToNodeLister
// a means to list all services
ServiceLister *cache.StoreToServiceLister
}
Expand All @@ -60,9 +54,9 @@ func NewConfigFactory(client *client.Client) *ConfigFactory {
return &ConfigFactory{
Client: client,
PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
PodLister: PodLister,
MinionLister: MinionLister,
ServiceLister: ServiceLister,
PodLister: &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
NodeLister: &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
ServiceLister: &cache.StoreToServiceLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
}
}

Expand Down Expand Up @@ -104,12 +98,17 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
// Creates a scheduler from a set of registered fit predicate keys and priority keys.
func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSet) (*scheduler.Config, error) {
glog.V(2).Infof("creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys)
predicateFuncs, err := getFitPredicateFunctions(predicateKeys)
pluginArgs := PluginFactoryArgs{
PodLister: f.PodLister,
ServiceLister: f.ServiceLister,
NodeLister: f.NodeLister,
}
predicateFuncs, err := getFitPredicateFunctions(predicateKeys, pluginArgs)
if err != nil {
return nil, err
}

priorityConfigs, err := getPriorityFunctionConfigs(priorityKeys)
priorityConfigs, err := getPriorityFunctionConfigs(priorityKeys, pluginArgs)
if err != nil {
return nil, err
}
Expand All @@ -126,9 +125,9 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
if false {
// Disable this code until minions support watches. Note when this code is enabled,
// we need to make sure minion ListWatcher has proper FieldSelector.
cache.NewReflector(f.createMinionLW(), &api.Node{}, f.MinionLister.Store, 0).Run()
cache.NewReflector(f.createMinionLW(), &api.Node{}, f.NodeLister.Store, 0).Run()
} else {
cache.NewPoller(f.pollMinions, 10*time.Second, f.MinionLister.Store).Run()
cache.NewPoller(f.pollMinions, 10*time.Second, f.NodeLister.Store).Run()
}

// Watch and cache all service objects. Scheduler needs to find all pods
Expand All @@ -149,7 +148,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
}

return &scheduler.Config{
MinionLister: f.MinionLister,
MinionLister: f.NodeLister,
Algorithm: algo,
Binder: &binder{f.Client},
NextPod: func() *api.Pod {
Expand Down
118 changes: 80 additions & 38 deletions plugin/pkg/scheduler/factory/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,26 @@ import (
"github.com/golang/glog"
)

// PluginFactoryArgs are passed to all plugin factory functions.
type PluginFactoryArgs struct {
algorithm.PodLister
algorithm.ServiceLister
NodeLister algorithm.MinionLister
NodeInfo algorithm.NodeInfo
}

// A FitPredicateFactory produces a FitPredicate from the given args.
type FitPredicateFactory func(PluginFactoryArgs) algorithm.FitPredicate

// A PriorityFunctionFactory produces a PriorityConfig from the given args.
type PriorityConfigFactory func(PluginFactoryArgs) algorithm.PriorityConfig

var (
schedulerFactoryMutex sync.Mutex

// maps that hold registered algorithm types
fitPredicateMap = make(map[string]algorithm.FitPredicate)
priorityFunctionMap = make(map[string]algorithm.PriorityConfig)
fitPredicateMap = make(map[string]FitPredicateFactory)
priorityFunctionMap = make(map[string]PriorityConfigFactory)
algorithmProviderMap = make(map[string]AlgorithmProviderConfig)
)

Expand All @@ -46,41 +60,60 @@ type AlgorithmProviderConfig struct {
PriorityFunctionKeys util.StringSet
}

// Registers a fit predicate with the algorithm registry. Returns the name,
// with which the predicate was registered.
// RegisterFitPredicate registers a fit predicate with the algorithm
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random question: Do we have a standard about whether you re-state the function name in the comment? Personally I don't like this style (I'd prefer "Registers a fit predicate..." as it was before) but I guess we don't have a standard practice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a godoc convention. From the blog, a godoc "comment is a complete sentence that begins with the name of the element it describes." govet issues a warning if you don't do this I believe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's godoc style to repeat the function name verbatim, for whatever reason. :/

// registry. Returns the name with which the predicate was registered.
func RegisterFitPredicate(name string, predicate algorithm.FitPredicate) string {
return RegisterFitPredicateFactory(name, func(PluginFactoryArgs) algorithm.FitPredicate { return predicate })
}

// RegisterFitPredicateFactory registers a fit predicate factory with the
// algorithm registry. Returns the name with which the predicate was registered.
func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFactory) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
fitPredicateMap[name] = predicate
fitPredicateMap[name] = predicateFactory
return name
}

// Registers a custom fit predicate with the algorithm registry.
// Returns the name, with which the predicate was registered.
func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy) string {
var predicate algorithm.FitPredicate
var predicateFactory FitPredicateFactory
var ok bool

validatePredicateOrDie(policy)

// generate the predicate function, if a custom type is requested
if policy.Argument != nil {
if policy.Argument.ServiceAffinity != nil {
predicate = algorithm.NewServiceAffinityPredicate(PodLister, ServiceLister, MinionLister, policy.Argument.ServiceAffinity.Labels)
predicateFactory = func(args PluginFactoryArgs) algorithm.FitPredicate {
return algorithm.NewServiceAffinityPredicate(
args.PodLister,
args.ServiceLister,
args.NodeInfo,
policy.Argument.ServiceAffinity.Labels,
)
}
} else if policy.Argument.LabelsPresence != nil {
predicate = algorithm.NewNodeLabelPredicate(MinionLister, policy.Argument.LabelsPresence.Labels, policy.Argument.LabelsPresence.Presence)
predicateFactory = func(args PluginFactoryArgs) algorithm.FitPredicate {
return algorithm.NewNodeLabelPredicate(
args.NodeInfo,
policy.Argument.LabelsPresence.Labels,
policy.Argument.LabelsPresence.Presence,
)
}
}
} else if predicate, ok = fitPredicateMap[policy.Name]; ok {
} else if predicateFactory, ok = fitPredicateMap[policy.Name]; ok {
// checking to see if a pre-defined predicate is requested
glog.V(2).Infof("Predicate type %s already registered, reusing.", policy.Name)
}

if predicate == nil {
if predicateFactory == nil {
glog.Fatalf("Invalid configuration: Predicate type not found for %s", policy.Name)
}

return RegisterFitPredicate(policy.Name, predicate)
return RegisterFitPredicateFactory(policy.Name, predicateFactory)
}

// This check is useful for testing providers.
Expand All @@ -94,37 +127,59 @@ func IsFitPredicateRegistered(name string) bool {
// Registers a priority function with the algorithm registry. Returns the name,
// with which the function was registered.
func RegisterPriorityFunction(name string, function algorithm.PriorityFunction, weight int) string {
return RegisterPriorityConfigFactory(name, func(PluginFactoryArgs) algorithm.PriorityConfig {
return algorithm.PriorityConfig{Function: function, Weight: weight}
})
}

func RegisterPriorityConfigFactory(name string, pcf PriorityConfigFactory) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
priorityFunctionMap[name] = algorithm.PriorityConfig{Function: function, Weight: weight}
priorityFunctionMap[name] = pcf
return name
}

// Registers a custom priority function with the algorithm registry.
// Returns the name, with which the priority function was registered.
func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string {
var priority algorithm.PriorityFunction
var pcf PriorityConfigFactory

validatePriorityOrDie(policy)

// generate the priority function, if a custom priority is requested
if policy.Argument != nil {
if policy.Argument.ServiceAntiAffinity != nil {
priority = algorithm.NewServiceAntiAffinityPriority(ServiceLister, policy.Argument.ServiceAntiAffinity.Label)
pcf = func(args PluginFactoryArgs) algorithm.PriorityConfig {
return algorithm.PriorityConfig{
Function: algorithm.NewServiceAntiAffinityPriority(
args.ServiceLister,
policy.Argument.ServiceAntiAffinity.Label,
),
Weight: policy.Weight,
}
}
} else if policy.Argument.LabelPreference != nil {
priority = algorithm.NewNodeLabelPriority(policy.Argument.LabelPreference.Label, policy.Argument.LabelPreference.Presence)
pcf = func(args PluginFactoryArgs) algorithm.PriorityConfig {
return algorithm.PriorityConfig{
Function: algorithm.NewNodeLabelPriority(
policy.Argument.LabelPreference.Label,
policy.Argument.LabelPreference.Presence,
),
Weight: policy.Weight,
}
}
}
} else if priorityConfig, ok := priorityFunctionMap[policy.Name]; ok {
} else if _, ok := priorityFunctionMap[policy.Name]; ok {
glog.V(2).Infof("Priority type %s already registered, reusing.", policy.Name)
priority = priorityConfig.Function
return policy.Name
}

if priority == nil {
if pcf == nil {
glog.Fatalf("Invalid configuration: Priority type not found for %s", policy.Name)
}

return RegisterPriorityFunction(policy.Name, priority, policy.Weight)
return RegisterPriorityConfigFactory(policy.Name, pcf)
}

// This check is useful for testing providers.
Expand All @@ -135,19 +190,6 @@ func IsPriorityFunctionRegistered(name string) bool {
return ok
}

// Sets the weight of an already registered priority function.
func SetPriorityFunctionWeight(name string, weight int) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
config, ok := priorityFunctionMap[name]
if !ok {
glog.Errorf("Invalid priority name %s specified - no corresponding function found", name)
return
}
config.Weight = weight
priorityFunctionMap[name] = config
}

// Registers a new algorithm provider with the algorithm registry. This should
// be called from the init function in a provider plugin.
func RegisterAlgorithmProvider(name string, predicateKeys, priorityKeys util.StringSet) string {
Expand Down Expand Up @@ -175,32 +217,32 @@ func GetAlgorithmProvider(name string) (*AlgorithmProviderConfig, error) {
return &provider, nil
}

func getFitPredicateFunctions(names util.StringSet) (map[string]algorithm.FitPredicate, error) {
func getFitPredicateFunctions(names util.StringSet, args PluginFactoryArgs) (map[string]algorithm.FitPredicate, error) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()

predicates := map[string]algorithm.FitPredicate{}
for _, name := range names.List() {
function, ok := fitPredicateMap[name]
factory, ok := fitPredicateMap[name]
if !ok {
return nil, fmt.Errorf("Invalid predicate name %q specified - no corresponding function found", name)
}
predicates[name] = function
predicates[name] = factory(args)
}
return predicates, nil
}

func getPriorityFunctionConfigs(names util.StringSet) ([]algorithm.PriorityConfig, error) {
func getPriorityFunctionConfigs(names util.StringSet, args PluginFactoryArgs) ([]algorithm.PriorityConfig, error) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()

configs := []algorithm.PriorityConfig{}
for _, name := range names.List() {
config, ok := priorityFunctionMap[name]
factory, ok := priorityFunctionMap[name]
if !ok {
return nil, fmt.Errorf("Invalid priority name %s specified - no corresponding function found", name)
}
configs = append(configs, config)
configs = append(configs, factory(args))
}
return configs, nil
}
Expand Down