Skip to content
Browse files

Remove all trailing whitespace

  • Loading branch information...
1 parent 96b1290 commit f5e087f1676971e9d07a79f83bd6f9a9bc9cb07b Robert Newson committed with moonpolysoft May 4, 2012
Showing with 455 additions and 455 deletions.
  1. +1 −1 .gitignore
  2. +1 −1 LICENSE
  3. +15 −15 readme.textile
  4. +5 −5 src/main/scala/scalang/BigTuple.scala
  5. +1 −1 src/main/scala/scalang/BitString.scala
  6. +3 −3 src/main/scala/scalang/Cluster.scala
  7. +2 −2 src/main/scala/scalang/ETerm.scala
  8. +1 −1 src/main/scala/scalang/ErlangPeer.scala
  9. +1 −1 src/main/scala/scalang/Fun.scala
  10. +9 −9 src/main/scala/scalang/FunProcess.scala
  11. +4 −4 src/main/scala/scalang/ImproperList.scala
  12. +84 −84 src/main/scala/scalang/Node.scala
  13. +2 −2 src/main/scala/scalang/NodeConfig.scala
  14. +2 −2 src/main/scala/scalang/Pid.scala
  15. +1 −1 src/main/scala/scalang/Port.scala
  16. +15 −15 src/main/scala/scalang/Process.scala
  17. +1 −1 src/main/scala/scalang/ProcessContext.scala
  18. +1 −1 src/main/scala/scalang/Reference.scala
  19. +2 −2 src/main/scala/scalang/ReplyRegistry.scala
  20. +8 −8 src/main/scala/scalang/Service.scala
  21. +2 −2 src/main/scala/scalang/ServiceContext.scala
  22. +5 −5 src/main/scala/scalang/TypeFactory.scala
  23. +8 −8 src/main/scala/scalang/epmd/Epmd.scala
  24. +1 −1 src/main/scala/scalang/epmd/EpmdDecoder.scala
  25. +3 −3 src/main/scala/scalang/epmd/EpmdEncoder.scala
  26. +11 −11 src/main/scala/scalang/epmd/EpmdHandler.scala
  27. +1 −1 src/main/scala/scalang/epmd/EpmdMessages.scala
  28. +10 −10 src/main/scala/scalang/node/CaseClassFactory.scala
  29. +9 −9 src/main/scala/scalang/node/ClientHandshakeHandler.scala
  30. +1 −1 src/main/scala/scalang/node/Clock.scala
  31. +8 −8 src/main/scala/scalang/node/ErlangHandler.scala
  32. +6 −6 src/main/scala/scalang/node/ErlangNodeClient.scala
  33. +1 −1 src/main/scala/scalang/node/ErlangNodeServer.scala
  34. +3 −3 src/main/scala/scalang/node/ExitListenable.scala
  35. +1 −1 src/main/scala/scalang/node/ExitListener.scala
  36. +5 −5 src/main/scala/scalang/node/FailureDetectionHandler.scala
  37. +4 −4 src/main/scala/scalang/node/HandshakeDecoder.scala
  38. +3 −3 src/main/scala/scalang/node/HandshakeEncoder.scala
  39. +16 −16 src/main/scala/scalang/node/HandshakeHandler.scala
  40. +2 −2 src/main/scala/scalang/node/HandshakeMessages.scala
  41. +2 −2 src/main/scala/scalang/node/Link.scala
  42. +4 −4 src/main/scala/scalang/node/LinkListenable.scala
  43. +2 −2 src/main/scala/scalang/node/LinkListener.scala
  44. +6 −6 src/main/scala/scalang/node/Mailbox.scala
  45. +2 −2 src/main/scala/scalang/node/NetKernel.scala
  46. +4 −4 src/main/scala/scalang/node/PacketCounter.scala
  47. +13 −13 src/main/scala/scalang/node/ProcessLike.scala
  48. +3 −3 src/main/scala/scalang/node/ReferenceCounter.scala
  49. +8 −8 src/main/scala/scalang/node/ScalaTermDecoder.scala
  50. +17 −17 src/main/scala/scalang/node/ScalaTermEncoder.scala
  51. +6 −6 src/main/scala/scalang/node/SendListenable.scala
  52. +1 −1 src/main/scala/scalang/node/SendListener.scala
  53. +11 −11 src/main/scala/scalang/node/ServerHandshakeHandler.scala
  54. +6 −6 src/main/scala/scalang/util/BatchPoolExecutor.scala
  55. +1 −1 src/main/scala/scalang/util/ByteArray.scala
  56. +1 −1 src/main/scala/scalang/util/CamelToUnder.scala
  57. +8 −8 src/main/scala/scalang/util/StateMachine.scala
  58. +13 −13 src/main/scala/scalang/util/ThreadPoolFactory.scala
  59. +1 −1 src/main/scala/scalang/util/UnderToCamel.scala
  60. +1 −1 src/test/resources/echo.escript
  61. +2 −2 src/test/resources/link_delivery.escript
  62. +1 −1 src/test/resources/receive_connection.escript
  63. +2 −2 src/test/scala/scalang/EchoProcess.scala
  64. +3 −3 src/test/scala/scalang/FailProcess.scala
  65. +3 −3 src/test/scala/scalang/LinkProcess.scala
  66. +13 −13 src/test/scala/scalang/NodeSpec.scala
  67. +13 −13 src/test/scala/scalang/ServiceSpec.scala
  68. +2 −2 src/test/scala/scalang/epmd/EpmdDecoderSpec.scala
  69. +1 −1 src/test/scala/scalang/epmd/EpmdEncoderSpec.scala
  70. +5 −5 src/test/scala/scalang/epmd/EpmdSpec.scala
  71. +2 −2 src/test/scala/scalang/node/ClientHandshakeHandlerSpec.scala
  72. +12 −12 src/test/scala/scalang/node/FailureDetectionHandlerSpec.scala
  73. +7 −7 src/test/scala/scalang/node/HandshakeDecoderSpec.scala
  74. +9 −9 src/test/scala/scalang/node/HandshakeEncoderSpec.scala
  75. +3 −3 src/test/scala/scalang/node/ServerHandshakeHandlerSpec.scala
  76. +4 −4 src/test/scala/scalang/terms/ScalaTermDecoderSpec.scala
  77. +4 −4 src/test/scala/scalang/util/TwoWayCodecEmbedder.scala
