Skip to content

Commit

Permalink
Scalang changes to support scala to scala communications
Browse files Browse the repository at this point in the history
  • Loading branch information
Ray Jenkins authored and Ray Jenkins committed Oct 5, 2012
1 parent 3ff3e54 commit 41c803b
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 16 deletions.
5 changes: 3 additions & 2 deletions src/main/scala/scalang/Node.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
val pidSerial = new AtomicInteger(0)
val executor = poolFactory.createActorPool
val factory = new PoolFiberFactory(executor)
val server = new ErlangNodeServer(this,config.typeFactory)
val server = new ErlangNodeServer(this,config.typeFactory, config.typeEncoder)
val localEpmd = Epmd("localhost")
localEpmd.alive(server.port, splitNodename(name)) match {
case Some(c) => creation = c
Expand Down Expand Up @@ -865,7 +865,8 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
val hostname = splitHostname(peer).getOrElse(throw new ErlangNodeException("Cannot resolve peer with no hostname: " + peer.name))
val peerName = splitNodename(peer)
val port = Epmd(hostname).lookupPort(peerName).getOrElse(throw new ErlangNodeException("Cannot lookup peer: " + peer.name))
val client = new ErlangNodeClient(this, peer, hostname, port, msg, config.typeFactory, afterHandshake)
val client = new ErlangNodeClient(this, peer, hostname, port, msg, config.typeFactory,
config.typeEncoder, afterHandshake)
client.channel
}

Expand Down
70 changes: 61 additions & 9 deletions src/main/scala/scalang/NodeConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,68 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
package scalang
package scalang.node

import util._
import java.net.InetSocketAddress
import java.util.concurrent.Executors
import org.jboss.{netty => netty}
import netty.buffer.ChannelBuffer
import scalang._
import netty.channel._
import netty.bootstrap._
import netty.handler.codec.frame._
import netty.handler.timeout._
import netty.util.HashedWheelTimer
import socket.nio.NioClientSocketChannelFactory
import com.codahale.logula.Logging

case class NodeConfig(
poolFactory : ThreadPoolFactory = new DefaultThreadPoolFactory,
clusterListener : Option[ClusterListener] = None,
typeFactory : TypeFactory = NoneTypeFactory,
tickTime : Int = 60)

object NoneTypeFactory extends TypeFactory {
def createType(name : Symbol, arity : Int, reader : TermReader) = None
class ErlangNodeClient(
node : ErlangNode,
peer : Symbol,
host : String,
port : Int,
control : Option[Any],
typeFactory : TypeFactory,
typeEncoder : TypeEncoder,
afterHandshake : Channel => Unit) extends Logging
{
val bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
node.poolFactory.createBossPool,
node.poolFactory.createWorkerPool))
bootstrap.setPipelineFactory(new ChannelPipelineFactory {
def getPipeline : ChannelPipeline = {
val pipeline = Channels.pipeline

val handshakeDecoder = new HandshakeDecoder
handshakeDecoder.mode = 'challenge //first message on the client side is challenge, not name

pipeline.addLast("handshakeFramer", new LengthFieldBasedFrameDecoder(Short.MaxValue, 0, 2, 0, 2))
pipeline.addLast("handshakeDecoder", handshakeDecoder)
pipeline.addLast("handshakeEncoder", new HandshakeEncoder)
pipeline.addLast("handshakeHandler", new ClientHandshakeHandler(node.name, node.cookie, node.posthandshake))
pipeline.addLast("erlangFramer", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4, 0, 4))
pipeline.addLast("encoderFramer", new LengthFieldPrepender(4))
pipeline.addLast("erlangDecoder", new ScalaTermDecoder(peer, typeFactory))
pipeline.addLast("erlangEncoder", new ScalaTermEncoder(peer, typeEncoder))
pipeline.addLast("erlangHandler", new ErlangHandler(node, afterHandshake))

pipeline
}
})

