Permalink
Browse files

Incorporate latest storehaus release.

  • Loading branch information...
1 parent 6445f05 commit 5ebd94c26cd26d989389ad2d491ebd5d7ba14ae7 @pankajroark pankajroark committed Jun 14, 2016
View
@@ -30,7 +30,7 @@ val scalaCheckVersion = "1.12.2"
val scalatestVersion = "2.2.4"
val scaldingVersion = "0.16.0-RC3"
val slf4jVersion = "1.6.6"
-val storehausVersion = "0.13.0"
+val storehausVersion = "0.15.0-RC1"
val stormDep = "storm" % "storm" % "0.9.0-wip15" //This project also compiles with the latest storm, which is in fact required to run the example
val tormentaVersion = "0.11.1"
val utilVersion = "6.26.0"
@@ -45,7 +45,7 @@ class ClientMergeable[K, V: Semigroup](
batcher: Batcher,
batchesToKeep: Int,
onlineKeyFilter: K => Boolean,
- collector: FutureCollector[(K, Iterable[BatchID])]) extends Mergeable[(K, BatchID), V] {
+ collector: FutureCollector) extends Mergeable[(K, BatchID), V] {
def readable: ClientStore[K, V] =
new ClientStore(offlineStore, onlineStore, batcher, batchesToKeep, onlineKeyFilter, collector)
@@ -61,14 +61,15 @@ object ClientStore {
onlineStore: ReadableStore[(K, BatchID), V],
batchesToKeep: Int,
onlineKeyFilter: K => Boolean,
- collector: FutureCollector[(K, Iterable[BatchID])])(implicit batcher: Batcher, semigroup: Semigroup[V]): ClientStore[K, V] =
+ collector: FutureCollector)(implicit batcher: Batcher, semigroup: Semigroup[V]): ClientStore[K, V] =
new ClientStore[K, V](offlineStore, onlineStore, batcher, batchesToKeep, onlineKeyFilter, collector)
- /** You can't read the batch counts before what offline has counted up to
+ /**
+ * You can't read the batch counts before what offline has counted up to
*/
def offlineLTEQBatch[K, V](k: K, b: BatchID, v: Future[Option[(BatchID, V)]]): Future[Option[(BatchID, V)]] =
v.flatMap {
- case s@Some((bOld, v)) if (bOld.id <= b.id) => Future.value(s)
+ case s @ Some((bOld, v)) if (bOld.id <= b.id) => Future.value(s)
case Some((bOld, v)) => Future.exception(OfflinePassedBatch(k, bOld, b))
case None => Future.None
}
@@ -125,7 +126,7 @@ class ClientStore[K, V: Semigroup](
batcher: Batcher,
batchesToKeep: Int,
onlineKeyFilter: K => Boolean,
- collector: FutureCollector[(K, Iterable[BatchID])]) extends ReadableStore[K, V] {
+ collector: FutureCollector) extends ReadableStore[K, V] {
import MergeOperations._
override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, FOpt[V]] =
@@ -158,7 +159,7 @@ class ClientStore[K, V: Semigroup](
val fOnlineKeys: Future[Set[(K1, BatchID)]] =
generateOnlineKeys(possibleOnlineKeys.toSeq, batch, batchesToKeep)(
- keyToBatch)(collector.asInstanceOf[FutureCollector[(K1, Iterable[BatchID])]])
+ keyToBatch)(collector.asInstanceOf[FutureCollector])
val m: Future[Map[K1, FOpt[V]]] = fOnlineKeys.map { onlineKeys =>
val onlineResult: Map[(K1, BatchID), FOpt[V]] = onlineStore.multiGet(onlineKeys)
@@ -172,5 +173,4 @@ class ClientStore[K, V: Semigroup](
}
}
-case class OfflinePassedBatch(key: Any, offlineBatch: BatchID, requested: BatchID) extends
- Exception(s"key: $key offline is at batch $offlineBatch, can't query for $requested")
+case class OfflinePassedBatch(key: Any, offlineBatch: BatchID, requested: BatchID) extends Exception(s"key: $key offline is at batch $offlineBatch, can't query for $requested")
@@ -39,7 +39,7 @@ object MergeOperations {
*/
def pivot[K] = Pivot.of[(K, BatchID), K, BatchID]
- def collect[T, U](seq: Seq[(T, Future[U])])(implicit collect: FutureCollector[(T, U)]): Future[Seq[(T, U)]] =
+ def collect[T, U](seq: Seq[(T, Future[U])])(implicit collect: FutureCollector): Future[Seq[(T, U)]] =
collect {
seq.map { case (t, futureU) => futureU.map(t -> _) }
}
@@ -85,7 +85,7 @@ object MergeOperations {
BatchID.range(initBatch, nowBatch)
}
- def generateOnlineKeys[K](ks: Seq[K], nowBatch: BatchID, batchesToKeep: Int)(lookup: K => FOpt[BatchID])(implicit collect: FutureCollector[(K, Iterable[BatchID])]): Future[Set[(K, BatchID)]] =
+ def generateOnlineKeys[K](ks: Seq[K], nowBatch: BatchID, batchesToKeep: Int)(lookup: K => FOpt[BatchID])(implicit collect: FutureCollector): Future[Set[(K, BatchID)]] =
for {
collected <- collect(
ks.map { k => lookup(k).map { k -> expand(_, nowBatch, batchesToKeep) } }
@@ -21,8 +21,8 @@ import com.twitter.bijection.{ Base64String, Bijection, Codec, Injection }
import com.twitter.bijection.netty.Implicits._
import com.twitter.conversions.time._
import com.twitter.finagle.builder.ClientBuilder
-import com.twitter.finagle.memcachedx.KetamaClientBuilder
-import com.twitter.finagle.memcachedx.protocol.text.Memcached
+import com.twitter.finagle.memcached.KetamaClientBuilder
+import com.twitter.finagle.memcached.protocol.text.Memcached
import com.twitter.storehaus.Store
import com.twitter.storehaus.algebra.MergeableStore
import com.twitter.storehaus.memcache.{ HashEncoder, MemcacheStore }

0 comments on commit 5ebd94c

Please sign in to comment.