View
2 .gitignore
@@ -6,4 +6,4 @@ lib_managed/*
*.iml
*.iws
*.ipr
-.idea/*
+.idea/*
View
2 LICENSE
@@ -10,4 +10,4 @@ Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
-limitations under the License.
+limitations under the License.
View
30 readme.textile
@@ -14,7 +14,7 @@ From Maven:
<url>http://maven.boundary.com/artifactory/repo</url>
</repository>
</repositories>
-
+
<dependencies>
<dependency>
<groupId>com.boundary</groupId>
@@ -30,7 +30,7 @@ From SBT:
<pre>
<code>
val boundaryPublic = "Boundary Public Repo" at "http://maven.boundary.com/artifactory/repo"
-
+
val scalang = "com.boundary" %% "scalang" % "0.9"
</code>
</pre>
@@ -62,12 +62,12 @@ Scalang shares Erlang's concept of a process, a lightweight actor that is capabl
log.info("received %s", msg)
}
}
-
+
val pid = node.spawn[MyProcess]("my_process")
-
+
//send to the pid
node.send(pid, "hey there")
-
+
//send to the regname
node.send("my_process", "you wanna party?")
</code>
@@ -92,7 +92,7 @@ The following shows a simple echo server and client that demonstrate message sen
val server = node.spawn[EchoServer]("echo_server")
-
+
val client = node.spawn { mbox =>
mbox.send(server, (client,'derp))
val received = mbox.receive
@@ -126,7 +126,7 @@ Processes that must implement custom behavior may override the trapExit method.
override def onMessage(msg : Any) = msg match {
case _ => log.info("derp %s", msg)
}
-
+
override def trapExit(from : Pid, msg : Any) {
log.warning("got exit notification from %s reason %s", from, msg)
}
@@ -178,12 +178,12 @@ Scalang messages are serialized into "Erlang's external term format":http://www.
h2. Rich Type Mappings
-Sometimes the built-in type mappings in Scalang are not sufficient for an application's message format. Scalang provides the TypeFactory trait for client code to provide custom decoding behavior. A TypeFactory is invoked when Scalang comes across a term that looks like an erlang record: a tuple where the first element is an atom. The createType method is called with the first tuple element as the name and the arity of the tuple.
+Sometimes the built-in type mappings in Scalang are not sufficient for an application's message format. Scalang provides the TypeFactory trait for client code to provide custom decoding behavior. A TypeFactory is invoked when Scalang comes across a term that looks like an erlang record: a tuple where the first element is an atom. The createType method is called with the first tuple element as the name and the arity of the tuple.
<pre>
<code>
object StructFactory extends TypeFactory {
- def createType(name : Symbol, arity : Int, reader : TermReader) : Option[Seq[Any]] = {
+ def createType(name : Symbol, arity : Int, reader : TermReader) : Option[Seq[Any]] = {
try {
reader.mark
(name,arity) match {
@@ -192,13 +192,13 @@ Sometimes the built-in type mappings in Scalang are not sufficient for an applic
}
}
}
-
+
protected def readSeq(reader : TermReader) : Map[Symbol,Any] = {
val proplist = reader.readAs[List[Symbol,Any]]
proplist.toMap
}
}
-
+
val node = Node("test", cookie, NodeConfig(
typeFactory = StructFactory))
</code>
@@ -217,20 +217,20 @@ Most modern Erlang applications are built using the OTP framework, and in partic
case class EchoServiceArgs(name : String)
class EchoService(ctx : ServiceContext[EchoServiceArgs]) extends Service(ctx) {
val EchoServiceArgs(name) = ctx.args
-
+
override def handleCall(from : Pid, request : Any) : Any = {
name + " " + request
}
-
+
override def handleCast(request : Any) {
log.info("Can't echo a cast. %s", request)
}
-
+
override def handleInfo(request : Any) {
log.info("A wild message appeared. %s, request")
}
}
-
+
val node = Node("test", cookie)
val pid = node.spawnService[EchoService]("echo", EchoServiceArgs("test_echo"))
</code>
View
10 src/main/scala/scalang/BigTuple.scala
@@ -16,17 +16,17 @@
package scalang
class BigTuple(val elements : Seq[Any]) extends Product {
-
+
override def productElement(n : Int) = elements(n)
-
+
override def productArity = elements.size
-
+
override def canEqual(other : Any) : Boolean = {
other match {
case o : BigTuple => o.elements == elements
case _ => false
}
}
-
+
override def equals(other : Any) = canEqual(other)
-}
+}
View
2 src/main/scala/scalang/BitString.scala
@@ -17,4 +17,4 @@ package scalang
import java.nio._
-case class BitString(buffer : ByteBuffer, bits : Int)
+case class BitString(buffer : ByteBuffer, bits : Int)
View
6 src/main/scala/scalang/Cluster.scala
@@ -17,7 +17,7 @@ package scalang
class Cluster(ctx : ProcessContext) extends Process(ctx) {
@volatile var nodes = Set[Symbol]()
-
+
def onMessage(msg : Any) = msg match {
case ('cluster, pid : Pid, ref : Reference) =>
pid ! ('cluster, ref, nodes.toList)
@@ -26,5 +26,5 @@ class Cluster(ctx : ProcessContext) extends Process(ctx) {
case ('nodedown, node : Symbol) =>
nodes -= node
}
-
-}
+
+}
View
4 src/main/scala/scalang/ETerm.scala
@@ -16,5 +16,5 @@
package scalang
trait ETerm {
-
-}
+
+}
View
2 src/main/scala/scalang/ErlangPeer.scala
@@ -15,4 +15,4 @@
//
package scalang
-case class ErlangPeer(node : String)
+case class ErlangPeer(node : String)
View
2 src/main/scala/scalang/Fun.scala
@@ -18,5 +18,5 @@ package scalang
case class Fun(pid : Pid, module : Symbol, index : Int, uniq : Int, vars : Seq[Any])
case class NewFun(pid : Pid, module : Symbol, oldIndex : Int, oldUniq : Int, arity : Int, index : Int, uniq : Seq[Byte], vars : Seq[Any])
-
+
case class ExportFun(module : Symbol, function : Symbol, arity : Int)
View
18 src/main/scala/scalang/FunProcess.scala
@@ -22,25 +22,25 @@ class FunProcess(fun : Mailbox => Unit, ctx : ProcessContext) extends Process(ct
val queue = new LinkedBlockingQueue[Any]
val parentPid = self
val parentRef = referenceCounter
-
+
val mbox = new Mailbox {
def self = parentPid
def referenceCounter = parentRef
-
+
override def handleMessage(msg : Any) {
queue.offer(msg)
}
-
+
def receive : Any = {
queue.take
}
-
+
def receive(timeout : Long) : Option[Any] = {
Option(queue.poll(timeout, TimeUnit.MILLISECONDS))
}
}
-
-
+
+
def start {
fiber.execute(new Runnable {
override def run {
@@ -49,9 +49,9 @@ class FunProcess(fun : Mailbox => Unit, ctx : ProcessContext) extends Process(ct
}
})
}
-
+
override def onMessage(msg : Any) {
queue.offer(msg)
}
-
-}
+
+}
View
8 src/main/scala/scalang/ImproperList.scala
@@ -22,11 +22,11 @@ case class ImproperList(under : List[Any], lastTail : Any) extends LinearSeq[Any
override def isEmpty = under.isEmpty
override def head = under.head
override def tail = under.tail
-
+
override def apply(idx : Int) = under(idx)
-
+
override def length = under.length
-
+
override def toString : String = {
val buffer = new StringBuilder
buffer ++= "ImproperList("
@@ -36,4 +36,4 @@ case class ImproperList(under : List[Any], lastTail : Any) extends LinearSeq[Any
buffer ++= ")"
buffer.toString
}
-}
+}
View
168 src/main/scala/scalang/Node.scala
@@ -44,28 +44,28 @@ import java.nio.channels.ClosedChannelException
object Node {
val random = SecureRandom.getInstance("SHA1PRNG")
-
+
def apply(name : String) : Node = apply(Symbol(name))
def apply(name : String, cookie : String) : Node = apply(Symbol(name), cookie)
def apply(name : String, cookie : String, tpf : ThreadPoolFactory) : Node = apply(Symbol(name), cookie, tpf)
def apply(name : String, cookie : String, listener : ClusterListener) : Node = apply(Symbol(name), cookie, listener)
def apply(name : String, cookie : String, nodeConfig : NodeConfig) : Node = apply(Symbol(name), cookie, nodeConfig)
-
+
def apply(name : Symbol) =
new ErlangNode(name, findOrGenerateCookie, NodeConfig(new DefaultThreadPoolFactory, None))
-
+
def apply(name : Symbol, cookie : String) =
new ErlangNode(name, cookie, NodeConfig(new DefaultThreadPoolFactory, None))
-
+
def apply(name : Symbol, cookie : String, tpf : ThreadPoolFactory) =
new ErlangNode(name, cookie, NodeConfig(tpf, None))
-
+
def apply(name : Symbol, cookie : String, listener : ClusterListener) =
new ErlangNode(name, cookie, NodeConfig(new DefaultThreadPoolFactory, Some(listener)))
-
+
def apply(name : Symbol, cookie : String, nodeConfig : NodeConfig) =
new ErlangNode(name, cookie, nodeConfig)
-
+
protected def findOrGenerateCookie : String = {
val homeDir = System.getenv("HOME")
if (homeDir == null) {
@@ -80,21 +80,21 @@ object Node {
cookie
}
}
-
+
protected def randomCookie : String = {
val ary = new Array[Byte](20)
random.nextBytes(ary)
ary.map("%02X" format _).mkString
}
-
+
protected def readFile(file : File) : String = {
val in = new BufferedReader(new FileReader(file))
try
in.readLine
finally
in.close
}
-
+
protected def writeCookie(file : File, cookie : String) {
val out = new FileWriter(file)
try
@@ -111,21 +111,21 @@ trait ClusterListener {
trait ClusterPublisher {
@volatile var listeners : List[ClusterListener] = Nil
-
+
def addListener(listener : ClusterListener) {
listeners = listener :: listeners
}
-
+
def clearListeners {
listeners = Nil
}
-
+
def notifyNodeUp(node : Symbol) {
for (listener <- listeners) {
listener.nodeUp(node)
}
}
-
+
def notifyNodeDown(node : Symbol) {
for (listener <- listeners) {
listener.nodeDown(node)
@@ -178,15 +178,15 @@ trait Node extends ClusterListener with ClusterPublisher {
def timer : HashedWheelTimer
}
-class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) extends Node
- with ExitListener
- with SendListener
- with LinkListener
+class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) extends Node
+ with ExitListener
+ with SendListener
+ with LinkListener
with ReplyRegistry
with Instrumented
with Logging {
InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory)
-
+
val timer = new HashedWheelTimer
val tickTime = config.tickTime
val poolFactory = config.poolFactory
@@ -208,7 +208,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
val referenceCounter = new ReferenceCounter(name, creation)
val netKernel = spawn[NetKernel]('net_kernel)
val cluster = spawn[Cluster]('cluster)
-
+
def shutdown {
localEpmd.close
for ((node,channel) <- channels) {
@@ -218,7 +218,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
process.exit('node_shutdown)
}
}
-
+
def spawnMbox : Mailbox = {
val p = createPid
val n = this
@@ -235,24 +235,24 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
processes.put(p, box)
box
}
-
+
def spawnMbox(regName : String) : Mailbox = spawnMbox(Symbol(regName))
def spawnMbox(regName : Symbol) : Mailbox = {
val mbox = spawnMbox
register(regName, mbox.self)
mbox
}
-
+
def createPid : Pid = {
val id = pidCount.getAndIncrement & 0x7fff
-
+
val serial = if (id == 0)
pidSerial.getAndIncrement & 0x1fff
else
pidSerial.get & 0x1fff
Pid(name,id,serial,creation)
}
-
+
//node external interface
def spawn(fun : Mailbox => Unit) : Pid = {
val n = this
@@ -272,11 +272,11 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
process.start
p
}
-
+
def spawn(name : String, fun : Mailbox => Unit) : Pid = {
spawn(Symbol(name), fun)
}
-
+
def spawn(name : Symbol, fun : Mailbox => Unit) : Pid = {
val n = this
val p = createPid
@@ -297,79 +297,79 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
registeredNames.put(name, p)
p
}
-
+
def spawn[T <: Process](implicit mf : Manifest[T]) : Pid = {
val pid = createPid
createProcess(mf.erasure.asInstanceOf[Class[T]], pid, poolFactory.createBatchExecutor(false))
pid
}
-
+
def spawn[T <: Process](regName : Symbol)(implicit mf : Manifest[T]) : Pid = {
val pid = createPid
val process = createProcess(mf.erasure.asInstanceOf[Class[T]], pid, poolFactory.createBatchExecutor(regName.name, false))
registeredNames.put(regName, pid)
pid
}
-
+
def spawn[T <: Process](regName : String)(implicit mf : Manifest[T]) : Pid = {
spawn(Symbol(regName))(mf)
}
-
+
def spawn[T <: Process](reentrant : Boolean)(implicit mf : Manifest[T]) : Pid = {
val pid = createPid
createProcess(mf.erasure.asInstanceOf[Class[T]], pid, poolFactory.createBatchExecutor(reentrant))
pid
}
-
+
def spawn[T <: Process](regName : Symbol, reentrant : Boolean)(implicit mf : Manifest[T]) : Pid = {
val pid = createPid
createProcess(mf.erasure.asInstanceOf[Class[T]], pid, poolFactory.createBatchExecutor(regName.name, reentrant))
registeredNames.put(regName, pid)
pid
}
-
+
def spawn[T <: Process](regName : String, reentrant : Boolean)(implicit mf : Manifest[T]) : Pid = {
spawn[T](Symbol(regName),reentrant)(mf)
}
-
+
def spawnService[T <: Service[A], A <: Product](args : A)(implicit mf : Manifest[T]) : Pid = {
spawnService[T,A](args, false)(mf)
}
-
+
def spawnService[T <: Service[A], A <: Product](args : A, reentrant : Boolean)(implicit mf : Manifest[T]) : Pid = {
val pid = createPid
val process = createService(mf.erasure.asInstanceOf[Class[T]], pid, args, poolFactory.createBatchExecutor(reentrant))
/* process.init(args)*/
pid
}
-
+
def spawnService[T <: Service[A], A <: Product](regName : String, args : A)(implicit mf : Manifest[T]) : Pid = {
spawnService[T,A](regName, args, false)(mf)
}
-
+
def spawnService[T <: Service[A], A <: Product](regName : String, args : A, reentrant : Boolean)(implicit mf : Manifest[T]) : Pid = {
spawnService[T,A](Symbol(regName), args, reentrant)(mf)
}
-
+
def spawnService[T <: Service[A], A <: Product](regName : Symbol, args : A)(implicit mf : Manifest[T]) : Pid = {
spawnService[T,A](regName, args, false)(mf)
}
-
+
def spawnService[T <: Service[A], A <: Product](regName : Symbol, args : A, reentrant : Boolean)(implicit mf : Manifest[T]) : Pid = {
val pid = createPid
val process = createService(mf.erasure.asInstanceOf[Class[T]], pid, args, poolFactory.createBatchExecutor(regName.name, reentrant))
registeredNames.put(regName, pid)
pid
}
-
+
override def nodeDown(node : Symbol) {
send('cluster, ('nodedown, node))
}
-
+
override def nodeUp(node : Symbol) {
send('cluster, ('nodeup, node))
}
-
+
protected def createService[A <: Product, T <: Service[A]](clazz : Class[T], p : Pid, a : A, batch : BatchExecutor) : T = {
val constructor = clazz.getConstructor(classOf[ServiceContext[_]])
val n = this
@@ -389,7 +389,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
process.fiber.start
process
}
-
+
protected def createProcess[T <: Process](clazz : Class[T], p : Pid, batch : BatchExecutor) : T = {
val constructor = clazz.getConstructor(classOf[ProcessContext])
val n = this
@@ -408,44 +408,44 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
process.fiber.start
process
}
-
+
def register(regName : String, pid : Pid) {
register(Symbol(regName), pid)
}
-
+
def register(regName : Symbol, pid : Pid) {
registeredNames.put(regName, pid)
}
-
+
def getNames : Set[Symbol] = {
registeredNames.keySet.toSet.asInstanceOf[Set[Symbol]]
}
-
+
def registerConnection(name : Symbol, channel : Channel) {
channels.put(name, channel)
}
-
+
def whereis(regName : Symbol) : Option[Pid] = {
Option(registeredNames.get(regName))
}
-
+
def ping(node : Symbol, timeout : Long) : Boolean = {
val mbox = spawnMbox
val ref = makeRef
mbox.send(('net_kernel, Symbol(node.name)), mbox.self, ((Symbol("$gen_call"), (mbox.self, ref), ('is_auth, node))))
val result = mbox.receive(timeout) match {
case Some((ref, 'yes)) => true
- case m =>
+ case m =>
false
}
mbox.exit('normal)
result
}
-
+
def nodes : Set[Symbol] = {
channels.keySet.toSet.asInstanceOf[Set[Symbol]]
}
-
+
def deliverLink(link : Link) {
val from = link.from
val to = link.to
@@ -454,7 +454,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
log.warn("Trying to link a pid to itself: %s", from)
return
}
-
+
if (isLocal(to)) {
for (p <- process(to)) {
p.linkWithoutNotify(from)
@@ -467,7 +467,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
})
}
}
-
+
//node internal interface
def link(from : Pid, to : Pid) {
log.debug("link %s -> %s", from, to)
@@ -479,11 +479,11 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
log.warn("Trying to link non-local pids: %s -> %s", from, to)
return
}
-
+
for(p <- process(from)) {
p.link(to)
}
-
+
for(p <- process(to)) {
p.link(from)
}
@@ -517,17 +517,17 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
val link = p.linkWithoutNotify(from)
if (!isLocal(to))
links.getOrElseUpdate(channel, new NonBlockingHashSet[Link]).add(link)
-
+
case None =>
if (!isLocal(to))
links.getOrElseUpdate(channel, new NonBlockingHashSet[Link]).add(Link(from, to))
}
}
-
+
def send(to : Pid, msg : Any) = handleSend(to, msg)
def send(to : Symbol, msg : Any) = handleSend(to, msg)
def send(to : (Symbol,Symbol), from : Pid, msg : Any) = handleSend(to, from, msg)
-
+
def call(to : Pid, msg : Any) : Any = call(createPid, to, msg)
def call(to : Pid, msg : Any, timeout : Long) : Any = call(createPid, to, msg, timeout)
def call(from : Pid, to : Pid, msg : Any) : Any = call(from, to, msg, Long.MaxValue)
@@ -537,7 +537,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
send(to, c)
waitReply(from, ref, channel, timeout)
}
-
+
def call(to : Symbol, msg : Any) : Any = call(createPid, to, msg)
def call(to : Symbol, msg : Any, timeout : Long) : Any = call(createPid, to, msg, timeout)
def call(from : Pid, to : Symbol, msg : Any) : Any = call(from, to, msg, Long.MaxValue)
@@ -547,7 +547,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
send(to, c)
waitReply(from, ref, channel, timeout)
}
-
+
def call(to : (Symbol,Symbol), msg : Any) = call(createPid, to, msg)
def call(to : (Symbol,Symbol), msg : Any, timeout : Long) = call(createPid, to, msg, timeout)
def call(from : Pid, to : (Symbol,Symbol), msg : Any) = call(from, to, msg, Long.MaxValue)
@@ -557,11 +557,11 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
send(to, from, c)
waitReply(from, ref, channel, timeout)
}
-
+
def cast(to : Pid, msg : Any) = send(to, (Symbol("$gen_cast"), msg))
def cast(to : Symbol, msg : Any) = send(to, (Symbol("$gen_cast"), msg))
def cast(to : (Symbol,Symbol), msg : Any) = send(to, createPid, (Symbol("$gen_cast"), msg))
-
+
def makeReplyChannel(pid : Pid, ref : Reference) : BlockingQueue[Any] = {
val queue = new LinkedBlockingQueue[Any]
registerReplyQueue(pid, ref, queue)
@@ -576,13 +576,13 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
case response => response
}
}
-
+
def makeCall(from : Pid, msg : Any) : (Reference,Any) = {
val ref = makeRef
val call = (Symbol("$gen_call"), (from, ref), msg)
(ref, call)
}
-
+
def handleSend(to : Pid, msg : Any) {
log.debug("send %s to %s", msg, to)
if (!tryDeliverReply(to,msg)) {
@@ -603,17 +603,17 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
}
}
}
-
+
def handleSend(to : Symbol, msg : Any) {
for (pid <- whereis(to)) {
handleSend(pid, msg)
}
}
-
+
def makeRef : Reference = {
referenceCounter.makeRef
}
-
+
def handleSend(dest : (Symbol,Symbol), from : Pid, msg : Any) {
val (regName,peer) = dest
try {
@@ -623,7 +623,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
log.warn(e, "trouble sending message to %s", peer)
}
}
-
+
def handleExit(from : Pid, reason : Any) {
Option(processes.get(from)) match {
case Some(pf : Process) =>
@@ -634,7 +634,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
}
processes.remove(from)
}
-
+
//this only gets called from a remote link breakage()
def remoteBreak(link : Link, reason : Any) {
val from = link.from
@@ -648,7 +648,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
proc.handleExit(to, reason)
}
}
-
+
//this will only get called from a local link breakage (process exit)
def break(link : Link, reason : Any) {
val from = link.from
@@ -662,25 +662,25 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
getOrConnectAndSend(to.node, ExitMessage(from,to,reason))
}
}
-
+
def process(pid : Pid) : Option[ProcessLike] = {
Option(processes.get(pid))
}
-
+
def unlink(from : Pid, to : Pid) {
for (p <- process(from)) {
p.unlink(to)
}
-
+
for (p <- process(to)) {
p.unlink(from)
}
}
-
+
def isLocal(pid : Pid) : Boolean = {
pid.node == name //&& pid.creation == creation
}
-
+
def disconnected(peer : Symbol, channel: Channel) {
if (channels.containsKey(peer)) {
channels.remove(peer)
@@ -693,10 +693,10 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
}
}
}
-
+
def getOrConnectAndSend(peer : Symbol, msg : Any, afterHandshake : Channel => Unit = { channel => Unit }) {
val channel = channels.getOrElseUpdate(peer, {
- connectAndSend(peer, None)
+ connectAndSend(peer, None)
})
if (channel.isOpen) {
@@ -708,7 +708,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
}
afterHandshake(channel)
-/*
+/*
Option(channels.get(peer)) match {
case Some(channel) =>
//race during channel shutdown
@@ -719,15 +719,15 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
case None => connectAndSend(peer, Some(msg), afterHandshake)
}
*/ }
-
+
def connectAndSend(peer : Symbol, msg : Option[Any] = None, afterHandshake : Channel => Unit = {_ => Unit }) : Channel = {
val hostname = splitHostname(peer).getOrElse(throw new ErlangNodeException("Cannot resolve peer with no hostname: " + peer.name))
val peerName = splitNodename(peer)
val port = Epmd(hostname).lookupPort(peerName).getOrElse(throw new ErlangNodeException("Cannot lookup peer: " + peer.name))
val client = new ErlangNodeClient(this, peer, hostname, port, msg, config.typeFactory, afterHandshake)
client.channel
}
-
+
def posthandshake : (Symbol,ChannelPipeline) => Unit = {
{ (peer : Symbol, p : ChannelPipeline) =>
p.addFirst("packetCounter", new PacketCounter("stream-" + peer.name))
@@ -738,7 +738,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
p.addAfter("erlangCounter", "failureDetector", new FailureDetectionHandler(name, new SystemClock, tickTime, timer))
}
}
-
+
def splitNodename(peer : Symbol) : String = {
val parts = peer.name.split('@')
if (parts.length < 2) {
@@ -747,7 +747,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
parts(0)
}
}
-
+
def splitHostname(peer : Symbol) : Option[String] = {
val parts = peer.name.split('@')
if (parts.length < 2) {
@@ -756,7 +756,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
Some(parts(1))
}
}
-
+
def isAlive(pid : Pid) : Boolean = {
process(pid) match {
case Some(_) => true
View
4 src/main/scala/scalang/NodeConfig.scala
@@ -22,7 +22,7 @@ case class NodeConfig(
clusterListener : Option[ClusterListener] = None,
typeFactory : TypeFactory = NoneTypeFactory,
tickTime : Int = 60)
-
+
object NoneTypeFactory extends TypeFactory {
def createType(name : Symbol, arity : Int, reader : TermReader) = None
-}
+}
View
4 src/main/scala/scalang/Pid.scala
@@ -16,8 +16,8 @@
package scalang
case class Pid(node : Symbol, id : Int, serial : Int, creation : Int) {
-
+
def toErlangString : String = {
"<" + id + "." + serial + "." + creation + ">"
}
-}
+}
View
2 src/main/scala/scalang/Port.scala
@@ -15,4 +15,4 @@
//
package scalang
-case class Port(node : Symbol, id : Int, creation : Int)
+case class Port(node : Symbol, id : Int, creation : Int)
View
30 src/main/scala/scalang/Process.scala
@@ -33,34 +33,34 @@ abstract class Process(ctx : ProcessContext) extends ProcessLike with Logging wi
val node = ctx.node
val messageRate = metrics.meter("messages", "messages", instrumentedName)
val executionTimer = metrics.timer("execution", instrumentedName)
-
+
def instrumentedName = self.toErlangString
-
+
implicit def pid2sendable(pid : Pid) = new PidSend(pid,this)
implicit def sym2sendable(to : Symbol) = new SymSend(to,this)
implicit def dest2sendable(dest : (Symbol,Symbol)) = new DestSend(dest,self,this)
-
+
def sendEvery(pid : Pid, msg : Any, delay : Long) {
val runnable = new Runnable {
def run = send(pid,msg)
}
fiber.scheduleAtFixedRate(runnable, delay, delay, TimeUnit.MILLISECONDS)
}
-
+
def sendEvery(name : Symbol, msg : Any, delay : Long) {
val runnable = new Runnable {
def run = send(name,msg)
}
fiber.scheduleAtFixedRate(runnable, delay, delay, TimeUnit.MILLISECONDS)
}
-
+
def sendEvery(name : (Symbol,Symbol), msg : Any, delay : Long) {
val runnable = new Runnable {
def run = send(name,self,msg)
}
fiber.scheduleAtFixedRate(runnable, delay, delay, TimeUnit.MILLISECONDS)
}
-
+
def sendAfter(pid : Pid, msg : Any, delay : Long) {
val runnable = new Runnable {
def run {
@@ -69,7 +69,7 @@ abstract class Process(ctx : ProcessContext) extends ProcessLike with Logging wi
}
fiber.schedule(runnable, delay, TimeUnit.MILLISECONDS)
}
-
+
def sendAfter(name : Symbol, msg : Any, delay : Long) {
val runnable = new Runnable {
def run {
@@ -78,7 +78,7 @@ abstract class Process(ctx : ProcessContext) extends ProcessLike with Logging wi
}
fiber.schedule(runnable, delay, TimeUnit.MILLISECONDS)
}
-
+
def sendAfter(dest : (Symbol,Symbol), msg : Any, delay : Long) {
val runnable = new Runnable {
def run {
@@ -87,28 +87,28 @@ abstract class Process(ctx : ProcessContext) extends ProcessLike with Logging wi
}
fiber.schedule(runnable, delay, TimeUnit.MILLISECONDS)
}
-
+
/**
* Subclasses should override this method with their own message handlers
*/
def onMessage(msg : Any)
-
+
/**
* Subclasses wishing to trap exits should override this method.
*/
def trapExit(from : Pid, msg : Any) {
exit(msg)
}
-
+
override def handleMessage(msg : Any) {
messageRate.mark
msgChannel.publish(msg)
}
-
+
override def handleExit(from : Pid, msg : Any) {
exitChannel.publish((from,msg))
}
-
+
val p = this
val msgChannel = new MemoryChannel[Any]
msgChannel.subscribe(ctx.fiber, new Callback[Any] {
@@ -124,7 +124,7 @@ abstract class Process(ctx : ProcessContext) extends ProcessLike with Logging wi
}
}
})
-
+
val exitChannel = new MemoryChannel[(Pid,Any)]
exitChannel.subscribe(ctx.fiber, new Callback[(Pid,Any)] {
def onMessage(msg : (Pid,Any)) {
@@ -155,4 +155,4 @@ class DestSend(to : (Symbol,Symbol), from : Pid, proc : Process) {
def !(msg : Any) {
proc.notifySend(to, from, msg)
}
-}
+}
View
2 src/main/scala/scalang/ProcessContext.scala
@@ -24,4 +24,4 @@ trait ProcessContext {
def referenceCounter : ReferenceCounter
def fiber : Fiber
def replyRegistry : ReplyRegistry
-}
+}
View
2 src/main/scala/scalang/Reference.scala
@@ -15,4 +15,4 @@
//
package scalang
-case class Reference(node : Symbol, id : Seq[Int], creation : Int)
+case class Reference(node : Symbol, id : Seq[Int], creation : Int)
View
4 src/main/scala/scalang/ReplyRegistry.scala
@@ -20,7 +20,7 @@ import org.cliffc.high_scale_lib.NonBlockingHashMap
trait ReplyRegistry {
val replyWaiters = new NonBlockingHashMap[(Pid,Reference),BlockingQueue[Any]]
-
+
/**
* Returns true if the reply delivery succeeded. False otherwise.
*/
@@ -44,4 +44,4 @@ trait ReplyRegistry {
def registerReplyQueue(pid : Pid, tag : Reference, queue : BlockingQueue[Any]) {
replyWaiters.put((pid,tag), queue)
}
-}
+}
View
16 src/main/scala/scalang/Service.scala
@@ -33,28 +33,28 @@ abstract class Service[A <: Product](ctx : ServiceContext[A]) extends Process(ct
/* def init(args : Any) {
// noop
}
-*/
+*/
/**
* Handle a call style of message which will expect a response.
*/
def handleCall(tag : (Pid,Reference), request : Any) : Any = {
throw new Exception(getClass + " did not define a call handler.")
}
-
+
/**
* Handle a cast style of message which will receive no response.
*/
def handleCast(request : Any) {
throw new Exception(getClass + " did not define a cast handler.")
}
-
+
/**
* Handle any messages that do not fit the call or cast pattern.
*/
def handleInfo(request : Any) {
throw new Exception(getClass + " did not define an info handler.")
}
-
+
override def onMessage(msg : Any) = msg match {
case ('ping, from : Pid, ref : Reference) =>
from ! ('pong, ref)
@@ -72,16 +72,16 @@ abstract class Service[A <: Product](ctx : ServiceContext[A]) extends Process(ct
case _ =>
handleInfo(msg)
}
-
+
def call(to : Pid, msg : Any) : Any = node.call(self,to,msg)
def call(to : Pid, msg : Any, timeout : Long) : Any = node.call(self,to,msg,timeout)
def call(to : Symbol, msg : Any) : Any = node.call(self,to,msg)
def call(to : Symbol, msg : Any, timeout : Long) : Any = node.call(self,to,msg,timeout)
def call(to : (Symbol,Symbol), msg : Any) : Any = node.call(self,to,msg)
def call(to : (Symbol,Symbol), msg : Any, timeout : Long) : Any = node.call(self,to,msg,timeout)
-
+
def cast(to : Pid, msg : Any) = node.cast(to,msg)
def cast(to : Symbol, msg : Any) = node.cast(to,msg)
def cast(to : (Symbol,Symbol), msg : Any) = node.cast(to,msg)
-
-}
+
+}
View
4 src/main/scala/scalang/ServiceContext.scala
@@ -1,9 +1,9 @@
package scalang
-trait ServiceContext[A <: Product] extends ProcessContext {
+trait ServiceContext[A <: Product] extends ProcessContext {
def args : A
}
case class NoArgs()
-object NoArgs extends NoArgs
+object NoArgs extends NoArgs
View
10 src/main/scala/scalang/TypeFactory.scala
@@ -24,22 +24,22 @@ trait TypeFactory {
class TermReader(val buffer : ChannelBuffer, decoder : ScalaTermDecoder) {
var m : Int = 0
-
+
def mark : TermReader = {
m = buffer.readerIndex
this
}
-
+
def reset : TermReader = {
buffer.readerIndex(m)
this
}
-
+
def readTerm : Any = {
decoder.readTerm(buffer)
}
-
+
def readAs[A] : A = {
readTerm.asInstanceOf[A]
}
-}
+}
View
16 src/main/scala/scalang/epmd/Epmd.scala
@@ -26,12 +26,12 @@ import overlock.threadpool._
object Epmd {
val defaultPort = 4369
-
+
def apply(host : String) : Epmd = {
val port = Option(System.getenv("ERL_EPMD_PORT")).map(_.toInt).getOrElse(defaultPort)
new Epmd(host, port)
}
-
+
def apply(host : String, port : Int) : Epmd = {
new Epmd(host, port)
}
@@ -42,9 +42,9 @@ class Epmd(val host : String, val port : Int) {
new NioClientSocketChannelFactory(
ThreadPool.instrumentedElastic("scalang.epmd", "boss", 1, 20),
ThreadPool.instrumentedElastic("scalang.epmd", "worker", 1, 20)))
-
+
val handler = new EpmdHandler
-
+
bootstrap.setPipelineFactory(new ChannelPipelineFactory {
def getPipeline : ChannelPipeline = {
Channels.pipeline(
@@ -53,19 +53,19 @@ class Epmd(val host : String, val port : Int) {
handler)
}
})
-
+
val connectFuture = bootstrap.connect(new InetSocketAddress(host, port))
val channel = connectFuture.awaitUninterruptibly.getChannel
if(!connectFuture.isSuccess) {
bootstrap.releaseExternalResources
throw connectFuture.getCause
}
-
+
def close {
channel.close
bootstrap.releaseExternalResources
}
-
+
def alive(portNo : Int, nodeName : String) : Option[Int] = {
channel.write(AliveReq(portNo,nodeName))
val response = handler.response.call.asInstanceOf[AliveResp]
@@ -76,7 +76,7 @@ class Epmd(val host : String, val port : Int) {
None
}
}
-
+
def lookupPort(nodeName : String) : Option[Int] = {
channel.write(PortPleaseReq(nodeName))
handler.response.call match {
View
2 src/main/scala/scalang/epmd/EpmdDecoder.scala
@@ -54,4 +54,4 @@ class EpmdDecoder extends FrameDecoder {
}
}
}
-}
+}
View
6 src/main/scala/scalang/epmd/EpmdEncoder.scala
@@ -26,12 +26,12 @@ import netty.handler.codec.oneone._
object EpmdConst {
val ntypeR6 = 110
val ntypeR4Erlang = 109
- val ntypeR4Hidden = 104
+ val ntypeR4Hidden = 104
}
class EpmdEncoder extends OneToOneEncoder {
import EpmdConst._
-
+
override def encode(ctx : ChannelHandlerContext, channel : Channel, msg : Object) : Object = {
val bout = new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer(24, ctx.getChannel.getConfig.getBufferFactory))
bout.writeShort(0) //length placeholder
@@ -54,4 +54,4 @@ class EpmdEncoder extends OneToOneEncoder {
encoded.setShort(0, encoded.writerIndex - 2)
encoded
}
-}
+}
View
22 src/main/scala/scalang/epmd/EpmdHandler.scala
@@ -28,29 +28,29 @@ import java.util.concurrent.TimeUnit
class EpmdHandler extends SimpleChannelUpstreamHandler with Logging {
val queue = new ConcurrentLinkedQueue[EpmdResponse]
-
+
def response : Callable[Any] = {
val call = new EpmdResponse
queue.add(call)
call
}
-
+
override def channelClosed(ctx : ChannelHandlerContext, e : ChannelStateEvent) {
log.debug("Oh snap channel closed.")
}
-
+
override def channelDisconnected(ctx : ChannelHandlerContext, e : ChannelStateEvent) {
log.debug("Uh oh disconnect.")
}
-
+
override def exceptionCaught(ctx : ChannelHandlerContext, e : ExceptionEvent) {
var rsp = queue.poll
while (rsp != null) {
rsp.setError(e.getCause)
rsp = queue.poll
}
}
-
+
override def messageReceived(ctx : ChannelHandlerContext, e : MessageEvent) {
val response = e.getMessage
var rsp = queue.poll
@@ -59,23 +59,23 @@ class EpmdHandler extends SimpleChannelUpstreamHandler with Logging {
rsp = queue.poll
}
}
-
+
class EpmdResponse extends Callable[Any] {
val response = new AtomicReference[Any]
val error = new AtomicReference[Throwable]
val lock = new CountDownLatch(1)
-
+
def setError(t : Throwable) {
error.set(t)
lock.countDown
-
+
}
-
+
def set(v : Any) {
response.set(v)
lock.countDown
}
-
+
def call : Any = {
if (lock.await(5000, TimeUnit.MILLISECONDS)) {
if (error.get != null) {
@@ -88,4 +88,4 @@ class EpmdHandler extends SimpleChannelUpstreamHandler with Logging {
}
}
}
-}
+}
View
2 src/main/scala/scalang/epmd/EpmdMessages.scala
@@ -23,4 +23,4 @@ case class PortPleaseReq(nodeName : String)
case class PortPleaseError(result : Int)
-case class PortPleaseResp(portNo : Int, nodeName : String)
+case class PortPleaseResp(portNo : Int, nodeName : String)
View
20 src/main/scala/scalang/node/CaseClassFactory.scala
@@ -22,19 +22,19 @@ import scalang.util.UnderToCamel._
class CaseClassFactory(searchPrefixes : Seq[String], typeMappings : Map[String,Class[_]]) extends TypeFactory {
//it's important to cache the negative side as well
val classCache = AtomicMap.atomicNBHM[String,Option[Class[_]]]
-
+
def createType(name : Symbol, arity : Int, reader : TermReader) : Option[Any] = {
classCache.getOrElseUpdate(name.name, lookupClass(name.name)).flatMap { clazz =>
tryCreateInstance(reader, clazz, arity)
}
}
-
+
/**
* Arity is the length of the tuple after the header
*/
protected def tryCreateInstance(reader : TermReader, clazz : Class[_], arity : Int) : Option[Any] = {
val candidates = for (constructor <- clazz.getConstructors if constructor.getParameterTypes.length == arity-1) yield {constructor}
- if (candidates.isEmpty) return None
+ if (candidates.isEmpty) return None
reader.mark
val parameters = for (i <- (1 until arity)) yield { reader.readTerm }
val classes = parameters.map { case param : AnyRef =>
@@ -54,7 +54,7 @@ class CaseClassFactory(searchPrefixes : Seq[String], typeMappings : Map[String,C
None
}
}
-
+
protected def boxEquals(a : List[Class[_]], b : List[Class[_]]) : Boolean = {
def scrubPrimitive(a : Class[_]) : Class[_] = a match {
case java.lang.Byte.TYPE => classOf[java.lang.Byte]
@@ -67,7 +67,7 @@ class CaseClassFactory(searchPrefixes : Seq[String], typeMappings : Map[String,C
case java.lang.Double.TYPE => classOf[java.lang.Double]
case x => x
}
-
+
(a,b) match {
case (classA :: tailA, classB :: tailB) =>
if (! (scrubPrimitive(classA) == scrubPrimitive(classB))) {
@@ -77,10 +77,10 @@ class CaseClassFactory(searchPrefixes : Seq[String], typeMappings : Map[String,C
case (Nil, Nil) => true
case _ => false
}
-
+
}
-
-
+
+
protected def lookupClass(name : String) : Option[Class[_]] = {
typeMappings.get(name) match {
case Some(c) => Some(c)
@@ -89,12 +89,12 @@ class CaseClassFactory(searchPrefixes : Seq[String], typeMappings : Map[String,C
try {
return Some(Class.forName(prefix + "." + name.underToCamel))
} catch {
- case e : Exception =>
+ case e : Exception =>
e.printStackTrace
Unit
}
}
None
}
}
-}
+}
View
18 src/main/scala/scalang/node/ClientHandshakeHandler.scala
@@ -37,7 +37,7 @@ class ClientHandshakeHandler(name : Symbol, cookie : String, posthandshake : (Sy
sendName
'connected
}),
-
+
state('connected, {
case StatusMessage("ok") =>
'status_ok
@@ -49,40 +49,40 @@ class ClientHandshakeHandler(name : Symbol, cookie : String, posthandshake : (Sy
case StatusMessage(status) =>
throw new ErlangAuthException("Bad status message: " + status)
}),
-
+
state('status_ok, {
case ChallengeMessage(version, flags, c, name) =>
peer = Symbol(name)
sendChallengeReply(c)
'reply_sent
}),
-
+
state('reply_sent, {
case ChallengeAckMessage(digest) =>
verifyChallengeAck(digest)
drainQueue
handshakeSucceeded
'verified
}),
-
+
state('verified, {
case _ => 'verified
}))
-
+
protected def sendStatus(st : String) {
val channel = ctx.getChannel
val future = Channels.future(channel)
val msg = StatusMessage(st)
ctx.sendDownstream(new DownstreamMessageEvent(channel,future,msg,null))
}
-
+
protected def sendName {
val channel = ctx.getChannel
val future = Channels.future(channel)
val msg = NameMessage(5, DistributionFlags.default, name.name)
ctx.sendDownstream(new DownstreamMessageEvent(channel,future,msg,null))
}
-
+
protected def sendChallengeReply(c : Int) {
val channel = ctx.getChannel
val future = Channels.future(channel)
@@ -92,11 +92,11 @@ class ClientHandshakeHandler(name : Symbol, cookie : String, posthandshake : (Sy
val msg = ChallengeReplyMessage(challenge, d)
ctx.sendDownstream(new DownstreamMessageEvent(channel,future,msg,null))
}
-
+
protected def verifyChallengeAck(peerDigest : Array[Byte]) {
val ourDigest = digest(challenge, cookie)
if (!digestEquals(ourDigest, peerDigest)) {
throw new ErlangAuthException("Peer authentication error.")
}
}
-}
+}
View
2 src/main/scala/scalang/node/Clock.scala
@@ -6,4 +6,4 @@ trait Clock {
class SystemClock extends Clock {
override def currentTimeMillis = System.currentTimeMillis
-}
+}
View
16 src/main/scala/scalang/node/ErlangHandler.scala
@@ -25,18 +25,18 @@ import scalang._
import com.codahale.logula.Logging
class ErlangHandler(
- node : ErlangNode,
+ node : ErlangNode,
afterHandshake : Channel => Unit = { _ => Unit }) extends SimpleChannelUpstreamHandler with Logging {
-
+
@volatile var peer : Symbol = null
-
+
override def exceptionCaught(ctx : ChannelHandlerContext, e : ExceptionEvent) {
log.error(e.getCause, "error caught in erlang handler %s", peer)
ctx.getChannel.close
}
-
+
override def messageReceived(ctx : ChannelHandlerContext, e : MessageEvent) {
- val msg = e.getMessage
+ val msg = e.getMessage
log.debug("handler message %s", msg)
msg match {
case Tick =>
@@ -63,12 +63,12 @@ class ErlangHandler(
node.handleSend(to, msg)
}
}
-
+
override def channelDisconnected(ctx : ChannelHandlerContext, e : ChannelStateEvent) {
log.debug("channel disconnected %s %s. peer: %s", ctx, e, peer)
if (peer != null) {
node.disconnected(peer, e.getChannel)
}
}
-
-}
+
+}
View
12 src/main/scala/scalang/node/ErlangNodeClient.scala
@@ -31,8 +31,8 @@ class ErlangNodeClient(
peer : Symbol,
host : String,
port : Int,
- control : Option[Any],
- typeFactory : TypeFactory,
+ control : Option[Any],
+ typeFactory : TypeFactory,
afterHandshake : Channel => Unit) {
val bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
@@ -41,10 +41,10 @@ class ErlangNodeClient(
bootstrap.setPipelineFactory(new ChannelPipelineFactory {
def getPipeline : ChannelPipeline = {
val pipeline = Channels.pipeline
-
+
val handshakeDecoder = new HandshakeDecoder
handshakeDecoder.mode = 'challenge //first message on the client side is challenge, not name
-
+
pipeline.addLast("handshakeFramer", new LengthFieldBasedFrameDecoder(Short.MaxValue, 0, 2, 0, 2))
pipeline.addLast("handshakeDecoder", handshakeDecoder)
pipeline.addLast("handshakeEncoder", new HandshakeEncoder)
@@ -54,11 +54,11 @@ class ErlangNodeClient(
pipeline.addLast("erlangDecoder", new ScalaTermDecoder(peer, typeFactory))
pipeline.addLast("erlangEncoder", new ScalaTermEncoder(peer))
pipeline.addLast("erlangHandler", new ErlangHandler(node, afterHandshake))
-
+
pipeline
}
})
-
+
val future = bootstrap.connect(new InetSocketAddress(host, port))
val channel = future.getChannel
future.addListener(new ChannelFutureListener {
View
2 src/main/scala/scalang/node/ErlangNodeServer.scala
@@ -47,7 +47,7 @@ class ErlangNodeServer(node : ErlangNode, typeFactory : TypeFactory) {
pipeline
}
})
-
+
val channel = bootstrap.bind(new InetSocketAddress(0))
def port = channel.getLocalAddress.asInstanceOf[InetSocketAddress].getPort
}
View
6 src/main/scala/scalang/node/ExitListenable.scala
@@ -19,14 +19,14 @@ import scalang._
trait ExitListenable {
@volatile var exitListeners : List[ExitListener] = Nil
-
+
def addExitListener(listener : ExitListener) {
exitListeners = listener :: exitListeners
}
-
+
def notifyExit(from : Pid, reason : Any) {
for (l <- exitListeners) {
l.handleExit(from, reason)
}
}
-}
+}
View
2 src/main/scala/scalang/node/ExitListener.scala
@@ -24,4 +24,4 @@ import scalang._
trait ExitListener {
def handleExit(from : Pid, reason : Any)
-}
+}
View
10 src/main/scala/scalang/node/FailureDetectionHandler.scala
@@ -12,17 +12,17 @@ class FailureDetectionHandler(node : Symbol, clock : Clock, tickTime : Int, time
@volatile var lastTimeReceived = 0l
@volatile var ctx : ChannelHandlerContext = null
val exception = new ReadTimeoutException
-
+
override def channelOpen(ctx : ChannelHandlerContext, e : ChannelStateEvent) {
this.ctx = ctx
lastTimeReceived = clock.currentTimeMillis
scheduleTick
}
-
+
override def channelClosed(ctx : ChannelHandlerContext, e : ChannelStateEvent) {
if (nextTick != null) nextTick.cancel
}
-
+
override def messageReceived(ctx : ChannelHandlerContext, e : MessageEvent) {
lastTimeReceived = clock.currentTimeMillis
e.getMessage match {
@@ -33,7 +33,7 @@ class FailureDetectionHandler(node : Symbol, clock : Clock, tickTime : Int, time
ctx.sendUpstream(e);
}
}
-
+
object TickTask extends TimerTask {
override def run(timeout : Timeout) {
val last = (clock.currentTimeMillis - lastTimeReceived) / 1000
@@ -45,7 +45,7 @@ class FailureDetectionHandler(node : Symbol, clock : Clock, tickTime : Int, time
scheduleTick
}
}
-
+
def scheduleTick {
nextTick = timer.newTimeout(TickTask, tickTime / 4, TimeUnit.SECONDS)
}
View
8 src/main/scala/scalang/node/HandshakeDecoder.scala
@@ -21,12 +21,12 @@ import netty.channel._
import netty.buffer._
class HandshakeDecoder extends OneToOneDecoder {
-
+
//we need to have a dirty fucking mode context
//because name messages and challenge replies have
//the same identifier
@volatile var mode = 'name
-
+
def decode(ctx : ChannelHandlerContext, channel : Channel, obj : Any) : Object = {
//dispatch on first byte
val buffer = obj.asInstanceOf[ChannelBuffer]
@@ -72,5 +72,5 @@ class HandshakeDecoder extends OneToOneDecoder {
buffer
}
}
-
-}
+
+}
View
6 src/main/scala/scalang/node/HandshakeEncoder.scala
@@ -21,7 +21,7 @@ import netty.channel._
import netty.buffer._
class HandshakeEncoder extends OneToOneEncoder {
-
+
def encode(ctx : ChannelHandlerContext, channel : Channel, obj : Any) : Object = {
obj match {
case NameMessage(version, flags, name) =>
@@ -72,5 +72,5 @@ class HandshakeEncoder extends OneToOneEncoder {
buffer
}
}
-
-}
+
+}
View
32 src/main/scala/scalang/node/HandshakeHandler.scala
@@ -37,12 +37,12 @@ abstract class HandshakeHandler(posthandshake : (Symbol,ChannelPipeline) => Unit
@volatile var peer : Symbol = null
@volatile var challenge : Int = 0
@volatile var peerChallenge : Int = 0
-
+
val messages = new ArrayDeque[MessageEvent]
val random = SecureRandom.getInstance("SHA1PRNG")
-
+
def isVerified = currentState == 'verified
-
+
//handler callbacks
override def messageReceived(ctx : ChannelHandlerContext, e : MessageEvent) {
this.ctx = ctx
@@ -51,29 +51,29 @@ abstract class HandshakeHandler(posthandshake : (Symbol,ChannelPipeline) => Unit
super.messageReceived(ctx, e)
return
}
-
+
event(msg)
}
-
+
override def channelConnected(ctx : ChannelHandlerContext, e : ChannelStateEvent) {
this.ctx = ctx
val channel = ctx.getChannel
val future = Channels.future(channel)
event(ConnectedMessage)
}
-
+
override def channelClosed(ctx : ChannelHandlerContext, e : ChannelStateEvent) {
this.ctx = ctx
log.error("Channel closed during handshake")
handshakeFailed
}
-
+
override def exceptionCaught(ctx : ChannelHandlerContext, e : ExceptionEvent) {
this.ctx = ctx
log.error(e.getCause, "Exception caught during erlang handshake: ")
handshakeFailed
}
-
+
override def writeRequested(ctx : ChannelHandlerContext, e : MessageEvent) {
this.ctx = ctx
if (isVerified) {
@@ -82,7 +82,7 @@ abstract class HandshakeHandler(posthandshake : (Symbol,ChannelPipeline) => Unit
messages.offer(e)
}
}
-
+
//utility methods
protected def digest(challenge : Int, cookie : String) : Array[Byte] = {
val masked = mask(challenge)
@@ -91,15 +91,15 @@ abstract class HandshakeHandler(posthandshake : (Symbol,ChannelPipeline) => Unit
md5.update(masked.toString.getBytes)
md5.digest
}
-
+
def mask(challenge : Int) : Long = {
if (challenge < 0) {
(1L << 31) | (challenge & 0x7FFFFFFFL)
} else {
challenge.toLong
}
}
-
+
protected def digestEquals(a : Array[Byte], b : Array[Byte]) : Boolean = {
var equals = true
if (a.length != b.length) {
@@ -111,27 +111,27 @@ abstract class HandshakeHandler(posthandshake : (Symbol,ChannelPipeline) => Unit
}
equals
}
-
+
protected def drainQueue {
val p = ctx.getPipeline
val keys = p.toMap.keySet
for (name <- List("handshakeFramer", "handshakeDecoder", "handshakeEncoder", "handshakeHandler"); if keys.contains(name)) {
p.remove(name)
}
posthandshake(peer,p)
-
+
for (msg <- messages) {
ctx.sendDownstream(msg)
}
messages.clear