Skip to content

Commit

Permalink
Merge pull request #1333 from tssurya/ocpbugs-1520-part2
Browse files Browse the repository at this point in the history
OCPBUGS-1520: Prioritize adding events to handlers for shared resources
  • Loading branch information
openshift-merge-robot committed Oct 25, 2022
2 parents f234d75 + e2cf55f commit 82f6079
Show file tree
Hide file tree
Showing 3 changed files with 367 additions and 62 deletions.
88 changes: 60 additions & 28 deletions go-controller/pkg/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ const (

// namespace, node, and pod handlers
defaultNumEventQueues uint32 = 15

// default priorities for various handlers (also the highest priority)
defaultHandlerPriority uint32 = 0
// lowest priority among various handlers (See GetHandlerPriority for more information)
minHandlerPriority uint32 = 4
)

// types for dynamic handlers created when adding a network policy
Expand Down Expand Up @@ -394,7 +399,40 @@ func getObjectMeta(objType reflect.Type, obj interface{}) (*metav1.ObjectMeta, e

type AddHandlerFuncType func(namespace string, sel labels.Selector, funcs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error)

// GetHandlerPriority returns the priority of each objType's handler
// Priority of the handler is what determine which handler would get an event first
// This is relevant only for handlers that are sharing the same resources:
// Pods: shared by PodType (0), EgressIPPodType (1), PeerPodSelectorType (2), PeerPodForNamespaceAndPodSelectorType (3), LocalPodSelectorType (4)
// Namespaces: shared by NamespaceType (0), EgressIPNamespaceType (1), PeerNamespaceSelectorType (3), PeerNamespaceAndPodSelectorType (4)
// Nodes: shared by NodeType (0), EgressNodeType (1)
// By default handlers get the defaultHandlerPriority which is 0 (highest priority). Higher the number, lower the priority to get an event.
// Example: EgressIPPodType will always get the pod event after PodType and PeerPodSelectorType will always get the event after PodType and EgressIPPodType
// NOTE: If you are touching this function to add a new object type that uses shared objects, please make sure to update `minHandlerPriority` if needed
func (wf *WatchFactory) GetHandlerPriority(objType reflect.Type) (priority uint32) {
switch objType {
case EgressIPPodType:
return 1
case PeerPodSelectorType:
return 2
case PeerPodForNamespaceAndPodSelectorType:
return 3
case LocalPodSelectorType:
return 4
case EgressIPNamespaceType:
return 1
case PeerNamespaceSelectorType:
return 2
case PeerNamespaceAndPodSelectorType:
return 3
case EgressNodeType:
return 1
default:
return defaultHandlerPriority
}
}

func (wf *WatchFactory) GetResourceHandlerFunc(objType reflect.Type) (AddHandlerFuncType, error) {
priority := wf.GetHandlerPriority(objType)
switch objType {
case NamespaceType:
return func(namespace string, sel labels.Selector,
Expand All @@ -411,7 +449,7 @@ func (wf *WatchFactory) GetResourceHandlerFunc(objType reflect.Type) (AddHandler
case NodeType, EgressNodeType:
return func(namespace string, sel labels.Selector,
funcs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) {
return wf.AddNodeHandler(funcs, processExisting)
return wf.AddNodeHandler(funcs, processExisting, priority)
}, nil

case PeerServiceType:
Expand All @@ -420,22 +458,16 @@ func (wf *WatchFactory) GetResourceHandlerFunc(objType reflect.Type) (AddHandler
return wf.AddFilteredServiceHandler(namespace, funcs, processExisting)
}, nil

case PeerPodSelectorType, LocalPodSelectorType, PodType, EgressIPPodType:
case PeerPodSelectorType, LocalPodSelectorType, PodType, EgressIPPodType, PeerPodForNamespaceAndPodSelectorType:
return func(namespace string, sel labels.Selector,
funcs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) {
return wf.AddFilteredPodHandler(namespace, sel, funcs, processExisting)
return wf.AddFilteredPodHandler(namespace, sel, funcs, processExisting, priority)
}, nil

case PeerNamespaceAndPodSelectorType, PeerNamespaceSelectorType, EgressIPNamespaceType:
return func(namespace string, sel labels.Selector,
funcs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) {
return wf.AddFilteredNamespaceHandler(namespace, sel, funcs, processExisting)
}, nil

case PeerPodForNamespaceAndPodSelectorType:
return func(namespace string, sel labels.Selector,
funcs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) {
return wf.AddFilteredPodHandler(namespace, sel, funcs, processExisting)
return wf.AddFilteredNamespaceHandler(namespace, sel, funcs, processExisting, priority)
}, nil

case EgressFirewallType:
Expand All @@ -459,7 +491,7 @@ func (wf *WatchFactory) GetResourceHandlerFunc(objType reflect.Type) (AddHandler
return nil, fmt.Errorf("cannot get ObjectMeta from type %v", objType)
}

func (wf *WatchFactory) addHandler(objType reflect.Type, namespace string, sel labels.Selector, funcs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) {
func (wf *WatchFactory) addHandler(objType reflect.Type, namespace string, sel labels.Selector, funcs cache.ResourceEventHandler, processExisting func([]interface{}) error, priority uint32) (*Handler, error) {
inf, ok := wf.informers[objType]
if !ok {
klog.Fatalf("Tried to add handler of unknown object type %v", objType)
Expand Down Expand Up @@ -510,7 +542,7 @@ func (wf *WatchFactory) addHandler(objType reflect.Type, namespace string, sel l
}

handlerID := atomic.AddUint64(&wf.handlerCounter, 1)
handler := inf.addHandler(handlerID, filterFunc, funcs, items)
handler := inf.addHandler(handlerID, priority, filterFunc, funcs, items)
klog.V(5).Infof("Added %v event handler %d", objType, handler.id)
return handler, nil
}
Expand All @@ -521,12 +553,12 @@ func (wf *WatchFactory) removeHandler(objType reflect.Type, handler *Handler) {

// AddPodHandler adds a handler function that will be executed on Pod object changes
func (wf *WatchFactory) AddPodHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) {
return wf.addHandler(PodType, "", nil, handlerFuncs, processExisting)
return wf.addHandler(PodType, "", nil, handlerFuncs, processExisting, defaultHandlerPriority)
}

// AddFilteredPodHandler adds a handler function that will be executed when Pod objects that match the given filters change
func (wf *WatchFactory) AddFilteredPodHandler(namespace string, sel labels.Selector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) {
return wf.addHandler(PodType, namespace, sel, handlerFuncs, processExisting)
func (wf *WatchFactory) AddFilteredPodHandler(namespace string, sel labels.Selector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error, priority uint32) (*Handler, error) {
return wf.addHandler(PodType, namespace, sel, handlerFuncs, processExisting, priority)
}

// RemovePodHandler removes a Pod object event handler function
Expand All @@ -536,12 +568,12 @@ func (wf *WatchFactory) RemovePodHandler(handler *Handler) {

// AddServiceHandler adds a handler function that will be executed on Service object changes
func (wf *WatchFactory) AddServiceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) {
return wf.addHandler(ServiceType, "", nil, handlerFuncs, processExisting)
return wf.addHandler(ServiceType, "", nil, handlerFuncs, processExisting, defaultHandlerPriority)
}

// AddFilteredServiceHandler adds a handler function that will be executed on all Service object changes for a specific namespace
func (wf *WatchFactory) AddFilteredServiceHandler(namespace string, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) {
return wf.addHandler(ServiceType, namespace, nil, handlerFuncs, processExisting)
return wf.addHandler(ServiceType, namespace, nil, handlerFuncs, processExisting, defaultHandlerPriority)
}

// RemoveServiceHandler removes a Service object event handler function
Expand All @@ -551,7 +583,7 @@ func (wf *WatchFactory) RemoveServiceHandler(handler *Handler) {

// AddEndpointSliceHandler adds a handler function that will be executed on EndpointSlice object changes
func (wf *WatchFactory) AddEndpointSliceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) {
return wf.addHandler(EndpointSliceType, "", nil, handlerFuncs, processExisting)
return wf.addHandler(EndpointSliceType, "", nil, handlerFuncs, processExisting, defaultHandlerPriority)
}

// RemoveEndpointSliceHandler removes a EndpointSlice object event handler function
Expand All @@ -561,7 +593,7 @@ func (wf *WatchFactory) RemoveEndpointSliceHandler(handler *Handler) {

// AddPolicyHandler adds a handler function that will be executed on NetworkPolicy object changes
func (wf *WatchFactory) AddPolicyHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) {
return wf.addHandler(PolicyType, "", nil, handlerFuncs, processExisting)
return wf.addHandler(PolicyType, "", nil, handlerFuncs, processExisting, defaultHandlerPriority)
}

// RemovePolicyHandler removes a NetworkPolicy object event handler function
Expand All @@ -571,7 +603,7 @@ func (wf *WatchFactory) RemovePolicyHandler(handler *Handler) {

// AddEgressFirewallHandler adds a handler function that will be executed on EgressFirewall object changes
func (wf *WatchFactory) AddEgressFirewallHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) {
return wf.addHandler(EgressFirewallType, "", nil, handlerFuncs, processExisting)
return wf.addHandler(EgressFirewallType, "", nil, handlerFuncs, processExisting, defaultHandlerPriority)
}

// RemoveEgressFirewallHandler removes an EgressFirewall object event handler function
Expand All @@ -586,7 +618,7 @@ func (wf *WatchFactory) RemoveEgressQoSHandler(handler *Handler) {

// AddEgressIPHandler adds a handler function that will be executed on EgressIP object changes
func (wf *WatchFactory) AddEgressIPHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) {
return wf.addHandler(EgressIPType, "", nil, handlerFuncs, processExisting)
return wf.addHandler(EgressIPType, "", nil, handlerFuncs, processExisting, defaultHandlerPriority)
}

// RemoveEgressIPHandler removes an EgressIP object event handler function
Expand All @@ -596,7 +628,7 @@ func (wf *WatchFactory) RemoveEgressIPHandler(handler *Handler) {

// AddCloudPrivateIPConfigHandler adds a handler function that will be executed on CloudPrivateIPConfig object changes
func (wf *WatchFactory) AddCloudPrivateIPConfigHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) {
return wf.addHandler(CloudPrivateIPConfigType, "", nil, handlerFuncs, processExisting)
return wf.addHandler(CloudPrivateIPConfigType, "", nil, handlerFuncs, processExisting, defaultHandlerPriority)
}

// RemoveCloudPrivateIPConfigHandler removes an CloudPrivateIPConfig object event handler function
Expand All @@ -606,12 +638,12 @@ func (wf *WatchFactory) RemoveCloudPrivateIPConfigHandler(handler *Handler) {

// AddNamespaceHandler adds a handler function that will be executed on Namespace object changes
func (wf *WatchFactory) AddNamespaceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) {
return wf.addHandler(NamespaceType, "", nil, handlerFuncs, processExisting)
return wf.addHandler(NamespaceType, "", nil, handlerFuncs, processExisting, defaultHandlerPriority)
}

// AddFilteredNamespaceHandler adds a handler function that will be executed when Namespace objects that match the given filters change
func (wf *WatchFactory) AddFilteredNamespaceHandler(namespace string, sel labels.Selector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) {
return wf.addHandler(NamespaceType, namespace, sel, handlerFuncs, processExisting)
func (wf *WatchFactory) AddFilteredNamespaceHandler(namespace string, sel labels.Selector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error, priority uint32) (*Handler, error) {
return wf.addHandler(NamespaceType, namespace, sel, handlerFuncs, processExisting, priority)
}

// RemoveNamespaceHandler removes a Namespace object event handler function
Expand All @@ -620,13 +652,13 @@ func (wf *WatchFactory) RemoveNamespaceHandler(handler *Handler) {
}

// AddNodeHandler adds a handler function that will be executed on Node object changes
func (wf *WatchFactory) AddNodeHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) {
return wf.addHandler(NodeType, "", nil, handlerFuncs, processExisting)
func (wf *WatchFactory) AddNodeHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error, priority uint32) (*Handler, error) {
return wf.addHandler(NodeType, "", nil, handlerFuncs, processExisting, priority)
}

// AddFilteredNodeHandler dds a handler function that will be executed when Node objects that match the given label selector
func (wf *WatchFactory) AddFilteredNodeHandler(sel labels.Selector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) {
return wf.addHandler(NodeType, "", sel, handlerFuncs, processExisting)
return wf.addHandler(NodeType, "", sel, handlerFuncs, processExisting, defaultHandlerPriority)
}

// RemoveNodeHandler removes a Node object event handler function
Expand Down

0 comments on commit 82f6079

Please sign in to comment.