Skip to content
Browse files

Metric#flush_interest and SwapFile#last_flush

  • Loading branch information...
1 parent 9a50d77 commit e1ea606d83509d0cad48b7e9eccc09996fbff8c4 @paulasmuth committed Feb 25, 2013
Showing with 15 additions and 5 deletions.
  1. +12 −5 fnordmetric-enterprise/src/Metric.scala
  2. +3 −0 fnordmetric-enterprise/src/SwapFile.scala
View
17 fnordmetric-enterprise/src/Metric.scala
@@ -12,8 +12,11 @@ import scala.collection.mutable.ListBuffer
case class MetricKey(key: String, mode: String, flush_interval: Long)
class Metric(key: MetricKey) {
+ var flush_interest : Long = 0
+
val bucket = BucketFactory.new_bucket(key.mode)
val swap = new SwapFile(key)
+
var rbuf = new RingBuffer[(Long, Double)](10)
var rbuf_seek_pos = 0
@@ -28,6 +31,10 @@ class Metric(key: MetricKey) {
def flush_bucket : Unit = {
val nxt = bucket.flush_every(key.flush_interval)
+ // indicate to the background thread that this metric has pending data
+ // in the bucket and when it can be flushed
+ flush_interest = bucket.next_flush
+
// flush_every returns null if the current flush interval is not over
// yet (makes this method idempotent)
if (nxt == null)
@@ -55,23 +62,23 @@ class Metric(key: MetricKey) {
// now at least one slot in the ring buffer is free so we can just
// push our sample
rbuf.push(nxt)
-
- flush_rbuf // FIXPAUL: remove me
}
// tries to persist as much data from the in memory ring buffer to disk
// as possible but doesnt remove it from the buffer yet
def flush_rbuf = this.synchronized {
val flush_range = rbuf.size - rbuf_seek_pos
- // FIXPAUL: wrong order!
+ // copy the flushable items from the rbuf to the swapfile
for (sample <- rbuf.tail(flush_range))
swap.put(sample._1, sample._2)
- swap.flush
+ // mark the range as "read to be overwritten
rbuf_seek_pos += flush_range
- }
+ // mark this metric as "no pending flushes"
+ flush_interest = 0
+ }
// returns this metrics value at time0 if a value was recorded at that
// point in time
View
3 fnordmetric-enterprise/src/SwapFile.scala
@@ -14,6 +14,7 @@ import java.nio.ByteOrder
import scala.collection.mutable.ListBuffer
class SwapFile(metric_key: MetricKey) {
+ var last_flush : Long = 0
val buffer = ByteBuffer.allocate(512)
buffer.order(ByteOrder.BIG_ENDIAN)
@@ -46,6 +47,8 @@ class SwapFile(metric_key: MetricKey) {
// fluhes the queued writes from the buffer to disk. this method
// is not thread safe!
def flush : Unit = {
+ last_flush = FnordMetric.now
+
file.synchronized {
file.seek(write_pos)
file.write(buffer.array, 0, buffer.position)

0 comments on commit e1ea606

Please sign in to comment.
Something went wrong with that request. Please try again.