val future = bootstrap.connect(new InetSocketAddress(host, port))
val channel = future.getChannel
future.addListener(new ChannelFutureListener {
def operationComplete(f : ChannelFuture) {
if (f.isSuccess) {
for (c <- control) {
channel.write(c)
}
} else {
node.disconnected(peer, channel)
}
}
})
}
9 changes: 7 additions & 2 deletions src/main/scala/scalang/node/ErlangNodeClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ package scalang.node
import java.net.InetSocketAddress
import java.util.concurrent.Executors
import org.jboss.{netty => netty}
import netty.buffer.ChannelBuffer
import scalang._
import netty.channel._
import netty.bootstrap._
import netty.handler.codec.frame._
import netty.handler.timeout._
import netty.util.HashedWheelTimer
import socket.nio.NioClientSocketChannelFactory
import com.codahale.logula.Logging


class ErlangNodeClient(
node : ErlangNode,
Expand All @@ -33,7 +36,9 @@ class ErlangNodeClient(
port : Int,
control : Option[Any],
typeFactory : TypeFactory,
afterHandshake : Channel => Unit) {
typeEncoder : TypeEncoder,
afterHandshake : Channel => Unit) extends Logging
{
val bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
node.poolFactory.createBossPool,
Expand All @@ -52,7 +57,7 @@ class ErlangNodeClient(
pipeline.addLast("erlangFramer", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4, 0, 4))
pipeline.addLast("encoderFramer", new LengthFieldPrepender(4))
pipeline.addLast("erlangDecoder", new ScalaTermDecoder(peer, typeFactory))
pipeline.addLast("erlangEncoder", new ScalaTermEncoder(peer))
pipeline.addLast("erlangEncoder", new ScalaTermEncoder(peer, typeEncoder))
pipeline.addLast("erlangHandler", new ErlangHandler(node, afterHandshake))

pipeline
Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/scalang/node/ErlangNodeServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ package scalang.node
import java.net.InetSocketAddress
import java.util.concurrent.Executors
import org.jboss.{netty => netty}
import netty.buffer.ChannelBuffer
import scalang._
import netty.channel._
import netty.bootstrap._
import netty.handler.codec.frame._
import netty.handler.timeout._
import netty.util.HashedWheelTimer
import socket.nio.NioServerSocketChannelFactory
import com.codahale.logula.Logging

class ErlangNodeServer(node : ErlangNode, typeFactory : TypeFactory) {
class ErlangNodeServer(node : ErlangNode, typeFactory : TypeFactory, typeEncoder: TypeEncoder) extends Logging {
val bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
node.poolFactory.createBossPool,
Expand All @@ -41,7 +43,7 @@ class ErlangNodeServer(node : ErlangNode, typeFactory : TypeFactory) {
pipeline.addLast("erlangFramer", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4, 0, 4))
pipeline.addLast("encoderFramer", new LengthFieldPrepender(4))
pipeline.addLast("erlangDecoder", new ScalaTermDecoder('server, typeFactory))
pipeline.addLast("erlangEncoder", new ScalaTermEncoder('server))
pipeline.addLast("erlangEncoder", new ScalaTermEncoder('server, typeEncoder))
pipeline.addLast("erlangHandler", new ErlangHandler(node))

pipeline
Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/scalang/node/ScalaTermEncoder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import scalang.util.CamelToUnder._
import com.codahale.logula.Logging
import com.yammer.metrics.scala._

class ScalaTermEncoder(peer: Symbol) extends OneToOneEncoder with Logging with Instrumented {
class ScalaTermEncoder(peer: Symbol, encoder: TypeEncoder) extends OneToOneEncoder with Logging with Instrumented {

val encodeTimer = metrics.timer("encoding", peer.name)

Expand Down Expand Up @@ -72,6 +72,8 @@ class ScalaTermEncoder(peer: Symbol) extends OneToOneEncoder with Logging with I
}

def encodeObject(buffer : ChannelBuffer, obj : Any) : Unit = obj match {
case encoder(_) =>
encoder.encode(obj, buffer)
case i : Int if i >= 0 && i <= 255 =>
writeSmallInteger(buffer, i)
case i : Int =>
Expand Down

0 comments on commit 41c803b

Please sign in to comment.