Skip to content

Commit

Permalink
Initial Commit
Browse files Browse the repository at this point in the history
  • Loading branch information
acunniffe committed Mar 31, 2018
0 parents commit a41bfb8
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
@@ -0,0 +1,3 @@
project/target
target
.idea
10 changes: 10 additions & 0 deletions build.sbt
@@ -0,0 +1,10 @@
name := "akka-faddish-mailbox"

version := "0.1"

scalaVersion := "2.12.4"

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.4"
libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % "2.5.4" % "test"
libraryDependencies += "org.scalactic" %% "scalactic" % "3.0.5"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test"
1 change: 1 addition & 0 deletions project/build.properties
@@ -0,0 +1 @@
sbt.version = 1.1.2
8 changes: 8 additions & 0 deletions src/main/resources/application.conf
@@ -0,0 +1,8 @@
my-mailbox {
mailbox-type = "com.opticdev.scala.akka.FaddishUnboundedMailbox"
filter = "com.opticdev.scala.akka.TestFilter"
}

other-mailbox {
mailbox-type = "com.opticdev.scala.akka.FaddishUnboundedMailbox"
}
77 changes: 77 additions & 0 deletions src/main/scala/com/opticdev/scala/akka/FaddishMailbox.scala
@@ -0,0 +1,77 @@
package com.opticdev.scala.akka

import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.dispatch.Envelope
import akka.dispatch.MailboxType
import akka.dispatch.MessageQueue
import akka.dispatch.ProducesMessageQueue
import com.typesafe.config.Config
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.function.Consumer

import scala.util.Try

trait FaddishUnboundedMessageQueueSemantics

abstract class FaddishMailboxFilter {
def filterOut(target: Envelope) : PartialFunction[Any, Boolean] = {
PartialFunction[Any, Boolean] {
case _ => false
}
}
}

object FaddishUnboundedMailbox {

class FaddishMessageQueue(filter: FaddishMailboxFilter) extends MessageQueue
with FaddishUnboundedMessageQueueSemantics {

private final val queue = new ConcurrentLinkedQueue[Envelope]()

def enqueue(receiver: ActorRef, handle: Envelope): Unit = {

synchronized {
val toRemove = scala.collection.mutable.Set[Envelope]()
queue.iterator().forEachRemaining(new Consumer[Envelope] {
override def accept(t: Envelope): Unit = {
if (filter.filterOut(handle)(t.message)) {
toRemove += t
}
}
})

toRemove.foreach(i=> queue.remove(i))
}

queue.offer(handle)
}
def dequeue(): Envelope = queue.poll()
def numberOfMessages: Int = queue.size
def hasMessages: Boolean = !queue.isEmpty
def cleanUp(owner: ActorRef, deadLetters: MessageQueue) {
while (hasMessages) {
deadLetters.enqueue(owner, dequeue())
}
}
}
}

// This is the Mailbox implementation
class FaddishUnboundedMailbox(filter: FaddishMailboxFilter) extends MailboxType
with ProducesMessageQueue[FaddishUnboundedMailbox.FaddishMessageQueue] {

import FaddishUnboundedMailbox._

// This constructor signature must exist, it will be called by Akka
def this(settings: ActorSystem.Settings, config: Config) = {
// put your initialization code here
this(Try { Class.forName(config.getString("filter")).newInstance().asInstanceOf[FaddishMailboxFilter]}.getOrElse(new FaddishMailboxFilter {}))
}

// The create method is called to create the MessageQueue
final override def create(
owner: Option[ActorRef],
system: Option[ActorSystem]): MessageQueue =
new FaddishMessageQueue(filter)
}
6 changes: 6 additions & 0 deletions src/main/scala/com/opticdev/scala/akka/package.scala
@@ -0,0 +1,6 @@
package com.opticdev.scala

package object akka {
case class Ping(ballColor: String, speed: Int)
case class Pong(speed: Int)
}
38 changes: 38 additions & 0 deletions src/test/scala/com/opticdev/scala/akka/FaddishMailboxSpec.scala
@@ -0,0 +1,38 @@
package com.opticdev.scala.akka

import akka.actor.{ActorSystem, Props}
import akka.testkit.{ImplicitSender, TestKit}
import org.scalatest.{BeforeAndAfterAll, FunSpecLike, Matchers, WordSpecLike}

class FaddishMailboxSpec extends TestKit(ActorSystem("FaddishMailboxSpec")) with ImplicitSender with FunSpecLike with BeforeAndAfterAll {

override def afterAll {
TestKit.shutdownActorSystem(system)
}

it("will filter out messages") {
//uses TestActor and TestFaddishFilter
val echo = system.actorOf(Props[TestActor].withDispatcher("my-mailbox"))
echo ! Ping("RED", 20)
(2 to 40).foreach(i=> echo ! Ping("RED", 20 * i)) //invalidated, never processed
echo ! Ping("RED", 0)

expectMsg(Pong(20))
expectMsg(Pong(0))
}

it("if no filter is specified none will be applied ") {
//uses TestActor and TestFaddishFilter
val echo = system.actorOf(Props[TestActor].withDispatcher("other-mailbox"))
echo ! Ping("RED", 20)
echo ! Ping("RED", 40)
echo ! Ping("RED", 60)
echo ! Ping("RED", 0)

expectMsg(Pong(20))
expectMsg(Pong(40))
expectMsg(Pong(60))
expectMsg(Pong(0))
}

}
13 changes: 13 additions & 0 deletions src/test/scala/com/opticdev/scala/akka/TestActor.scala
@@ -0,0 +1,13 @@
package com.opticdev.scala.akka

import akka.actor.Actor
import akka.dispatch.RequiresMessageQueue

class TestActor extends Actor with RequiresMessageQueue[FaddishUnboundedMessageQueueSemantics] {
override def receive: Receive = {
case Ping(ballColor, rating) => {
Thread.sleep(1000)
sender() ! Pong(rating)
}
}
}
18 changes: 18 additions & 0 deletions src/test/scala/com/opticdev/scala/akka/TestFilter.scala
@@ -0,0 +1,18 @@
package com.opticdev.scala.akka

import akka.dispatch.Envelope

class TestFilter extends FaddishMailboxFilter {
override def filterOut(target: Envelope) : PartialFunction[Any, Boolean] = {
target.message match {
case ping: Ping =>
val newBallColor = ping.ballColor
PartialFunction[Any, Boolean] {
case ping: Ping => ping.ballColor == newBallColor
case _ => false
}
case _ =>
super.filterOut(target)
}
}
}

0 comments on commit a41bfb8

Please sign in to comment.