Skip to content

Commit

Permalink
Fixes logging for services; simplifies zipkin-query; defers c* i/o
Browse files Browse the repository at this point in the history
The log level flags for all processes were broken, defaulting to DEBUG
level. This resurrects them by using logback directly.

This also squashes some indirection in the query service in preparation
of removing ostrich completely.

Finally, this defers network i/o caused by c*'s Repository ctor.
  • Loading branch information
Adrian Cole committed Sep 29, 2015
1 parent 1b61d3b commit 984ffa1
Show file tree
Hide file tree
Showing 23 changed files with 131 additions and 230 deletions.
Expand Up @@ -85,6 +85,9 @@ private long toCacheInterval(long ms) {
}
};

/**
* Note: This constructor performs network I/O to the {@code cluster}.
*/
public Repository(String keyspace, Cluster cluster) {
metadata = Schema.ensureExists(keyspace, cluster);
session = cluster.connect(keyspace);
Expand Down
Expand Up @@ -43,9 +43,19 @@ trait CassandraSpanStoreFactory {self: App =>
val cassandraUsername = flag[String] ("zipkin.store.cassandra.username", "cassandra authentication user name")
val cassandraPassword = flag[String] ("zipkin.store.cassandra.password", "cassandra authentication password")

def newCassandraStore(stats: StatsReceiver = DefaultStatsReceiver.scope("CassandraSpanStore")): CassandraSpanStore = {
val repository = new Repository(keyspace(), createClusterBuilder().build())
new CassandraSpanStore(repository, stats.scope(keyspace()), cassandraSpanTtl(), cassandraIndexTtl(), cassandraMaxTraceCols())
// eagerly makes network connections, so lazy
private[this] lazy val lazyRepository = new Repository(keyspace(), createClusterBuilder().build())

def newCassandraStore(stats: StatsReceiver = DefaultStatsReceiver.scope("CassandraSpanStore")) = {
new CassandraSpanStore(stats.scope(keyspace()), cassandraSpanTtl(), cassandraIndexTtl(), cassandraMaxTraceCols()) {
override lazy val repository = lazyRepository
}
}

def newCassandraDependencies(stats: StatsReceiver = DefaultStatsReceiver.scope("CassandraDependencyStore")) = {
new CassandraDependencyStore() {
override lazy val repository = lazyRepository
}
}

def createClusterBuilder(): Cluster.Builder = {
Expand Down

This file was deleted.

This file was deleted.

Expand Up @@ -15,7 +15,10 @@ import scala.collection.JavaConverters._
* only writes once a day. As such calls to [[.storeDependencies]] which vary contained
* by a day will overwrite eachother.
*/
class CassandraDependencyStore(repository: Repository) extends DependencyStore {
abstract class CassandraDependencyStore extends DependencyStore {

/** Deferred as repository eagerly creates network connections */
protected def repository: Repository

private[this] val pool = FuturePool.unboundedPool
private[this] val codec = new ScroogeThriftCodec[ThriftDependencies](ThriftDependencies)
Expand Down
Expand Up @@ -37,13 +37,16 @@ object CassandraSpanStoreDefaults {
val SpanCodec = new ScroogeThriftCodec[ThriftSpan](ThriftSpan)
}

class CassandraSpanStore(
repository: Repository,
abstract class CassandraSpanStore(
stats: StatsReceiver = DefaultStatsReceiver.scope("CassandraSpanStore"),
spanTtl: Duration = CassandraSpanStoreDefaults.SpanTtl,
indexTtl: Duration = CassandraSpanStoreDefaults.IndexTtl,
maxTraceCols: Int = CassandraSpanStoreDefaults.MaxTraceCols
) extends SpanStore {

/** Deferred as repository eagerly creates network connections */
protected def repository: Repository

private[this] val IndexDelimiter = ":"
private[this] val IndexDelimiterBytes = IndexDelimiter.getBytes
private[this] val spanCodec = CassandraSpanStoreDefaults.SpanCodec
Expand Down
@@ -1,8 +1,9 @@
package com.twitter.zipkin.storage.cassandra

import java.util.Collections

import com.datastax.driver.core.Cluster
import com.twitter.zipkin.storage.DependencyStoreSpec
import java.util.Collections
import org.cassandraunit.CQLDataLoader
import org.cassandraunit.dataset.CQLDataSet
import org.cassandraunit.utils.EmbeddedCassandraServerHelper.startEmbeddedCassandra
Expand Down Expand Up @@ -33,7 +34,9 @@ class CassandraDependencyStoreSpec extends DependencyStoreSpec {

import CassandraDependencyStoreSpec._

override lazy val store = new CassandraDependencyStore(new Repository(keyspace, cluster))
override val store = new CassandraDependencyStore {
override lazy val repository = new Repository(keyspace, cluster)
}

override def clear = cluster.connect().execute("DROP KEYSPACE IF EXISTS " + keyspace)
}
Expand Up @@ -33,7 +33,9 @@ class CassandraSpanStoreSpec extends SpanStoreSpec {

import CassandraSpanStoreSpec._

override lazy val store = new CassandraSpanStore(new Repository(keyspace, cluster))
override val store = new CassandraSpanStore {
override lazy val repository = new Repository(keyspace, cluster)
}

override def clear = cluster.connect().execute("DROP KEYSPACE IF EXISTS " + keyspace)
}
27 changes: 12 additions & 15 deletions zipkin-collector-service/config/collector-cassandra.scala
Expand Up @@ -15,12 +15,11 @@
*/

import com.twitter.app.App
import com.twitter.logging.{ConsoleHandler, Level, LoggerFactory}
import com.twitter.zipkin.cassandra
import com.twitter.zipkin.builder.Builder
import com.twitter.zipkin.cassandra.CassandraSpanStoreFactory
import com.twitter.zipkin.collector.builder.{ZipkinServerBuilder, Adjustable, CollectorServiceBuilder}
import com.twitter.zipkin.collector.builder.{Adjustable, CollectorServiceBuilder, ZipkinServerBuilder}
import com.twitter.zipkin.receiver.kafka.KafkaSpanReceiverFactory
import com.twitter.zipkin.storage.Store
import com.twitter.zipkin.storage.{DependencyStore, SpanStore, Store}

val serverPort = sys.env.get("COLLECTOR_PORT").getOrElse("9410").toInt
val adminPort = sys.env.get("COLLECTOR_ADMIN_PORT").getOrElse("9900").toInt
Expand All @@ -34,29 +33,27 @@ Factory.cassandraDest.parse(sys.env.get("CASSANDRA_CONTACT_POINTS").getOrElse("l
val username = sys.env.get("CASSANDRA_USERNAME")
val password = sys.env.get("CASSANDRA_PASSWORD")


if (username.isDefined && password.isDefined) {
Factory.cassandraUsername.parse(username.get)
Factory.cassandraPassword.parse(password.get)
}

val cluster = Factory.createClusterBuilder().build()
val storeBuilder = Store.Builder(
new cassandra.SpanStoreBuilder(cluster),
new cassandra.DependencyStoreBuilder(cluster)
new Builder[SpanStore]() {
override def apply() = Factory.newCassandraStore()
},
new Builder[DependencyStore]() {
override def apply() = Factory.newCassandraDependencies()
}
)

val kafkaReceiver = sys.env.get("KAFKA_ZOOKEEPER").map(
KafkaSpanReceiverFactory.factory(_, sys.env.get("KAFKA_TOPIC").getOrElse("zipkin"))
)

val loggerFactory = new LoggerFactory(
node = "",
level = Level.parse(logLevel),
handlers = List(ConsoleHandler())
)

CollectorServiceBuilder(
storeBuilder,
kafkaReceiver,
serverBuilder = ZipkinServerBuilder(serverPort, adminPort).loggers(List(loggerFactory))
serverBuilder = ZipkinServerBuilder(serverPort, adminPort),
logLevel = logLevel
).sampleRate(Adjustable.local(sampleRate))
16 changes: 5 additions & 11 deletions zipkin-collector-service/config/collector-dev.scala
Expand Up @@ -14,12 +14,11 @@
* limitations under the License.
*/

import com.twitter.logging.{ConsoleHandler, Level, LoggerFactory}
import com.twitter.zipkin.anormdb.{SpanStoreBuilder, DependencyStoreBuilder}
import com.twitter.zipkin.anormdb.{DependencyStoreBuilder, SpanStoreBuilder}
import com.twitter.zipkin.collector.builder.{Adjustable, CollectorServiceBuilder, ZipkinServerBuilder}
import com.twitter.zipkin.receiver.kafka.KafkaSpanReceiverFactory
import com.twitter.zipkin.storage.anormdb.DB
import com.twitter.zipkin.collector.builder.{ZipkinServerBuilder, Adjustable, CollectorServiceBuilder}
import com.twitter.zipkin.storage.Store
import com.twitter.zipkin.storage.anormdb.DB

val serverPort = sys.env.get("COLLECTOR_PORT").getOrElse("9410").toInt
val adminPort = sys.env.get("COLLECTOR_ADMIN_PORT").getOrElse("9900").toInt
Expand All @@ -33,14 +32,9 @@ val kafkaReceiver = sys.env.get("KAFKA_ZOOKEEPER").map(
KafkaSpanReceiverFactory.factory(_, sys.env.get("KAFKA_TOPIC").getOrElse("zipkin"))
)

val loggerFactory = new LoggerFactory(
node = "",
level = Level.parse(logLevel),
handlers = List(ConsoleHandler())
)

CollectorServiceBuilder(
storeBuilder,
kafkaReceiver,
serverBuilder = ZipkinServerBuilder(serverPort, adminPort).loggers(List(loggerFactory))
serverBuilder = ZipkinServerBuilder(serverPort, adminPort),
logLevel = logLevel
).sampleRate(Adjustable.local(sampleRate))
12 changes: 3 additions & 9 deletions zipkin-collector-service/config/collector-mysql.scala
@@ -1,6 +1,5 @@
import com.twitter.logging.{ConsoleHandler, Level, LoggerFactory}
import com.twitter.zipkin.anormdb.{DependencyStoreBuilder, SpanStoreBuilder}
import com.twitter.zipkin.collector.builder.{ZipkinServerBuilder, Adjustable, CollectorServiceBuilder}
import com.twitter.zipkin.collector.builder.{Adjustable, CollectorServiceBuilder, ZipkinServerBuilder}
import com.twitter.zipkin.receiver.kafka.KafkaSpanReceiverFactory
import com.twitter.zipkin.storage.Store
import com.twitter.zipkin.storage.anormdb.{DB, DBConfig, DBParams}
Expand All @@ -24,14 +23,9 @@ val kafkaReceiver = sys.env.get("KAFKA_ZOOKEEPER").map(
KafkaSpanReceiverFactory.factory(_, sys.env.get("KAFKA_TOPIC").getOrElse("zipkin"))
)

val loggerFactory = new LoggerFactory(
node = "",
level = Level.parse(logLevel),
handlers = List(ConsoleHandler())
)

CollectorServiceBuilder(
storeBuilder,
kafkaReceiver,
serverBuilder = ZipkinServerBuilder(serverPort, adminPort).loggers(List(loggerFactory))
serverBuilder = ZipkinServerBuilder(serverPort, adminPort),
logLevel = logLevel
).sampleRate(Adjustable.local(sampleRate))
14 changes: 4 additions & 10 deletions zipkin-collector-service/config/collector-redis.scala
Expand Up @@ -16,11 +16,10 @@

import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.redis.{Client, Redis}
import com.twitter.logging.{ConsoleHandler, Level, LoggerFactory}
import com.twitter.zipkin.collector.builder.{ZipkinServerBuilder, Adjustable, CollectorServiceBuilder}
import com.twitter.zipkin.collector.builder.{Adjustable, CollectorServiceBuilder, ZipkinServerBuilder}
import com.twitter.zipkin.receiver.kafka.KafkaSpanReceiverFactory
import com.twitter.zipkin.storage.Store
import com.twitter.zipkin.redis
import com.twitter.zipkin.storage.Store

val serverPort = sys.env.get("COLLECTOR_PORT").getOrElse("9410").toInt
val adminPort = sys.env.get("COLLECTOR_ADMIN_PORT").getOrElse("9900").toInt
Expand All @@ -41,14 +40,9 @@ val kafkaReceiver = sys.env.get("KAFKA_ZOOKEEPER").map(
KafkaSpanReceiverFactory.factory(_, sys.env.get("KAFKA_TOPIC").getOrElse("zipkin"))
)

val loggerFactory = new LoggerFactory(
node = "",
level = Level.parse(logLevel),
handlers = List(ConsoleHandler())
)

CollectorServiceBuilder(
storeBuilder,
kafkaReceiver,
serverBuilder = ZipkinServerBuilder(serverPort, adminPort).loggers(List(loggerFactory))
serverBuilder = ZipkinServerBuilder(serverPort, adminPort),
logLevel = logLevel
).sampleRate(Adjustable.local(sampleRate))
Expand Up @@ -15,8 +15,8 @@
*/
package com.twitter.zipkin.collector.builder

import java.net.InetSocketAddress

import ch.qos.logback.classic
import ch.qos.logback.classic.Level
import com.twitter.finagle.ThriftMux
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.logging.Logger
Expand All @@ -30,6 +30,8 @@ import com.twitter.zipkin.config.sampler.{AdaptiveSamplerConfig, AdjustableRateC
import com.twitter.zipkin.storage.Store
import com.twitter.zipkin.thriftscala._
import org.apache.thrift.protocol.TBinaryProtocol.Factory
import org.slf4j.LoggerFactory
import java.net.InetSocketAddress

/**
* Immutable builder for ZipkinCollector
Expand All @@ -50,9 +52,13 @@ case class CollectorServiceBuilder[T](
*/
sampleRateBuilder: Builder[AdjustableRateConfig] = Adjustable.local(1.0),
adaptiveSamplerBuilder: Option[Builder[AdaptiveSamplerConfig]] = None,
serverBuilder: ZipkinServerBuilder = ZipkinServerBuilder(9410, 9900)
serverBuilder: ZipkinServerBuilder = ZipkinServerBuilder(9410, 9900),
logLevel: String = "INFO"
) extends Builder[RuntimeEnvironment => ZipkinCollector] {

LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME)
.asInstanceOf[classic.Logger].setLevel(Level.toLevel(logLevel))

val log = Logger.get()

def sampleRate(c: Builder[AdjustableRateConfig]): CollectorServiceBuilder[T] = copy(sampleRateBuilder = c)
Expand Down
Expand Up @@ -31,7 +31,6 @@ case class ZipkinServerBuilder(
serverPort : Int,
adminPort : Int,
serverAddress : InetAddress = InetAddress.getByAddress(Array[Byte](0,0,0,0)),
loggers : List[LoggerFactory] = List(LoggerFactory(level = Some(Level.DEBUG), handlers = List(ConsoleHandler()))),
adminStatsNodes : List[StatsFactory] = List(StatsFactory(reporters = List(TimeSeriesCollectorFactory()))),
adminStatsFilters : List[Regex] = List.empty,
statsReceiver : StatsReceiver = new OstrichStatsReceiver
Expand All @@ -40,10 +39,8 @@ case class ZipkinServerBuilder(
def serverPort(p: Int) : ZipkinServerBuilder = copy(serverPort = p)
def adminPort(p: Int) : ZipkinServerBuilder = copy(adminPort = p)
def serverAddress(a: InetAddress) : ZipkinServerBuilder = copy(serverAddress = a)
def loggers(l: List[LoggerFactory]) : ZipkinServerBuilder = copy(loggers = l)
def statsReceiver(s: StatsReceiver) : ZipkinServerBuilder = copy(statsReceiver = s)

def addLogger(l: LoggerFactory) : ZipkinServerBuilder = copy(loggers = loggers :+ l)
def addAdminStatsNode(n: StatsFactory): ZipkinServerBuilder = copy(adminStatsNodes = adminStatsNodes :+ n)
def addAdminStatsFilter(f: Regex) : ZipkinServerBuilder = copy(adminStatsFilters = adminStatsFilters :+ f)

Expand All @@ -59,7 +56,6 @@ case class ZipkinServerBuilder(
var adminHttpService: Option[AdminHttpService] = None

def apply() = (runtime: RuntimeEnvironment) => {
Logger.configure(loggers)
adminHttpService = Some(adminServiceFactory(runtime))
}
}
26 changes: 9 additions & 17 deletions zipkin-query-service/config/query-cassandra.scala
Expand Up @@ -13,12 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import com.twitter.app.App
import com.twitter.logging.{ConsoleHandler, Level, LoggerFactory}
import com.twitter.zipkin.builder.{ZipkinServerBuilder, QueryServiceBuilder}
import com.twitter.zipkin.cassandra
import com.twitter.zipkin.builder.QueryServiceBuilder
import com.twitter.zipkin.cassandra.CassandraSpanStoreFactory
import com.twitter.zipkin.storage.Store

val serverPort = sys.env.get("QUERY_PORT").getOrElse("9411").toInt
val adminPort = sys.env.get("QUERY_ADMIN_PORT").getOrElse("9901").toInt
Expand All @@ -36,19 +34,13 @@ if (username.isDefined && password.isDefined) {
Factory.cassandraPassword.parse(password.get)
}

val cluster = Factory.createClusterBuilder().build()
val storeBuilder = Store.Builder(
new cassandra.SpanStoreBuilder(cluster),
new cassandra.DependencyStoreBuilder(cluster)
)

val loggerFactory = new LoggerFactory(
node = "",
level = Level.parse(logLevel),
handlers = List(ConsoleHandler())
)
val spanStore = Factory.newCassandraStore()
val dependencies = Factory.newCassandraDependencies()

QueryServiceBuilder(
storeBuilder,
serverBuilder = ZipkinServerBuilder(serverPort, adminPort).loggers(List(loggerFactory))
"0.0.0.0:" + serverPort,
adminPort,
logLevel,
spanStore,
dependencies
)

0 comments on commit 984ffa1

Please sign in to comment.