Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VersionedStore proposal #47

Open
sritchie opened this issue Mar 9, 2013 · 2 comments
Open

VersionedStore proposal #47

sritchie opened this issue Mar 9, 2013 · 2 comments

Comments

@sritchie
Copy link
Collaborator

sritchie commented Mar 9, 2013

The notion of a VersionedStore would make it easy to create a distributed read-only database backed by storehaus stores that loaded themselves off of disk.

import com.twitter.util.{ Duration, Future, Timer }

/**
  * ReadableStore with some notion of versioning.
  */
trait VersionedStore[-K, +V] extends ReadableStore[K, (Long, V)] {
  def currentVersion: Option[Long]
}

/**
  * Fired when an updating versioned store is not yet loaded.
  */
class StoreNotLoadedException extends RuntimeException

/**
  * The typical not-yet-loaded versioned store. This store returns
  * exceptions for every get and multiGet.
  */
object ExceptionalStore extends VersionedStore[Any, Nothing] {
  override def get(k: Any) = Future.exception(new StoreNotLoadedException)
  override val currentVersion = None
}

class ConstantVersionedStore[-K, +V](store: ReadableStore[K, V], version: Long) extends VersionedStore[K, V] {
  override val currentVersion = Some(version)

  protected def prependVersion(f: Future[Option[V]], version: Long) =
    f.map { _.map { (version, _) } }

  override def get(k: K) = prependVersion(store.get(k), version)
  override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[(Long, V)]]] =
    store.multiGet(ks).mapValues { prependVersion(_, version) }
}

/**
  * Versioned store implementation capable of updating itself to some
  * new version, potentially asynchronously. One use case here would a
  * store that loads its backing dataset across the network from
  * HDFS. Every time a new version appeared on HDFS the watcher
  * process would switch the backing store behind the scenes.
  */
class UpdatingVersionedStore[-K, +V](updateInterval: Duration)(watcherProcess: Option[Long] => Future[Option[VersionedStore[K, V]]])
  (implicit timer: Timer)
    extends VersionedStore[K, V] {
  protected var innerStore: Future[VersionedStore[K, V]] = Future.value(ExceptionalStore)

  timer.schedule(updateInterval) {
    innerStore = innerStore
      .join(watcherProcess(currentVersion))
      .map { case (oldStore, optNewStore) => optNewStore.getOrElse(oldStore) }
  }
  override def get(k: K) = innerStore.flatMap { _.get(k) }
  override def multiGet(ks: Set[K]) =
    FutureOps.liftFutureValues(ks, innerStore.map { _.multiGet(ks) })
}
@johnynek
Copy link
Collaborator

johnynek commented Mar 9, 2013

Is currentVersion well defined?

It seems like different keys might have different versions.

@sritchie
Copy link
Collaborator Author

sritchie commented Mar 9, 2013

Yes, it's well defined for a given versionedstore (which is a readablestore[K, (Long, V)]. The usual ReadableStore combinators would allow you combine multiple versioned stores.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants