Permalink
Browse files

basic background/scheduled flush

  • Loading branch information...
1 parent e1ea606 commit 7bf18cbf878963cba1017f89dc017e14f0e8b923 @paulasmuth committed Feb 25, 2013
View
7 fnordmetric-enterprise/TODO.md
@@ -1,7 +1,4 @@
-
-
-+ first sampled value is not flushed (only after first flush interval was reached and new data is submitted)
++ background rbuf_flush -> swapfile every N seconds
++ rbuf size in time
+ warm up ring buffers on start
+ accept millisecond timestamps
-+ background rbuf_flush thread every N seconds
-+ rbuf size in time
View
36 fnordmetric-enterprise/src/BackgroundThread.scala
@@ -0,0 +1,36 @@
+// FnordMetric Enterprise
+// (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
+//
+// Licensed under the MIT License (the "License"); you may not use this
+// file except in compliance with the License. You may obtain a copy of
+// the License at: http://opensource.org/licenses/MIT
+
+package com.fnordmetric.enterprise
+
+class BackgroundThread extends Thread {
+
+ val TICK = 10
+
+ // this runs in the background and tries to invoke some flushes in
+ // the background. it is not neccessary for the liveliness of the
+ // application that each iteration of the loop finishes in a fixed
+ // time interval. even if this thread dies completely all data will
+ // be correctly recorded, but the last sample on each metric won't
+ // be flushed until new data arrives
+ override def run : Unit =
+ while (true) next
+
+ private def next : Unit = {
+ val now = FnordMetric.now
+
+ // search for metrics with a pending flush interest and fush them
+ // if the now > flush_interest
+ for (metric <- MetricFactory.metric_map)
+ if (metric._2.flush_interest > 0 && metric._2.flush_interest <= now)
+ metric._2.flush_bucket
+
+ // to avoid burning CPU we sleep for a few ms
+ Thread.sleep(TICK)
+ }
+
+}
View
2 fnordmetric-enterprise/src/FnordMetric.scala
@@ -102,6 +102,8 @@ object FnordMetric {
if (flock == null)
error("cannot aquire server.lck", true)
+ val bg_thread = new BackgroundThread
+ bg_thread.start
if (CONFIG contains 'http)
error("FIXPAUL: not yet implemented: http-server", true)
View
8 fnordmetric-enterprise/src/Metric.scala
@@ -28,7 +28,7 @@ class Metric(key: MetricKey) {
// adds an aggregated value to the in memory ring buffer after it has
// been flushed from the bucket
- def flush_bucket : Unit = {
+ def flush_bucket : Unit = this.synchronized {
val nxt = bucket.flush_every(key.flush_interval)
// indicate to the background thread that this metric has pending data
@@ -62,6 +62,9 @@ class Metric(key: MetricKey) {
// now at least one slot in the ring buffer is free so we can just
// push our sample
rbuf.push(nxt)
+
+ // mark this metric as "no pending flushes"
+ flush_interest = 0
}
// tries to persist as much data from the in memory ring buffer to disk
@@ -75,9 +78,6 @@ class Metric(key: MetricKey) {
// mark the range as "read to be overwritten
rbuf_seek_pos += flush_range
-
- // mark this metric as "no pending flushes"
- flush_interest = 0
}
// returns this metrics value at time0 if a value was recorded at that

0 comments on commit 7bf18cb

Please sign in to comment.