Skip to content
Browse files

Merge branch 'enterprise' of falbala.23loc.com:fnordmetric

  • Loading branch information...
2 parents 96c7ec2 + 429b945 commit 58dab03efab3820073414b0a4ca414f5458082da @paulasmuth committed Feb 26, 2013
Showing with 1,632 additions and 96 deletions.
  1. +27 −69 README.md
  2. +23 −8 fnordmetric-doc/config.yml
  3. +0 −1 fnordmetric-doc/src/classic_event_handlers.md
  4. +11 −0 fnordmetric-doc/src/enterprise_deployment.md
  5. +4 −0 fnordmetric-doc/src/enterprise_examples.md
  6. +85 −0 fnordmetric-doc/src/enterprise_http_api.md
  7. +51 −0 fnordmetric-doc/src/enterprise_index.md
  8. +92 −0 fnordmetric-doc/src/enterprise_tcp_udp_api.md
  9. +4 −0 fnordmetric-doc/src/enterprise_usage.md
  10. +21 −16 fnordmetric-doc/src/index.md
  11. +4 −0 fnordmetric-doc/web/assets/documentation.css
  12. +3 −2 fnordmetric-doc/web/documentation.erb
  13. +1 −0 fnordmetric-enterprise/.gitignore
  14. +4 −0 fnordmetric-enterprise/TODO.md
  15. +34 −0 fnordmetric-enterprise/build.sbt
  16. +3 −0 fnordmetric-enterprise/project/plugins.sbt
  17. +36 −0 fnordmetric-enterprise/src/AbstractBucket.scala
  18. +14 −0 fnordmetric-enterprise/src/AbstractInstruction.scala
  19. +137 −0 fnordmetric-enterprise/src/Benchmark.scala
  20. +23 −0 fnordmetric-enterprise/src/BucketFactory.scala
  21. +14 −0 fnordmetric-enterprise/src/ErrorInstruction.scala
  22. +188 −0 fnordmetric-enterprise/src/FnordMetric.scala
  23. +39 −0 fnordmetric-enterprise/src/HTTPServer.scala
  24. +39 −0 fnordmetric-enterprise/src/InstructionFactory.scala
  25. +31 −0 fnordmetric-enterprise/src/MeanBucket.scala
  26. +211 −0 fnordmetric-enterprise/src/Metric.scala
  27. +33 −0 fnordmetric-enterprise/src/MetricFactory.scala
  28. +75 −0 fnordmetric-enterprise/src/RingBuffer.scala
  29. +17 −0 fnordmetric-enterprise/src/SampleInstruction.scala
  30. +36 −0 fnordmetric-enterprise/src/Scheduler.scala
  31. +15 −0 fnordmetric-enterprise/src/StorageAdapter.scala
  32. +22 −0 fnordmetric-enterprise/src/SumBucket.scala
  33. +106 −0 fnordmetric-enterprise/src/SwapFile.scala
  34. +27 −0 fnordmetric-enterprise/src/TCPHandler.scala
  35. +51 −0 fnordmetric-enterprise/src/TCPServer.scala
  36. +49 −0 fnordmetric-enterprise/src/UDPServer.scala
  37. +22 −0 fnordmetric-enterprise/src/ValueAtInstruction.scala
  38. +31 −0 fnordmetric-enterprise/src/ValuesInInstruction.scala
  39. +31 −0 fnordmetric-enterprise/src/WebSocket.scala
  40. +18 −0 fnordmetric-enterprise/src/WebSocketHandler.scala
