Skip to content
Browse files

Merge branch 'master' of github.com:wajam/nrv

  • Loading branch information...
2 parents fdc5e39 + 8715fe1 commit c93ece0acc36ce32aabcb1b731ab7feb98e1f0ab felix committed Apr 9, 2012
View
16 nrv-core/src/main/scala/com/wajam/nrv/cluster/Router.scala
@@ -2,8 +2,8 @@ package com.wajam.nrv.cluster
import actors.Actor
import collection.mutable.Map
-import com.wajam.nrv.data.{InRequest, OutRequest}
import com.wajam.nrv.Logging
+import com.wajam.nrv.data.{Message, InRequest, OutRequest}
/**
* Handle incoming requests to different actions
@@ -30,11 +30,15 @@ class Router(cluster: Cluster) extends Actor with Logging {
case inRequest: InRequest =>
// check for rendez-vous
var optReq:Option[OutRequest] = None
- if (inRequest.rendezvous > 0) {
- optReq = this.requests.remove(inRequest.rendezvous)
- if (optReq == None) {
- warn("Received a incoming request with a rendez-vous, but with no matching outgoing request: {}", inRequest)
- }
+ inRequest.function match {
+ case Message.FUNCTION_RESPONSE =>
+ optReq = this.requests.remove(inRequest.rendezvous)
+ if (optReq == None) {
+ warn("Received a incoming request with a rendez-vous, but with no matching outgoing request: {}", inRequest)
+ }
+
+ case Message.FUNCTION_CALL =>
+ // TODO: call specific processing here
}
val action = cluster.getAction(inRequest.actionURL)
View
13 nrv-core/src/main/scala/com/wajam/nrv/data/InRequest.scala
@@ -4,5 +4,16 @@ package com.wajam.nrv.data
* Received request
*/
class InRequest extends Message {
- def reply() = null
+ var replyCallback:(OutRequest => Unit) = null
+
+ def reply(data:(String,Any)*) {
+ this.reply(new OutRequest(data))
+ }
+
+ def reply(request:OutRequest) {
+ if (replyCallback == null)
+ throw new Exception("Called reply on a request with no reply callback")
+
+ this.replyCallback(request)
+ }
}
View
24 nrv-core/src/main/scala/com/wajam/nrv/data/Message.scala
@@ -2,16 +2,28 @@ package com.wajam.nrv.data
import scala.collection.mutable.HashMap
import com.wajam.nrv.service.{ActionURL, Endpoints}
+import com.wajam.nrv.cluster.Node
/**
* Base message used for outbound and inbound requests.
*/
abstract class Message(data: Iterable[(String, Any)]) extends HashMap[String, Any] with Serializable {
+
+ import Message._
+
var protocolName = ""
var serviceName = ""
var path = "/"
var rendezvous = 0
- var destination = Endpoints.empty
+
+ /*
+ * Messages that are passed between nodes are not just RPC calls, but can also
+ * be response or any control message.
+ */
+ var function = FUNCTION_CALL
+
+ var source: Node = null
+ var destination = Endpoints.empty // TODO: see @Action, should it be service members??
loadData(data)
@@ -28,6 +40,14 @@ abstract class Message(data: Iterable[(String, Any)]) extends HashMap[String, An
other.protocolName = this.protocolName
other.serviceName = this.serviceName
other.path = this.path
- other.destination = this.destination
+ other.function = this.function
+ other.rendezvous = this.rendezvous
+ other.source = this.source
+ other.destination = this.destination // TODO: should be cloned
}
}
+
+object Message {
+ val FUNCTION_CALL = 0
+ val FUNCTION_RESPONSE = 1
+}
View
2 nrv-core/src/main/scala/com/wajam/nrv/protocol/DummyProtocol.scala
@@ -12,7 +12,7 @@ class DummyProtocol(cluster: Cluster, name: String) extends Protocol(name, clust
def handleOutgoing(action: Action, message: Message) {
val newRequest = new InRequest()
message.copyTo(newRequest)
- action.handleIncomingRequest(newRequest)
+ cluster.route(newRequest)
}
def start() = null
View
35 nrv-core/src/main/scala/com/wajam/nrv/service/Action.scala
@@ -1,7 +1,7 @@
package com.wajam.nrv.service
-import com.wajam.nrv.data.{OutRequest, InRequest}
import com.wajam.nrv.UnavailableException
+import com.wajam.nrv.data.{Message, OutRequest, InRequest}
/**
* Action that binds a path to a callback. This is analogous to a RPC endpoint function,
@@ -10,11 +10,20 @@ import com.wajam.nrv.UnavailableException
class Action(var path: ActionPath, onReceive: ((InRequest) => Unit)) extends ActionSupport {
def matches(path: ActionPath) = this.path.matchesPath(path)._1
+ private def initOutRequest(request:OutRequest) {
+ request.source = this.cluster.localNode
+ request.serviceName = this.service.name
+ request.path = this.path.buildPath(request)
+ }
+
def call(request: OutRequest) {
this.checkSupported()
+ // initialize request
+ this.initOutRequest(request)
+ request.function = Message.FUNCTION_CALL
+
// resolve endpoints
- request.path = this.path.buildPath(request)
this.resolver.handleOutgoing(this, request)
if (request.destination.size == 0)
throw new UnavailableException
@@ -31,9 +40,23 @@ class Action(var path: ActionPath, onReceive: ((InRequest) => Unit)) extends Act
}
def handleIncomingRequest(inRequest: InRequest, outRequest: Option[OutRequest] = None) {
- if (outRequest != None)
- outRequest.get.handleReply(inRequest)
- else
- this.onReceive(inRequest)
+ outRequest match {
+ case None =>
+ inRequest.replyCallback = (respRequest => {
+ this.initOutRequest(respRequest)
+ respRequest.function = Message.FUNCTION_RESPONSE
+
+ respRequest.rendezvous = inRequest.rendezvous
+
+ // TODO: shouldn't be like that. Source may not be a member...
+ respRequest.destination = new Endpoints(Seq(new ServiceMember(0, inRequest.source)))
+
+ this.protocol.handleOutgoing(this, respRequest)
+ })
+ this.onReceive(inRequest)
+
+ case Some(originalRequest) =>
+ originalRequest.handleReply(inRequest)
+ }
}
}
View
6 nrv-core/src/main/scala/com/wajam/nrv/service/Endpoints.scala
@@ -5,16 +5,12 @@ package com.wajam.nrv.service
* but all backup nodes are considered endpoints too in order to handle the
* high availability. Consensus manager handles replication to these endpoints.
*/
-class Endpoints(members: List[ServiceMember]) extends Serializable {
+class Endpoints(members: Seq[ServiceMember]) extends Serializable {
def size = members.size
def apply(pos: Int) = members(pos)
}
object Endpoints {
val empty = new Endpoints(List())
-
- def list(members: ServiceMember*): Endpoints = {
- new Endpoints(List[ServiceMember]() ++ members)
- }
}
View
4 nrv-core/src/test/scala/com/wajam/nrv/cluster/TestRouter.scala
@@ -5,9 +5,9 @@ import org.scalatest.mock.MockitoSugar
import org.mockito.Matchers._
import org.mockito.Mockito._
import com.wajam.nrv.service.Action
-import com.wajam.nrv.data.{InRequest, OutRequest}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
+import com.wajam.nrv.data.{Message, InRequest, OutRequest}
@RunWith(classOf[JUnitRunner])
class TestRouter extends FunSuite with MockitoSugar {
@@ -26,11 +26,11 @@ class TestRouter extends FunSuite with MockitoSugar {
router !? outReq
val inReq = new InRequest()
+ inReq.function = Message.FUNCTION_RESPONSE
inReq.rendezvous = outReq.rendezvous
router !? inReq
verify(mockAction).handleIncomingRequest(inReq, Some(outReq))
-
}
}
View
2 nrv-core/src/test/scala/com/wajam/nrv/protocol/TestNrvProtocol.scala
@@ -34,7 +34,7 @@ class TestNrvProtocol extends FunSuite with BeforeAndAfter {
cluster.start()
val req = new OutRequest(Map("test" -> "someval"))
- req.destination = Endpoints.list(new ServiceMember(0, cluster.localNode))
+ req.destination = new Endpoints(Seq(new ServiceMember(0, cluster.localNode)))
protocol.handleOutgoing(null, req)
notifier.synchronized {
View
18 nrv-core/src/test/scala/com/wajam/nrv/service/TestAction.scala
@@ -29,17 +29,25 @@ class TestAction extends FunSuite {
}
+ req.reply("some_key" -> "some_value")
+ }))
+
+
+ var responded = false
+ action.call("test" -> "myvalue")(resp => {
+ responded = true
+
notifier.synchronized {
notifier.notify()
}
- }))
-
- action.call("test" -> "myvalue")()
+ })
notifier.synchronized {
- notifier.wait(1)
- assert(called, "didn't received called action")
+ notifier.wait(2000)
+ assert(called, "didn't receive called action")
assert(testValue == "myvalue", "expected 'test', got '" + testValue + "'")
+
+ assert(responded, "didn't receive response")
}
}
}
View
2 nrv-core/src/test/scala/com/wajam/nrv/transport/codec/TestJavaSerializeCodec.scala
@@ -14,7 +14,7 @@ class TestJavaSerializeCodec extends FunSuite {
val codec = new JavaSerializeCodec()
val req = new OutRequest(Map("test" -> "someval"))
- req.destination = Endpoints.list(new ServiceMember(0, new Node("127.0.0.1", Map("nrv" -> 12345))))
+ req.destination = new Endpoints(Seq(new ServiceMember(0, new Node("127.0.0.1", Map("nrv" -> 12345)))))
val bytes = codec.encode(req)
assert(bytes.length > 0)

0 comments on commit c93ece0

Please sign in to comment.
Something went wrong with that request. Please try again.