Skip to content

Commit

Permalink
Implement bulk object ops in SQLite3
Browse files Browse the repository at this point in the history
  • Loading branch information
anaisbetts committed Oct 12, 2013
1 parent 5811635 commit fb9ae3b
Showing 1 changed file with 72 additions and 5 deletions.
77 changes: 72 additions & 5 deletions Akavache.Sqlite3/SqlitePersistentBlobCache.cs
Expand Up @@ -82,7 +82,8 @@ public IObservable<Unit> Insert(string key, byte[] data, DateTimeOffset? absolut
public IObservable<Unit> Insert(IDictionary<string, byte[]> keyValuePairs, DateTimeOffset? absoluteExpiration = null)
{
if (disposed) return Observable.Throw<Unit>(new ObjectDisposedException("SqlitePersistentBlobCache"));
lock (_inflightCache) {
lock (_inflightCache)
{
foreach(var kvp in keyValuePairs) _inflightCache.Invalidate(kvp.Key);
}

Expand Down Expand Up @@ -110,7 +111,8 @@ public IObservable<Unit> Insert(IDictionary<string, byte[]> keyValuePairs, DateT
public IObservable<byte[]> GetAsync(string key)
{
if (disposed) return Observable.Throw<byte[]>(new ObjectDisposedException("SqlitePersistentBlobCache"));
lock (_inflightCache) {
lock (_inflightCache)
{
return _inflightCache.Get(key)
.Select(x => x.Value)
.SelectMany(x => AfterReadFromDiskFilter(x, Scheduler))
Expand Down Expand Up @@ -238,6 +240,36 @@ public IObservable<Unit> InsertObject<T>(string key, T value, DateTimeOffset? ab
return ret;
}

public IObservable<Unit> InsertObjects<T>(IDictionary<string, T> keyValuePairs, DateTimeOffset? absoluteExpiration = null)
{
if (disposed) return Observable.Throw<Unit>(new ObjectDisposedException("SqlitePersistentBlobCache"));

lock (_inflightCache)
{
foreach(var kvp in keyValuePairs) _inflightCache.Invalidate(kvp.Key);
}

var serializedElements = Observable.Start(() =>
{
return keyValuePairs.Select(x => new CacheElement()
{
Expiration = absoluteExpiration != null ? absoluteExpiration.Value.UtcDateTime : DateTime.MaxValue,
Key = x.Key,
TypeName = typeof(T).FullName,
Value = SerializeObject<T>(x.Value)
}).ToList();
}, Scheduler);

return serializedElements
.SelectMany(x => x.ToObservable())
.Select(x => Observable.Defer(() => BeforeWriteToDiskFilter(x.Value, Scheduler))
.Do(y => x.Value = y))
.Merge(4)
.ToList()
.SelectMany(x => _connection.InsertAllAsync(x, "OR REPLACE").Select(_ => Unit.Default))
.Multicast(new AsyncSubject<Unit>()).PermaRef();
}

public IObservable<T> GetObjectAsync<T>(string key, bool noTypePrefix = false)
{
if (disposed) return Observable.Throw<T>(new ObjectDisposedException("SqlitePersistentBlobCache"));
Expand All @@ -246,10 +278,41 @@ public IObservable<T> GetObjectAsync<T>(string key, bool noTypePrefix = false)
var ret = _inflightCache.Get(key);
return ret
.SelectMany(x => AfterReadFromDiskFilter(x.Value, Scheduler))
.SelectMany(DeserializeObject<T>);
.SelectMany(DeserializeObject<T>)
.Multicast(new AsyncSubject<T>())
.PermaRef();
}
}

public IObservable<IDictionary<string, T>> GetObjectsAsync<T>(IEnumerable<string> keys, bool noTypePrefix = false)
{
if (disposed) return Observable.Throw<IDictionary<string, T>>(new ObjectDisposedException("SqlitePersistentBlobCache"));

string questionMarks = String.Join(",", keys.Select(_ => "?"));
return _connection.QueryAsync<CacheElement>(String.Format("SELECT * FROM CacheElement WHERE Key IN ({0});", questionMarks), keys.ToArray())
.SelectMany(async xs =>
{
var invalidXs = xs.Where(x => x.Expiration < Scheduler.Now.UtcDateTime).ToList();
if (invalidXs.Count > 0)
{
await Invalidate(invalidXs.Select(x => x.Key));
}
var validXs = xs.Where(x => x.Expiration >= Scheduler.Now.UtcDateTime).ToList();
await validXs.ToObservable()
.Select(x => Observable.Defer(() => AfterReadFromDiskFilter(x.Value, Scheduler)
.Do(y => x.Value = y)))
.Merge(4);
return await validXs.ToObservable()
.SelectMany(x => DeserializeObject<T>(x.Value).Select(y => new KeyValuePair<string, T>(x.Key, y)))
.Aggregate(new Dictionary<string, T>(), (acc, x) => { acc.Add(x.Key, x.Value); return acc; });
})
.Multicast(new AsyncSubject<IDictionary<string, T>>())
.PermaRef();
}

public IObservable<IEnumerable<T>> GetAllObjects<T>()
{
if (disposed) return Observable.Throw<IEnumerable<T>>(new ObjectDisposedException("SqlitePersistentBlobCache"));
Expand All @@ -267,6 +330,12 @@ public IObservable<Unit> InvalidateObject<T>(string key)
return Invalidate(key);
}

public IObservable<Unit> InvalidateObjects<T>(IEnumerable<string> keys)
{
if (disposed) throw new ObjectDisposedException("SqlitePersistentBlobCache");
return Invalidate(keys);
}

public IObservable<Unit> InvalidateAllObjects<T>()
{
if (disposed) throw new ObjectDisposedException("SqlitePersistentBlobCache");
Expand Down Expand Up @@ -362,8 +431,6 @@ static IObservable<CacheElement> ObservableThrowKeyNotFoundException(string key,
new KeyNotFoundException(String.Format(CultureInfo.InvariantCulture,
"The given key '{0}' was not present in the cache.", key), innerException));
}


}

class CacheElement
Expand Down

0 comments on commit fb9ae3b

Please sign in to comment.