View
96 README.md
@@ -1,9 +1,8 @@
FnordMetric
===========
-FnordMetric is a highly configurable (and pretty fast) realtime app/event
-tracking thing based on [ruby eventmachine](http://rubyeventmachine.com/) and [redis](http://redis.io/). You define your own
-plotting and counting functions as ruby blocks! [See it in action! (RailsCasts)](http://railscasts.com/episodes/378-fnordmetric)
+FnordMetric is a framework for collecting and visualizing timeseries data. It enables
+you to build beautiful real-time analytics dashboards within minutes.
Documentation: [fnordmetric.io](http://fnordmetric.io/)
@@ -12,85 +11,43 @@ Documentation: [fnordmetric.io](http://fnordmetric.io/)
[ ![Screenshot](https://github.com/paulasmuth/fnordmetric/raw/v1.0-alpha/doc/preview3.png) ](http://github.com/paulasmuth/fnordmetric)
+### FnordMetric Enterprise
-Getting Started
----------------
+FnordMetric Enterprise is a JVM-based timeseries database. It's a key-value store
+(much like redis or memcached) where each key holds a "measurement". There are
+different measurement methods like sum, mean, min/max, 90th percentile, etc. You
+continuously add data to these keys/measurements which is aggregated and periodically
+persisted.
-FnordMetric is based on ruby eventmachine and needs to run in a seperate ruby process.
-The preferred way to start it is to create a ruby file (where you put your DSL statements)
-and execute it (more about that in [the documentation](http://fnordmetric.io/documentation))
+FnordMetric Enterprise features disk persistence, a HTTP, TCP and UDP API, native
+WebSockets support, CSV/JSON Export and a turnkey-ready HTML5 visualization solution
+(FnordMetric UI). FnordMetric Enterprise can be used as a drop-in replacement for
+StatsD+Graphite (it is a lot faster, see Benchmarks).
-Save this to `my_fnordmetric.rb`
+[Getting Started with FnordMetric Enterprise](http://fnordmetric.io/documentation/enterprise_index)
-```ruby
-require "fnordmetric"
-FnordMetric.namespace :myapp do
+### FnordMetric UI
- # render a timeseries graph
- widget 'Sales',
- :title => "Sales per Minute",
- :gauges => [:sales_per_minute],
- :type => :timeline,
- :width => 100,
- :autoupdate => 1
+FnordMetric UI is a HTML5 API that lets you plug realtime data and charts into any webpage
+without writing a single line of code. It gives you maximum flexiblitiy as you have full
+control over layout and style with HTML and CSS.
-end
+FnordMetric UI uses WebSockets to communicate with a backend server. There are two backend
+implementations: FnordMetric Classic (ruby + redis) and FnordMetric Enterprise (JVM). You can use
+FnordMetric UI as a white label solution to power your custom realtime analytics apps.
-FnordMetric.standalone
-```
+[Getting Started with FnordMetric UI](http://fnordmetric.io/documentation/ui_index)
-In this case we created one timeseries chart on the dashboard "Sales" that will display
-the number of sales_per_minute and auto-refresh every second.
-You should now be able to start the dashboard on http://localhost:4242/ (default) by running:
+### FnordMetric Classic
- $ ruby my_fnordmetric.rb
+FnordMetric Classic is powered by ruby and redis. It offers a ruby DSL for processing data
+streams and building beautiful web dashboards from a collection of turnkey-ready UI widgets.
+You define your own plotting and aggregation methods as ruby blocks. [See it in action! (RailsCasts)](http://railscasts.com/episodes/378-fnordmetric)
+[Getting Started with FnordMetric Classic](http://fnordmetric.io/documentation/classic_index)
-Now we can start sending data to FnordMetric. The canonical way to submit data is the HTTP API.
-This will report a single sale:
-
- curl -X POST -d '{ "_type": "_incr", "value": 1, "gauge": "sales_per_minute" }' http://localhost:4242/events
-
-There are various other ways to submit events to FnordMetric (more information in [the documentation](http://fnordmetric.io/documentation)).
-
-
-### HTML5 API
-
-FnordMetric offers a HTML5 / JavaScript API (called _FnordMetric UI_) that allows
-you to plug real-time data and charts into any website without having to write code.
-This is achieved by including a JavaScript library and using data-* attributes on
-html elements to declare the widgets.
-
-```html
-<link href='http://localhost:4242/fnordmetric-ui.css' type='text/css' rel='stylesheet' />
-<script src='http://localhost:4242/fnordmetric-ui.js' type='text/javascript'></script>
-
-<span
- data-fnordmetric="counter"
- data-gauge="sales_per_minute"
- data-autoupdate="1"
- data-unit=""
- >0</span>
-
-<script>
- FnordMetric.setup({
- "address": "localhost:4242",
- "namespace": "myapp"
- });
-</script>
-```
-
-
-Installation
-------------
-
- gem install fnordmetric
-
-or in your Gemfile:
-
- gem 'fnordmetric', '>= 1.0.0'
Documentation
@@ -99,6 +56,7 @@ Documentation
You can find the full FnordMetric Documentation at http://fnordmetric.io/
+
Contributors
------------
View
31 fnordmetric-doc/config.yml
@@ -5,7 +5,7 @@ sitemap:
fnordmetric_ui:
-
- title: "Introduction"
+ title: "Getting Started"
url: "/ui_index"
-
title: "API Reference"
@@ -41,12 +41,27 @@ sitemap:
url: "/classic_hacking"
- #fnordmetric_enterprise:
- # -
- # title: "Getting Started"
- # url: "/ui_index"
- # -
- # title: "HTTP API"
- # url: "/ui_index"
+ fnordmetric_enterprise:
+ -
+ title: "Introduction"
+ url: "/enterprise_index"
+ -
+ title: "Deployment"
+ url: "/enterprise_deployment"
+ -
+ title: "HTTP API"
+ url: "/enterprise_http_api"
+ -
+ title: "TCP/UDP API"
+ url: "/enterprise_tcp_udp_api"
+ -
+ title: "Configuration"
+ url: "/enterprise_usage"
+ -
+ title: "Clients"
+ url: "/enterprise_usage"
+ -
+ title: "Examples"
+ url: "/enterprise_examples"
View
1 fnordmetric-doc/src/classic_event_handlers.md
@@ -325,7 +325,6 @@ _Example:_
=> 1360623178
-<br />
#### Manipulating gauges
These methods allow manipulation of gauges. The time / bucket to modify
View
11 fnordmetric-doc/src/enterprise_deployment.md
@@ -0,0 +1,11 @@
+Deployment
+----------
+
+### Deploy on Ubuntu
+
+upstart script etc
+
+
+### Deploy on EC2
+
+ready made ami
View
4 fnordmetric-doc/src/enterprise_examples.md
@@ -0,0 +1,4 @@
+Examples
+--------
+
+here be dragons
View
85 fnordmetric-doc/src/enterprise_http_api.md
@@ -0,0 +1,85 @@
+HTTP API
+--------
+
+FIXPAUL: blah blah
+
+### GET /metric/:key
+
+Retrieves a metric's value(s) at a specified point in time or in a specified time interval
+
+Parameters:
+
+<table>
+ <tr>
+ <th>key <i>(mandatory)</i></th>
+ <td>
+ key of this metric (e.g. my-counter-sum-15)
+ </td>
+ </tr>
+ <tr>
+ <th>since / until</th>
+ <td>
+ when the since and until parameters are set, all values in the supplied time
+ interval are returned. values can be a timestamp or a timespec like... since/until
+ are mutually exclusive with at
+ </td>
+ </tr>
+ <tr>
+ <th>at</th>
+ <td>
+ if set, returns a single value at this point in time. content can be a timestamp
+ or a timespec like -3m... mutually exclusive with since/until
+ </td>
+ </tr>
+ <tr>
+ <th>format</th>
+ <td>
+ response format csv, json or xml (also use HTTP-Accept)
+ </td>
+ </tr>
+</table>
+<br />
+
+Examples:
+
+ >> GET /metric/total_sales_in_euro-sum-30?at=-5m&format=json
+ << HTTP/1.1 200 OK
+ << ...
+ << { "value": 23426 }
+
+ >> GET /metric/total_sales_in_euro-sum-30?since=-2min&until=now&format=json
+ << HTTP/1.1 200 OK
+ << ...
+ << { "values": { 1360804571: 4233, 1360804581: 4636, 1360804591: 3621, ... } }
+
+
+### POST /metric/:key
+
+Sample/add a value to a metric.
+
+Parameters:
+
+<table>
+ <tr>
+ <th>key <i>(mandatory)</i></th>
+ <td>
+ key of this metric (e.g. my-counter-sum-15)
+ </td>
+ </tr>
+ <tr>
+ <th>value <i>(mandatory)</i></th>
+ <td>
+ the value to add/sample to this metric
+ </td>
+ </tr>
+</table>
+<br />
+
+Examples:
+
+ >> POST /metric/total_sales_in_euro-sum-30?value=351
+ << HTTP/1.1 201 CREATED
+
+
+
+
View
51 fnordmetric-doc/src/enterprise_index.md
@@ -0,0 +1,51 @@
+FnordMetric Enterprise
+----------------------
+
+FnordMetric Enterprise is a JVM-based timeseries database. It's a key-value store
+(much like redis or memcached) where each key holds a "measurement". There are
+different measurement methods like sum, mean, min/max, 90th percentile, etc. You
+continuously add data to these keys/measurements which is aggregated and periodically
+persisted.
+
+FnordMetric Enterprise features disk persistence, a HTTP, TCP and UDP API, native
+WebSockets support, CSV/JSON Export and a turnkey-ready HTML5 visualization solution
+(FnordMetric UI). FnordMetric Enterprise can be used as a drop-in replacement for
+StatsD+Graphite (it is a lot faster, see Benchmarks).
+
+
+### Semantics
+
+There are three basic operations: `add_sample`, `value_at` and `values_in` that
+add a sample to an ongoing measurement, retrieve the measurement value at a
+specified time, or retrieve all aggregated measurement values in a specified time
+interval.
+
+The measurement method and flush_interval are implicitly specified by the key;
+all keys have to be postfixed with "$method-$flush_timeout". For example if
+you want a key "response_time" to operate in average/mean mode and flush every 60
+seconds, use the key `response_time-mean-60`, for a key "total_clicks" that
+aggregates/sums a value and flushes every hour, you could use "total_clicks.sum-3600"
+
+
+### In-memory vs. disk storage
+
+FnordMetric Enterprise stores the values as 64bit double precision floats.
+
+With an example flush timeout of 10 seconds, one metric uses 0.065 MB of
+memory per day or 0.4 MB per week. The default ring buffer size is x,.
+
+That means with only 4GB of ram, you could access the last month of data of
+2500 counters/measurements with 10 second granularity all from the in-memory
+ringbuffers (without ever hitting a HDD).
+
+Requests that can not be served from memory require one sequential disk read.
+
+
+### FnordMetric Enterprise vs. StatsD
+
++ allows for flush intervals as low as one second
++ rendered in the browser, interactive
++ much much more scalable
++ highly customizable with css
++ requires only a single deployment
++ i18n (proper timezones in graphs due to in browser rendering etc)
View
92 fnordmetric-doc/src/enterprise_tcp_udp_api.md
@@ -0,0 +1,92 @@
+TCP, UDP, WebSockets API
+------------------------
+
+protocol is line based, neither request nor response are allowed to
+contain \n characters, but both request and response have to end with
+a newline (\n) character.
+
+#### TCP / WebSockets:
+
+there is no handshake, you can just open the connection and start sending
+commands. the protocol does not support multiplexing/pipelining: after every
+newline-terminated command you send you have to read one line from the socket.
+
+websocket connections also speak the tcp protocol after the connection upgrade
+
+the generic error response is
+
+ ERROR something went wrong\n
+
+
+#### UDP:
+
+you can put multiple lines in one udp packet.
+example udp packet that increments four counters:
+
+ SAMPLE my_counter1.sum-3600 123\n
+ SAMPLE my_counter2.sum-3600 456\n
+ SAMPLE my_counter3.sum-3600 789\n
+
+
+
+### Command: SAMPLE
+
+samples/adds a value to a metric
+
+*Format:*
+
+ SAMPLE [metric] [value]
+
+*Response:*
+
+ "OK"
+
+*Example:*
+
+ >> SAMPLE my_application.response_time.avg-30 23
+ << OK
+
+
+### Command: VALUE_AT
+
+retrieves the value of a metric at one point in time
+
+*Format:*
+
+ VALUE_AT [metric] [at]
+
+*Response:*
+
+ Float or "null"
+
+*Example:*
+
+ >> VALUE_AT my_application.response_time.avg-30 1382341536
+ << 17.42
+
+ >> VALUE_AT my_application.response_time.avg-30 1382341536
+ << null
+
+
+### Command: VALUES_IN
+
+Retrieves all values of a metric in a time interval
+
+*Format:*
+
+ VALUE_AT [metric] [from] [until]
+
+*Response:*
+
+ white space seperated Timestamp:Float tuples or "null"
+
+*Example:*
+
+ >> VALUE_AT my_application.response_time.avg-30 13823534644 13823414323
+ << 1360804571:4233.52 1360804581:4312.36 1360804591:6323.12
+
+ >> VALUE_AT my_application.response_time.avg-30 13823534644 13823414323
+ << null
+
+
+
View
4 fnordmetric-doc/src/enterprise_usage.md
@@ -0,0 +1,4 @@
+Usage
+-----
+
+here be dragons
View
37 fnordmetric-doc/src/index.md
@@ -1,20 +1,9 @@
FnordMetric v1.1.2 Documentation
--------------------------------
-FnordMetric is a framework for processing and visualizing stream data on
+FnordMetric is a framework for stream data processing and visualization on
real-time analytics dashboards. It is split up into three components:
-### FnordMetric Classic
-
-FnordMetric Classic is powered by ruby and redis. It offers a ruby DSL for processing data
-streams and building beautiful web dashboards from a collection of pre-made UI widgets.
-
-You can also use the FnordMetric UI HTML5 API to display the data collected with FnordMetric
-Classic on another website or build highly customized / white label views.
-
-<a href="/documentation/classic_index">Getting Started with FnordMetric Classic &rarr;</a>
-
-
### FnordMetric UI
FnordMetric UI is a HTML5 / JavaScript API that lets you plug realtime data and charts into any
@@ -27,14 +16,30 @@ implementations: FnordMetric Classic (ruby + redis) and FnordMetric Enterprise (
You can use FnordMetric UI as a white label solution to power your custom realtime analytics apps.
<a href="/documentation/ui_index">Getting Started with FnordMetric UI &rarr;</a>
+<br /><br />
### FnordMetric Enterprise
FnordMetric Enterprise is a JVM based timeseries database which serves as a backend for FnordMetric
-UI. It can handle thousands of gauges and years worth of historical data. It does not depend on a
-backend store like redis; it can be run in standalone mode or persist to an external database.
+UI. It can handle thousands of gauges and years worth of historical data. FnordMetric Enterprise runs
+stand-alone, i.e. it does not depend on a backend store like redis.
+
+FnordMetric Enterprise features disk persistence, a HTTP, TCP and UDP API, native WebSockets support
+and CSV/JSON Export. FnordMetric Enterprise can be used as a drop-in replacement for StatsD+Graphite
+(it is a lot faster, see Benchmarks).
-FnordMetric Enterprise includes some advanced features like Hadoop Integration and CSV Export.
+<a href="/documentation/enterprise_index">Getting Started with FnordMetric Enterprise &rarr;</a>
+<br /><br />
+
+
+### FnordMetric Classic
+
+FnordMetric Classic is powered by ruby and redis. It offers a ruby DSL for processing data
+streams and building beautiful web dashboards from a collection of pre-made UI widgets.
+
+You can also use the FnordMetric UI HTML5 API to display the data collected with FnordMetric
+Classic on another website or build highly customized / white label views.
+
+<a href="/documentation/classic_index">Getting Started with FnordMetric Classic &rarr;</a>
-<i style="color:#999;">FnordMetric Enterprise is currently in private beta</i>
View
4 fnordmetric-doc/web/assets/documentation.css
@@ -315,3 +315,7 @@ pre.dark code {
.code_snippets a.link:hover {
text-decoration: underline;
}
+
+#documentation pre + h3 {
+ margin-top:55px;
+}
View
5 fnordmetric-doc/web/documentation.erb
@@ -17,8 +17,9 @@
</ul>
<% [
- ["FnordMetric Classic", "fnordmetric_classic"],
- ["FnordMetric UI", "fnordmetric_ui"]
+ ["FnordMetric UI", "fnordmetric_ui"],
+ ["FnordMetric Enterprise", "fnordmetric_enterprise"],
+ ["FnordMetric Classic", "fnordmetric_classic"]
].each do |(title, key)| %>
<a class="nav_title"><%= title %></a>
<ul>
View
1 fnordmetric-enterprise/.gitignore
@@ -0,0 +1 @@
+target/
View
4 fnordmetric-enterprise/TODO.md
@@ -0,0 +1,4 @@
++ background rbuf_flush -> swapfile every N seconds
++ rbuf size in time
++ load all metrics on startup
++ accept millisecond timestamps
View
34 fnordmetric-enterprise/build.sbt
@@ -0,0 +1,34 @@
+import AssemblyKeys._
+
+name := "FnordMetric Enterprise"
+
+organization := "com.paulasmuth"
+
+version := "0.0.2"
+
+mainClass in (Compile, run) := Some("com.fnordmetric.enterprise.FnordMetric")
+
+scalaSource in Compile <<= baseDirectory(_ / "src")
+
+scalaVersion := "2.9.1"
+
+resolvers += "Couchbase Maven2 Repo" at "http://files.couchbase.com/maven2"
+
+libraryDependencies += "org.eclipse.jetty" % "jetty-websocket" % "8.1.8.v20121106"
+
+libraryDependencies += "org.eclipse.jetty" % "jetty-server" % "8.1.8.v20121106"
+
+libraryDependencies += "io.netty" % "netty" % "3.6.0.Final"
+
+assemblySettings
+
+jarName in assembly <<= (version) { v => "FnordMetric-Enterprise-v" + v + ".jar" }
+
+mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
+ {
+ case "about.html" => MergeStrategy.discard
+ case x => old(x)
+ }
+}
+
+fork in run := true
View
3 fnordmetric-enterprise/project/plugins.sbt
@@ -0,0 +1,3 @@
+resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)
+
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.5")
View
36 fnordmetric-enterprise/src/AbstractBucket.scala
@@ -0,0 +1,36 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+trait AbstractBucket {
+
+ var next_flush : Long = 0
+
+ def sample(value: Double) : Unit
+ def flush() : Double
+
+ def flush_every(interval: Long) : (Long, Double) = {
+ val now = FnordMetric.now
+ var triggered = (next_flush == 0)
+ var ret : (Long, Double) = null
+
+ if (triggered)
+ next_flush = now
+
+ while (next_flush <= now) {
+ if (!triggered)
+ ret = ((next_flush, flush))
+
+ next_flush += interval
+ triggered = true
+ }
+
+ ret
+ }
+
+}
View
14 fnordmetric-enterprise/src/AbstractInstruction.scala
@@ -0,0 +1,14 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+trait AbstractInstruction {
+
+ def execute : String
+
+}
View
137 fnordmetric-enterprise/src/Benchmark.scala
@@ -0,0 +1,137 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+object Benchmark {
+
+ def run : Unit = {
+ FnordMetric.DEFAULTS.foreach(d =>
+ FnordMetric.CONFIG += d)
+
+ print_title("MetricFactory#get_metric")
+ for (t <- List(1, 16, 64))
+ bm_metric_factory(1000, t)
+
+ print_title("Metric#values_in")
+ for (n <- List(10L, 60L, 180L, 3600L, 84600L))
+ for (t <- List(1, 4, 16, 64))
+ bm_metric_values_in(n, t)
+
+ print_title("Metric#value_at")
+ for (n <- List(10L, 60L, 180L, 3600L, 84600L))
+ for (t <- List(1, 4, 16, 64))
+ bm_metric_value_at(n, t)
+
+
+ print_title("Metric#sample")
+ for (n <- List(200000, 1000000, 5000000))
+ for (t <- List(1, 4, 16))
+ bm_metric(n, t)
+
+ }
+
+ private def bm_metric(samples: Int, threads: Int) : Unit =
+ print_res(samples + " values, " + threads + " thread(s)",
+ mean_with_preheat(50, 10, (() => {
+
+ val metric = new Metric(
+ MetricKey("fnord", "sum", 1.toLong))
+
+ measure(() => {
+ in_parallel(threads, (() => {
+
+ for (n <- (1 to (samples / threads)))
+ metric.sample(23)
+
+ }))
+ })
+ })))
+
+
+ private def bm_metric_values_in(range: Long, threads: Int) : Unit =
+ print_res(range + " seconds, " + threads + " thread(s)",
+ mean_with_preheat(50, 10, (() => {
+
+ val metric = new Metric(
+ MetricKey("fnord", "sum", 1.toLong))
+
+ measure(() => {
+ in_parallel(threads, (() => {
+
+ for (n <- (1 to 100))
+ metric.values_in(metric.bucket.next_flush, metric.bucket.next_flush - range)
+
+ }))
+ }) / 100
+ })))
+
+
+ private def bm_metric_value_at(ago: Long, threads: Int) : Unit =
+ print_res(ago + " seconds, " + threads + " thread(s)",
+ mean_with_preheat(50, 10, (() => {
+
+ val metric = new Metric(
+ MetricKey("fnord", "sum", 1.toLong))
+
+ measure(() => {
+ in_parallel(threads, (() => {
+
+ for (n <- (1 to 100))
+ metric.value_at(metric.bucket.next_flush - ago)
+
+ }))
+ }) / 100
+ })))
+
+
+ private def bm_metric_factory(metrics: Int, threads: Int) : Unit =
+ print_res(metrics + " metrics, " + threads + " thread(s)",
+ mean_with_preheat(30, 10, (() => {
+ measure(() => {
+ in_parallel(threads, (() => {
+ for (x <- (0 to 100))
+ MetricFactory.get_metric(MetricKey(
+ "benchmark-metric" + (scala.math.random * metrics).toInt,
+ "sum", 1.toLong))
+ }))
+ }) / 100
+ })))
+
+
+ private def measure(proc: => Function0[Unit]) : Long = {
+ val tstart = System.nanoTime
+ proc()
+ System.nanoTime - tstart
+ }
+
+ private def mean_with_preheat(tests: Int, preheat: Int, proc: Function0[Long]) : Long =
+ ((0.toLong /: (1 to tests + preheat)) ((s, n) => {
+ val v = proc()
+ if (n < preheat) 0 else s + v
+ }) / tests)
+
+
+ private def in_parallel(threads: Int, proc: Function0[Unit]) : Unit =
+ ((1 to threads) map (n => {
+ val t = new Thread { override def run = { proc() }}
+ t.start; t
+ })) map { t => t.join }
+
+
+ // HACK !!! ;)
+ private def print_res(title: String, tdiff: Long) =
+ println(" * " + title +
+ (("" /: (1 to (30 - title.length)))((m,c) => m + " ")) + " => " +
+ (tdiff/1000000.0) + "ms")
+
+ // HACK !!! ;)
+ private def print_title(title: String) =
+ println("\n\n" + title + "\n" +
+ (("" /: (1 to (title.length)))((m,c) => m + "=")) + "\n")
+
+}
View
23 fnordmetric-enterprise/src/BucketFactory.scala
@@ -0,0 +1,23 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+object BucketFactory {
+
+ def new_bucket(mode: String) : AbstractBucket =
+ mode match {
+
+ case "sum" =>
+ return new SumBucket()
+
+ case "mean" =>
+ return new MeanBucket()
+
+ }
+
+}
View
14 fnordmetric-enterprise/src/ErrorInstruction.scala
@@ -0,0 +1,14 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+class ErrorInstruction(err: String) extends AbstractInstruction {
+
+ def execute : String = "ERROR " + err
+
+}
View
188 fnordmetric-enterprise/src/FnordMetric.scala
@@ -0,0 +1,188 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+import java.util.Locale
+import java.util.Date
+import java.text.DateFormat
+import java.io.File
+import java.io.RandomAccessFile
+import java.nio.channels.FileChannel
+import java.nio.channels.FileLock
+import java.text.DecimalFormat
+import scala.collection.mutable.HashMap
+
+object FnordMetric {
+
+ val VERSION = "v0.0.2"
+
+ val CONFIG = HashMap[Symbol,String]()
+
+ var DEFAULTS = HashMap[Symbol, String](
+ 'http_threads -> "4",
+ 'websocket_threads -> "4",
+ 'tcp_threads -> "4",
+ 'udp_threads -> "4",
+ 'swap_prefix -> "/tmp/fnordmetric"
+ )
+
+ val number_format = new DecimalFormat("0.#####")
+
+ var debug = false
+
+ var flock : FileLock = null
+
+ def main(args: Array[String]) : Unit = {
+ var n = 0
+
+ while (n < args.length) {
+
+ if (args(n) == "--http")
+ { CONFIG += (('http_port, args(n+1))); n += 2 }
+
+ else if (args(n) == "--http-threads")
+ { CONFIG += (('http_threads, args(n+1))); n += 2 }
+
+ else if (args(n) == "--websocket")
+ { CONFIG += (('websocket_port, args(n+1))); n += 2 }
+
+ else if (args(n) == "--websocket-threads")
+ { CONFIG += (('websocket_threads, args(n+1))); n += 2 }
+
+ else if (args(n) == "--tcp")
+ { CONFIG += (('tcp_port, args(n+1))); n += 2 }
+
+ else if (args(n) == "--tcp-threads")
+ { CONFIG += (('tcp_threads, args(n+1))); n += 2 }
+
+ else if (args(n) == "--udp")
+ { CONFIG += (('udp_port, args(n+1))); n += 2 }
+
+ else if (args(n) == "--udp-threads")
+ { CONFIG += (('udp_threads, args(n+1))); n += 2 }
+
+ else if ((args(n) == "-d") || (args(n) == "--debug"))
+ { debug = true; n += 1 }
+
+ else if ((args(n) == "-h") || (args(n) == "--help"))
+ return usage(true)
+
+ else if (args(n) == "--benchmark")
+ return Benchmark.run
+
+ else {
+ println("error: invalid option: " + args(n) + "\n")
+ return usage(false)
+ }
+
+ }
+
+ DEFAULTS.foreach(d =>
+ if (CONFIG contains d._1 unary_!) CONFIG += d )
+
+ boot
+ }
+
+ def boot = try {
+ FnordMetric.log("Booting...")
+
+ flock = new RandomAccessFile(
+ new File(FnordMetric.CONFIG('swap_prefix), "server.lck"),
+ "rw").getChannel.tryLock
+
+ if (flock == null)
+ error("cannot aquire server.lck", true)
+
+ val sched = new Scheduler
+ sched.start
+
+ if (CONFIG contains 'http)
+ error("FIXPAUL: not yet implemented: http-server", true)
+
+ val ws_server = if (CONFIG contains 'websocket_port)
+ new HTTPServer(
+ CONFIG('websocket_port).toInt,
+ CONFIG('websocket_threads).toInt,
+ new WebSocketHandler)
+
+ val tcp_server = if (CONFIG contains 'tcp_port)
+ new TCPServer(
+ CONFIG('tcp_port).toInt,
+ CONFIG('tcp_threads).toInt)
+
+ val udp_server = if (CONFIG contains 'udp_port)
+ new UDPServer(
+ CONFIG('udp_port).toInt,
+ CONFIG('udp_threads).toInt)
+
+ } catch {
+ case e: Exception => exception(e, true)
+ }
+
+
+ def banner() = {
+ println("FnordMetric Enterprise v" + VERSION)
+ println(" (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>\n\n")
+ }
+
+ def usage(show_banner: Boolean = true) = {
+ if (show_banner) banner()
+
+ println("usage: fnordmetric-server [options] ")
+ println(" --http <port> start http server on this port ")
+ println(" --http-threads <num> number of http worker-threads (default: 4) ")
+ println(" --websocket <port> start websocket server on this port ")
+ println(" --websocket-threads <num> number of websocket worker-threads (default: 4)")
+ println(" --admin <port> start http admin web interface on this port ")
+ println(" -h, --help you're reading it... ")
+ println(" -d, --debug debug mode ")
+ }
+
+
+ def now : Long =
+ System.nanoTime / 1000000
+
+
+ def log(msg: String) = {
+ val now = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.LONG, Locale.FRANCE)
+ println("[" + now.format(new Date()) + "] " + msg)
+ }
+
+
+ def error(msg: String, fatal: Boolean) = {
+ log("[ERROR] " + msg)
+
+ if (fatal)
+ exit(1)
+ }
+
+
+ def log_debug(msg: String) =
+ if (debug)
+ log("[DEBUG] " + msg)
+
+
+ def exception(ex: Throwable, fatal: Boolean) = {
+ error(ex.toString, false)
+
+ for (line <- ex.getStackTrace)
+ log_debug(line.toString)
+
+ if (fatal)
+ exit(1)
+ }
+
+
+ def exit(code: Int) = {
+ if (flock != null)
+ flock.release
+
+ System.exit(code)
+ }
+
+}
View
39 fnordmetric-enterprise/src/HTTPServer.scala
@@ -0,0 +1,39 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+import org.eclipse.jetty.server.Server
+import org.eclipse.jetty.server.handler.AbstractHandler
+import org.eclipse.jetty.util.thread.QueuedThreadPool
+
+import java.io.PrintStream
+import java.io.OutputStream
+
+class HTTPServer(port: Int, num_threads: Int, handler: AbstractHandler) {
+ val server = new Server(port)
+ /*server.setThreadPool(pool)
+ server.setGracefulShutdown(1000)*/
+
+ server.setHandler(handler)
+ without_stderr(_ => server.start())
+
+ FnordMetric.log("Listening on ws://0.0.0.0:" + port)
+
+ def without_stderr(lambda: Unit => Unit) : Unit = {
+ val stderr = System.err
+
+ val dummy = new PrintStream(new OutputStream(){
+ def write(b: Int) : Unit = ()
+ })
+
+ System.setErr(dummy)
+ lambda()
+ System.setErr(stderr)
+ }
+
+}
View
39 fnordmetric-enterprise/src/InstructionFactory.scala
@@ -0,0 +1,39 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+object InstructionFactory {
+
+ val X_METRICKEY = """(.*)(mean|sum)-([0-9]+)"""
+ val X_SAMPLE = ("""^SAMPLE """ + X_METRICKEY + """ ([0-9]+\.?[0-9]*)$""").r
+ val X_VALUESIN = ("""^VALUES_?IN """ + X_METRICKEY + """ ([0-9]+) ([0-9]+)$""").r
+ val X_VALUEAT = ("""^VALUE_?AT """ + X_METRICKEY + """ ([0-9]+)$""").r
+
+ def parse(str: String) : AbstractInstruction = str match {
+
+ case X_SAMPLE(key, mode, flush_interval, value) =>
+ new SampleInstruction(MetricKey(key, mode,
+ java.lang.Double.parseDouble(flush_interval).longValue * 1000),
+ java.lang.Double.parseDouble(value))
+
+ case X_VALUESIN(key, mode, flush_interval, time0, time1) =>
+ new ValuesInInstruction(MetricKey(key, mode,
+ java.lang.Double.parseDouble(flush_interval).longValue * 1000),
+ java.lang.Long.parseLong(time0), java.lang.Long.parseLong(time1))
+
+ case X_VALUEAT(key, mode, flush_interval, time) =>
+ new ValueAtInstruction(MetricKey(key, mode,
+ java.lang.Double.parseDouble(flush_interval).longValue * 1000),
+ java.lang.Long.parseLong(time))
+
+ case _ =>
+ new ErrorInstruction("invalid command")
+
+ }
+
+}
View
31 fnordmetric-enterprise/src/MeanBucket.scala
@@ -0,0 +1,31 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+class MeanBucket extends AbstractBucket {
+ var tmp_sum : Double = 0
+ var tmp_cnt : Int = 0
+
+ def sample(value: Double) : Unit = {
+ tmp_sum += value
+ tmp_cnt += 1
+ }
+
+ def flush : Double = {
+ val res = if (tmp_cnt == 0)
+ 0
+ else
+ tmp_sum / tmp_cnt
+
+ tmp_sum = 0.toDouble
+ tmp_cnt = 0
+
+ res
+ }
+
+}
View
211 fnordmetric-enterprise/src/Metric.scala
@@ -0,0 +1,211 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+import scala.collection.mutable.ListBuffer
+
+case class MetricKey(key: String, mode: String, flush_interval: Long)
+
+class Metric(key: MetricKey) {
+ var flush_interest : Long = 0
+
+ val bucket = BucketFactory.new_bucket(key.mode)
+ val swap = new SwapFile(key)
+
+ var rbuf = new RingBuffer[(Long, Double)](10)
+ var rbuf_seek_pos = 0
+
+ // adds a value to the metric's bucket and tries to flush the bucket
+ def sample(value: Double) = this.synchronized {
+ bucket.sample(value)
+ flush_bucket
+ }
+
+ // adds an aggregated value to the in memory ring buffer after it has
+ // been flushed from the bucket
+ def flush_bucket : Unit = this.synchronized {
+ val nxt = bucket.flush_every(key.flush_interval)
+
+ // indicate to the background thread that this metric has pending data
+ // in the bucket and when it can be flushed
+ flush_interest = bucket.next_flush
+
+ // flush_every returns null if the current flush interval is not over
+ // yet (makes this method idempotent)
+ if (nxt == null)
+ return
+
+ // if the ring buffer is already full we need to clear up a slot
+ if (rbuf.remaining == 0) {
+
+ // if there is no slot that is already flushed to disk which we can
+ // use, we need to flush some. this flushes as much data to disk as
+ // possible and marks it as "ready for removal"
+ if (rbuf_seek_pos < 1)
+ flush_rbuf
+
+ // exit if we couldn't free up any slots (this should never happen)
+ if (rbuf_seek_pos < 1)
+ throw new Exception("flush_rbuf failed")
+
+ // mark the next value in the rbuf as ready to be overwritten. The
+ // order of these statements is significant!
+ rbuf_seek_pos -= 1
+ rbuf.seek(1)
+ }
+
+ // now at least one slot in the ring buffer is free so we can just
+ // push our sample
+ rbuf.push(nxt)
+
+ // mark this metric as "no pending flushes"
+ flush_interest = 0
+ }
+
+ // tries to persist as much data from the in memory ring buffer to disk
+ // as possible but doesnt remove it from the buffer yet
+ def flush_rbuf = this.synchronized {
+ val flush_range = rbuf.size - rbuf_seek_pos
+
+ // copy the flushable items from the rbuf to the swapfile
+ for (sample <- rbuf.tail(flush_range))
+ swap.put(sample._1, sample._2)
+
+ // mark the range as "read to be overwritten
+ rbuf_seek_pos += flush_range
+ }
+
+ // returns this metrics value at time0 if a value was recorded at that
+ // point in time
+ def value_at(time0: Long) : Option[Double] = {
+ val values = values_in(time0, 0)
+
+ if (values.size > 0)
+ Some(values.first._2)
+ else
+ None
+ }
+
+ // returns all aggregated values for this metric in the specified time
+ // range. if time1 is 0 then only the first value at time0 is returned.
+ // note that time0 > time1! this method is threadsafe. reads within the
+ // in memory ring buffer are lock-free. reads that hit the on disk swap
+ // file use a striped lock and may block
+ def values_in(time0: Long, time1: Long) : List[(Long, Double)] = {
+ val lst = ListBuffer[(Long, Double)]()
+
+ var rbuf_last : Long = java.lang.Long.MAX_VALUE
+ var rbuf_pos = 0
+
+ // take a "snapshot" of the ring buffers current state. this may race
+ // (len may be smaller than the real value) but this only means that
+ // we may have to load one more value from the swapfile instead from
+ // the in memory ring buffer
+ val rbuf_snap_len = rbuf.size
+ val rbuf_snap_pos = rbuf.cursor
+
+ // search the ring buffer backwards without synchronization. the basic
+ // assumption here is that the system time will only progress forward.
+ // if the system time should jump backwards this would race
+ while (rbuf_pos >= 0 && rbuf_pos < rbuf_snap_len) {
+ val cur = rbuf.at(rbuf_snap_pos, rbuf_pos)
+
+ // since this is not synchronized, we need to check if we hit the
+ // rbuf wrapping point and exit if so. this code would race if the
+ // ring buffer did one full revolution in the time between taking
+ // the initial snapshot (rbuf_snap_pos) and the first assignment to
+ // rbuf_last. we assume that this thread isn't preempted for longer
+ // than 60 seconds (the min. flush_interval) and ignore this...
+ if (cur._1 < rbuf_last)
+ rbuf_last = cur._1
+ else
+ rbuf_pos = -1
+
+ // if we are already beyond time1 we can exit
+ if (time1 != 0 && (cur._1 < time1))
+ return lst.toList
+
+ // if we are only looking for a single value and already beyond time0
+ // plus flush_interval and didnt find a value yet, we can exit
+ if (time1 == 0 && (cur._1 < (time0 - key.flush_interval)))
+ return lst.toList
+
+ // continues only if we didn't hit the buffer wrap
+ if (rbuf_pos >= 0) {
+
+ // check if we found the start of the range yet
+ if (cur._1 <= time0 && ((cur._1 >= time1) || time1 == 0)) {
+
+ // collect all matching items
+ lst += cur
+
+ // if we are looking only for a single value we can exit now
+ if (time1 == 0)
+ return lst.toList
+
+ }
+ }
+
+ if (rbuf_pos >= 0)
+ rbuf_pos += 1
+ }
+
+ // exit if we have already seen the whole time range and don't need to
+ // search the swapfile anymore
+ if (rbuf_last <= time1)
+ return lst.toList
+
+ // start searching the swapfile backwards from the last write position
+ var swap_chunk = ListBuffer[(Long, Double)]()
+ var swap_pos = swap.write_pos
+
+ // we skip at least as many values as we've already seen in the rbuf. but
+ // since this is not synchronized we might still load a few samples that
+ // we have already seen
+ swap_pos -= (rbuf_seek_pos * swap.BLOCK_SIZE)
+
+ while (swap_pos > 0) {
+
+ // load the next chunk of samples from the swapfile
+ swap_pos = swap.load_chunk(swap_pos, swap_chunk)
+
+ for (cur <- swap_chunk) {
+
+ // skip if we already saw this sample in the rbuf search
+ if (cur._1 < rbuf_last) {
+
+ // if we are already beyond time1 we can exit
+ if (time1 != 0 && (cur._1 < time1))
+ return lst.toList
+
+ // if we are only looking for a single value and already beyond time0
+ // plus flush_interval and didnt find a value yet, we can exit
+ if (time1 == 0 && (cur._1 < (time0 - key.flush_interval)))
+ return lst.toList
+
+ // check if we found the start of the range yet
+ if (cur._1 <= time0 && ((cur._1 >= time1) || time1 == 0)) {
+
+ // collect all matching items
+ lst += cur
+
+ // if we are looking only for a single value we can exit now
+ if (time1 == 0)
+ return lst.toList
+
+ }
+ }
+ }
+
+ swap_chunk.clear
+ }
+
+ lst.toList
+ }
+
+}
View
33 fnordmetric-enterprise/src/MetricFactory.scala
@@ -0,0 +1,33 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+import scala.collection.mutable.HashMap
+
+object MetricFactory {
+
+ val metric_map = new HashMap[MetricKey, Metric]()
+
+ def get_metric(key: MetricKey) : Metric = {
+ var metric : Metric = metric_map.getOrElse(key, null)
+
+ if (metric == null) {
+ this.synchronized {
+ metric = metric_map.getOrElse(key, null)
+
+ if (metric == null) {
+ metric = new Metric(key)
+ metric_map += ((key, metric))
+ }
+ }
+ }
+
+ metric
+ }
+
+}
View
75 fnordmetric-enterprise/src/RingBuffer.scala
@@ -0,0 +1,75 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+class RingBuffer[T: Manifest](capacity: Int) {
+ private val backend = new Array[T](capacity)
+
+ private var end : Int = -1
+ private var start : Int = 0
+
+ // the numer of elements that this ring buffer currently contains
+ var size : Int = 0
+
+ // appends a new item. the remaining number of free slots must be checked
+ // before appending
+ def push(item: T) : Unit = {
+ if (size == capacity)
+ throw new Exception("ring buffer is full")
+
+ size += 1
+
+ end = (end + 1) % capacity
+ backend(end) = item
+ }
+
+ // retrieves the first max items from the ring buffer by walking the ring
+ // buffer in chronological order (from oldest to most recent)
+ def head(max: Int) : List[T] = {
+ val lst = new Array[T](scala.math.min(size, max))
+
+ for (ind <- (0 until lst.size))
+ lst(ind) = backend((start + ind) % capacity)
+
+ lst.toList
+ }
+
+ // retrieves the last max items from the ring buffer by walking the ring
+ // buffer in reverse chronological order (from most recent to oldest)
+ def tail(max: Int) : List[T] = {
+ val lst = new Array[T](scala.math.min(size, max))
+
+ for (ind <- (0 until lst.size))
+ lst(ind) = backend((
+ ((end - ind) % capacity) + capacity) % capacity)
+
+ lst.toList
+ }
+
+ // retrieves one item from the ring buffer at offset by walking the ring
+ // buffer starting at position in reverse chronological order (from most
+ // recent to oldest)
+ def at(position: Int, offset: Int) : T =
+ backend((((position - offset) % capacity) + capacity) % capacity)
+
+ // removes the first num items from the start of the ring buffer (oldest
+ // items get removed first)
+ def seek(num: Int) = {
+ start = (start + num) % capacity
+ size -= num
+ }
+
+ // returns the remaning number of free slots in the ring buffer
+ def remaining : Int =
+ capacity - size
+
+ // returns a curser to the current end position of the ring buffer
+ def cursor : Int =
+ end
+
+}
View
17 fnordmetric-enterprise/src/SampleInstruction.scala
@@ -0,0 +1,17 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+class SampleInstruction(key: MetricKey, value: Double) extends AbstractInstruction {
+
+ def execute : String = {
+ MetricFactory.get_metric(key).sample(value)
+ "OK"
+ }
+
+}
View
36 fnordmetric-enterprise/src/Scheduler.scala
@@ -0,0 +1,36 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+class Scheduler extends Thread {
+
+ val TICK = 10
+
+ // this runs in the background and tries to invoke some flushes. it
+ // is not neccessary for the liveliness of the application that each
+ // iteration of the loop finishes in a fixed time interval. even if
+ // this thread dies completely all data will be correctly recorded,
+ // only the last sample on each metric won't be flushed until new data
+ // arrives
+ override def run : Unit =
+ while (true) next
+
+ private def next : Unit = {
+ val now = FnordMetric.now
+
+ // search for metrics with a pending flush interest and flush them
+ // if the now > flush_interest
+ for (metric <- MetricFactory.metric_map)
+ if (metric._2.flush_interest > 0 && metric._2.flush_interest <= now)
+ metric._2.flush_bucket
+
+ // to avoid burning CPU we sleep for a few ms
+ Thread.sleep(TICK)
+ }
+
+}
View
15 fnordmetric-enterprise/src/StorageAdapter.scala
@@ -0,0 +1,15 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+object StorageAdapter {
+
+ //def store(key: BucketKey, time: Long, value: Double) =
+ // println("flush", key, time, value)
+
+}
View
22 fnordmetric-enterprise/src/SumBucket.scala
@@ -0,0 +1,22 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+class SumBucket extends AbstractBucket {
+ var tmp : Double = 0
+
+ def sample(value: Double) : Unit =
+ tmp += value
+
+ def flush : Double = {
+ val res = tmp
+ tmp = 0.toDouble
+ res
+ }
+
+}
View
106 fnordmetric-enterprise/src/SwapFile.scala
@@ -0,0 +1,106 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+import java.io.RandomAccessFile
+import java.io.File
+import java.nio.ByteBuffer
+import java.nio.ByteOrder
+import scala.collection.mutable.ListBuffer
+
+class SwapFile(metric_key: MetricKey) {
+ var last_flush : Long = 0
+
+ val buffer = ByteBuffer.allocate(512)
+ buffer.order(ByteOrder.BIG_ENDIAN)
+
+ // each sample is 18 bytes big (2 bytes header, 8 bytes time and
+ // 8 bytes value as double precision ieee 754 float)
+ val BLOCK_SIZE = 18
+
+ val file_name = "metric-" + metric_key.key +
+ metric_key.mode + "-" + metric_key.flush_interval
+
+ val file = new RandomAccessFile(new File(
+ FnordMetric.CONFIG('swap_prefix), file_name), "rwd")
+
+ var write_pos = file.length.toInt
+
+ // adds a new (time, value) tuple to be written to the swap file
+ // but does not write it yet. this method is not thread safe!
+ def put(time: Long, value: Double) : Unit = {
+ val bvalue = java.lang.Double.doubleToLongBits(value)
+
+ if (buffer.remaining < BLOCK_SIZE)
+ flush
+
+ buffer.putShort(0x1717)
+ buffer.putLong(time)
+ buffer.putLong(bvalue)
+ }
+
+ // fluhes the queued writes from the buffer to disk. this method
+ // is not thread safe!
+ def flush : Unit = {
+ last_flush = FnordMetric.now
+
+ file.synchronized {
+ file.seek(write_pos)
+ file.write(buffer.array, 0, buffer.position)
+ }
+
+ write_pos += BLOCK_SIZE
+ buffer.rewind
+ }
+
+ // reads a chunk of of values from the swapfile at position into the
+ // specified destionation list buffer. this method is thread safe
+ def load_chunk(position: Int, dst: ListBuffer[(Long, Double)]) : Int = {
+ var read_pos = 0
+
+ // we read the data back in 540 byte blocks (30 samples per block)
+ var chunk_size = BLOCK_SIZE * 30
+ val chunk = ByteBuffer.allocate(chunk_size)
+
+ if (position < chunk_size)
+ chunk_size = position
+
+ // read the next chunk into memory
+ while (read_pos < chunk_size - 1) {
+
+ // we need to seek before every read as calls to load_chunk don't
+ // have to be synchronized with writes
+ file.synchronized {
+ file.seek(position - chunk_size)
+
+ read_pos += file.read(chunk.array, read_pos,
+ chunk_size - read_pos - 1)
+ }
+ }
+
+ read_pos = chunk_size - BLOCK_SIZE
+
+ while (read_pos >= 0) {
+ chunk.position(read_pos)
+
+ if (chunk.getShort != 0x1717) {
+ FnordMetric.error("file corrupted: " + file_name, false)
+ return position - chunk_size
+ }
+
+ dst += ((chunk.getLong,
+ java.lang.Double.longBitsToDouble(chunk.getLong)))
+
+ read_pos -= BLOCK_SIZE
+ }
+
+ position - chunk_size
+ }
+
+}
+
View
27 fnordmetric-enterprise/src/TCPHandler.scala
@@ -0,0 +1,27 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+import java.net.InetSocketAddress
+import org.jboss.netty.buffer.ChannelBuffer
+import org.jboss.netty.buffer.ChannelBuffers
+import org.jboss.netty.channel._
+
+class TCPHandler extends SimpleChannelUpstreamHandler {
+
+ override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
+ val ins = InstructionFactory.parse(e.getMessage.toString)
+ e.getChannel.write(ins.execute + "\n")
+ }
+
+ override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
+ FnordMetric.error("[TCP] Exception: " + e.getCause, false)
+ e.getChannel.close()
+ }
+
+}
View
51 fnordmetric-enterprise/src/TCPServer.scala
@@ -0,0 +1,51 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+import java.net.InetSocketAddress
+import java.util.concurrent._
+import org.jboss.netty.bootstrap.ServerBootstrap
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
+import org.jboss.netty.channel.Channels
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.ChannelPipelineFactory
+import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder
+import org.jboss.netty.handler.codec.frame.Delimiters
+import org.jboss.netty.handler.codec.string.StringDecoder
+import org.jboss.netty.handler.codec.string.StringEncoder
+
+class TCPServer(port: Int, threads: Int) {
+
+ val MAX_FRAME_LENGTH = 8192
+
+ val bootstrap = new ServerBootstrap(
+ new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))
+
+ bootstrap.setPipelineFactory(new TCPServerPipelineFactory)
+ bootstrap.bind(new InetSocketAddress(port))
+
+ FnordMetric.log("Listening on tcp://0.0.0.0:" + port)
+
+ class TCPServerPipelineFactory extends ChannelPipelineFactory {
+
+ override def getPipeline : ChannelPipeline = {
+ val pipeline = Channels.pipeline
+
+ pipeline.addLast("framer", new DelimiterBasedFrameDecoder(MAX_FRAME_LENGTH,
+ (Delimiters.lineDelimiter): _*))
+
+ pipeline.addLast("decoder", new StringDecoder)
+ pipeline.addLast("eccoder", new StringEncoder)
+ pipeline.addLast("handler", new TCPHandler)
+
+ pipeline
+ }
+
+ }
+
+}
View
49 fnordmetric-enterprise/src/UDPServer.scala
@@ -0,0 +1,49 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+import java.net.DatagramPacket
+import java.net.DatagramSocket
+import java.util.concurrent._
+
+class UDPServer(port: Int, threads: Int){
+
+ val pool = Executors.newFixedThreadPool(threads)
+ val sock = new DatagramSocket(port)
+
+ val buffer_size = 65535
+ val buffer = new Array[Byte](buffer_size)
+ val packet = new DatagramPacket(buffer, buffer_size)
+
+ FnordMetric.log("Listening on tcp://0.0.0.0:" + port)
+
+ while (true) {
+ sock.receive(packet)
+
+ dispatch(
+ new String(packet.getData, 0, packet.getLength))
+ }
+
+ private def dispatch(body: String) =
+ pool.execute(new Runnable { def run : Unit = {
+ for (msg <- body.split("\n")) execute(msg)
+ }})
+
+ private def execute(msg: String) =
+ InstructionFactory.parse(msg) match {
+ case e: ErrorInstruction =>
+ FnordMetric.error("[UDP] " + e.execute, false)
+
+ case e: SampleInstruction =>
+ e.execute
+
+ case e: AbstractInstruction =>
+ FnordMetric.error("[UDP] received non-sample instruction", false)
+ }
+
+}
View
22 fnordmetric-enterprise/src/ValueAtInstruction.scala
@@ -0,0 +1,22 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+class ValueAtInstruction(key: MetricKey, time: Long) extends AbstractInstruction {
+
+ def execute : String = {
+ val metric = MetricFactory.get_metric(key)
+ val value = metric.value_at(time * 1000).getOrElse(null)
+
+ if (value == null)
+ "null"
+ else
+ FnordMetric.number_format.format(value)
+ }
+
+}
View
31 fnordmetric-enterprise/src/ValuesInInstruction.scala
@@ -0,0 +1,31 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+class ValuesInInstruction(key: MetricKey, time0: Long, time1: Long) extends AbstractInstruction {
+
+ def execute : String = {
+ val metric = MetricFactory.get_metric(key)
+ val values = metric.values_in(time1 * 1000, time0 * 1000)
+ val resp = new StringBuffer
+
+ for (ind <- (0 until values.size)) {
+ resp.append(values(ind)._1)
+ resp.append(":")
+
+ resp.append(FnordMetric.number_format.format((
+ values(ind)._2)))
+
+ if (ind < values.size - 1)
+ resp.append(" ")
+ }
+
+ resp.toString
+ }
+
+}
View
31 fnordmetric-enterprise/src/WebSocket.scala
@@ -0,0 +1,31 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+import org.eclipse.jetty.websocket.WebSocket.Connection
+
+class WebSocket extends org.eclipse.jetty.websocket.WebSocket.OnTextMessage {
+
+ var endpoint : Connection = null
+
+ def onOpen(conn: Connection) = {
+ endpoint = conn
+ FnordMetric.log_debug("[WebSocket] connection opened")
+ }
+
+ def onClose(code: Int, message: String) =
+ FnordMetric.log_debug("[WebSocket] connection closed")
+
+ def onMessage(message: String) = try {
+ val ins = InstructionFactory.parse(message)
+ endpoint.sendMessage(ins.execute);
+ } catch {
+ case e: Exception => FnordMetric.exception(e, false)
+ }
+
+}
View
18 fnordmetric-enterprise/src/WebSocketHandler.scala
@@ -0,0 +1,18 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+import javax.servlet.http.HttpServletRequest
+
+class WebSocketHandler extends org.eclipse.jetty.websocket.WebSocketHandler {
+
+ def doWebSocketConnect(request: HttpServletRequest, protocol: String) : WebSocket = {
+ new WebSocket()
+ }
+
+}

0 comments on commit 58dab03

Please sign in to comment.
Something went wrong with that request. Please try again.