-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
topdown: Lock per key cache access #5374
topdown: Lock per key cache access #5374
Conversation
44f3b57
to
c3a6f3c
Compare
✅ Deploy Preview for openpolicyagent ready!
To edit notification comments on pull requests, go to your Netlify site settings. |
c3a6f3c
to
8b327b0
Compare
✅ Deploy Preview for openpolicyagent ready!
To edit notification comments on pull requests, go to your Netlify site settings. |
8b327b0
to
b5f8f5a
Compare
Tested this with a Locust test run with two users: cc @asleire
|
b5f8f5a
to
36cc021
Compare
Are you using my sample policy and locust script? Your output shows many cache inserts per second, I would expect at most 2 cache inserts per second? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments on the code, but they're superficial -- I'm afraid I didn't get to it while still awake enough. 😬 Sorry!
topdown/cache/cache.go
Outdated
value, _ := m.LoadOrStore(k, &sync.Mutex{}) | ||
mtx := value.(*sync.Mutex) | ||
mtx.Lock() | ||
return func() { mtx.Unlock() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] would this do the trick, too?
return func() { mtx.Unlock() } | |
return mtx.Unlock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -165,3 +171,14 @@ func (c *cache) maxSizeBytes() int64 { | |||
} | |||
return *c.config.InterQueryBuiltinCache.MaxSizeBytes | |||
} | |||
|
|||
type keyMutex struct { | |||
sync.Map |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not that I don't trust your judgement, it's just that if this design was informed by some specific piece of documentation, or anything, I'd appreciate if we recorded that 🤔 (To put it in another way: I'm not sure I would trust myself coming up with this 🙃)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess what I'm wondering about is that this feels like we're locking twice. We're storing Mutexes in a Mutex-protected Map.
The Map type is optimized for two common use cases:
(1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or
(2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate Mutex or RWMutex.
So, wondering which of these would be the case here.... once we've added a Mutex for a specific k
, it'll never be overwritten. So I guess we're in (1) territory.
But I'm not entirely sure, since we're also caching the results of http.send, and this is just the sync map guarding their re-fetches... we're basically forcing each http.send call with the same arguments (k
) into a line: only of one them can ever be evaluated at the same time. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was to lock per key access and give the caller the ability to unlock when done. I started with a map + lock/unlock and then discovered sync.Map
which seemed to work. I still trying to figure out how to test this but there isn't much in terms of documentation to point to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're basically forcing each http.send call with the same arguments (k) into a line: only of one them can ever be evaluated at the same time.
This should affect concurrent requests for the same key. Are you thinking of any side-effect this may have.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't that what were doing? I can only imagine this making things better, since the second one in line will get an immediate response when the first one is done. And every subsequent one, too, until the cage entry is stale... But I might be missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't that what were doing?
We were protecting the cache implementation itself from concurrent access. But there could be concurrent calls to http.send wherein it's possible the first writes an entry to the cache for key x
and the second does not see that and adds an entry for x
too in the cache instead of just fetching it.
topdown/http.go
Outdated
func (c *intraQueryCache) LockCacheKey() func() { | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (c *intraQueryCache) LockCacheKey() func() { | |
return nil | |
} | |
func (*intraQueryCache) LockCacheKey() func() { | |
return func() {} | |
} |
this way, we could just call it, without ever wondering about != nil
. But that's nitpicking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
topdown/cache/cache.go
Outdated
@@ -134,6 +139,7 @@ func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue) (dropped int) | |||
dropKey := key.Value.(ast.Value) | |||
c.unsafeDelete(dropKey) | |||
c.l.Remove(key) | |||
c.keyMtx.Delete(dropKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this to remove a mutex that is currently locked by an in-progress request, allowing a new concurrent request with the same key to take place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I'll remove this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Outright removing this line means keyMtx
will increase in size indefinitely though?
What if you TryLock the key before dropping it? If you acquire the lock you can safely remove it, if you don't then it's unsafe and you could try to remove the next element from c.l
instead. If it's impossible to remove enough cache items to get below the limit then that means every currently cached item is actively accessed. In this case I think it's better to simply not insert the current response into cache
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you acquire the lock you can safely remove it
Actually that's not true. Another goroutine could be waiting for a lock on the mutex as it's removed. I think you'd have to lock the entire keyMtx
dictionary as well, to prevent other goroutines from retrieving the mutex you're about to remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A simpler solution may be to keep this line in, but include my fix at #5361 to make sure the cache size stays correct in the case of a concurrent request
40f2ba3
to
b84df91
Compare
} | ||
|
||
wg.Wait() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Figuring out how to resolve https://github.com/open-policy-agent/opa/actions/runs/3432729846/jobs/5722316001
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated test.
b84df91
to
abb8075
Compare
This change adds a lock on per key cache access to handle cases where concurrent queries could result in the same key being unnecessarily overwritten multiple times and fetched from the server even though a fresh entry existed in the cache. Fixes: open-policy-agent#5359 Signed-off-by: Ashutosh Narkar <anarkar4387@gmail.com>
abb8075
to
9a40958
Compare
} | ||
|
||
func (m *keyMutex) Lock(k ast.Value) func() { | ||
value, _ := m.LoadOrStore(k, &sync.Mutex{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A sync.Mutex
probably isn't large, but this means that for each k
ever encountered, we'll store one. And we'll never drop it. So if some long-running OPA instances uses, say
allow if http.send({"url": "https://foo", "method": "POST", "headers": {"now": time.now_ns() }}}.body.allowed
we would keep accumulating these. Is that a problem? 💭
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means that any OPA instance that does unique requests over time will eventually run out of memory. You also need to consider the size of the cache key itself.
In my company's case, we're using oauth bearer tokens in our requests from OPA. These are just below 1kb, and are renewed every now and then
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the cache key size the impact of not dropping keys from the cache should be low I imagine but there could certainly be a case where a long running OPA could eventually OOM due to it.
The original change was:
for key := c.l.Front(); key != nil && (c.usage+size > limit); key = c.l.Front() {
dropKey := key.Value.(ast.Value)
c.unsafeDelete(dropKey)
c.l.Remove(key)
c.keyMtx.Delete(dropKey) // <--------this is change
dropped++
}
@asleire raised the question: Can this to remove a mutex that is currently locked by an in-progress request, allowing a new concurrent request with the same key to take place?
So let's say there are 2 concurrent requests for the key foo
. Both routines grab the value (ie. sync.Mutex
) from the sync.Map
and one of them takes the lock. Then this routine say while inserting a value for key foo
in the cache ends up removing an existing stale value for foo
from the cache and also removes key foo
from the sync.Map
. But the other routine already is blocked on the mutex so it should get unblocked once the 1st go routine finishes its work. In this case deleting key foo
from the sync.Map
should be ok.
The edge case here is if the 1st routine deletes key foo
from the sync.Map
before the 2nd could read it and if that happens we've got the original issue again.
One solution @asleire suggested is to reduce the cache size as in this change.
Another solution would be not to remove the current key from the sync.Map
. Something like:
func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue) (dropped int) {
size := v.SizeInBytes()
limit := c.maxSizeBytes()
if limit > 0 {
if size > limit {
dropped++
return dropped
}
for key := c.l.Front(); key != nil && (c.usage+size > limit); key = c.l.Front() {
dropKey := key.Value.(ast.Value)
c.unsafeDelete(dropKey)
c.l.Remove(key)
if dropKey.Compare(k) != 0 {
c.keyMtx.Delete(dropKey)
}
dropped++
}
}
c.items[k.String()] = v
c.l.PushBack(k)
c.usage += size
return dropped
}
The edge case here would be a concurrent request for dropKey
.
Any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking of the edge case where 3 request occur near-simultaneously:
1 request with key foo
2 requests with key bar
In this scenario the cache is already full.
foo
request is completed, and the request is about to insert into cache- the first
bar
request is about to start, the request gets a lock onkeyMtx(foo)
foo
request has to delete entries from the cache, it removes the mutexkeyMtx(foo)
- the second
bar
request is about to start, sincekeyMtx
doesnt contain a mutex forfoo
it creates a new mutex and locks it - both
bar
requests are now running concurrently and upon cache insertion they will both increase theusage
counter, we have the same problem as we're trying to fix
if dropKey.Compare(k) != 0 {
c.keyMtx.Delete(dropKey)
}
This proposed fix doesn't fix the scenario I've described
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The edge case here would be a concurrent request for dropKey.
Yes that's the edge case I was talking about ⬆️ .
@srenatus @ashutosh-narkar One concern I'll raise in this thread is the risk of double-caching HTTP requests when the The inter-query cache is expected to burn a lot of memory over time, but we could OOM dramatically sooner when The way I'd suggest we work around this is by carefully ensuring that the same pointer-to-a-Term is used for the NDBCache and for the inter-query cache's stored values. Then the caches will transparently be sharing the underlying value in memory, and we pay the caching cost once. |
@asleire I would appreciate if you could add unit tests for your PR like this one and also add a comment in the code explaining the rationale behind the change. Thanks! |
Fixes: #5359
Signed-off-by: Ashutosh Narkar anarkar4387@gmail.com