Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No more locking Oopses #134

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion Akavache.Sqlite3/Akavache.Sqlite3_Net45.csproj
Expand Up @@ -72,6 +72,7 @@
<Reference Include="WindowsBase" />
</ItemGroup>
<ItemGroup>
<Compile Include="AsyncReaderWriterLock.cs" />
<Compile Include="EncryptedBlobCache.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Registrations.cs" />
Expand All @@ -93,4 +94,4 @@
<Target Name="AfterBuild">
</Target>
-->
</Project>
</Project>
3 changes: 2 additions & 1 deletion Akavache.Sqlite3/Akavache.Sqlite3_WinRT.csproj
Expand Up @@ -41,6 +41,7 @@
<DocumentationFile>bin\Release\WinRT45\Akavache.Sqlite3.xml</DocumentationFile>
</PropertyGroup>
<ItemGroup>
<Compile Include="AsyncReaderWriterLock.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Registrations.cs" />
<Compile Include="SQLite.cs" />
Expand Down Expand Up @@ -94,4 +95,4 @@
<Target Name="AfterBuild">
</Target>
-->
</Project>
</Project>
83 changes: 83 additions & 0 deletions Akavache.Sqlite3/AsyncReaderWriterLock.cs
@@ -0,0 +1,83 @@
using System;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;

namespace Akavache.Sqlite3
{
sealed class AsyncReaderWriterLock
{
bool isShutdown;
readonly TaskFactory readScheduler;
readonly TaskFactory writeScheduler;

public AsyncReaderWriterLock()
{
var pair = new ConcurrentExclusiveSchedulerPair();

readScheduler = new TaskFactory(
CancellationToken.None, TaskCreationOptions.LongRunning, TaskContinuationOptions.None, pair.ConcurrentScheduler);
writeScheduler = new TaskFactory(
CancellationToken.None, TaskCreationOptions.LongRunning, TaskContinuationOptions.None, pair.ExclusiveScheduler);
}

public IObservable<IDisposable> AcquireRead()
{
return AcquireOnScheduler(readScheduler);
}

public IObservable<IDisposable> AcquireWrite()
{
return AcquireOnScheduler(writeScheduler);
}

AsyncSubject<Unit> shutdownResult;
public IObservable<Unit> Shutdown()
{
if (shutdownResult != null) return shutdownResult;

shutdownResult = new AsyncSubject<Unit>();

// NB: Just grab the write lock to shut down
var writeFuture = AcquireWrite();
isShutdown = true;

var ret = writeFuture
.Select(x => { x.Dispose(); return Unit.Default; })
.Multicast(shutdownResult);
ret.Connect();

return ret;
}

IObservable<IDisposable> AcquireOnScheduler(TaskFactory sched)
{
if (isShutdown) return Observable.Throw<IDisposable>(new ObjectDisposedException("AsyncReaderWriterLock"));

var ret = new AsyncSubject<IDisposable>();
var gate = new AsyncSubject<Unit>();

sched.StartNew(() =>
{
// NB: At this point we know that we are currently executing on the
// scheduler (i.e. if this was the exclusive scheduler, we know that
// all the readers have been thrown out)
var disp = Disposable.Create(() => { gate.OnNext(Unit.Default); gate.OnCompleted(); });
ret.OnNext(disp);
ret.OnCompleted();

// Trashing the returned Disposable will unlock this gate
// NB: Why not await? Holding this task alive will ensure that
// we don't release the exclusive half of the pair. If we await,
// the task exits until the gate is signalled. That means that
// the scheduler is free when it shouldn't be.
gate.Wait();
});

return ret;
}
}
}
13 changes: 2 additions & 11 deletions Akavache.Sqlite3/SQLiteAsync.cs
Expand Up @@ -276,15 +276,7 @@ public IObservable<T> EnqueueConnectionOp<T>(Func<SQLiteConnection, T> operation
var makeRq = Observable.Defer(() =>
Observable.Start(() => operation(conn.Connection), BlobCache.TaskpoolScheduler));

return opQueue.EnqueueObservableOperation(idx.ToString(), () =>
makeRq.RetryWithBackoffStrategy(tableLockRetries, retryOnError: ex =>
{
var sqlex = ex as SQLiteException;
if (sqlex == null)
return false;

return (sqlex.Result == SQLite3.Result.Locked || sqlex.Result == SQLite3.Result.Busy);
}));
return opQueue.EnqueueObservableOperation(idx.ToString(), () => makeRq);
}

