refactor: make dispatcher locking more granular#4958
refactor: make dispatcher locking more granular#4958ultrotter merged 4 commits intoprometheus:mainfrom
Conversation
Spaceman1701
left a comment
There was a problem hiding this comment.
It'd be helpful to include some benchmarks.
|
|
||
| maintenanceInterval time.Duration | ||
| done chan struct{} | ||
| finished sync.WaitGroup |
There was a problem hiding this comment.
why change this to a wait group? I think the done channel is a little more idiomatic.
There was a problem hiding this comment.
Right, I thought I needed it to deal with the lock in Stop() but it seems I don't. I'll add a commit to fix this and also make LoadingDone() more efficient
There was a problem hiding this comment.
Actually I tried to change it back to a channel and it was problematic: before Stop() took the dispatcher lock, and all was safe, but now it has to be idempotent, and also not listen on the done channel if it'll never be closed. While it's arguably possible to achieve with the state atomic pointer and switch cases, it becomes a bit convoluted and relies on assumptions on how we use the dispatcher outside of the module itself. As such I would prefer a waitgroup, and not having to worry, especially as it's not used that often.
There was a problem hiding this comment.
I managed to change the loaded signal to a channel, but for now I left finished as a waitgroup for the reasons above
| return nil, nil, ctx.Err() | ||
| case <-d.LoadingDone(): | ||
| } | ||
| d.WaitForLoading() |
There was a problem hiding this comment.
I think we still need this
There was a problem hiding this comment.
I don't think we need it: case <-d.LoadingDone(): waits on the group then closes the channel, and we are waiting for the channel to be closed. once it's closed it's strictly after WaitForLoading passes... since the channel would not be closed otherwise.
There was a problem hiding this comment.
I agree with Guido here 👍
There was a problem hiding this comment.
Ah, I see, yeah you're right
I will work one some, I have some work in progress ones that show no penalty, but only a very ad-hoc one that shows the gain... Anyway they are only meaningful on the whole patch series, not this change alone |
siavashs
left a comment
There was a problem hiding this comment.
LGTM, based on your comment benchmarks will be added later.
| return nil, nil, ctx.Err() | ||
| case <-d.LoadingDone(): | ||
| } | ||
| d.WaitForLoading() |
There was a problem hiding this comment.
I agree with Guido here 👍
Spaceman1701
left a comment
There was a problem hiding this comment.
Left a few comments, but they're small stylistic things that don't need to block merging this change
| return nil, nil, ctx.Err() | ||
| case <-d.LoadingDone(): | ||
| } | ||
| d.WaitForLoading() |
There was a problem hiding this comment.
Ah, I see, yeah you're right
| routeGroups = map[model.Fingerprint]*aggrGroup{} | ||
| d.aggrGroupsPerRoute[route] = routeGroups | ||
| } | ||
| d.routeGroupsSlice[route.Idx].mtx.Lock() |
There was a problem hiding this comment.
small style nitpick: any chance you could reduce the number of times we reference the group via d.routeGroupsSlice[route.Idx]? I think it'd be a little more clear if this code assigned the slice to a variable internally:
groups := d.routeGroupsSlice[route.Idx]
groups.mtx.Lock()
defer groups.mtx.Unlock()
...The way it's written now makes it a little unclear that it's only legal to access the groups for route.Idx.
| d.aggrGroupsNum++ | ||
| d.metrics.aggrGroups.Inc() | ||
| d.routeGroupsSlice[route.Idx].groups[fp] = ag | ||
| d.aggrGroupsNum.Add(1) |
There was a problem hiding this comment.
it's probably a good idea to add a comment somewhere explaining why it's ok for aggrGroupsNum to be an atomic rather than sharing a lock with group creation. I know we talked about this offline (and the conclusion was that the aggrGroup limit becomes the configured value + the number of dispatcher threads), but it's not documented anywhere (that I see) for the world.
859dc10 to
29274d3
Compare
… the tree is always the top index This allows us to have a count for each route, and to be able to move the route indexing from "pointer to the route object" to its integer index. Signed-off-by: Guido Trotter <guido@hudson-trading.com>
…t is only used internally for the recursion) Signed-off-by: Guido Trotter <guido@hudson-trading.com>
- Add a new stopped state - Remove the mtx lock - Change aggrGroupsPerRoute map to a new preallocated slice - Change aggrGroupsNum to an atomic int - Change done chan to a waitgroup - Change state to atomic Add a new type holding the lock at the map fingerprint to aggrgroup level (tbd if we can make this a sync.Map) Preallocate the route slice and its content objects Remove the WaitForLoading call in Groups, which is redundant after LoadingDone. Remove copying the immutable slice in Groups, now only copy the inside maps, holding their individual lock. This also saves copying if there is a route filter, which can be checked without holding a lock. Simplify Stop() to not need a lock by storing the state atomically then calling cancel(). Stop is safe to call more times since cancel() is safe to call more times, and waiting on the finished channel can be done more times, unlike listening on the done chan. In groupAlert only hold the lock for the current route slice. Note that the limit check is done holding only this lock. This is safe now as only one alert can be ingested at a time, but if we enable parallel alert ingestion with N workers, we may overshoot the limit by N. We deem this to be ok as N is smaller than the number of workers, and the limit is a safety to avoid too many AGs, not something that will substantially break between e.g. 1000 and 1016. We will update the documentation about the limit when we will make the ingestion parallel. We could use Add() then check, and then undershoot the limit, or CompareAndSwap and retry but that's making performance worse. Dispatch tests needed fixes to add the Idx in the manually created Route, and to not pass a nil Route. The way the maintenance test populates the aggrgroup also changes with the new system. Signed-off-by: Guido Trotter <guido@hudson-trading.com>
Since the waitgroup had Add(1) in New, and then was Done() during Run(), it changes nothing to instead create the channel in New, and close it during Run(). This avoid creating a separate goroutine to listen to its closure during Groups. Signed-off-by: Guido Trotter <guido@hudson-trading.com>
29274d3 to
b8da307
Compare
We first add a unique index to each route, and test that it's calculated correctly.
Then we pre-allocate the route to aggrgroup map, and actually use a slice instead.
Finally we can remove the lock, changing a few more things so that they are atomic.