Skip to content

Commit

Permalink
Rewrite storage to direct db write with WAL mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
zbsz committed Dec 22, 2015
1 parent 4723917 commit 0583315
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 103 deletions.
202 changes: 126 additions & 76 deletions src/main/scala/com/geteit/db/CachedStorage.scala
@@ -1,18 +1,19 @@
package com.geteit.db

import android.database.sqlite.SQLiteDatabase
import android.database.{Cursor, CursorWrapper}
import android.support.v4.util.LruCache
import com.geteit.concurrent.{LimitedExecutionContext, Threading}
import com.geteit.db.CachedStorageSignal.{Cmd, Del, Add}
import com.geteit.db.CachedStorageSignal.{Add, Cmd, Del}
import com.geteit.events._
import com.geteit.inject.{Injectable, Injector}
import com.geteit.util.Log._
import com.geteit.util.ThrottledProcessingQueue
import com.geteit.util.returning

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.breakOut
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Try

abstract class CachedStorage[K, V](implicit val dao: Dao[K, V], inj: Injector) extends Injectable {
protected implicit val executionContext = new LimitedExecutionContext
Expand All @@ -21,63 +22,52 @@ abstract class CachedStorage[K, V](implicit val dao: Dao[K, V], inj: Injector) e

val storage: Storage = inject[Storage]

val onAdded = EventStream[V]()
val onRemoved = EventStream[K]()
val onUpdated = EventStream[(V, V)]() // (prev, current)
val onAdded = EventStream[Seq[V]]()
val onRemoved = EventStream[Seq[K]]()
val onUpdated = EventStream[Seq[(V, V)]]() // (prev, current)

private var loadFuture = Future.successful(Option.empty[V])

private val saveQueue = new ThrottledProcessingQueue[SaveCmd](500.millis, { evs =>
val removed = new mutable.HashSet[K]
val added = new mutable.HashMap[K, V]
evs foreach {
case Delete(id) =>
removed += id
added -= id
case Insert(item) => added(dao.getId(item)) = item
}
storage { implicit db =>
Dao.inTransaction {
dao.deleteAll(removed)
dao.insert(added.values)
}
}
}, "AudioStorageSaveQueue")

private def cachedOrElse(key: K, default: => Future[Option[V]]): Future[Option[V]] = Option(cache.get(key)).fold(default)(Future.successful)

private def loadFromDb(key: K): Future[Option[V]] = storage { dao.get(key)(_) } map { value =>
private def loadFromDb(key: K): Future[Option[V]] = storage.read { dao.get(key)(_) } map { value =>
Option(cache.get(key)).getOrElse {
cache.put(key, value)
value
}
}

def find(matcher: Matcher[V]): Future[Set[K]] = {
val dbSearch = storage { dao.find(matcher.whereSql)(_) }
val dbSearch = storage.read { dao.find(matcher.whereSql)(_) }
val cached = Future { cache.snapshot.asScala.collect { case (k, Some(v)) if matcher(v) => k }.toSet }
dbSearch.flatMap(d => cached.map(_ ++ d))
}

def add(item: V) = Future { addInternal(dao.getId(item), item) }
def query(matcher: Matcher[V]): Future[Cursor] = storage.read { dao.query(matcher.whereSql)(_) }

def remove(matcher: Matcher[V]): Future[Unit] = find(matcher) map { _ foreach remove }

def insert(item: V) = updateOrCreate(dao.getId(item), _ => item, item)

def add(item: V) = addInternal(dao.getId(item), item)

def get(key: K): Future[Option[V]] = cachedOrElse(key, Future {
loadFuture = loadFuture recover { case _ => () } flatMap { _ => cachedOrElse(key, loadFromDb(key)) }
loadFuture
}.flatMap(identity))

def getOrCreate(key: K, creator: => V): Future[V] = get(key) map { value =>
value.orElse(Option(cache.get(key)).flatten).getOrElse { addInternal(key, creator) }
def getOrCreate(key: K, creator: => V): Future[V] = get(key) flatMap { value =>
value.orElse(Option(cache.get(key)).flatten).fold(addInternal(key, creator))(Future.successful)
}

def getAll = storage { dao.find("1 = 1")(_) } flatMap { ids => Future.traverse(ids)(get) } map { _.flatten }
def getAll = storage.read { dao.find("1 = 1")(_) } flatMap { ids => Future.traverse(ids)(get) } map { _.flatten }

def getAll(keys: Seq[K]): Future[Seq[Option[V]]] = {
val cachedEntries = keys.flatMap { key => Option(cache.get(key)) map { value => (key, value) } }.toMap
val missingKeys = keys.toSet -- cachedEntries.keys
val loaderOfMissing = { (keys: Seq[K], db: SQLiteDatabase) => keys.flatMap(key => dao.get(key)(db).map { value => (key, value) })}

storage.withTransaction { db => loaderOfMissing(missingKeys.toSeq, db) } map { loadedEntries =>
storage.read { db => loaderOfMissing(missingKeys.toSeq, db) } map { loadedEntries =>
val loadedMap = loadedEntries .map { case (key, value) =>
Option(cache.get(key)).map((key, _)).getOrElse {
cache.put(key, Some(value))
Expand All @@ -91,72 +81,119 @@ abstract class CachedStorage[K, V](implicit val dao: Dao[K, V], inj: Injector) e
}
}

def update(key: K, updater: V => V): Future[Option[V]] = get(key) map { loaded =>
Option(cache.get(key)).getOrElse(loaded) map updateInternal(key, updater)
def update(key: K, updater: V => V): Future[Option[(V, V)]] = get(key) flatMap { loaded =>
val prev = Option(cache.get(key)).getOrElse(loaded)
prev.fold(Future successful Option.empty[(V, V)]) { updateInternal(key, updater)(_) }
}

def updateOrCreate(key: K, updater: V => V, creator: => V): Future[V] = get(key) map { loaded =>
val prev = loaded.orElse(Option(cache.get(key)).flatten)
prev .map { updateInternal(key, updater) } .getOrElse { addInternal(key, creator) }
def updateOrCreate(key: K, updater: V => V, creator: => V): Future[V] = get(key) flatMap { loaded =>
val prev = Option(cache.get(key)).getOrElse(loaded)
prev.fold { addInternal(key, creator) } { v => updateInternal(key, updater)(v).map(_.fold(v)(_._2)) }
}

def updateAll(updaters: scala.collection.Map[K, V => V]): Future[Seq[(V, V)]] = {
val keys = updaters.keys.toSeq
getAll(keys) flatMap { values =>
val updated = keys.zip(values) flatMap { case (k, v) =>
Option(cache.get(k)).flatten.orElse(v).flatMap { value =>
val updated = updaters(k)(value)
if (updated != value) {
cache.put(k, Some(updated))
Some(value -> updated)
} else None
}
}

if (updated.isEmpty) Future.successful(Seq.empty)
else
returning (storage { dao.insert(updated.map(_._2))(_) } .map { _ => updated }) { _ =>
onUpdated ! updated
}
}
}

def updateOrCreateAll(updaters: K Map (Option[V] => V)): Future[Set[V]] = {
val keys = updaters.keys.toSeq
getAll(keys) map { values =>
val loaded = keys.zip(values).toMap
updaters .map { case (key, updater) =>
loaded.get(key).flatten.orElse(Option(cache.get(key)).flatten) match {
case Some(current) => updateInternal(key, updater compose (Some(_)))(current)
case None => addInternal(key, updater(None))
getAll(keys) flatMap { values =>
val loaded: Map[K, Option[V]] = keys.zip(values).map { case (k, v) => k -> Option(cache.get(k)).flatten.orElse(v) } (breakOut)
val toSave = Seq.newBuilder[V]
val added = Seq.newBuilder[V]
val updated = Seq.newBuilder[(V, V)]

val result = updaters .map { case (key, updater) =>
val current = loaded.get(key).flatten
val next = updater(current)
current match {
case Some(c) if c != next =>
cache.put(key, Some(next))
toSave += next
updated += (c -> next)
case None =>
cache.put(key, Some(next))
toSave += next
added += next
case Some(_) => // unchanged, ignore
}
next
} .toSet

val addedResult = added.result
val updatedResult = updated.result

returning (storage { dao.insert(toSave.result)(_) } .map { _ => result }) { _ =>
if (addedResult.nonEmpty) onAdded ! addedResult
if (updatedResult.nonEmpty) onUpdated ! updatedResult
}
}
}

private def addInternal(key: K, value: V): V = {
private def addInternal(key: K, value: V): Future[V] = {
cache.put(key, Some(value))
saveQueue ! Insert(value)
onAdded ! value
value
returning(storage { dao.insert(Seq(value))(_) } .map { _ => value }) { _ =>
onAdded ! Seq(value)
}
}

private def updateInternal(key: K, updater: V => V)(current: V): V = {
private def updateInternal(key: K, updater: V => V)(current: V): Future[Option[(V, V)]] = {
val updated = updater(current)
if (updated != current) {
if (updated == current) Future.successful(None)
else {
cache.put(key, Some(updated))
saveQueue ! Insert(updated)
onUpdated ! ((current, updated))
returning(storage { dao.insert(Seq(updated))(_) } .map { _ => Some((current, updated)) }) { _ =>
onUpdated ! Seq((current, updated))
}
}
updated
}

def remove(key: K): Unit = {
def remove(key: K): Future[Int] = Future {
cache.put(key, None)
saveQueue ! Delete(key)
onRemoved ! key //FIXME: should we call this? It is possible that this key was already remove (or not present at all), should we report removal then?
}

returning(storage { dao.delete(key)(_) }) { _ => onRemoved ! Seq(key) }
} .flatMap(identity)

sealed trait SaveCmd
case class Insert(item: V) extends SaveCmd
case class Delete(key: K) extends SaveCmd
def removeAll(keys: Seq[K]): Future[Unit] = Future {
keys foreach { cache.put(_, None) }
returning(storage { dao.deleteAll(keys)(_) }) { _ => onRemoved ! keys }
} .flatMap(identity)
}


trait CachedStorageSignal[K, V] { self: CachedStorage[K, V] =>
import Threading.global

import collection.breakOut
private implicit val tag: LogTag = "CachedStorageSignal"

private lazy val onChanged = EventStream.union[Cmd[K, V]](onAdded.map(Add(_)), onUpdated.map(p => Add(p._2)), onRemoved.map(Del(_)))
private lazy val onChanged = EventStream.union[Seq[Cmd[K, V]]](onAdded.map(_.map(Add(_))), onUpdated.map(_.map(p => Add(p._2))), onRemoved.map(_.map(Del(_))))

private def loadAll: Future[Map[K, V]] = getAll map { _.map(v => dao.getId(v) -> v)(breakOut) }

def all(implicit ev: EventContext): Signal[Map[K, V]] = new AggregatingSignal[Cmd[K, V], Map[K, V]](onChanged, loadAll, { (values, cmd) =>
cmd match {
case Add(v) => values + (dao.getId(v) -> v)
case Del(k) => values - k
def all(implicit ev: EventContext): Signal[Map[K, V]] = new AggregatingSignal[Seq[Cmd[K, V]], Map[K, V]](onChanged, loadAll, { (values, cmds) =>
var res = values
cmds foreach {
case Add(v) => res += (dao.getId(v) -> v)
case Del(k) => res -= k
}
res
})

def signal(id: K)(implicit ev: EventContext): Signal[V] = new Signal[V]() {
Expand All @@ -167,24 +204,24 @@ trait CachedStorageSignal[K, V] { self: CachedStorage[K, V] =>

override protected def onWire(): Unit = {
observers = Seq(
onAdded.filter(v => dao.getId(v) == id) { publish }, // FIXME: possible race condition with reload result
onUpdated.filter(p => dao.getId(p._2) == id) { case (_, v) => publish(v) },
onRemoved.filter(_ == id) { _ => reload() }
onAdded.map(_.reverseIterator.find(v => dao.getId(v) == id)) { _.foreach(publish) }, // FIXME: possible race condition with reload result
onUpdated.map(_.reverseIterator.find(p => dao.getId(p._2) == id)) { _.foreach(p => publish(p._2)) },
onRemoved.filter(_.exists(_ == id)) { _ => reload() }
)
reload()
}

override protected def onUnwire(): Unit = {
observers foreach (_.destroy())
value = None
clear()
}
}

def findSignal(matcher: Matcher[V])(implicit ev: EventContext): Signal[Set[K]] = new Signal[Set[K]] {

def reload(): Unit = {
verbose(s"reload ${matcher.whereSql}")
storage { dao.find(matcher.whereSql)(_) } onSuccess {
storage.read { dao.find(matcher.whereSql)(_) } onSuccess {
case ids =>
verbose(s"found: $ids")
publish(value.getOrElse(Set.empty) ++ ids)
Expand All @@ -194,35 +231,48 @@ trait CachedStorageSignal[K, V] { self: CachedStorage[K, V] =>
private var observers = Seq.empty[Subscription]

override protected def onWire(): Unit = {
value = None
clear()
observers = Seq(
// FIXME: possible race conditions - on every upate
onAdded { v =>
if (matcher(v)) publish(value.getOrElse(Set.empty) + dao.getId(v))
onAdded { vs =>
vs.reverseIterator.find(matcher(_)) foreach { v => publish(value.getOrElse(Set.empty) + dao.getId(v)) }
},
onUpdated {
onUpdated { _ foreach {
case (prev, up) =>
(matcher(prev), matcher(up)) match {
case (true, false) => value foreach { s => this.publish(s - dao.getId(prev)) }
case (false, true) => publish(value.getOrElse(Set.empty) + dao.getId(up))
case _ =>
}
},
onRemoved { id => value foreach { s => publish(s - id) } }
} },
onRemoved { ids => value foreach { s => publish(s -- ids) } }
)
reload()
}

override protected def onUnwire(): Unit = {
verbose(s"onUnwire")
observers foreach (_.destroy())
value = None
clear()
}
}

def querySignal(matcher: Matcher[V]): Signal[Cursor] =
Signal.wrap(onChanged).orElse(Signal.const(null)) flatMap { _ =>
verbose(s"requery ${matcher.whereSql}")
Signal.future(query(matcher)) map (new AutoCloseCursor(_))
}
}

object CachedStorageSignal {
trait Cmd[+K, +V]
case class Add[A](v: A) extends Cmd[Nothing, A]
case class Del[A](v: A) extends Cmd[A, Nothing]
}

class AutoCloseCursor(c: Cursor) extends CursorWrapper(c) {
override def finalize(): Unit = {
Try(c.close())
super.finalize()
}
}

0 comments on commit 0583315

Please sign in to comment.