Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

implemented fast read mode

  • Loading branch information...
commit f2ef2e259cc90a6bbd2d5ec91163c12008434931 1 parent 04caea4
@paulasmuth authored
View
8 src/Fyrehose.scala
@@ -72,7 +72,7 @@ object Fyrehose{
if (CONFIG contains 'timeout)
CONN_IDLE_TIMEOUT = CONFIG('timeout).toInt
- boot()
+ safe_boot()
}
@@ -113,6 +113,12 @@ object Fyrehose{
}
+ def safe_boot() = try{
+ boot
+ } catch {
+ case e: Exception => Fyrehose.fatal(e.toString)
+ }
+
def usage(head: Boolean = true) = {
if (head)
View
60 src/InboundStream.scala
@@ -5,6 +5,10 @@ import scala.actors.Actor._
class InboundStream(recv: Receivable, buffer_size: Int){
+ // if fast_mode is enabled, events and queries are splitted
+ // by newline, which means you can't have \n in your events
+ var fast_mode = true
+
// if safe_mode is disabled, non-json input is silently
// ignored. safe_mode=false requires strict_mode=true
var safe_mode = true
@@ -20,10 +24,12 @@ class InboundStream(recv: Receivable, buffer_size: Int){
def set_safe_mode(m: Boolean) =
safe_mode = m
-
def set_strict_mode(m: Boolean) =
strict_mode = m
+ def set_fast_mode(m: Boolean) =
+ fast_mode = m
+
def read(buf: Array[Byte], buf_len: Int) : Unit = {
if ((buf_len + buffer_pos) > buffer.length){
@@ -37,7 +43,53 @@ class InboundStream(recv: Receivable, buffer_size: Int){
System.arraycopy(buf, 0, buffer, buffer_pos, buf_len)
buffer_pos += buf_len
- read_chunked()
+ if (fast_mode)
+ read_fast(false)
+ else
+ read_chunked()
+ }
+
+
+ def clear = {
+ if (fast_mode)
+ read_fast(true)
+ else
+ read_chunked()
+
+ buffer_pos = 0
+ }
+
+
+ private def read_fast(last: Boolean) : Unit = {
+ var pos = 0
+ var offset = 0
+
+ while (pos <= buffer_pos) {
+
+ if (pos > 0 && ((buffer(pos) == 10) || ((pos == buffer_pos) && last))) {
+
+ while ((offset < pos) && (
+ (buffer(offset) == 0) ||
+ (buffer(offset) == 9) ||
+ (buffer(offset) == 10) ||
+ (buffer(offset) == 13) ||
+ (buffer(offset) == 32))) offset += 1
+
+ if (buffer(offset) == 123)
+ emit_event(java.util.Arrays.copyOfRange(buffer, offset, pos))
+ else
+ emit_query(java.util.Arrays.copyOfRange(buffer, offset, pos))
+
+ offset = pos
+
+ if (pos + 1 >= buffer_pos)
+ buffer_pos = 0
+
+ }
+
+ pos += 1
+ }
+
}
@@ -114,8 +166,10 @@ class InboundStream(recv: Receivable, buffer_size: Int){
throw new ParseException("something went horribly wrong while parsing")
- private def emit_event(buf: Array[Byte]) =
+ private def emit_event(buf: Array[Byte]) = {
+ println("emit:" + new String(buf))
recv.message(new MessageBody(buf))
+ }
private def emit_query(buf: Array[Byte]) =
View
2  src/QueryLexer.scala
@@ -19,7 +19,7 @@ class QueryLexer(recv: QueryParser) {
var head = stack.head.next(cursor, buffer)
buffer = stack.head.buffer(cursor, buffer)
- debug
+ // debug
if (head != stack.head)
{ head +=: stack; next }
View
7 src/UDPListener.scala
@@ -13,11 +13,12 @@ class UDPListener(port: Int) extends Receivable {
val sock = new DatagramSocket(port)
val stream = new InboundStream(this, Fyrehose.BUFFER_SIZE_UDP)
- stream.set_safe_mode(false)
+ stream.set_safe_mode(true)
while (true) {
sock.receive(next)
- stream.read(next.getData, Fyrehose.BUFFER_SIZE_UDP)
+ stream.read(next.getData, next.getLength)
+ stream.clear
}
}
@@ -31,6 +32,6 @@ class UDPListener(port: Int) extends Receivable {
def query(qry: QueryBody) =
- Fyrehose.error("received query via UDP: illegal")
+ Fyrehose.error("received query via UDP (illegal): " + new String(qry.raw))
}
Please sign in to comment.
Something went wrong with that request. Please try again.