Skip to content

Commit

Permalink
Fixed issue akka#46: Remote Actor should be defined by target class a…
Browse files Browse the repository at this point in the history
…nd UUID
  • Loading branch information
jboner committed Nov 21, 2009
1 parent 805fac6 commit f5b9e98
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 94 deletions.
1 change: 1 addition & 0 deletions akka-actors/src/main/scala/actor/ActiveObject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ private[akka] sealed class ActiveObjectAspect {
.setId(RemoteRequestIdFactory.nextId)
.setMethod(rtti.getMethod.getName)
.setTarget(target.getName)
.setUuid(actor.uuid)
.setTimeout(timeout)
.setIsActor(false)
.setIsOneWay(oneWay_?)
Expand Down
25 changes: 12 additions & 13 deletions akka-actors/src/main/scala/actor/Actor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,14 @@ trait Actor extends Logging with TransactionManagement {
implicit val self: AnyRef = this

// FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait
val uuid = Uuid.newUuid.toString

private[akka] var _uuid = Uuid.newUuid.toString
def uuid = _uuid

// ====================================
// private fields
// ====================================

@volatile private var _isRunning: Boolean = false
@volatile private var _isRunning: Boolean = false
@volatile private var _isShutDown: Boolean = false
private var _hotswap: Option[PartialFunction[Any, Unit]] = None
private var _config: Option[AnyRef] = None
Expand Down Expand Up @@ -346,6 +347,9 @@ trait Actor extends Logging with TransactionManagement {
"Actor has not been started, you need to invoke 'actor.start' before using it")
}

/**
* Same as the '!' method but does not take an implicit sender as second parameter.
*/
def send(message: AnyRef) = {
if (_isRunning) postMessageToMailbox(message, None)
else throw new IllegalStateException(
Expand Down Expand Up @@ -394,17 +398,10 @@ trait Actor extends Logging with TransactionManagement {
def !![T](message: AnyRef): Option[T] = !![T](message, timeout)

/**
* Sends a message asynchronously, but waits on a future indefinitely. E.g. emulates a synchronous call.
* <p/>
* <b>NOTE:</b>
* Should be used with care (almost never), since very dangerous (will block a thread indefinitely if no reply).
* This method is evil and have been removed. Use '!!' with a timeout instead.
*/
def !?[T](message: AnyRef): T = if (_isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0)
future.awaitBlocking
getResultOrThrowException(future).get
} else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
def !?[T](message: AnyRef): T = throw new UnsupportedOperationException(
"'!?' is evil and have been removed. Use '!!' with a timeout instead")

/**
* Use <code>reply(..)</code> to reply with a message to the original sender of the message currently
Expand Down Expand Up @@ -596,6 +593,7 @@ trait Actor extends Logging with TransactionManagement {
.setId(RemoteRequestIdFactory.nextId)
.setTarget(this.getClass.getName)
.setTimeout(timeout)
.setUuid(uuid)
.setIsActor(true)
.setIsOneWay(true)
.setIsEscaped(false)
Expand All @@ -615,6 +613,7 @@ trait Actor extends Logging with TransactionManagement {
.setId(RemoteRequestIdFactory.nextId)
.setTarget(this.getClass.getName)
.setTimeout(timeout)
.setUuid(uuid)
.setIsActor(true)
.setIsOneWay(false)
.setIsEscaped(false)
Expand Down
3 changes: 2 additions & 1 deletion akka-actors/src/main/scala/nio/RemoteClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ class RemoteClientHandler(val name: String,
extends SimpleChannelUpstreamHandler with Logging {

override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
if (event.isInstanceOf[ChannelStateEvent] &&
event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
log.debug(event.toString)
}
super.handleUpstream(ctx, event)
Expand Down
40 changes: 26 additions & 14 deletions akka-actors/src/main/scala/nio/RemoteServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ object RemoteServer extends Logging {
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader]) extends ChannelPipelineFactory {
class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader])
extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val p = Channels.pipeline()
p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4))
Expand All @@ -83,21 +84,26 @@ class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader]) ext
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ChannelPipelineCoverage { val value = "all" }
class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassLoader]) extends SimpleChannelUpstreamHandler with Logging {
class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassLoader])
extends SimpleChannelUpstreamHandler with Logging {
private val activeObjects = new ConcurrentHashMap[String, AnyRef]
private val actors = new ConcurrentHashMap[String, Actor]

override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
if (event.isInstanceOf[ChannelStateEvent] &&
event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
log.debug(event.toString)
}
super.handleUpstream(ctx, event)
}

override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
val message = event.getMessage
if (message == null) throw new IllegalStateException("Message in remote MessageEvent is null: " + event)
if (message.isInstanceOf[RemoteRequest]) handleRemoteRequest(message.asInstanceOf[RemoteRequest], event.getChannel)
if (message == null) throw new IllegalStateException(
"Message in remote MessageEvent is null: " + event)
if (message.isInstanceOf[RemoteRequest]) {
handleRemoteRequest(message.asInstanceOf[RemoteRequest], event.getChannel)
}
}

override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
Expand All @@ -115,7 +121,7 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
private def dispatchToActor(request: RemoteRequest, channel: Channel) = {
import Actor._
log.debug("Dispatching to remote actor [%s]", request.getTarget)
val actor = createActor(request.getTarget, request.getTimeout)
val actor = createActor(request.getTarget, request.getUuid, request.getTimeout)
actor.start
val message = RemoteProtocolBuilder.getMessage(request)
if (request.getIsOneWay) actor ! message
Expand Down Expand Up @@ -158,7 +164,8 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL

//continueTransaction(request)
try {
val messageReceiver = activeObject.getClass.getDeclaredMethod(request.getMethod, unescapedArgClasses: _*)
val messageReceiver = activeObject.getClass.getDeclaredMethod(
request.getMethod, unescapedArgClasses: _*)
if (request.getIsOneWay) messageReceiver.invoke(activeObject, unescapedArgs: _*)
else {
val result = messageReceiver.invoke(activeObject, unescapedArgs: _*)
Expand All @@ -174,7 +181,9 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
}
} catch {
case e: InvocationTargetException =>
log.error("Could not invoke remote active object [%s :: %s] due to: %s", request.getMethod, request.getTarget, e.getCause)
log.error(
"Could not invoke remote active object [%s :: %s] due to: %s",
request.getMethod, request.getTarget, e.getCause)
e.getCause.printStackTrace
val replyBuilder = RemoteReply.newBuilder
.setId(request.getId)
Expand All @@ -185,7 +194,9 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
val replyMessage = replyBuilder.build
channel.write(replyMessage)
case e: Throwable =>
log.error("Could not invoke remote active object [%s :: %s] due to: %s", request.getMethod, request.getTarget, e)
log.error(
"Could not invoke remote active object [%s :: %s] due to: %s",
request.getMethod, request.getTarget, e)
e.printStackTrace
val replyBuilder = RemoteReply.newBuilder
.setId(request.getId)
Expand Down Expand Up @@ -250,21 +261,22 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
} else activeObjectOrNull
}

private def createActor(name: String, timeout: Long): Actor = {
val actorOrNull = actors.get(name)
private def createActor(name: String, uuid: String, timeout: Long): Actor = {
val actorOrNull = actors.get(uuid)
if (actorOrNull == null) {
try {
log.info("Creating a new remote actor [%s]", name)
log.info("Creating a new remote actor [%s:%s]", name, uuid)
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
else Class.forName(name)
val newInstance = clazz.newInstance.asInstanceOf[Actor]
newInstance._uuid = uuid
newInstance.timeout = timeout
newInstance._remoteAddress = None
actors.put(name, newInstance)
actors.put(uuid, newInstance)
newInstance
} catch {
case e =>
log.debug("Could not create remote actor instance due to: %s", e)
log.error(e, "Could not create remote actor instance due to: " + e.getMessage)
e.printStackTrace
throw e
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class EventBasedSingleThreadActorTest extends JUnitSuite {
implicit val timeout = 5000L
val actor = new TestActor
actor.start
val result: String = actor !? "Hello"
val result: String = (actor !! ("Hello", 10000)).get
assert("World" === result)
actor.stop
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class EventBasedThreadPoolActorTest extends JUnitSuite {
implicit val timeout = 5000L
val actor = new TestActor
actor.start
val result: String = actor !? "Hello"
val result: String = (actor !! ("Hello", 10000)).get
assert("World" === result)
actor.stop
}
Expand Down
11 changes: 0 additions & 11 deletions akka-actors/src/test/scala/RemoteActorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,6 @@ class RemoteActorTest extends JUnitSuite {
actor.stop
}

@Test
def shouldSendReplySync = {
implicit val timeout = 500000000L
val actor = new RemoteActorSpecActorBidirectional
actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
actor.start
val result: String = actor !? "Hello"
assert("World" === result)
actor.stop
}

@Test
def shouldSendReplyAsync = {
implicit val timeout = 500000000L
Expand Down
2 changes: 1 addition & 1 deletion akka-actors/src/test/scala/ThreadBasedActorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ThreadBasedActorTest extends JUnitSuite {
implicit val timeout = 5000L
val actor = new TestActor
actor.start
val result: String = actor !? "Hello"
val result: String = (actor !! ("Hello", 10000)).get
assert("World" === result)
actor.stop
}
Expand Down
2 changes: 1 addition & 1 deletion akka-security/src/main/scala/Security.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class AkkaSecurityFilterFactory extends ResourceFilterFactory with Logging {
override def filter(request: ContainerRequest): ContainerRequest =
rolesAllowed match {
case Some(roles) => {
(authenticator !? Authenticate(request, roles)).asInstanceOf[AnyRef] match {
(authenticator !! (Authenticate(request, roles), 10000)).get.asInstanceOf[AnyRef] match {
case OK => request
case r if r.isInstanceOf[Response] =>
throw new WebApplicationException(r.asInstanceOf[Response])
Expand Down
6 changes: 3 additions & 3 deletions akka-security/src/test/scala/SecuritySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class BasicAuthenticatorSpec extends junit.framework.TestCase
@Test def testChallenge = {
val req = mock[ContainerRequest]

val result: Response = (authenticator !? Authenticate(req, List("foo")))
val result: Response = (authenticator !! (Authenticate(req, List("foo")), 10000)).get

// the actor replies with a challenge for the browser
result.getStatus must equal(Response.Status.UNAUTHORIZED.getStatusCode)
Expand All @@ -41,7 +41,7 @@ class BasicAuthenticatorSpec extends junit.framework.TestCase
// fake a request authorization -> this will authorize the user
when(req.isUserInRole("chef")).thenReturn(true)

val result: AnyRef = (authenticator !? Authenticate(req, List("chef")))
val result: AnyRef = (authenticator !! (Authenticate(req, List("chef")), 10000)).get

result must be(OK)
// the authenticator must have set a security context
Expand All @@ -55,7 +55,7 @@ class BasicAuthenticatorSpec extends junit.framework.TestCase
when(req.getHeaderValue("Authorization")).thenReturn("Basic " + new String(Base64.encode("foo:bar")))
when(req.isUserInRole("chef")).thenReturn(false) // this will deny access

val result: Response = (authenticator !? Authenticate(req, List("chef")))
val result: Response = (authenticator !! (Authenticate(req, List("chef")), 10000)).get

result.getStatus must equal(Response.Status.FORBIDDEN.getStatusCode)

Expand Down

0 comments on commit f5b9e98

Please sign in to comment.