Permalink
Browse files

implement disk syncing via scheduler

  • Loading branch information...
1 parent 8474f55 commit 1b4b854741cca2a4bcdbc5a5e6d51bd1654e69c8 @paulasmuth committed Mar 4, 2013
Showing with 23 additions and 6 deletions.
  1. +8 −3 fnordmetric-enterprise/src/Metric.scala
  2. +15 −3 fnordmetric-enterprise/src/Scheduler.scala
@@ -13,6 +13,7 @@ case class MetricKey(key: String, mode: String, flush_interval: Long)
class Metric(key: MetricKey) {
var flush_interest : Long = 0
+ var next_disk_sync : Long = 0
val bucket = BucketFactory.new_bucket(key.mode)
val swap = new SwapFile(key)
@@ -47,7 +48,7 @@ class Metric(key: MetricKey) {
// use, we need to flush some. this flushes as much data to disk as
// possible and marks it as "ready for removal"
if (rbuf_seek_pos < 1)
- flush_rbuf
+ flush_rbuf()
// exit if we couldn't free up any slots (this should never happen)
if (rbuf_seek_pos < 1)
@@ -69,15 +70,19 @@ class Metric(key: MetricKey) {
// tries to persist as much data from the in memory ring buffer to disk
// as possible but doesnt remove it from the buffer yet
- def flush_rbuf = this.synchronized {
+ def flush_rbuf(force_sync: Boolean = false) = this.synchronized {
val flush_range = rbuf.size - rbuf_seek_pos
// copy the flushable items from the rbuf to the swapfile
for (sample <- rbuf.tail(flush_range))
swap.put(sample._1, sample._2)
- // mark the range as "read to be overwritten
+ // mark the range as "ready to be overwritten"
rbuf_seek_pos += flush_range
+
+ // force syncing the data to disk if requested
+ if (force_sync)
+ swap.flush
}
// returns this metrics value at time0 if a value was recorded at that
@@ -11,6 +11,9 @@ class Scheduler extends Thread {
val TICK = 10
+ // sync metric data to disk every N ms
+ val SYNC_EVERY = 10000
+
// this runs in the background and tries to invoke some flushes. it
// is not neccessary for the liveliness of the application that each
// iteration of the loop finishes in a fixed time interval. even if
@@ -23,12 +26,21 @@ class Scheduler extends Thread {
private def next : Unit = {
val now = FnordMetric.now
- // search for metrics with a pending flush interest and flush them
- // if the now > flush_interest
- for (metric <- MetricFactory.metric_map)
+ for (metric <- MetricFactory.metric_map) {
+
+ // search for metrics with a pending flush interest and flush them
+ // if now > flush_interest
if (metric._2.flush_interest > 0 && metric._2.flush_interest <= now)
metric._2.flush_bucket
+ // search for metrics with a pending disk sync and flush them
+ if (metric._2.next_disk_sync <= now) {
+ metric._2.next_disk_sync = now + SYNC_EVERY
+ metric._2.flush_rbuf(true)
+ }
+
+ }
+
// to avoid burning CPU we sleep for a few ms
Thread.sleep(TICK)
}

0 comments on commit 1b4b854

Please sign in to comment.