Skip to content

Loading…

Added capability to send stats.time data to remote server #33

Closed
wants to merge 9 commits into from

2 participants

@pcetsogtoo

As defined at issue #31 I have a couple of machines using ostrich behind a load balancer. I needed to have a central ostrich server that collects timing data from the other servers. Basically the clients would simply send the data over socket and a central server collects all of them so I can have stats for multiple machines.

To achieve this I implemented a generic hook. Initially ostrich will do nothing but things will start working once user implements how data is sent to remote server & hooks it into the ostrich service. Sending & Receiving data are independent from ostrich and are implemented by user. This is a small modification but it's really helpful for someone who has same problem as I had. I have working example of using Salvero (https://github.com/zcox/salvero) to send time data to remote server.

Here is how data is sent to remote code in client side:

import com.twitter.ostrich.admin._
import com.twitter.ostrich.admin.config._
import org.salvero.core.{Push, Connect}

case class SalveroRemote(push: Option[Push]) extends Remote {
  override def time(name: String, value: Int) = push map (_ ! AddMetric(name, value)) // sends the data to remote server
}

val push = Some(new Push("tcp://localhost:5555") with Connect)
val salveroRemote = new SalveroRemote(push)

// Ostrich setup
val adminConfig = new AdminServiceConfig {
  httpPort = 9991
  statsNodes = new StatsConfig { 
    reporters = new TimeSeriesCollectorConfig 
  }
}
val runtime = RuntimeEnvironment(this, Array[String]())
val admin = adminConfig()(runtime)

// Hook our remote into ostrich http admin service
admin map (_.remote = salveroRemote)

Here is the example of how data would be received in server side

import com.twitter.ostrich.admin._
import com.twitter.ostrich.admin.config._
import com.twitter.ostrich.stats._
import org.salvero.core.{Send, CaseClass, Pull, Bind}

val handler = new Send {
  def ![A <: CaseClass: Manifest](msg: A) = msg match {
    case (m: AddMetric) => Stats.addMetric(m.name, m.value) 
  }
}

val pull = new Pull("tcp://localhost:5555", handler) with Bind 
new Thread(pull).start 

// Ostrich setup
val adminConfig = new AdminServiceConfig {
  httpPort = 9990
  statsNodes = new StatsConfig { 
    reporters = new TimeSeriesCollectorConfig 
  }
}
val runtime = RuntimeEnvironment(this, Array[String]())
val admin = adminConfig()(runtime)
@robey

it looks like this will send each new timing/metric to the remote server as it's collected. i'm worried that won't scale well with high-traffic servers (or having, say, 500 servers). what if the stats were forwarded upstream minutely as a collection?

@pcetsogtoo

You are right. Including the AddMetric case class in ostrich wasn't a good idea. I think user should define this. By letting them define the transfer message they can forward case classes that contain collections or anything. And in the client side there is a thread running and at some time it sends the collection to the remote server. Scaling is up to users.

For example if we want transfer collection to the remote server the client side code would look like this:

case class AddMetric(name: String, value: Int)
case class MetricMessages(messages: List[AddMetric])    

case class SalveroRemote(push: Option[Push]) extends Remote {
  var timingMessages = List[AddMetric]()
  override def time(name: String, value: Int) = {
     // just appends timing to timingMessages list
  }

}

...

val push = Some(new Push("tcp://127.0.0.1:5555") with Connect)
val salveroRemote = new SalveroRemote(push)

// There will be a thread running that sends messages list to the remote server and clear the list
// Example: salveroRemote.push map (_ ! MetricMessages(salveroRemote.timingMessages))

...

// hook into ostrich
admin map (_.remote = salveroRemote)

What do you think? I believe it will work. I can upload sample codes to github in 1, 2 days.

@pcetsogtoo

I've just removed AddMetric class class from ostrich. Now implementation of the hook is totally separate from ostrich, it doesn't have anything with performance optimization and it's the implementation concern, not a ostrich issue. All I want to provide is one implementation of this hook using Salvero. Actually user can provide whatever for this hook implementation, sending right away, aggregate & send later etc ... Please accept this hook to ostrich if you think OK or else let me know if there is anything you would like ask.

@pcetsogtoo

Please respond

@pcetsogtoo pcetsogtoo closed this
@pcetsogtoo pcetsogtoo reopened this
@robey

sorry, long week.

the remote hook is still called for every metric, though. i think it would be better to have the hook called once a minute (or on some arbitrary period) with the collected stats, to minimize traffic... something like JsonStatsLogger, for example.

@pcetsogtoo

Actually this hook doesn't have to send the each metric data to remote servers. It just allows us to do something with each metric data and what exactly happens with it is entirely an implementation concern. In our case we're sending it to a remote server. So I've just renamed the Remote trait to TimeListener which, I believe, makes more sense. Please look at the diff.

@pcetsogtoo

