Skip to content

Commit

Permalink
Chain aggregates remove/add calls
Browse files Browse the repository at this point in the history
  • Loading branch information
Franklin Hu committed Sep 6, 2012
1 parent c260368 commit cb1303a
Showing 1 changed file with 19 additions and 17 deletions.
Expand Up @@ -15,7 +15,7 @@
*/ */
package com.twitter.zipkin.storage.cassandra package com.twitter.zipkin.storage.cassandra


import com.twitter.util.Future import com.twitter.util.{Throw, Return, Future}
import com.twitter.zipkin.storage.Aggregates import com.twitter.zipkin.storage.Aggregates
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import com.twitter.cassie.{Column, ColumnFamily} import com.twitter.cassie.{Column, ColumnFamily}
Expand Down Expand Up @@ -80,25 +80,27 @@ trait CassandraAggregates extends Aggregates with Cassandra {
} }


/** Synchronize these so we don't do concurrent writes from the same box */ /** Synchronize these so we don't do concurrent writes from the same box */
def storeDependencies(serviceName: String, endpoints: Seq[String]): Future[Unit] = synchronized { def storeDependencies(serviceName: String, endpoints: Seq[String]): Future[Unit] =
val remove = dependencies.removeRow(serviceName) store(dependencies, serviceName, endpoints)
val batch = dependencies.batch()
endpoints.zipWithIndex.foreach { case (endpoint: String, index: Int) =>
batch.insert(serviceName, new Column[Long, String](index, endpoint))
}
remove()
Future.join(Seq(batch.execute()))
}


/** Synchronize these so we don't do concurrent writes from the same box */ /** Synchronize these so we don't do concurrent writes from the same box */
private[cassandra] def storeAnnotations(key: String, annotations: Seq[String]): Future[Unit] = synchronized { private[cassandra] def storeAnnotations(key: String, annotations: Seq[String]): Future[Unit] =
val remove = topAnnotations.removeRow(key) store(topAnnotations, key, annotations)
val batch = topAnnotations.batch()
annotations.zipWithIndex.foreach { case (annotation: String, index: Int) => private[cassandra] def store(cf: ColumnFamily[String, Long, String], key: String, values: Seq[String]): Future[Unit] = synchronized {
batch.insert(key, new Column[Long, String](index, annotation)) val remove = cf.removeRow(key)
val batch = cf.batch()
values.zipWithIndex.foreach { case (value: String, index: Int) =>
batch.insert(key, new Column[Long, String](index, value))
}
remove transform {
case Return(r) => {
Future.join(Seq(batch.execute()))
}
case Throw(e) => {
Future.exception(e)
}
} }
remove()
Future.join(Seq(batch.execute()))
} }




Expand Down

0 comments on commit cb1303a

Please sign in to comment.