diff --git a/Akavache.Sqlite3/Akavache.Sqlite3_Net45.csproj b/Akavache.Sqlite3/Akavache.Sqlite3_Net45.csproj index e11f7387..195b5be8 100644 --- a/Akavache.Sqlite3/Akavache.Sqlite3_Net45.csproj +++ b/Akavache.Sqlite3/Akavache.Sqlite3_Net45.csproj @@ -72,6 +72,7 @@ + @@ -93,4 +94,4 @@ --> - + \ No newline at end of file diff --git a/Akavache.Sqlite3/Akavache.Sqlite3_WinRT.csproj b/Akavache.Sqlite3/Akavache.Sqlite3_WinRT.csproj index eeb51897..1a225193 100644 --- a/Akavache.Sqlite3/Akavache.Sqlite3_WinRT.csproj +++ b/Akavache.Sqlite3/Akavache.Sqlite3_WinRT.csproj @@ -41,6 +41,7 @@ bin\Release\WinRT45\Akavache.Sqlite3.xml + @@ -94,4 +95,4 @@ --> - + \ No newline at end of file diff --git a/Akavache.Sqlite3/AsyncReaderWriterLock.cs b/Akavache.Sqlite3/AsyncReaderWriterLock.cs new file mode 100644 index 00000000..4cc82cdc --- /dev/null +++ b/Akavache.Sqlite3/AsyncReaderWriterLock.cs @@ -0,0 +1,83 @@ +using System; +using System.Reactive; +using System.Reactive.Disposables; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading; +using System.Threading.Tasks; + +namespace Akavache.Sqlite3 +{ + sealed class AsyncReaderWriterLock + { + bool isShutdown; + readonly TaskFactory readScheduler; + readonly TaskFactory writeScheduler; + + public AsyncReaderWriterLock() + { + var pair = new ConcurrentExclusiveSchedulerPair(); + + readScheduler = new TaskFactory( + CancellationToken.None, TaskCreationOptions.LongRunning, TaskContinuationOptions.None, pair.ConcurrentScheduler); + writeScheduler = new TaskFactory( + CancellationToken.None, TaskCreationOptions.LongRunning, TaskContinuationOptions.None, pair.ExclusiveScheduler); + } + + public IObservable AcquireRead() + { + return AcquireOnScheduler(readScheduler); + } + + public IObservable AcquireWrite() + { + return AcquireOnScheduler(writeScheduler); + } + + AsyncSubject shutdownResult; + public IObservable Shutdown() + { + if (shutdownResult != null) return shutdownResult; + + shutdownResult = new AsyncSubject(); + + // NB: Just grab the write lock to shut down + var writeFuture = AcquireWrite(); + isShutdown = true; + + var ret = writeFuture + .Select(x => { x.Dispose(); return Unit.Default; }) + .Multicast(shutdownResult); + ret.Connect(); + + return ret; + } + + IObservable AcquireOnScheduler(TaskFactory sched) + { + if (isShutdown) return Observable.Throw(new ObjectDisposedException("AsyncReaderWriterLock")); + + var ret = new AsyncSubject(); + var gate = new AsyncSubject(); + + sched.StartNew(() => + { + // NB: At this point we know that we are currently executing on the + // scheduler (i.e. if this was the exclusive scheduler, we know that + // all the readers have been thrown out) + var disp = Disposable.Create(() => { gate.OnNext(Unit.Default); gate.OnCompleted(); }); + ret.OnNext(disp); + ret.OnCompleted(); + + // Trashing the returned Disposable will unlock this gate + // NB: Why not await? Holding this task alive will ensure that + // we don't release the exclusive half of the pair. If we await, + // the task exits until the gate is signalled. That means that + // the scheduler is free when it shouldn't be. + gate.Wait(); + }); + + return ret; + } + } +} diff --git a/Akavache.Sqlite3/SQLiteAsync.cs b/Akavache.Sqlite3/SQLiteAsync.cs index b4af0a6a..d21a5193 100644 --- a/Akavache.Sqlite3/SQLiteAsync.cs +++ b/Akavache.Sqlite3/SQLiteAsync.cs @@ -276,15 +276,7 @@ public IObservable EnqueueConnectionOp(Func operation var makeRq = Observable.Defer(() => Observable.Start(() => operation(conn.Connection), BlobCache.TaskpoolScheduler)); - return opQueue.EnqueueObservableOperation(idx.ToString(), () => - makeRq.RetryWithBackoffStrategy(tableLockRetries, retryOnError: ex => - { - var sqlex = ex as SQLiteException; - if (sqlex == null) - return false; - - return (sqlex.Result == SQLite3.Result.Locked || sqlex.Result == SQLite3.Result.Busy); - })); + return opQueue.EnqueueObservableOperation(idx.ToString(), () => makeRq); } /// @@ -406,5 +398,4 @@ public static class RetryWithBackoffMixin : Observable.Throw(t.Item3)); } } -} - +} \ No newline at end of file diff --git a/Akavache.Sqlite3/SqlitePersistentBlobCache.cs b/Akavache.Sqlite3/SqlitePersistentBlobCache.cs index ba71ae31..ca310032 100644 --- a/Akavache.Sqlite3/SqlitePersistentBlobCache.cs +++ b/Akavache.Sqlite3/SqlitePersistentBlobCache.cs @@ -8,6 +8,7 @@ using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Reactive.Subjects; +using System.Reactive.Threading.Tasks; using System.Reflection; using Newtonsoft.Json; using Newtonsoft.Json.Bson; @@ -28,6 +29,7 @@ public class SqlitePersistentBlobCache : IObjectBlobCache, IObjectBulkBlobCache, readonly MemoizingMRUCache> _inflightCache; readonly IObservable _initializer; + readonly AsyncReaderWriterLock tableLock = new AsyncReaderWriterLock(); bool disposed = false; public SqlitePersistentBlobCache(string databaseFile, IScheduler scheduler = null) @@ -42,8 +44,13 @@ public SqlitePersistentBlobCache(string databaseFile, IScheduler scheduler = nul _inflightCache = new MemoizingMRUCache>((key, ce) => { + // NB: We nest the SelectMany here to prevent us from taking + // the read lock, then proceeding in some scenarios to take + // the write lock, meaning we'd deadlock ourselves return _initializer - .SelectMany(_ => Connection.QueryAsync("SELECT * FROM CacheElement WHERE Key=? LIMIT 1;", key)) + .SelectMany(_ => Observable.Return(Unit.Default) + .SelectManyWithRead(tableLock, __ => + Connection.QueryAsync("SELECT * FROM CacheElement WHERE Key=? LIMIT 1;", key))) .SelectMany(x => { return (x.Count == 1) ? Observable.Return(x[0]) : ObservableThrowKeyNotFoundException(key); @@ -52,7 +59,9 @@ public SqlitePersistentBlobCache(string databaseFile, IScheduler scheduler = nul { if (x.Expiration < Scheduler.Now.UtcDateTime) { - return Invalidate(key).SelectMany(_ => ObservableThrowKeyNotFoundException(key)); + return Observable.Return(Unit.Default) + .SelectManyWithWrite(tableLock, _ => Invalidate(key)) + .SelectMany(_ => ObservableThrowKeyNotFoundException(key)); } else { @@ -80,7 +89,7 @@ public IObservable Insert(string key, byte[] data, DateTimeOffset? absolut var ret = _initializer .SelectMany(_ => BeforeWriteToDiskFilter(data, Scheduler)) .Do(x => element.Value = x) - .SelectMany(x => Connection.InsertAsync(element, "OR REPLACE", typeof(CacheElement)).Select(_ => Unit.Default)) + .SelectManyWithWrite(tableLock, x => Connection.InsertAsync(element, "OR REPLACE", typeof(CacheElement)).Select(_ => Unit.Default)) .Multicast(new AsyncSubject()); ret.Connect(); @@ -111,7 +120,7 @@ public IObservable Insert(IDictionary keyValuePairs, DateT var ret = encryptAllTheData .SelectMany(_ => _initializer) - .SelectMany(_ => Connection.InsertAllAsync(elements, "OR REPLACE").Select(__ => Unit.Default)) + .SelectManyWithWrite(tableLock, _ => Connection.InsertAllAsync(elements, "OR REPLACE").Select(__ => Unit.Default)) .Multicast(new AsyncSubject()); ret.Connect(); @@ -136,7 +145,9 @@ public IObservable GetAsync(string key) string questionMarks = String.Join(",", keys.Select(_ => "?")); return _initializer - .SelectMany(_ => Connection.QueryAsync(String.Format("SELECT * FROM CacheElement WHERE Key IN ({0});", questionMarks), keys.ToArray())) + .SelectMany(_ => Observable.Return(Unit.Default) + .SelectManyWithRead(tableLock, __ => + Connection.QueryAsync(String.Format("SELECT * FROM CacheElement WHERE Key IN ({0});", questionMarks), keys.ToArray()))) .SelectMany(xs => { var invalidXs = xs.Where(x => x.Expiration < Scheduler.Now.UtcDateTime).ToList(); @@ -162,7 +173,7 @@ public IObservable> GetAllKeys() var now = BlobCache.TaskpoolScheduler.Now.UtcTicks; return _initializer - .SelectMany(_ => Connection.QueryAsync("SELECT Key FROM CacheElement WHERE Expiration >= ?;", now)) + .SelectManyWithRead(tableLock, _ => Connection.QueryAsync("SELECT Key FROM CacheElement WHERE Expiration >= ?;", now)) .Select(x => x.Select(y => y.Key).ToList()); } @@ -185,7 +196,9 @@ public IObservable> GetAllKeys() var questionMarks = String.Join(",", keys.Select(_ => "?")); return _initializer - .SelectMany(_ => Connection.QueryAsync(String.Format("SELECT * FROM CacheElement WHERE Key IN ({0});", questionMarks), keys.ToArray())) + .SelectMany(_ => Observable.Return(Unit.Default) + .SelectManyWithRead(tableLock, __ => + Connection.QueryAsync(String.Format("SELECT * FROM CacheElement WHERE Key IN ({0});", questionMarks), keys.ToArray()))) .SelectMany(xs => { var invalidXs = xs.Where(x => x.Expiration < Scheduler.Now.UtcDateTime).ToList(); @@ -210,7 +223,7 @@ public IObservable Invalidate(string key) { if (disposed) throw new ObjectDisposedException("SqlitePersistentBlobCache"); lock(_inflightCache) _inflightCache.Invalidate(key); - return _initializer.SelectMany(__ => + return _initializer.SelectManyWithWrite(tableLock, __ => Connection.ExecuteAsync("DELETE FROM CacheElement WHERE Key=?;", key).Select(_ => Unit.Default)); } @@ -220,7 +233,7 @@ public IObservable Invalidate(IEnumerable keys) lock (_inflightCache) foreach (var v in keys) { _inflightCache.Invalidate(v); } var questionMarks = String.Join(",", keys.Select(_ => "?")); - return _initializer.SelectMany(__ => + return _initializer.SelectManyWithWrite(tableLock, __ => Connection.ExecuteAsync(String.Format("DELETE FROM CacheElement WHERE Key IN ({0});", questionMarks), keys.ToArray()).Select(_ => Unit.Default)); } @@ -228,8 +241,8 @@ public IObservable InvalidateAll() { if (disposed) throw new ObjectDisposedException("SqlitePersistentBlobCache"); lock(_inflightCache) _inflightCache.InvalidateAll(); - return _initializer.SelectMany(__ => - Connection.ExecuteAsync("DELETE FROM CacheElement;").Select(_ => Unit.Default)); + return _initializer.SelectManyWithWrite(tableLock, _ => + Connection.ExecuteAsync("DELETE FROM CacheElement;").Select(__ => Unit.Default)); } public IObservable InsertObject(string key, T value, DateTimeOffset? absoluteExpiration = null) @@ -251,7 +264,7 @@ public IObservable InsertObject(string key, T value, DateTimeOffset? ab var ret = _initializer .SelectMany(_ => BeforeWriteToDiskFilter(data, Scheduler)) .Do(x => element.Value = x) - .SelectMany(x => Connection.InsertAsync(element, "OR REPLACE", typeof(CacheElement)).Select(_ => Unit.Default)) + .SelectManyWithWrite(tableLock, x => Connection.InsertAsync(element, "OR REPLACE", typeof(CacheElement)).Select(_ => Unit.Default)) .Multicast(new AsyncSubject()); ret.Connect(); @@ -285,7 +298,7 @@ public IObservable InsertObjects(IDictionary keyValuePairs, .Select(y => { x.Value = y; return x; })) .Merge(4) .ToList() - .SelectMany(x => Connection.InsertAllAsync(x, "OR REPLACE").Select(_ => Unit.Default)) + .SelectManyWithWrite(tableLock, x => Connection.InsertAllAsync(x, "OR REPLACE").Select(_ => Unit.Default)) .Multicast(new AsyncSubject()); return _initializer.SelectMany(_ => ret.PermaRef()); @@ -311,7 +324,9 @@ public IObservable GetObjectAsync(string key, bool noTypePrefix = false) string questionMarks = String.Join(",", keys.Select(_ => "?")); return _initializer - .SelectMany(_ => Connection.QueryAsync(String.Format("SELECT * FROM CacheElement WHERE Key IN ({0});", questionMarks), keys.ToArray())) + .SelectMany(_ => Observable.Return(Unit.Default) + .SelectManyWithRead(tableLock, __ => + Connection.QueryAsync(String.Format("SELECT * FROM CacheElement WHERE Key IN ({0});", questionMarks), keys.ToArray()))) .SelectMany(xs => { var invalidXs = xs.Where(x => x.Expiration < Scheduler.Now.UtcDateTime).ToList(); @@ -342,7 +357,7 @@ public IObservable> GetAllObjects() if (disposed) return Observable.Throw>(new ObjectDisposedException("SqlitePersistentBlobCache")); return _initializer - .SelectMany(_ => Connection.QueryAsync("SELECT * FROM CacheElement WHERE TypeName=?;", typeof(T).FullName)) + .SelectManyWithRead(tableLock, _ => Connection.QueryAsync("SELECT * FROM CacheElement WHERE TypeName=?;", typeof(T).FullName)) .SelectMany(x => x.ToObservable()) .SelectMany(x => AfterReadFromDiskFilter(x.Value, Scheduler)) .SelectMany(DeserializeObject) @@ -365,7 +380,7 @@ public IObservable InvalidateAllObjects() { if (disposed) throw new ObjectDisposedException("SqlitePersistentBlobCache"); return _initializer - .SelectMany(_ => Connection.ExecuteAsync("DELETE FROM CacheElement WHERE TypeName=?;", typeof(T).FullName)) + .SelectManyWithWrite(tableLock, _ => Connection.ExecuteAsync("DELETE FROM CacheElement WHERE TypeName=?;", typeof(T).FullName)) .Select(_ => Unit.Default); } @@ -375,7 +390,7 @@ public IObservable Vacuum() var nowTime = BlobCache.TaskpoolScheduler.Now.UtcTicks; return _initializer - .SelectMany(_ => Connection.ExecuteAsync("DELETE FROM CacheElement WHERE Expiration < ?;", nowTime)) + .SelectManyWithWrite(tableLock, _ => Connection.ExecuteAsync("DELETE FROM CacheElement WHERE Expiration < ?;", nowTime)) .SelectMany(_ => Observable.Defer(() => Connection.ExecuteAsync("VACUUM;", nowTime).Retry(3))) .Select(_ => Unit.Default); } @@ -527,6 +542,21 @@ static IObservable ObservableThrowKeyNotFoundException(string key, } } + static class SelectManyLockExtensions + { + public static IObservable SelectManyWithRead(this IObservable This, AsyncReaderWriterLock opLock, Func> selector) + { + return Observable.Using(ct => opLock.AcquireRead().ToTask(), + (_, __) => Task.FromResult(This.SelectMany(selector))); + } + + public static IObservable SelectManyWithWrite(this IObservable This, AsyncReaderWriterLock opLock, Func> selector) + { + return Observable.Using(ct => opLock.AcquireWrite().ToTask(), + (_, __) => Task.FromResult(This.SelectMany(selector))); + } + } + class CacheElement { [PrimaryKey]