@@ -30,7 +30,7 @@ abstract class BlockingChannelManager[S <: SpecificRecord, R <: SpecificRecord](
implicit sendManifest: Manifest[S], recvManifest: Manifest[R])
extends AvroChannelManager[S, R] {

protected val log = Logger()
protected val logger = Logger()

private lazy val useTcpNoDelay = Config.config.getBool("scads.comm.tcpNoDelay", true)

@@ -127,7 +127,7 @@ abstract class BlockingChannelManager[S <: SpecificRecord, R <: SpecificRecord](
}

private def listen(port: Int) = {
log.info("starting listener on port %d".format(port))
logger.info("starting listener on port %d".format(port))
// open the socket in the current thread
val serverSocket = new ServerSocket(port)
new Listener(serverSocket)
@@ -136,7 +136,7 @@ abstract class BlockingChannelManager[S <: SpecificRecord, R <: SpecificRecord](
private def connect(addr: InetSocketAddress) = {
require(addr ne null)

log.info("opening new connection to address: %s".format(addr))
logger.info("opening new connection to address: %s".format(addr))
// open the connection in the current thread
val socket = new Socket
socket.setTcpNoDelay(useTcpNoDelay) // disable Nagle's algorithm
@@ -203,9 +203,9 @@ abstract class BlockingChannelManager[S <: SpecificRecord, R <: SpecificRecord](
last.future.await(5000)
} catch {
case _: FutureTimeoutException =>
log.error("Could not drain send queue: took more than 5 seconds")
logger.error("Could not drain send queue: took more than 5 seconds")
case ex: FutureException =>
log.error("Could not drain send queue: caught exception", ex)
logger.error("Could not drain send queue: caught exception", ex)
}
}
}
@@ -232,7 +232,7 @@ abstract class BlockingChannelManager[S <: SpecificRecord, R <: SpecificRecord](
task.future.finish()
} catch {
case e: IOException =>
log.error("Could not write send task", e)
logger.error("Could not write send task", e)
task.future.finishWithError(e)
stop()
}
@@ -246,7 +246,7 @@ abstract class BlockingChannelManager[S <: SpecificRecord, R <: SpecificRecord](
try {
val len = dataInputStream.readInt()
if (len < 0) {
log.error("Read bad input length: %d".format(len))
logger.error("Read bad input length: %d".format(len))
stop()
} else {
val bytes = new Array[Byte](len)
@@ -258,7 +258,7 @@ abstract class BlockingChannelManager[S <: SpecificRecord, R <: SpecificRecord](
}
} catch {
case e: IOException =>
log.error("Caught error in reading", e)
logger.error("Caught error in reading", e)
stop()
}
}
@@ -314,7 +314,7 @@ abstract class BlockingChannelManager[S <: SpecificRecord, R <: SpecificRecord](
super.doAfterStop()
val test = connectionsSet.remove(this)
if (test eq null)
log.error("ephemeral connection was not in connection set: %s".format(this))
logger.error("ephemeral connection was not in connection set: %s".format(this))
}
}

@@ -336,15 +336,15 @@ abstract class BlockingChannelManager[S <: SpecificRecord, R <: SpecificRecord](
val conn = new EphemeralConnection(client)
val prev = nodeToConnections.putIfAbsent(client.remoteInetSocketAddress, conn)
if (prev ne null) {
log.error("Unable to enter new remote connection into connection map: refusing connection")
logger.error("Unable to enter new remote connection into connection map: refusing connection")
conn.stop()
}
connectionsSet.put(conn, V)
conn.initialize() // start send/read threads
}
} catch {
case e: IOException =>
log.error("Caught error in accepting", e)
logger.error("Caught error in accepting", e)
stop()
}
}
@@ -7,13 +7,15 @@ import avro.runtime._
import avro.marker.{AvroRecord, AvroUnion}
import actors.Actor
import com.sun.xml.internal.ws.developer.MemberSubmissionAddressing.Validation
import java.awt.image.PixelInterleavedSampleModel
import java.lang.RuntimeException

sealed trait PerfMessage extends AvroUnion
case class Ping(var x: Int) extends PerfMessage with AvroRecord
case class Pong(var y: Int) extends PerfMessage with AvroRecord

object ActorPerfTest {
object PerfRegistry extends ServiceRegistry[PerfMessage]
implicit object PerfRegistry extends ServiceRegistry[PerfMessage]

class TestActor extends Actor {
implicit val remoteHandle = PerfRegistry.registerActor(this)
@@ -45,22 +47,39 @@ object ActorPerfTest {
actor !? Ping(1)
}

val destFuture = new MessageFuture[PerfMessage](null, null)
def futureMessage: Unit = {
val ping = Ping(1)
val f1 = new MessageFuture[PerfMessage](destFuture.remoteService, ping)
val f2 = f1.remoteService !! ping
f1.get(1000).getOrElse(throw new RuntimeException("TIMEOUT"))
f2.remoteService.!(Pong(1))(destFuture.remoteService)
f2.get(1000).getOrElse(throw new RuntimeException("TIMEOUT"))
}

def main(args: Array[String]): Unit = {
val msgCount = 100000
val msgCount = 1000000
(1 to 10).foreach(i => {
val actStart = System.nanoTime()
(1 to 100000).foreach(j => {
(1 to msgCount).foreach(j => {
actorMessage
})
val actEnd = System.nanoTime()
println("actors: " + (msgCount / ((actEnd - actStart) / 1000000000)))
println("actors: " + 2 * (msgCount / ((actEnd - actStart) / 1000000000.0)))

val distStart = System.nanoTime()
(1 to 100000).foreach(j => {
(1 to msgCount).foreach(j => {
dispatchMessage
})
val distEnd = System.nanoTime()
println("hawt: " + (msgCount / ((distEnd - distStart)/ 1000000000)))
println("hawt: " + 2 * (msgCount / ((distEnd - distStart)/ 1000000000.0)))

val futureStart = System.nanoTime()
(1 to msgCount).foreach(j => {
futureMessage
})
val futureEnd = System.nanoTime()
println("future: " + 2 * (msgCount / ((futureEnd - futureStart)/ 1000000000.0)))
})
}
}