Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base: bcb50cb93f
...
compare: 6562100181
Checking mergeability… Don't worry, you can still create the pull request.
  • 4 commits
  • 10 files changed
  • 0 commit comments
  • 1 contributor
View
11 src/Backbone.scala
@@ -1,6 +1,7 @@
package com.paulasmuth.fyrehose
import java.util.concurrent._
+import java.util.concurrent.atomic._
import scala.actors.Actor
import scala.actors.Actor._
@@ -11,6 +12,8 @@ class Backbone() extends Actor{
val queries = scala.collection.mutable.Set[Query]()
var sequence = 0
+ var msg_total = new AtomicInteger
+
def act() = {
Actor.loop{ receive{
case query: Query => execute(query)
@@ -20,15 +23,21 @@ class Backbone() extends Actor{
}
private def dispatch(msg: Message) = {
+ msg_total.incrementAndGet
+
if (msg.exists(List("_volatile")) unary_!) {
sequence += 1
msg.sequence = sequence
+
Fyrehose.message_index ! msg
Fyrehose.message_cache ! msg
- Fyrehose.writer ! msg
}
msg.sequence = sequence
+
+ if (Fyrehose.writer != null)
+ Fyrehose.writer ! msg
+
queries.foreach(_ ! msg)
}
View
2  src/Endpoint.scala
@@ -25,6 +25,7 @@ class Endpoint(socket: Socket) extends Runnable with Receivable {
stream.set_safe_mode(safe_mode)
Fyrehose.log("connection opened")
+ Fyrehose.tcp_listener.num_connections.incrementAndGet
val reactor = new Actor {
def qsize = mailboxSize
@@ -128,6 +129,7 @@ class Endpoint(socket: Socket) extends Runnable with Receivable {
private def hangup() : Unit = {
Fyrehose.log("connection closed")
+ Fyrehose.tcp_listener.num_connections.decrementAndGet
socket.close()
}
View
4 src/FQL.scala
@@ -1,5 +1,7 @@
package com.paulasmuth.fyrehose;
+// todo: mkeyops, allcaps, less/greater-than, regex, except, only, \. unescape, smart typecasting
+
trait FQL_VAL {}
trait FQL_META {}
trait FQL_OP {}
@@ -11,6 +13,7 @@ trait FQL_BUFFER {
}
trait FQL_TOKEN extends FQL_BUFFER {
+ def inspect = if (buf == null) "?" else buf
def ready : Boolean
def next : FQL_TOKEN
def buffer(_cur: Char, _buf: String) : String =
@@ -213,6 +216,7 @@ class FQL_KEY(prev: String = "") extends FQL_TOKEN with FQL_VAL {
{ _buf= (_buf + cur).trim; this }
def get = parts
+ override def inspect = ("" /: parts)(_ + _)
}
class FQL_WHERE(_not: Boolean) extends FQL_TOKEN with FQL_STATEMENT {
View
30 src/Fyrehose.scala
@@ -1,15 +1,10 @@
package com.paulasmuth.fyrehose
-import scala.collection.mutable.HashMap;
import java.util.Locale
import java.util.Date
import java.text.DateFormat
+import scala.collection.mutable.HashMap;
-// todo:
-// conn-header: keepalive + safe_mode
-// check if out dir exists on start
-// listen-udp / upstream
-// fql: mkeyops, allcaps, less/greater-than, regex, except, only, \. unescape, smart typecasting
object Fyrehose{
@@ -23,11 +18,13 @@ object Fyrehose{
val BUFFER_SIZE_UDP = 65535
val MESSAGE_CACHE_SIZE = 100000
val FILE_CHUNK_SIZE = 3600 * 6
- val DEFAULT_OUT_DIR = "/tmp/fyrehose"
var backbone : Backbone = null
var writer : Writer = null
+ var tcp_listener : TCPListener = null
+ var udp_listener : UDPListener = null
+
val message_cache = new MessageCache
val message_index = new MessageIndex
@@ -63,9 +60,6 @@ object Fyrehose{
(CONFIG contains 'listen_udp unary_!)
) return usage()
- if (CONFIG contains 'out_dir unary_!)
- CONFIG += (('out_dir, DEFAULT_OUT_DIR))
-
if (CONFIG contains 'upstream)
return println("not yet implemented: -x / --upstream")
@@ -83,8 +77,12 @@ object Fyrehose{
backbone = new Backbone()
backbone.start()
- writer = new Writer()
- writer.start()
+ val status_timer = new StatusTimer
+
+ if (CONFIG contains 'out_dir) {
+ writer = new Writer()
+ writer.start()
+ }
message_cache.start()
message_index.start()
@@ -96,7 +94,7 @@ object Fyrehose{
return println("error: invalid port: tcp/" + CONFIG('listen_tcp))
}
- val tcp_listener = new TCPListener(CONFIG('listen_tcp).toInt)
+ tcp_listener = new TCPListener(CONFIG('listen_tcp).toInt)
tcp_listener.listen
}
@@ -107,10 +105,9 @@ object Fyrehose{
return println("error: invalid port: udp/" + CONFIG('listen_udp))
}
- val udp_listener = new UDPListener(CONFIG('listen_udp).toInt)
+ udp_listener = new UDPListener(CONFIG('listen_udp).toInt)
udp_listener.listen
}
-
}
def safe_boot() = try{
@@ -127,7 +124,7 @@ object Fyrehose{
println("usage: fyrehose [options] ")
println(" -l, --listen-tcp <port> listen for clients on this tcp port ")
println(" -u, --listen-udp <port> listen for clients on this udp port ")
- println(" -p, --path <path> path to store data (default: /tmp/fyrehose/) ")
+ println(" -p, --path <path> write event journal (default: no journal) ")
println(" -t, --timeout <msecs> connection idle timeout (default: 5000ms) ")
// println(" -x, --upstream <addr> pull events from this fyrehosed \n")
}
@@ -147,4 +144,5 @@ object Fyrehose{
error(msg); System.exit(1)
}
+
}
View
4 src/InboundStream.scala
@@ -32,7 +32,7 @@ class InboundStream(recv: Receivable, buffer_size: Int){
def read(buf: Array[Byte], buf_len: Int) : Unit = {
- if ((buf_len + buffer_pos) > buffer.length){
+ if ((buf_len + buffer_pos) >= buffer.length) {
throw new ParseException(
"endoint parser buffer overflow: " +
new String(java.util.Arrays.copyOfRange(buf, 0, buf_len))
@@ -77,6 +77,8 @@ class InboundStream(recv: Receivable, buffer_size: Int){
if (buffer(offset) == 123)
emit_event(java.util.Arrays.copyOfRange(buffer, offset, pos))
+ else if (buffer(offset) == 33)
+ emit_query(java.util.Arrays.copyOfRange(buffer, offset + 1, pos))
else
emit_query(java.util.Arrays.copyOfRange(buffer, offset, pos))
View
4 src/MessageCache.scala
@@ -42,6 +42,10 @@ class MessageCache extends Actor {
println("FIXPAUL foward " + seq_range.toString)
+ def size =
+ messages.size
+
+
def retrieve_async(sig: QueryDiscoverSig) =
retrieve(messages.toArray, sig) // FIXPAUL: in threadpool!
View
4 src/QueryParser.scala
@@ -13,7 +13,7 @@ class QueryParser {
lexer.finish
if (query == null)
- throw new ParseException("query must contain one of stream, info, etc.")
+ throw new ParseException("query must start with one of stream, count, etc.")
query.assert
@@ -31,7 +31,7 @@ class QueryParser {
case t: FQL_TOKEN =>
if (query == null)
- unexpected_token(t, "query to start with stream, count, sum, group, info, etc.")
+ unexpected_token(t, "query to start with one of stream, count, etc. got: " + t.inspect)
else
eval_token(t)
View
38 src/StatusTimer.scala
@@ -0,0 +1,38 @@
+package com.paulasmuth.fyrehose
+
+import java.util.concurrent._
+import java.lang.Runnable
+
+class StatusTimer(){
+
+ var last_time = FyrehoseUtil.now_ms
+ var last_count = 0
+
+ private val callback = new Runnable{
+ def run = print_stats
+ }
+
+ private val scheduler = Executors.newSingleThreadScheduledExecutor()
+ scheduler.scheduleAtFixedRate(callback, 0, 1500, TimeUnit.MILLISECONDS)
+
+ def print_stats() = {
+ val tdiff = FyrehoseUtil.now_ms - last_time
+ val cdiff = Fyrehose.backbone.msg_total.get - last_count
+ val msgps = cdiff / (tdiff / 1000.0)
+
+ val conns = if (Fyrehose.tcp_listener == null) 0
+ else Fyrehose.tcp_listener.num_connections.get
+
+ Fyrehose.log("%.1f msg/s, %d conns, %d qrys, cache: %d/%d (%.1f MB)".format(
+ msgps, conns,
+ Fyrehose.backbone.queries.size,
+ Fyrehose.message_cache.size,
+ Fyrehose.MESSAGE_CACHE_SIZE,
+ FyrehoseUtil.used_mem
+ ))
+
+ last_count = Fyrehose.backbone.msg_total.get
+ last_time = FyrehoseUtil.now_ms
+ }
+
+}
View
3  src/TCPListener.scala
@@ -1,6 +1,7 @@
package com.paulasmuth.fyrehose
import java.util.concurrent._
+import java.util.concurrent.atomic._
import java.io._
import java.net._
@@ -9,6 +10,8 @@ class TCPListener(port: Int) {
val sock = new ServerSocket(port)
val clients = Executors.newCachedThreadPool() // evil ~paul
+ var num_connections = new AtomicInteger
+
def listen = {
Fyrehose.log("listening on tcp/0.0.0.0:" + port.toString)
View
16 src/Util.scala
@@ -1,5 +1,9 @@
package com.paulasmuth.fyrehose
+import java.lang.management.ManagementFactory
+import java.lang.management.MemoryPoolMXBean
+import java.lang.management.MemoryUsage
+
class ParseException(msg: String) extends Exception{
override def toString = msg
}
@@ -7,7 +11,11 @@ class ParseException(msg: String) extends Exception{
object FyrehoseUtil{
def now() : Long =
- (new java.util.Date()).getTime / 1000
+ now_ms / 1000
+
+
+ def now_ms() : Long =
+ (new java.util.Date()).getTime
def get_uuid() : String =
@@ -17,4 +25,10 @@ object FyrehoseUtil{
def pfunc_unit : PartialFunction[Int, Unit] =
{ case _ => () }
+
+ def used_mem =
+ ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed() /
+ (1024 * 1024).toDouble
+
}
+

No commit comments for this range

Something went wrong with that request. Please try again.