Browse files

Fixed checks for \r\n delimited lines

  • Loading branch information...
1 parent 33396b8 commit d064fead01c78116d726b0d6b4ad2b3cd3f1274b Mark Chadwick committed Apr 18, 2011
View
2 project/plugins/project/build.properties
@@ -1,3 +1,3 @@
#Project properties
-#Sat Apr 16 12:51:39 EDT 2011
+#Mon Apr 18 11:27:21 EDT 2011
plugin.uptodate=true
View
5 src/main/scala/backend/BackendClient.scala
@@ -52,7 +52,10 @@ class BackendClient(channels: ChannelGroup, backend: Backend, reconnect: Int,
}
}
- def shutdown() = bootstrap.releaseExternalResources()
+ def shutdown() = {
+ timer.stop()
+ bootstrap.releaseExternalResources()
+ }
private def handleUpdate(updates: Traversable[Update], channel: Channel) = {
channel.write(updates)
View
7 src/main/scala/server/RelayPipelineFactory.scala
@@ -28,8 +28,11 @@ class RelayPipelineFactory @Inject() (handler: RelayUpdateHandler)
}
private val framer = {
- val delimiter = ChannelBuffers.copiedBuffer("\n".getBytes)
- new DelimiterBasedFrameDecoder(8192, true, delimiter)
+ val delimiters = Array(
+ ChannelBuffers.copiedBuffer("\r\n".getBytes),
+ ChannelBuffers.copiedBuffer("\n".getBytes))
+
+ new DelimiterBasedFrameDecoder(8192, true, delimiters:_*)
}
private val stringDecoder = new StringDecoder()
View
20 src/test/scala/DummyBackend.scala
@@ -1,7 +1,6 @@
package graphite.relay
-import java.io.BufferedReader
-import java.io.InputStreamReader
+import java.io.DataInputStream
import java.net.ServerSocket
import java.net.SocketTimeoutException
@@ -14,20 +13,27 @@ class DummyBackend extends Runnable {
val port = TestModule.freePort
val socket = new ServerSocket(port)
var running = true
- var lines: List[String] = Nil
+ var messages: Int = 0
socket.setSoTimeout(timeout)
def asBackend = Backend("localhost", port)
def run = while(running) {
try {
val client = socket.accept()
- val in = new BufferedReader(new InputStreamReader(client.getInputStream))
+ val in = new DataInputStream(client.getInputStream)
try {
- Stream.continually(in.readLine)
- .takeWhile(_ != null)
- .foreach(line lines ::= line)
+ var readingMessages = true
+ while(readingMessages) {
+ try {
+ val length = in.readInt()
+ in.skipBytes(length)
+ messages += 1
+ } catch {
+ case _ readingMessages = false
+ }
+ }
} finally {
in.close()
client.close()
View
38 src/test/scala/GraphiteRelaySpec.scala
@@ -13,19 +13,30 @@ class GraphiteRelaySpec extends FlatSpec
with ServerTests {
behavior of "Graphite Relay"
- /*
- it should "bind to a port" in {
- val injector = getInjector()
-
- withServer(injector) { relay ⇒
- val socket = new Socket("localhost", relay.port)
- val out = socket.getOutputStream()
- out.write("metric.one 123 1234567890\n".getBytes)
- out.close()
+ it should "route traffic to a backend" in {
+ withBackends(3) { dummyBackends
+ val backends = Backends(dummyBackends.map(_.asBackend):_*)
+ val injector = getInjector(backends = backends)
+
+ withServer(injector) { relay
+ val socket = new Socket("localhost", relay.port)
+ val out = socket.getOutputStream()
+
+ (0 to 10000).foreach { i
+ val metric = "metric.%s".format(UUID.randomUUID.toString)
+ val update = "%s 123 1234567890\n".format(metric)
+ out.write(update.getBytes)
+ }
+ out.close()
+ }
+ Thread.sleep(500)
+ dummyBackends.foreach { backend
+ backend.messages should be > (500)
+ }
}
}
- it should "route traffic to a backend" in {
+ it should "parse \\r\\n delimited lines" in {
withBackends(3) { dummyBackends
val backends = Backends(dummyBackends.map(_.asBackend):_*)
val injector = getInjector(backends = backends)
@@ -36,16 +47,15 @@ class GraphiteRelaySpec extends FlatSpec
(0 to 10000).foreach { i
val metric = "metric.%s".format(UUID.randomUUID.toString)
- val update = "%s 123 1234567890\n".format(metric)
+ val update = "%s 123 1234567890\r\n".format(metric)
out.write(update.getBytes)
}
out.close()
}
- Thread.sleep(1000)
+ Thread.sleep(500)
dummyBackends.foreach { backend
- println("%s (%s)".format(backend.asBackend, backend.lines.size))
+ backend.messages should be > (500)
}
}
}
- */
}
View
5 src/test/scala/module/TestModule.scala
@@ -5,7 +5,7 @@ import com.google.inject.name.Names
import graphite.relay.backend.Backends
import graphite.relay.backend.strategy.BackendStrategy
-import graphite.relay.backend.strategy.ConsistentHash
+import graphite.relay.backend.strategy.RoundRobin
import graphite.relay.overflow.BitchingOverflowHandler
import graphite.relay.overflow.OverflowHandler
@@ -23,12 +23,11 @@ object TestModule {
class TestModule(backends: Backends) extends AbstractModule {
def configure() {
- bindConstant.annotatedWith(Names.named("hash.replicas")).to(100)
bindConstant.annotatedWith(Names.named("relay.hostbuffer")).to(100)
bindConstant.annotatedWith(Names.named("relay.port")).to(TestModule.freePort)
bindConstant.annotatedWith(Names.named("relay.reconnect")).to(2)
- bind(classOf[BackendStrategy]).to(classOf[ConsistentHash])
+ bind(classOf[BackendStrategy]).to(classOf[RoundRobin])
bind(classOf[Backends]).toInstance(backends)
bind(classOf[OverflowHandler]).to(classOf[BitchingOverflowHandler])
}

0 comments on commit d064fea

Please sign in to comment.