-
Notifications
You must be signed in to change notification settings - Fork 38.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Automated cherry pick of #104833 (1.22): Refine locking in API Priority and Fairness config controller #105049
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,6 +67,14 @@ const timeFmt = "2006-01-02T15:04:05.999" | |
// undesired becomes completely unused, all the config objects are | ||
// read and processed as a whole. | ||
|
||
// The funcs in this package follow the naming convention that the suffix | ||
// "Locked" means the relevant mutex must be locked at the start of each | ||
// call and will be locked upon return. For a configController, the | ||
// suffix "ReadLocked" stipulates a read lock while just "Locked" | ||
// stipulates a full lock. Absence of either suffix means that either | ||
// (a) the lock must NOT be held at call time and will not be held | ||
// upon return or (b) locking is irrelevant. | ||
|
||
// StartFunction begins the process of handling a request. If the | ||
// request gets queued then this function uses the given hashValue as | ||
// the source of entropy as it shuffle-shards the request into a | ||
|
@@ -124,10 +132,25 @@ type configController struct { | |
// requestWaitLimit comes from server configuration. | ||
requestWaitLimit time.Duration | ||
|
||
// watchTracker implements the necessary WatchTracker interface. | ||
WatchTracker | ||
|
||
// the most recent update attempts, ordered by increasing age. | ||
// Consumer trims to keep only the last minute's worth of entries. | ||
// The controller uses this to limit itself to at most six updates | ||
// to a given FlowSchema in any minute. | ||
// This may only be accessed from the one and only worker goroutine. | ||
mostRecentUpdates []updateAttempt | ||
|
||
// This must be locked while accessing flowSchemas or | ||
// priorityLevelStates. It is the lock involved in | ||
// LockingWriteMultiple. | ||
lock sync.Mutex | ||
// priorityLevelStates. A lock for writing is needed | ||
// for writing to any of the following: | ||
// - the flowSchemas field | ||
// - the slice held in the flowSchemas field | ||
// - the priorityLevelStates field | ||
// - the map held in the priorityLevelStates field | ||
// - any field of a priorityLevelState held in that map | ||
lock sync.RWMutex | ||
|
||
// flowSchemas holds the flow schema objects, sorted by increasing | ||
// numerical (decreasing logical) matching precedence. Every | ||
|
@@ -138,16 +161,6 @@ type configController struct { | |
// name to the state for that level. Every name referenced from a | ||
// member of `flowSchemas` has an entry here. | ||
priorityLevelStates map[string]*priorityLevelState | ||
|
||
// the most recent update attempts, ordered by increasing age. | ||
// Consumer trims to keep only the last minute's worth of entries. | ||
// The controller uses this to limit itself to at most six updates | ||
// to a given FlowSchema in any minute. | ||
// This may only be accessed from the one and only worker goroutine. | ||
mostRecentUpdates []updateAttempt | ||
|
||
// watchTracker implements the necessary WatchTracker interface. | ||
WatchTracker | ||
} | ||
|
||
type updateAttempt struct { | ||
|
@@ -281,8 +294,8 @@ func (cfgCtlr *configController) MaintainObservations(stopCh <-chan struct{}) { | |
} | ||
|
||
func (cfgCtlr *configController) updateObservations() { | ||
cfgCtlr.lock.Lock() | ||
defer cfgCtlr.lock.Unlock() | ||
cfgCtlr.lock.RLock() | ||
defer cfgCtlr.lock.RUnlock() | ||
for _, plc := range cfgCtlr.priorityLevelStates { | ||
if plc.queues != nil { | ||
plc.queues.UpdateObservations() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it correct that we're calling UpdateObservations() under a read lock? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. See the writes into the observers are not what's covered by the write locking here. |
||
|
@@ -765,8 +778,8 @@ func (immediateRequest) Finish(execute func()) bool { | |
// waiting in its queue, or `Time{}` if this did not happen. | ||
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) { | ||
klog.V(7).Infof("startRequest(%#+v)", rd) | ||
cfgCtlr.lock.Lock() | ||
defer cfgCtlr.lock.Unlock() | ||
cfgCtlr.lock.RLock() | ||
defer cfgCtlr.lock.RUnlock() | ||
var selectedFlowSchema, catchAllFlowSchema *flowcontrol.FlowSchema | ||
for _, fs := range cfgCtlr.flowSchemas { | ||
if matchesFlowSchema(rd, fs) { | ||
|
@@ -811,7 +824,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig | |
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues) | ||
req, idle := plState.queues.StartRequest(ctx, &rd.Width, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) | ||
if idle { | ||
cfgCtlr.maybeReapLocked(plName, plState) | ||
cfgCtlr.maybeReapReadLocked(plName, plState) | ||
} | ||
return selectedFlowSchema, plState.pl, false, req, startWaitingTime | ||
} | ||
|
@@ -820,8 +833,8 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig | |
// priority level if it has no more use. Call this after getting a | ||
// clue that the given priority level is undesired and idle. | ||
func (cfgCtlr *configController) maybeReap(plName string) { | ||
cfgCtlr.lock.Lock() | ||
defer cfgCtlr.lock.Unlock() | ||
cfgCtlr.lock.RLock() | ||
defer cfgCtlr.lock.RUnlock() | ||
plState := cfgCtlr.priorityLevelStates[plName] | ||
if plState == nil { | ||
klog.V(7).Infof("plName=%s, plState==nil", plName) | ||
|
@@ -843,7 +856,7 @@ func (cfgCtlr *configController) maybeReap(plName string) { | |
// it has no more use. Call this if both (1) plState.queues is | ||
// non-nil and reported being idle, and (2) cfgCtlr's lock has not | ||
// been released since then. | ||
func (cfgCtlr *configController) maybeReapLocked(plName string, plState *priorityLevelState) { | ||
func (cfgCtlr *configController) maybeReapReadLocked(plName string, plState *priorityLevelState) { | ||
if !(plState.quiescing && plState.numPending == 0) { | ||
return | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was wrongly added with the rebase, lemme fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, ignore above comment. I was looking at the cherrypick for older versions 1.21 and 1.20.