/// <summary>
Expand Down Expand Up @@ -406,5 +398,4 @@ public static class RetryWithBackoffMixin
: Observable.Throw<T>(t.Item3));
}
}
}

}
64 changes: 47 additions & 17 deletions Akavache.Sqlite3/SqlitePersistentBlobCache.cs
Expand Up @@ -8,6 +8,7 @@
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Reflection;
using Newtonsoft.Json;
using Newtonsoft.Json.Bson;
Expand All @@ -28,6 +29,7 @@ public class SqlitePersistentBlobCache : IObjectBlobCache, IObjectBulkBlobCache,

readonly MemoizingMRUCache<string, IObservable<CacheElement>> _inflightCache;
readonly IObservable<Unit> _initializer;
readonly AsyncReaderWriterLock tableLock = new AsyncReaderWriterLock();
bool disposed = false;

public SqlitePersistentBlobCache(string databaseFile, IScheduler scheduler = null)
Expand All @@ -42,8 +44,13 @@ public SqlitePersistentBlobCache(string databaseFile, IScheduler scheduler = nul

_inflightCache = new MemoizingMRUCache<string, IObservable<CacheElement>>((key, ce) =>
{
// NB: We nest the SelectMany here to prevent us from taking
// the read lock, then proceeding in some scenarios to take
// the write lock, meaning we'd deadlock ourselves
return _initializer
.SelectMany(_ => Connection.QueryAsync<CacheElement>("SELECT * FROM CacheElement WHERE Key=? LIMIT 1;", key))
.SelectMany(_ => Observable.Return(Unit.Default)
.SelectManyWithRead(tableLock, __ =>
Connection.QueryAsync<CacheElement>("SELECT * FROM CacheElement WHERE Key=? LIMIT 1;", key)))
.SelectMany(x =>
{
return (x.Count == 1) ? Observable.Return(x[0]) : ObservableThrowKeyNotFoundException(key);
Expand All @@ -52,7 +59,9 @@ public SqlitePersistentBlobCache(string databaseFile, IScheduler scheduler = nul
{
if (x.Expiration < Scheduler.Now.UtcDateTime)
{
return Invalidate(key).SelectMany(_ => ObservableThrowKeyNotFoundException(key));
return Observable.Return(Unit.Default)
.SelectManyWithWrite(tableLock, _ => Invalidate(key))
.SelectMany(_ => ObservableThrowKeyNotFoundException(key));
}
else
{
Expand Down Expand Up @@ -80,7 +89,7 @@ public IObservable<Unit> Insert(string key, byte[] data, DateTimeOffset? absolut
var ret = _initializer
.SelectMany(_ => BeforeWriteToDiskFilter(data, Scheduler))
.Do(x => element.Value = x)
.SelectMany(x => Connection.InsertAsync(element, "OR REPLACE", typeof(CacheElement)).Select(_ => Unit.Default))
.SelectManyWithWrite(tableLock, x => Connection.InsertAsync(element, "OR REPLACE", typeof(CacheElement)).Select(_ => Unit.Default))
.Multicast(new AsyncSubject<Unit>());

ret.Connect();
Expand Down Expand Up @@ -111,7 +120,7 @@ public IObservable<Unit> Insert(IDictionary<string, byte[]> keyValuePairs, DateT

var ret = encryptAllTheData
.SelectMany(_ => _initializer)
.SelectMany(_ => Connection.InsertAllAsync(elements, "OR REPLACE").Select(__ => Unit.Default))
.SelectManyWithWrite(tableLock, _ => Connection.InsertAllAsync(elements, "OR REPLACE").Select(__ => Unit.Default))
.Multicast(new AsyncSubject<Unit>());

ret.Connect();
Expand All @@ -136,7 +145,9 @@ public IObservable<byte[]> GetAsync(string key)

string questionMarks = String.Join(",", keys.Select(_ => "?"));
return _initializer
.SelectMany(_ => Connection.QueryAsync<CacheElement>(String.Format("SELECT * FROM CacheElement WHERE Key IN ({0});", questionMarks), keys.ToArray()))
.SelectMany(_ => Observable.Return(Unit.Default)
.SelectManyWithRead(tableLock, __ =>
Connection.QueryAsync<CacheElement>(String.Format("SELECT * FROM CacheElement WHERE Key IN ({0});", questionMarks), keys.ToArray())))
.SelectMany(xs =>
{
var invalidXs = xs.Where(x => x.Expiration < Scheduler.Now.UtcDateTime).ToList();
Expand All @@ -162,7 +173,7 @@ public IObservable<List<string>> GetAllKeys()

var now = BlobCache.TaskpoolScheduler.Now.UtcTicks;
return _initializer
.SelectMany(_ => Connection.QueryAsync<CacheElement>("SELECT Key FROM CacheElement WHERE Expiration >= ?;", now))
.SelectManyWithRead(tableLock, _ => Connection.QueryAsync<CacheElement>("SELECT Key FROM CacheElement WHERE Expiration >= ?;", now))
.Select(x => x.Select(y => y.Key).ToList());
}

Expand All @@ -185,7 +196,9 @@ public IObservable<List<string>> GetAllKeys()

var questionMarks = String.Join(",", keys.Select(_ => "?"));
return _initializer
.SelectMany(_ => Connection.QueryAsync<CacheElement>(String.Format("SELECT * FROM CacheElement WHERE Key IN ({0});", questionMarks), keys.ToArray()))
.SelectMany(_ => Observable.Return(Unit.Default)
.SelectManyWithRead(tableLock, __ =>
Connection.QueryAsync<CacheElement>(String.Format("SELECT * FROM CacheElement WHERE Key IN ({0});", questionMarks), keys.ToArray())))
.SelectMany(xs =>
{
var invalidXs = xs.Where(x => x.Expiration < Scheduler.Now.UtcDateTime).ToList();
Expand All @@ -210,7 +223,7 @@ public IObservable<Unit> Invalidate(string key)
{
if (disposed) throw new ObjectDisposedException("SqlitePersistentBlobCache");
lock(_inflightCache) _inflightCache.Invalidate(key);
return _initializer.SelectMany(__ =>
return _initializer.SelectManyWithWrite(tableLock, __ =>
Connection.ExecuteAsync("DELETE FROM CacheElement WHERE Key=?;", key).Select(_ => Unit.Default));
}

Expand All @@ -220,16 +233,16 @@ public IObservable<Unit> Invalidate(IEnumerable<string> keys)
lock (_inflightCache) foreach (var v in keys) { _inflightCache.Invalidate(v); }

var questionMarks = String.Join(",", keys.Select(_ => "?"));
return _initializer.SelectMany(__ =>
return _initializer.SelectManyWithWrite(tableLock, __ =>
Connection.ExecuteAsync(String.Format("DELETE FROM CacheElement WHERE Key IN ({0});", questionMarks), keys.ToArray()).Select(_ => Unit.Default));
}

public IObservable<Unit> InvalidateAll()
{
if (disposed) throw new ObjectDisposedException("SqlitePersistentBlobCache");
lock(_inflightCache) _inflightCache.InvalidateAll();
return _initializer.SelectMany(__ =>
Connection.ExecuteAsync("DELETE FROM CacheElement;").Select(_ => Unit.Default));
return _initializer.SelectManyWithWrite(tableLock, _ =>
Connection.ExecuteAsync("DELETE FROM CacheElement;").Select(__ => Unit.Default));
}

public IObservable<Unit> InsertObject<T>(string key, T value, DateTimeOffset? absoluteExpiration = null)
Expand All @@ -251,7 +264,7 @@ public IObservable<Unit> InsertObject<T>(string key, T value, DateTimeOffset? ab
var ret = _initializer
.SelectMany(_ => BeforeWriteToDiskFilter(data, Scheduler))
.Do(x => element.Value = x)
.SelectMany(x => Connection.InsertAsync(element, "OR REPLACE", typeof(CacheElement)).Select(_ => Unit.Default))
.SelectManyWithWrite(tableLock, x => Connection.InsertAsync(element, "OR REPLACE", typeof(CacheElement)).Select(_ => Unit.Default))
.Multicast(new AsyncSubject<Unit>());

ret.Connect();
Expand Down Expand Up @@ -285,7 +298,7 @@ public IObservable<Unit> InsertObjects<T>(IDictionary<string, T> keyValuePairs,
.Select(y => { x.Value = y; return x; }))
.Merge(4)
.ToList()
.SelectMany(x => Connection.InsertAllAsync(x, "OR REPLACE").Select(_ => Unit.Default))
.SelectManyWithWrite(tableLock, x => Connection.InsertAllAsync(x, "OR REPLACE").Select(_ => Unit.Default))
.Multicast(new AsyncSubject<Unit>());

return _initializer.SelectMany(_ => ret.PermaRef());
Expand All @@ -311,7 +324,9 @@ public IObservable<T> GetObjectAsync<T>(string key, bool noTypePrefix = false)

string questionMarks = String.Join(",", keys.Select(_ => "?"));
return _initializer
.SelectMany(_ => Connection.QueryAsync<CacheElement>(String.Format("SELECT * FROM CacheElement WHERE Key IN ({0});", questionMarks), keys.ToArray()))
.SelectMany(_ => Observable.Return(Unit.Default)
.SelectManyWithRead(tableLock, __ =>
Connection.QueryAsync<CacheElement>(String.Format("SELECT * FROM CacheElement WHERE Key IN ({0});", questionMarks), keys.ToArray())))
.SelectMany(xs =>
{
var invalidXs = xs.Where(x => x.Expiration < Scheduler.Now.UtcDateTime).ToList();
Expand Down Expand Up @@ -342,7 +357,7 @@ public IObservable<IEnumerable<T>> GetAllObjects<T>()
if (disposed) return Observable.Throw<IEnumerable<T>>(new ObjectDisposedException("SqlitePersistentBlobCache"));

return _initializer
.SelectMany(_ => Connection.QueryAsync<CacheElement>("SELECT * FROM CacheElement WHERE TypeName=?;", typeof(T).FullName))
.SelectManyWithRead(tableLock, _ => Connection.QueryAsync<CacheElement>("SELECT * FROM CacheElement WHERE TypeName=?;", typeof(T).FullName))
.SelectMany(x => x.ToObservable())
.SelectMany(x => AfterReadFromDiskFilter(x.Value, Scheduler))
.SelectMany(DeserializeObject<T>)
Expand All @@ -365,7 +380,7 @@ public IObservable<Unit> InvalidateAllObjects<T>()
{
if (disposed) throw new ObjectDisposedException("SqlitePersistentBlobCache");
return _initializer
.SelectMany(_ => Connection.ExecuteAsync("DELETE FROM CacheElement WHERE TypeName=?;", typeof(T).FullName))
.SelectManyWithWrite(tableLock, _ => Connection.ExecuteAsync("DELETE FROM CacheElement WHERE TypeName=?;", typeof(T).FullName))
.Select(_ => Unit.Default);
}

Expand All @@ -375,7 +390,7 @@ public IObservable<Unit> Vacuum()

var nowTime = BlobCache.TaskpoolScheduler.Now.UtcTicks;
return _initializer
.SelectMany(_ => Connection.ExecuteAsync("DELETE FROM CacheElement WHERE Expiration < ?;", nowTime))
.SelectManyWithWrite(tableLock, _ => Connection.ExecuteAsync("DELETE FROM CacheElement WHERE Expiration < ?;", nowTime))
.SelectMany(_ => Observable.Defer(() => Connection.ExecuteAsync("VACUUM;", nowTime).Retry(3)))
.Select(_ => Unit.Default);
}
Expand Down Expand Up @@ -527,6 +542,21 @@ static IObservable<CacheElement> ObservableThrowKeyNotFoundException(string key,
}
}

static class SelectManyLockExtensions
{
public static IObservable<TRet> SelectManyWithRead<T, TRet>(this IObservable<T> This, AsyncReaderWriterLock opLock, Func<T, IObservable<TRet>> selector)
{
return Observable.Using(ct => opLock.AcquireRead().ToTask(),
(_, __) => Task.FromResult(This.SelectMany(selector)));
}

public static IObservable<TRet> SelectManyWithWrite<T, TRet>(this IObservable<T> This, AsyncReaderWriterLock opLock, Func<T, IObservable<TRet>> selector)
{
return Observable.Using(ct => opLock.AcquireWrite().ToTask(),
(_, __) => Task.FromResult(This.SelectMany(selector)));
}
}

class CacheElement
{
[PrimaryKey]
Expand Down