A type-safe Actor class and WiredActor for sending functions as messages
Branch: master
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
project
src
.gitignore
LICENSE
README.md
build.sbt

README.md

Actor

Actor - A small type-safe class that implements most commonly used Actor APIs including ask (?) which returns a typed Future[R].

WiredActor - Invoke functions directly on Actors. Send functions as messages.

Setup

libraryDependencies += "com.github.simerplaha" %% "actor" % "0.3"

Make sure to import ExecutionContext

import scala.concurrent.ExecutionContext.Implicits.global

WiredActor

Functions can be sent, invoked & scheduled as messages to WiredActors similar to messages in an Actor.

WiredActors can be created on any class instance or object.

Create a WiredActor

//your class that contains Actor functions
object MyImpl {
  def hello(name: String): String =
    s"Hello $name"

  def helloFuture(name: String): Future[String] =
    Future(s"Hello $name from the Future!") //some future operation
}

//create WiredActor
val actor = Actor.wire(MyImpl)

ask

//invoke function
val response: Future[String] = actor.ask(_.hello("World"))
response.foreach(println)

askFlatMap

val responseFlatMap: Future[String] = actor.askFlatMap(_.helloFuture("World"))
responseFlatMap.foreach(println)

send

val unitResponse: Unit = actor.send(_.hello("World again!"))

schedule

//schedule a function call on the actor. Returns Future response and TimerTask to cancel.
val scheduleResponse: (Future[String], TimerTask) = actor.scheduleAsk(delay = 1.second)(_.hello("World"))
scheduleResponse._1.foreach(println)

PingPong example using WiredActor

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

object WiredPingPongDemo extends App {

  class WiredPingPong(var pingCount: Int, var pongCount: Int) {
    def ping(replyTo: WiredActor[WiredPingPong]): Unit = {
      pingCount += 1
      println(s"pingCount: $pingCount")
      replyTo.send(_.pong(replyTo))
    }

    def pong(replyTo: WiredActor[WiredPingPong]): Unit = {
      pongCount += 1
      println(s"pongCount: $pongCount")
      replyTo.send(_.ping(replyTo))
    }
  }

  Actor
    .wire(new WiredPingPong(0, 0))
    .send {
      (impl, self) =>
        impl.ping(self)
    }

  Thread.sleep(1.seconds.toMillis)
}

See WiredPingPongStateless for a stateless version of the above WiredPingPong WiredActor.

Actor

Stateless Actor

val actor =
  Actor[Int](
    (message, self) =>
      println(message)
  )

actor ! 1

Stateful Actor

case class State(var counter: Int)

val actor =
  Actor[Int, State](State(0))(
    (message, self) =>
      self.state.counter += 1
  )

Timer actor

A timer actor will process messages in batches after the set delays. Similar to above a stateful timer Actor can also be created.

import scala.concurrent.duration._

//stateless timer actor
val actor =
  Actor.timer[Int](delays = 1.second) {
    (message, self) =>
    //do something
  }

Scheduling messages to self

self.schedule returns a java TimerTask which is cancellable.

val actor =
  Actor[Int](
    (message, self) =>
      self.schedule(message = 1, delay = 1.second)  
  )

Terminating an Actor

val actor =
  Actor[Int](
    (message, self) =>
      println(message)
  )

actor.terminate()
//cannot send messages to a terminated actor.
(actor ! 1) shouldBe Left(Result.TerminatedActor)

Ask - Get a Future response

case class CreateUser(name: String)(val replyTo: ActorRef[Boolean])

val actor = Actor[CreateUser] {
  (message: CreateUser, _) =>
    message.replyTo ! true
}

val response: Future[Boolean] = (actor ? CreateUser("Tony Stark")).right.get

Await.result(response, 1.second)

Terminating an Actor on message failure

By default actors are not terminated if there is a failure processing a message. The following actor enables termination if there is a failure processing a message.

val actor =
  Actor[Int](
    (message, self) =>
      throw new Exception("Kaboom!")
  ).terminateOnException() //enable terminate on exception

(actor ! 1) shouldBe Right(Result.Sent) //first message sent is successful
eventually(actor.isTerminated() shouldBe true) //actor is terminated
(actor ! 2) shouldBe Left(Result.TerminatedActor) //cannot sent messages to a terminated actor

Testing

Borrowing ideas from Akka the TestActor implements APIs to test messages in an Actor's mailbox.

val actor = TestActor[Int]()

actor.expectNoMessage(after = 1.second) //expect a message after delay in the Actor's mailbox
val got = actor.getMessage() //fetch the first message in the actor's mailbox
actor.expectMessage[Int]() //expect a message of some type

PingPong example

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import com.github.simerplaha.actor._

case class Pong(replyTo: ActorRef[Ping])
case class Ping(replyTo: ActorRef[Pong])
case class State(var count: Int)

val ping =
  Actor[Ping, State](State(0)) {
    case (message, self) =>
      self.state.count += 1
      println(s"Ping: ${self.state.count}")
      message.replyTo ! Pong(self)
  }

val pong =
  Actor[Pong, State](State(0)) {
    case (message, self) =>
      self.state.count += 1
      println(s"Pong: ${self.state.count}")
      message.replyTo ! Ping(self)
  }

pong ! Pong(ping)

//run this for 1 seconds
Thread.sleep(1.second.toMillis)