Skip to content

Loading…

Chain remove/add aggregates data futures #143

Closed
wants to merge 1 commit into from

2 participants

@franklinhu

Fixes #129

@franklinhu franklinhu pushed a commit that closed this pull request
Franklin Hu Chain remove/add aggregates data futures
Fixes #129

Author: @franklinhu
Fixes #143
URL: #143
318a97a
@franklinhu franklinhu closed this in 318a97a
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Sep 6, 2012
  1. Chain aggregates remove/add calls

    Franklin Hu committed
This page is out of date. Refresh to see the latest.
View
36 zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraAggregates.scala
@@ -15,7 +15,7 @@
*/
package com.twitter.zipkin.storage.cassandra
-import com.twitter.util.Future
+import com.twitter.util.{Throw, Return, Future}
import com.twitter.zipkin.storage.Aggregates
import scala.collection.JavaConverters._
import com.twitter.cassie.{Column, ColumnFamily}
@@ -80,25 +80,27 @@ trait CassandraAggregates extends Aggregates with Cassandra {
}
/** Synchronize these so we don't do concurrent writes from the same box */
- def storeDependencies(serviceName: String, endpoints: Seq[String]): Future[Unit] = synchronized {
- val remove = dependencies.removeRow(serviceName)
- val batch = dependencies.batch()
- endpoints.zipWithIndex.foreach { case (endpoint: String, index: Int) =>
- batch.insert(serviceName, new Column[Long, String](index, endpoint))
- }
- remove()
- Future.join(Seq(batch.execute()))
- }
+ def storeDependencies(serviceName: String, endpoints: Seq[String]): Future[Unit] =
+ store(dependencies, serviceName, endpoints)
/** Synchronize these so we don't do concurrent writes from the same box */
- private[cassandra] def storeAnnotations(key: String, annotations: Seq[String]): Future[Unit] = synchronized {
- val remove = topAnnotations.removeRow(key)
- val batch = topAnnotations.batch()
- annotations.zipWithIndex.foreach { case (annotation: String, index: Int) =>
- batch.insert(key, new Column[Long, String](index, annotation))
+ private[cassandra] def storeAnnotations(key: String, annotations: Seq[String]): Future[Unit] =
+ store(topAnnotations, key, annotations)
+
+ private[cassandra] def store(cf: ColumnFamily[String, Long, String], key: String, values: Seq[String]): Future[Unit] = synchronized {
+ val remove = cf.removeRow(key)
+ val batch = cf.batch()
+ values.zipWithIndex.foreach { case (value: String, index: Int) =>
+ batch.insert(key, new Column[Long, String](index, value))
+ }
+ remove transform {
+ case Return(r) => {
+ Future.join(Seq(batch.execute()))
+ }
+ case Throw(e) => {
+ Future.exception(e)
+ }
}
- remove()
- Future.join(Seq(batch.execute()))
}
Something went wrong with that request. Please try again.