Skip to content
This repository has been archived by the owner on Oct 31, 2022. It is now read-only.

Commit

Permalink
Introduce scalafmt
Browse files Browse the repository at this point in the history
  • Loading branch information
samstarling committed Oct 31, 2017
1 parent bfea1bb commit ea70f34
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 69 deletions.
2 changes: 2 additions & 0 deletions .scalafmt.conf
@@ -0,0 +1,2 @@
importSelectors = singleLine
rewrite.rules = [SortImports]
Expand Up @@ -6,7 +6,8 @@ import com.twitter.finagle.loadbalancer.LoadBalancerFactory
import com.twitter.finagle.stats.{DefaultStatsReceiver, StatsReceiver}
import com.twitter.util.Future

class EmojiService(statsReceiver: StatsReceiver) extends Service[Request, Response] {
class EmojiService(statsReceiver: StatsReceiver)
extends Service[Request, Response] {

private val client = Http.client
.withTls("api.github.com")
Expand Down
Expand Up @@ -22,12 +22,13 @@ object TestServer extends App {
val emojiService = new EmojiService(statsReceiver)
val metricsService = new MetricsService(registry)

val router: Service[Request, Response] = RoutingService.byMethodAndPathObject {
case (Method.Get, Root / "emoji") => emojiService
case (Method.Get, Root / "metrics") => metricsService
case (Method.Get, Root / "echo") => new EchoService
case _ => new NotFoundService
}
val router: Service[Request, Response] =
RoutingService.byMethodAndPathObject {
case (Method.Get, Root / "emoji") => emojiService
case (Method.Get, Root / "metrics") => metricsService
case (Method.Get, Root / "echo") => new EchoService
case _ => new NotFoundService
}

ServerBuilder()
.stack(Http.server)
Expand Down
Expand Up @@ -2,34 +2,36 @@ package com.samstarling.prometheusfinagle

class MetricLabeller {
def labelNamesFor(name: Seq[String]): Seq[String] = name match {
case Seq(_, "requests") => Seq("serviceName")
case Seq(_, "success") => Seq("serviceName")
case Seq(_, "requests") => Seq("serviceName")
case Seq(_, "success") => Seq("serviceName")
case Seq(_, "request_latency_ms") => Seq("serviceName")
case Seq(_, "pending") => Seq("serviceName")
case Seq(_, "http", "time", _) => Seq("serviceName", "statusCode")
case Seq(_, "http", "status", _) => Seq("serviceName", "statusCode")
case default => Seq.empty
case Seq(_, "pending") => Seq("serviceName")
case Seq(_, "http", "time", _) => Seq("serviceName", "statusCode")
case Seq(_, "http", "status", _) => Seq("serviceName", "statusCode")
case default => Seq.empty
}

def labelsFor(name: Seq[String]): Seq[String] = name match {
case Seq(serviceName, "requests") => Seq(serviceName)
case Seq(serviceName, "success") => Seq(serviceName)
case Seq(serviceName, "requests") => Seq(serviceName)
case Seq(serviceName, "success") => Seq(serviceName)
case Seq(serviceName, "request_latency_ms") => Seq(serviceName)
case Seq(serviceName, "pending") => Seq(serviceName)
case Seq(serviceName, "http", "time", statusCode) => Seq(serviceName, statusCode)
case Seq(serviceName, "http", "status", statusCode) => Seq(serviceName, statusCode)
case Seq(serviceName, "pending") => Seq(serviceName)
case Seq(serviceName, "http", "time", statusCode) =>
Seq(serviceName, statusCode)
case Seq(serviceName, "http", "status", statusCode) =>
Seq(serviceName, statusCode)
case default => Seq.empty
}

def sanitizeName(name: Seq[String]): String = {
name match {
case Seq(_, "requests") => "requests"
case Seq(_, "success") => "success"
case Seq(_, "requests") => "requests"
case Seq(_, "success") => "success"
case Seq(_, "request_latency_ms") => "request_latency_ms"
case Seq(_, "pending") => "pending"
case Seq(_, "http", "time", _) => "http_time"
case Seq(_, "http", "status", _) => "http_status"
case default => name.map(_.replaceAll("[^\\w]", "_")).mkString("__")
case Seq(_, "pending") => "pending"
case Seq(_, "http", "time", _) => "http_time"
case Seq(_, "http", "status", _) => "http_status"
case default => name.map(_.replaceAll("[^\\w]", "_")).mkString("__")
}
}
}
Expand Up @@ -5,22 +5,28 @@ import io.prometheus.client.{CollectorRegistry, Summary, Counter => PCounter, Ga
import scala.collection.concurrent.TrieMap

class PrometheusStatsReceiver(registry: CollectorRegistry,
namespace: String = "finagle") extends StatsReceiver {
namespace: String = "finagle")
extends StatsReceiver {

private val counters = TrieMap.empty[String, PCounter]
private val summaries = TrieMap.empty[String, Summary]
private val gauges = TrieMap.empty[String, PGauge]

// TODO: Map name (Seq[String]) to a meaningful help string
private val helpMessage = "Refer to https://twitter.github.io/finagle/guide/Metrics.html"
private val helpMessage =
"Refer to https://twitter.github.io/finagle/guide/Metrics.html"

override def repr: AnyRef = this

override def counter(verbosity: Verbosity, name: String*): Counter = {
val (metricName, labels) = extractLabels(name)
new Counter {
override def incr(delta: Long): Unit = {
counters.getOrElseUpdate(metricName, newCounter(metricName, labels.keys.toSeq)).labels(labels.values.toSeq: _*).inc(delta)
counters
.getOrElseUpdate(metricName,
newCounter(metricName, labels.keys.toSeq))
.labels(labels.values.toSeq: _*)
.inc(delta)
}
}
}
Expand All @@ -29,30 +35,42 @@ class PrometheusStatsReceiver(registry: CollectorRegistry,
val (metricName, labels) = extractLabels(name)
new Stat {
override def add(value: Float): Unit = {
summaries.getOrElseUpdate(metricName, newSummary(metricName, labels.keys.toSeq)).labels(labels.values.toSeq: _*).observe(value)
summaries
.getOrElseUpdate(metricName,
newSummary(metricName, labels.keys.toSeq))
.labels(labels.values.toSeq: _*)
.observe(value)
}
}
}

override def addGauge(verbosity: Verbosity, name: String*)(f: => Float): Gauge = {
override def addGauge(verbosity: Verbosity, name: String*)(
f: => Float): Gauge = {
val (metricName, labels) = extractLabels(name)
gauges.getOrElseUpdate(metricName, newGauge(metricName, labels.keys.toSeq)).labels(labels.values.toSeq: _*).set(f)
gauges
.getOrElseUpdate(metricName, newGauge(metricName, labels.keys.toSeq))
.labels(labels.values.toSeq: _*)
.set(f)
new Gauge {
override def remove(): Unit = gauges.remove(metricName)
}
}

private def newCounter(metricName: String, labelNames: Seq[String]): PCounter = {
PCounter.build()
private def newCounter(metricName: String,
labelNames: Seq[String]): PCounter = {
PCounter
.build()
.namespace(namespace)
.name(metricName)
.labelNames(labelNames: _*)
.help(helpMessage)
.register(registry)
}

private def newSummary(metricName: String, labelNames: Seq[String]): Summary = {
Summary.build()
private def newSummary(metricName: String,
labelNames: Seq[String]): Summary = {
Summary
.build()
.namespace(namespace)
.name(metricName)
.labelNames(labelNames: _*)
Expand All @@ -68,7 +86,8 @@ class PrometheusStatsReceiver(registry: CollectorRegistry,
}

private def newGauge(metricName: String, labelNames: Seq[String]): PGauge = {
PGauge.build()
PGauge
.build()
.namespace(namespace)
.name(metricName)
.labelNames(labelNames: _*)
Expand All @@ -78,8 +97,11 @@ class PrometheusStatsReceiver(registry: CollectorRegistry,

def metricPattern: DefaultMetricPatterns.Pattern = DefaultMetricPatterns.All

protected def extractLabels(name: Seq[String]): (String, Map[String, String]) = {
metricPattern.applyOrElse(name, (x: Seq[String]) => DefaultMetricPatterns.sanitizeName(x) -> Map.empty)
protected def extractLabels(
name: Seq[String]): (String, Map[String, String]) = {
metricPattern.applyOrElse(
name,
(x: Seq[String]) => DefaultMetricPatterns.sanitizeName(x) -> Map.empty)
}
}

Expand All @@ -99,32 +121,42 @@ object DefaultMetricPatterns {

val PerHost: Pattern = {
case Seq("host", label, host, "failures", failure) =>
(s"perHost_failures", Map(prometheusLabelForLabel -> label, "host" -> host, "class" -> failure))
(s"perHost_failures",
Map(prometheusLabelForLabel -> label,
"host" -> host,
"class" -> failure))
case "host" +: label +: host +: metrics =>
(s"perHost_${sanitizeName(metrics)}", Map(prometheusLabelForLabel -> label, "host" -> host))
(s"perHost_${sanitizeName(metrics)}",
Map(prometheusLabelForLabel -> label, "host" -> host))
}

val Core: Pattern = {
case Seq(label, "failures", exceptionName) =>
("failures_perException", Map(prometheusLabelForLabel -> label, "class" -> exceptionName))
("failures_perException",
Map(prometheusLabelForLabel -> label, "class" -> exceptionName))
case Seq(label, "sourcedfailures", sourceService, exceptionName) =>
("failures_perException", Map(prometheusLabelForLabel -> label, "sourceService" -> sourceService, "class" -> exceptionName))
("failures_perException",
Map(prometheusLabelForLabel -> label,
"sourceService" -> sourceService,
"class" -> exceptionName))
case Seq(label, metric) =>
(metric, Map(prometheusLabelForLabel -> label))
}

val LoadBalancer: Pattern = {
case Seq(label, "loadbalancer", "algorithm", algorithm) =>
(s"loadbalancer_algorithm", Map(prometheusLabelForLabel -> label, "algorithm" -> algorithm))
(s"loadbalancer_algorithm",
Map(prometheusLabelForLabel -> label, "algorithm" -> algorithm))
}


val Http: Pattern = {
case Seq(label, "http", "time", resultCode) =>
("http_request_duration", Map(prometheusLabelForLabel -> label, "resultCode" -> resultCode))
case Seq(label, "http", "time", resultCode) =>
("http_request_duration",
Map(prometheusLabelForLabel -> label, "resultCode" -> resultCode))
case Seq(label, "http", "status", resultCode) =>
("http_request_classification", Map(prometheusLabelForLabel -> label, "resultCode" -> resultCode))
case Seq(label, "http", "response_size") =>
("http_request_classification",
Map(prometheusLabelForLabel -> label, "resultCode" -> resultCode))
case Seq(label, "http", "response_size") =>
("http_response_size", Map(prometheusLabelForLabel -> label))
}

Expand Down
Expand Up @@ -7,8 +7,9 @@ import com.twitter.util.{Future, Stopwatch}

class HttpLatencyMonitoringFilter(telemetry: Telemetry,
buckets: Seq[Double],
labeller: HttpServiceLabeller = new HttpServiceLabeller)
extends SimpleFilter[Request, Response] {
labeller: HttpServiceLabeller =
new HttpServiceLabeller)
extends SimpleFilter[Request, Response] {

private val histogram = telemetry.histogram(
name = "incoming_http_request_latency_seconds",
Expand All @@ -17,10 +18,12 @@ class HttpLatencyMonitoringFilter(telemetry: Telemetry,
buckets = buckets
)

override def apply(request: Request, service: Service[Request, Response]): Future[Response] = {
override def apply(request: Request,
service: Service[Request, Response]): Future[Response] = {
val stopwatch = Stopwatch.start()
service(request) onSuccess { response =>
histogram.labels(labeller.labelsFor(request, response): _*)
histogram
.labels(labeller.labelsFor(request, response): _*)
.observe(stopwatch().inMilliseconds / 1000.0)
}
}
Expand Down
Expand Up @@ -6,16 +6,18 @@ import com.twitter.finagle.{Service, SimpleFilter}
import com.twitter.util.Future

class HttpMonitoringFilter(telemetry: Telemetry,
labeller: HttpServiceLabeller = new HttpServiceLabeller)
extends SimpleFilter[Request, Response] {
labeller: HttpServiceLabeller =
new HttpServiceLabeller)
extends SimpleFilter[Request, Response] {

private val counter = telemetry.counter(
name = "incoming_http_requests_total",
help = "The number of incoming HTTP requests",
labelNames = labeller.keys
)

override def apply(request: Request, service: Service[Request, Response]): Future[Response] = {
override def apply(request: Request,
service: Service[Request, Response]): Future[Response] = {
service(request) onSuccess { response =>
counter.labels(labeller.labelsFor(request, response): _*).inc()
}
Expand Down
Expand Up @@ -8,7 +8,8 @@ import com.twitter.util.Future
import io.prometheus.client.CollectorRegistry
import io.prometheus.client.exporter.common.TextFormat

class MetricsService(registry: CollectorRegistry) extends Service[Request, Response] {
class MetricsService(registry: CollectorRegistry)
extends Service[Request, Response] {

override def apply(request: Request): Future[Response] = {
val writer = new StringWriter
Expand Down
Expand Up @@ -18,10 +18,19 @@ class DefaultMetricPatternsTest extends UnitTest {
Seq("api.github.com:/emojis:GET", "loadbalancer", "removes"),
Seq("api.github.com:/emojis:GET", "loadbalancer", "rebuilds"),
Seq("api.github.com:/emojis:GET", "loadbalancer", "updates"),
Seq("api.github.com:/emojis:GET", "loadbalancer", "max_effort_exhausted"),
Seq("api.github.com:/emojis:GET", "loadbalancer", "algorithm", "p2c_least_loaded"),
Seq("api.github.com:/emojis:GET", "service_creation", "service_acquisition_latency_ms"),
Seq("api.github.com:/emojis:GET", "nack_admission_control", "dropped_requests"),
Seq("api.github.com:/emojis:GET",
"loadbalancer",
"max_effort_exhausted"),
Seq("api.github.com:/emojis:GET",
"loadbalancer",
"algorithm",
"p2c_least_loaded"),
Seq("api.github.com:/emojis:GET",
"service_creation",
"service_acquisition_latency_ms"),
Seq("api.github.com:/emojis:GET",
"nack_admission_control",
"dropped_requests"),
Seq("api.github.com:/emojis:GET", "retries", "requeues"),
Seq("api.github.com:/emojis:GET", "retries", "requeues"),
Seq("api.github.com:/emojis:GET", "retries", "budget_exhausted"),
Expand Down Expand Up @@ -72,9 +81,12 @@ class DefaultMetricPatternsTest extends UnitTest {
Seq("api.github.com:/emojis:GET", "http", "time", "2XX")
)

(DefaultMetricPatterns.All.isDefinedAt(_:Seq[String]) must beTrue)
(DefaultMetricPatterns.All.isDefinedAt(_: Seq[String]) must beTrue)
.foreach(finagleMetrics)
.setMessage(finagleMetrics.filterNot(DefaultMetricPatterns.All.isDefinedAt).mkString)
.setMessage(
finagleMetrics
.filterNot(DefaultMetricPatterns.All.isDefinedAt)
.mkString)
}

}
Expand Down
Expand Up @@ -9,6 +9,7 @@ trait UnitTest extends Specification with Mockito {

class TestLabeller extends HttpServiceLabeller {
override val keys: List[String] = List("foo")
override def labelsFor(request: Request, response: Response): List[String] = List("bar")
override def labelsFor(request: Request, response: Response): List[String] =
List("bar")
}
}
@@ -1,14 +1,13 @@
package com.samstarling.prometheusfinagle.helper

import io.prometheus.client.Collector.MetricFamilySamples.{
Sample => PrometheusSample
}
import io.prometheus.client.Collector.MetricFamilySamples.{Sample => PrometheusSample}
import io.prometheus.client.{Collector, Counter, Gauge, Histogram}

import scala.collection.JavaConverters._

object CollectorHelper {
def firstMetricFor(counter: Counter): Option[Collector.MetricFamilySamples] = {
def firstMetricFor(
counter: Counter): Option[Collector.MetricFamilySamples] = {
counter.collect().asScala.toList.headOption
}

Expand Down
@@ -1,8 +1,6 @@
package com.samstarling.prometheusfinagle.helper

import io.prometheus.client.Collector.MetricFamilySamples.{
Sample => PrometheusSample
}
import io.prometheus.client.Collector.MetricFamilySamples.{Sample => PrometheusSample}
import io.prometheus.client.{Collector, CollectorRegistry}

import scala.collection.JavaConverters._
Expand Down

0 comments on commit ea70f34

Please sign in to comment.