Permalink
Browse files

Prevent unsynchronized access to a MemoizedMRUCache

  • Loading branch information...
1 parent b48180d commit 9096d351c48fba5620c65bcb3683ebfadb00a2e2 @paulcbetts paulcbetts committed Aug 28, 2012
Showing with 28 additions and 26 deletions.
  1. +28 −26 ReactiveUI/ObservableAsyncMRUCache.cs
@@ -92,37 +92,39 @@ public sealed class ObservableAsyncMRUCache<TParam, TVal> : IEnableLogger
public IObservable<TVal> AsyncGet(TParam key)
{
IObservable<TVal> result;
- if (_innerCache.TryGet(key, out result)) {
- this.Log().Debug("Cache hit: '{0}'", key);
- return result;
- }
-
- int myCall = Interlocked.Increment(ref currentCall);
-
+ int myCall;
var rs = new ReplaySubject<TVal>();
- _callQueue.Where(x => x == myCall).Subscribe(_ => {
- this.Log().Debug("Dispatching '{0}'", key);
- IObservable<TVal> fetched = null;
- try {
- fetched = _fetcher(key);
- } catch (Exception ex) {
- _callQueue.Release();
- rs.OnError(ex);
- return;
+
+ lock (_innerCache) {
+ if (_innerCache.TryGet(key, out result)) {
+ this.Log().Debug("Cache hit: '{0}'", key);
+ return result;
}
- fetched.Subscribe(x => {
- rs.OnNext(x);
- }, ex => {
- _callQueue.Release();
- rs.OnError(ex);
- }, () => {
- _callQueue.Release();
- rs.OnCompleted();
+ myCall = Interlocked.Increment(ref currentCall);
+
+ _callQueue.Where(x => x == myCall).Subscribe(_ => {
+ this.Log().Debug("Dispatching '{0}'", key);
+ IObservable<TVal> fetched = null;
+ try {
+ fetched = _fetcher(key);
+ } catch (Exception ex) {
+ _callQueue.Release();
+ rs.OnError(ex);
+ return;
+ }
+
+ fetched.Subscribe(x => {
+ rs.OnNext(x);
+ }, ex => {
+ _callQueue.Release();
+ rs.OnError(ex);
+ }, () => {
+ _callQueue.Release();
+ rs.OnCompleted();
+ });
});
- });
- lock(_innerCache) {
_innerCache.Get(key, rs);
}

0 comments on commit 9096d35

Please sign in to comment.