Hey Robey it's been a while. Are you still not convinced?

@pcetsogtoo pcetsogtoo closed this
@pcetsogtoo pcetsogtoo reopened this
@robey

it still looks like this would get called anytime any timing is collected. that could be thousands of times a second, so it's important not to do anything intensive in there.

if you grabbed the histogram during a stats listener, you could still have the distribution of timings within 5%.

it'll be hard to convince me that any hook should be taking place during addMetric, but i'm curious if there's some simpler way to accomplish what you want.

@pcetsogtoo pcetsogtoo closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jul 1, 2011
  1. @pcetsogtoo
  2. @pcetsogtoo

    Fixed import bug

    pcetsogtoo committed
Commits on Jul 2, 2011
  1. @pcetsogtoo
  2. @pcetsogtoo
Commits on Jul 12, 2011
  1. @pcetsogtoo
  2. @pcetsogtoo

    Merge fix on StatsProvider

    pcetsogtoo committed
  3. @pcetsogtoo
Commits on Sep 5, 2013
  1. @pcetsogtoo

    merge from upstream

    pcetsogtoo committed
  2. @pcetsogtoo

    remove unused import

    pcetsogtoo committed
This page is out of date. Refresh to see the latest.
View
5 src/main/scala/com/twitter/ostrich/admin/Service.scala
@@ -46,4 +46,9 @@ trait Service {
def reload() {
// default is to do nothing.
}
+
+ var timeListener: TimeListener = new TimeListenerNoop
+
+ def hookTime(name: String, value: Int) = timeListener.time(name, value)
+
}
View
4 src/main/scala/com/twitter/ostrich/admin/ServiceTracker.scala
@@ -53,4 +53,8 @@ object ServiceTracker {
def reload() {
synchronized { services.toList }.foreach { _.reload() }
}
+
+ def hookTime(name: String, value: Int) = {
+ synchronized { services.toList }.foreach { _.hookTime(name, value) }
+ }
}
View
15 src/main/scala/com/twitter/ostrich/admin/TimeListener.scala
@@ -0,0 +1,15 @@
+package com.twitter.ostrich.admin
+
+trait TimeListener {
+
+ def time(name: String, value: Int)
+
+}
+
+class TimeListenerNoop extends TimeListener {
+
+ override def time(name: String, value: Int) = {
+ // does nothing
+ }
+
+}
View
7 src/main/scala/com/twitter/ostrich/stats/StatsProvider.scala
@@ -20,6 +20,7 @@ import scala.util.matching.Regex
import scala.collection.{Map, mutable, immutable}
import com.twitter.json.Json
import com.twitter.util.{Duration, Future, Stopwatch}
+import com.twitter.ostrich.admin.ServiceTracker
import com.twitter.logging.Logger
/**
@@ -195,6 +196,7 @@ trait StatsProvider {
def time[T](name: String)(f: => T): T = {
val (rv, duration) = Duration.inMilliseconds(f)
addMetric(name + "_msec", duration.inMilliseconds.toInt)
+ ServiceTracker.hookTime(name + "_msec", duration.inMilliseconds.toInt)
rv
}
@@ -206,6 +208,7 @@ trait StatsProvider {
val elapsed = Stopwatch.start()
f.respond { _ =>
addMetric(name + "_usec", elapsed().inMicroseconds.toInt)
+ ServiceTracker.hookTime(name + "_usec", elapsed().inMicroseconds.toInt)
}
f
}
@@ -227,6 +230,7 @@ trait StatsProvider {
val elapsed = Stopwatch.start()
f.ensure {
addMetric(name + "_msec", elapsed().inMilliseconds.toInt)
+ ServiceTracker.hookTime(name + "_msec", elapsed().inMilliseconds.toInt)
}
}
@@ -238,6 +242,7 @@ trait StatsProvider {
val elapsed = Stopwatch.start()
f.respond { _ =>
addMetric(name + "_nsec", elapsed().inNanoseconds.toInt)
+ ServiceTracker.hookTime(name + "_nsec", elapsed().inNanoseconds.toInt)
}
f
}
@@ -248,6 +253,7 @@ trait StatsProvider {
def timeMicros[T](name: String)(f: => T): T = {
val (rv, duration) = Duration.inNanoseconds(f)
addMetric(name + "_usec", duration.inMicroseconds.toInt)
+ ServiceTracker.hookTime(name + "_usec", duration.inMicroseconds.toInt)
rv
}
@@ -257,6 +263,7 @@ trait StatsProvider {
def timeNanos[T](name: String)(f: => T): T = {
val (rv, duration) = Duration.inNanoseconds(f)
addMetric(name + "_nsec", duration.inNanoseconds.toInt)
+ ServiceTracker.hookTime(name + "_nsec", duration.inNanoseconds.toInt)
rv
}
}
Something went wrong with that request. Please try again.