From cb1303af9e6280ab1380a99226dffe98a94292ca Mon Sep 17 00:00:00 2001 From: Franklin Hu Date: Wed, 5 Sep 2012 19:21:08 -0700 Subject: [PATCH] Chain aggregates remove/add calls --- .../cassandra/CassandraAggregates.scala | 36 ++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraAggregates.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraAggregates.scala index afba7107d13..0cd62c11421 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraAggregates.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraAggregates.scala @@ -15,7 +15,7 @@ */ package com.twitter.zipkin.storage.cassandra -import com.twitter.util.Future +import com.twitter.util.{Throw, Return, Future} import com.twitter.zipkin.storage.Aggregates import scala.collection.JavaConverters._ import com.twitter.cassie.{Column, ColumnFamily} @@ -80,25 +80,27 @@ trait CassandraAggregates extends Aggregates with Cassandra { } /** Synchronize these so we don't do concurrent writes from the same box */ - def storeDependencies(serviceName: String, endpoints: Seq[String]): Future[Unit] = synchronized { - val remove = dependencies.removeRow(serviceName) - val batch = dependencies.batch() - endpoints.zipWithIndex.foreach { case (endpoint: String, index: Int) => - batch.insert(serviceName, new Column[Long, String](index, endpoint)) - } - remove() - Future.join(Seq(batch.execute())) - } + def storeDependencies(serviceName: String, endpoints: Seq[String]): Future[Unit] = + store(dependencies, serviceName, endpoints) /** Synchronize these so we don't do concurrent writes from the same box */ - private[cassandra] def storeAnnotations(key: String, annotations: Seq[String]): Future[Unit] = synchronized { - val remove = topAnnotations.removeRow(key) - val batch = topAnnotations.batch() - annotations.zipWithIndex.foreach { case (annotation: String, index: Int) => - batch.insert(key, new Column[Long, String](index, annotation)) + private[cassandra] def storeAnnotations(key: String, annotations: Seq[String]): Future[Unit] = + store(topAnnotations, key, annotations) + + private[cassandra] def store(cf: ColumnFamily[String, Long, String], key: String, values: Seq[String]): Future[Unit] = synchronized { + val remove = cf.removeRow(key) + val batch = cf.batch() + values.zipWithIndex.foreach { case (value: String, index: Int) => + batch.insert(key, new Column[Long, String](index, value)) + } + remove transform { + case Return(r) => { + Future.join(Seq(batch.execute())) + } + case Throw(e) => { + Future.exception(e) + } } - remove() - Future.join(Seq(batch.execute())) }