Permalink
Browse files

various fixes, bump to 0.05

  • Loading branch information...
1 parent 758fbfb commit f298e01a16292d7ae0a32c1ba30e461669d21b89 @paulasmuth committed Mar 5, 2013
@@ -4,7 +4,7 @@ name := "FnordMetric Enterprise"
organization := "com.paulasmuth"
-version := "0.0.3"
+version := "0.0.5"
mainClass in (Compile, run) := Some("com.fnordmetric.enterprise.FnordMetric")
@@ -19,7 +19,7 @@ import scala.collection.mutable.HashMap
object FnordMetric {
- val VERSION = "v0.0.3"
+ val VERSION = "v0.0.5"
val CONFIG = HashMap[Symbol,String]()
@@ -33,7 +33,7 @@ class SwapFile(metric_key: MetricKey) {
// adds a new (time, value) tuple to be written to the swap file
// but does not write it yet. this method is not thread safe!
- def put(time: Long, value: Double) : Unit = {
+ def put(time: Long, value: Double) : Unit = this.synchronized {
val bvalue = java.lang.Double.doubleToLongBits(value)
if (buffer.remaining < BLOCK_SIZE)
@@ -46,9 +46,12 @@ class SwapFile(metric_key: MetricKey) {
// fluhes the queued writes from the buffer to disk. this method
// is not thread safe!
- def flush : Unit = {
+ def flush : Unit = this.synchronized {
last_flush = FnordMetric.now
+ if (buffer.position < BLOCK_SIZE)
+ return
+
file.synchronized {
file.seek(write_pos)
file.write(buffer.array, 0, buffer.position)
@@ -75,12 +78,19 @@ class SwapFile(metric_key: MetricKey) {
// we need to seek before every read as calls to load_chunk don't
// have to be synchronized with writes
- file.synchronized {
+ val nxt_read = file.synchronized {
file.seek(position - chunk_size)
- read_pos += file.read(chunk.array, read_pos,
+ file.read(chunk.array, read_pos,
chunk_size - read_pos - 1)
}
+
+ if (nxt_read >= 0)
+ read_pos += nxt_read
+ else
+ // this should never happen
+ FnordMetric.error("end of file reached while reading " + file_name, false)
+
}
read_pos = chunk_size - BLOCK_SIZE
@@ -20,7 +20,7 @@ class TCPHandler extends SimpleChannelUpstreamHandler {
}
override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
- FnordMetric.error("[TCP] Exception: " + e.getCause, false)
+ FnordMetric.exception(e.getCause, false)
e.getChannel.close()
}
@@ -20,7 +20,7 @@ class UDPServer(port: Int, threads: Int){
val buffer = new Array[Byte](buffer_size)
val packet = new DatagramPacket(buffer, buffer_size)
- FnordMetric.log("Listening on tcp://0.0.0.0:" + port)
+ FnordMetric.log("Listening on udp://0.0.0.0:" + port)
while (true) {
sock.receive(packet)

0 comments on commit f298e01

Please sign in to comment.