From 41ea5885ccf75c20f8729f714f920fd6c82ff145 Mon Sep 17 00:00:00 2001 From: Joshua Garnett Date: Thu, 14 Mar 2013 10:51:36 -0400 Subject: [PATCH] Adding Scala Example --- examples/README.md | 1 + examples/StatsD.scala | 232 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 233 insertions(+) create mode 100644 examples/StatsD.scala diff --git a/examples/README.md b/examples/README.md index 582a561c..3b7547c4 100644 --- a/examples/README.md +++ b/examples/README.md @@ -12,6 +12,7 @@ Here's a bunch of example code contributed by the community for interfacing with ruby_example.rb - Ruby statsd.erl - Erlang statsd-client.sh - Bash + StatsD.scala - Scala Third Party StatsD Libraries ============================ diff --git a/examples/StatsD.scala b/examples/StatsD.scala new file mode 100644 index 00000000..7830b92d --- /dev/null +++ b/examples/StatsD.scala @@ -0,0 +1,232 @@ +/* + +Scala implementation of Andrew Gwozdziewycz's StatsdClient.java + +Copyright (c) 2013 Joshua Garnett + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +package com.statsd + +import java.io.IOException +import java.net._ +import java.nio.ByteBuffer +import java.nio.channels.DatagramChannel +import java.util.Random +import org.slf4j.LoggerFactory +import akka.actor._ + +/** + * Client for sending stats to StatsD uses Akka to manage concurrency + * + * @param context The Akka ActorContext + * @param host The statsd host + * @param port The statsd port + * @param multiMetrics If true, multiple stats will be sent in a single UDP packet + * @param packetBufferSize If multiMetrics is true, this is the max buffer size before sending the UDP packet + */ +class StatsD(context: ActorContext, + host: String, + port: Int, + multiMetrics: Boolean = true, + packetBufferSize: Int = 1024) { + + private val rand = new Random() + + private val actorRef = context.actorOf(Props(new StatsDActor(host, port, multiMetrics, packetBufferSize))) + + /** + * Sends timing stats in milliseconds to StatsD + * + * @param key name of the stat + * @param value time in milliseconds + */ + def timing(key: String, value: Int, sampleRate: Double = 1.0) = { + send(key, value.toString, StatsDProtocol.TIMING_METRIC, sampleRate) + } + + /** + * Decrement StatsD counter + * + * @param key name of the stat + * @param magnitude how much to decrement + */ + def decrement(key: String, magnitude: Int = -1, sampleRate: Double = 1.0) = { + increment(key, magnitude, sampleRate) + } + + /** + * Increment StatsD counter + * + * @param key name of the stat + * @param magnitude how much to increment + */ + def increment(key: String, magnitude: Int = 1, sampleRate: Double = 1.0) = { + send(key, magnitude.toString, StatsDProtocol.COUNTER_METRIC, sampleRate) + } + + /** + * StatsD now also supports gauges, arbitrary values, which can be recorded. + * + * @param key name of the stat + * @param value Can be a fixed value or increase or decrease (Ex: "10" "-1" "+5") + */ + def gauge(key: String, value: String = "1", sampleRate: Double = 1.0) = { + send(key, value, StatsDProtocol.GAUGE_METRIC, sampleRate) + } + + /** + * StatsD supports counting unique occurrences of events between flushes, using a Set to store all occurring events. + * + * @param key name of the stat + * @param value value of the set + */ + def set(key: String, value: Int, sampleRate: Double = 1.0) = { + send(key, value.toString, StatsDProtocol.SET_METRIC, sampleRate) + } + + /** + * Checks the sample rate and sends the stat to the actor if it passes + */ + private def send(key: String, value: String, metric: String, sampleRate: Double): Boolean = { + if (sampleRate >= 1 || rand.nextDouble <= sampleRate) { + actorRef ! SendStat(StatsDProtocol.stat(key, value, metric, sampleRate)) + true + } + else { + false + } + } +} + +object StatsDProtocol { + val TIMING_METRIC = "ms" + val COUNTER_METRIC = "c" + val GAUGE_METRIC = "g" + val SET_METRIC = "s" + + /** + * @return Returns a string that conforms to the StatsD protocol: + * KEY:VALUE|METRIC or KEY:VALUE|METRIC|@SAMPLE_RATE + */ + def stat(key: String, value: String, metric: String, sampleRate: Double) = { + val sampleRateString = if (sampleRate < 1) "|@" + sampleRate else "" + key + ":" + value + "|" + metric + sampleRateString + } +} + +/** + * Message for the StatsDActor + */ +private case class SendStat(stat: String) + +/** + * @param host The statsd host + * @param port The statsd port + * @param multiMetrics If true, multiple stats will be sent in a single UDP packet + * @param packetBufferSize If multiMetrics is true, this is the max buffer size before sending the UDP packet + */ +private class StatsDActor(host: String, + port: Int, + multiMetrics: Boolean, + packetBufferSize: Int) extends Actor { + + private val log = LoggerFactory.getLogger(getClass()) + + private val sendBuffer = ByteBuffer.allocate(packetBufferSize) + + private val address = new InetSocketAddress(InetAddress.getByName(host), port) + private val channel = DatagramChannel.open() + + def receive = { + case msg: SendStat => doSend(msg.stat) + case _ => log.error("Unknown message") + } + + override def postStop() = { + //save any remaining data to StatsD + flush + + //Close the channel + if (channel.isOpen()) { + channel.close() + } + + sendBuffer.clear() + } + + private def doSend(stat: String) = { + try { + val data = stat.getBytes("utf-8") + + // If we're going to go past the threshold of the buffer then flush. + // the +1 is for the potential '\n' in multi_metrics below + if (sendBuffer.remaining() < (data.length + 1)) { + flush + } + + // multiple metrics are separated by '\n' + if (sendBuffer.position() > 0) { + sendBuffer.put('\n'.asInstanceOf[Byte]) + } + + // append the data + sendBuffer.put(data) + + if (!multiMetrics) { + flush + } + + } + catch { + case e: IOException => { + log.error("Could not send stat {} to host {}:{}", sendBuffer.toString, address.getHostName(), address.getPort().toString, e) + } + } + } + + private def flush(): Unit = { + try { + val sizeOfBuffer = sendBuffer.position() + + if (sizeOfBuffer <= 0) { + // empty buffer + return + } + + // send and reset the buffer + sendBuffer.flip() + val nbSentBytes = channel.send(sendBuffer, address) + sendBuffer.limit(sendBuffer.capacity()) + sendBuffer.rewind() + + if (sizeOfBuffer != nbSentBytes) { + log.error("Could not send entirely stat {} to host {}:{}. Only sent {} bytes out of {} bytes", sendBuffer.toString(), + address.getHostName(), address.getPort().toString, nbSentBytes.toString, sizeOfBuffer.toString) + } + + } + catch { + case e: IOException => { + log.error("Could not send stat {} to host {}:{}", sendBuffer.toString, address.getHostName(), address.getPort().toString, e) + } + } + } +} \ No newline at end of file