Skip to content
Browse files

Chain remove/add aggregates data futures

Fixes #129

Author: @franklinhu
Fixes #143
URL: #143
  • Loading branch information...
1 parent c260368 commit 318a97a7a005174eb791ecffc7e61a9927a51aaf Franklin Hu committed
View
36 zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraAggregates.scala
@@ -15,7 +15,7 @@
*/
package com.twitter.zipkin.storage.cassandra
-import com.twitter.util.Future
+import com.twitter.util.{Throw, Return, Future}
import com.twitter.zipkin.storage.Aggregates
import scala.collection.JavaConverters._
import com.twitter.cassie.{Column, ColumnFamily}
@@ -80,25 +80,27 @@ trait CassandraAggregates extends Aggregates with Cassandra {
}
/** Synchronize these so we don't do concurrent writes from the same box */
- def storeDependencies(serviceName: String, endpoints: Seq[String]): Future[Unit] = synchronized {
- val remove = dependencies.removeRow(serviceName)
- val batch = dependencies.batch()
- endpoints.zipWithIndex.foreach { case (endpoint: String, index: Int) =>
- batch.insert(serviceName, new Column[Long, String](index, endpoint))
- }
- remove()
- Future.join(Seq(batch.execute()))
- }
+ def storeDependencies(serviceName: String, endpoints: Seq[String]): Future[Unit] =
+ store(dependencies, serviceName, endpoints)
/** Synchronize these so we don't do concurrent writes from the same box */
- private[cassandra] def storeAnnotations(key: String, annotations: Seq[String]): Future[Unit] = synchronized {
- val remove = topAnnotations.removeRow(key)
- val batch = topAnnotations.batch()
- annotations.zipWithIndex.foreach { case (annotation: String, index: Int) =>
- batch.insert(key, new Column[Long, String](index, annotation))
+ private[cassandra] def storeAnnotations(key: String, annotations: Seq[String]): Future[Unit] =
+ store(topAnnotations, key, annotations)
+
+ private[cassandra] def store(cf: ColumnFamily[String, Long, String], key: String, values: Seq[String]): Future[Unit] = synchronized {
+ val remove = cf.removeRow(key)
+ val batch = cf.batch()
+ values.zipWithIndex.foreach { case (value: String, index: Int) =>
+ batch.insert(key, new Column[Long, String](index, value))
+ }
+ remove transform {
+ case Return(r) => {
+ Future.join(Seq(batch.execute()))
+ }
+ case Throw(e) => {
+ Future.exception(e)
+ }
}
- remove()
- Future.join(Seq(batch.execute()))
}

0 comments on commit 318a97a

Please sign in to comment.
Something went wrong with that request. Please try again.