Permalink
Browse files

documentation, cleaning up

  • Loading branch information...
1 parent 0fda05e commit 2cc45d36abf2a029c03a97731eff7fe14a5e078c @paulasmuth committed Feb 25, 2013
@@ -1,3 +1,6 @@
+ first sampled value is not flushed (only after first flush interval was reached and new data is submitted)
++ warm up ring buffers on start
++ accept millisecond timestamps
+
@@ -20,6 +20,8 @@ class Metric(key: MetricKey) {
// adds a value to the metric's bucket and tries to flush the bucket
def sample(value: Double) = this.synchronized {
+ // call flush_bucket with the returned aggregated value for every
+ // flush_interval since the last call to flush_every
bucket.flush_every(key.flush_interval, (
(time, value) => flush_bucket(time, value) ))
@@ -29,18 +31,28 @@ class Metric(key: MetricKey) {
// adds an aggregated value to the in memory ring buffer after it has
// been flushed from the bucket
private def flush_bucket(time: Long, value: Double) = {
+
+ // if the ring buffer is already full we need to clear up a slot
if (rbuf.remaining == 0) {
+ // if there is no slot that is already flushed to disk which we can
+ // use, we need to flush some. this flushes as much data to disk as
+ // possible and marks it as "ready for removal"
if (rbuf_seek_pos < 1)
flush_rbuf
+ // exit if we couldn't free up any slots (this should never happen)
if (rbuf_seek_pos < 1)
throw new Exception("flush_rbuf failed")
- rbuf.seek(1)
+ // Mark the next value in the rbuf as ready to be overwritten. The
+ // order of these statements is significant!
rbuf_seek_pos -= 1
+ rbuf.seek(1)
}
+ // now at least one slot in the ring buffer is free so we can just
+ // push our sample
rbuf.push(((time, value)))
flush_rbuf // FIXPAUL: remove me
@@ -72,28 +84,35 @@ class Metric(key: MetricKey) {
}
// returns all aggregated values for this metric in the specified time
- // range. if time1 is 0 then only the first value at time0 is returned
- // note that time0 > time1! this method is threadsafe
+ // range. if time1 is 0 then only the first value at time0 is returned.
+ // note that time0 > time1! this method is threadsafe. reads within the
+ // in memory ring buffer are lock-free. reads that hit the on disk swap
+ // file use a striped lock and may block
def values_in(time0: Long, time1: Long) : List[(Long, Double)] = {
val lst = ListBuffer[(Long, Double)]()
- // FIXPAUL: skip n flush_intervals if time0 is older than now, maybe
- // skip rbuf_search completely
-
- // FIXPAUL: do this without snapshotting the ringbuffer
- val rbuf_snap = rbuf.tail(rbuf.size)
-
var rbuf_last : Long = java.lang.Long.MAX_VALUE
var rbuf_pos = 0
+ // take a "snapshot" of the ring buffers current state. this may race
+ // (len may be smaller than the real value) but this only means that
+ // we may have to load one more value from the swapfile instead from
+ // the in memory ring buffer
+ val rbuf_snap_len = rbuf.size
+ val rbuf_snap_pos = rbuf.position
+
// search the ring buffer backwards without synchronization. the basic
// assumption here is that the system time will only progress forward.
// if the system time should jump backwards this would race
while (rbuf_pos >= 0 && rbuf_pos < rbuf_snap.size) {
val cur = rbuf_snap(rbuf_pos)
// since this is not synchronized, we need to check if we hit the
- // rbuf wrapping point and exit if so.
+ // rbuf wrapping point and exit if so. this code would race if the
+ // ring buffer did one full revolution in the time between taking
+ // the initial snapshot (rbuf_snap_pos) and the first assignment to
+ // rbuf_last. we assume that this thread isn't preempted for longer
+ // than 60 seconds (the min. flush_interval) and ignore this...
if (cur._1 < rbuf_last)
rbuf_last = cur._1
else
@@ -16,8 +16,8 @@ class RingBuffer[T: Manifest](capacity: Int) {
// the numer of elements that this ring buffer currently contains
var size : Int = 0
- // appends a new item. is_full must be called before appending to check if
- // the ringbuffer is already full
+ // appends a new item. the remaining number of free slots must be checked
+ // before appending
def push(item: T) : Unit = {
if (size == capacity)
throw new Exception("ring buffer is full")
@@ -51,14 +51,14 @@ class RingBuffer[T: Manifest](capacity: Int) {
lst.toList
}
- // Removes the first num items from the start of the ring buffer (oldest
+ // removes the first num items from the start of the ring buffer (oldest
// items get removed first)
def seek(num: Int) = {
start = (start + num) % capacity
size -= num
}
- // Returns the remaning number of free slots in the ringbuffer
+ // returns the remaning number of free slots in the ringbuffer
def remaining : Int =
capacity - size

0 comments on commit 2cc45d3

Please sign in to comment.