Permalink
Browse files

Adding Semaphore Actor to "synchronize" different actors that should …

…not work in parallel
  • Loading branch information...
atooni committed Jan 3, 2019
1 parent 149820b commit 3d9dc308fb02a110669227b158e2f08d4a3af97b
@@ -2,7 +2,6 @@ package blended.akka

import akka.actor.{ ActorRef, Props }
import blended.akka.internal.ActorSystemCapsule
import blended.akka.protocol.BundleActorStarted
import domino.DominoImplicits
import domino.capsule.CapsuleContext
import org.osgi.framework.BundleContext
@@ -31,7 +30,6 @@ trait ActorSystemWatching extends DominoImplicits {
val actorName = bundleContext.getBundle().getSymbolicName()
log.debug(s"About to create bundle actor for bundle: ${actorName}")
val actorRef = system.actorOf(props, actorName)
system.eventStream.publish(BundleActorStarted(actorName))

capsuleContext.addCapsule(new Capsule {
override def start() {
@@ -0,0 +1,76 @@
package blended.akka

import java.util.UUID

import akka.actor.{Actor, ActorRef, Terminated}
import blended.util.logging.Logger

object SemaphoreActor {

case object Acquired
case object Waiting
case class Acquire(actor : ActorRef)
case class Release(actor : ActorRef)
}

class SemaphoreActor extends Actor {
import SemaphoreActor._

private val id : String = UUID.randomUUID().toString()
private val log : Logger = Logger[SemaphoreActor]

// the pending acquire messages
private[akka] var pending : List[Acquire] = List.empty

override def receive: Receive = open

private[akka] def logPending() : Unit = {
log.trace(s"Semaphore has [${pending.size}] actors waiting")
}

private[akka] def acquire(a : Acquire) : Unit = {
pending = pending.filterNot(_.actor == a.actor)
a.actor ! Acquired
logPending()
context.become(locked(a))
}

private[akka] def release(current : Acquire, a : ActorRef) : Unit = {
if (current.actor == a) {
log.trace(s"Releasing actor [$a]")
context.unwatch(a)
pending match {
case Nil => context.become(open)
case l => acquire(l.last)
}
} else {
pending = pending.filter(_.actor != a)
logPending()
}
}

private[akka] def open : Receive = {
case a : Acquire =>
context.watch(a.actor)
acquire(a)
case r : Release => // do nothing

case m =>
log.warn(s"Unexpected message of type [${m.getClass().getName()}] in semaphore [$id]")
}

private[akka] def locked(lockedBy : Acquire) : Receive = {
case a : Acquire =>
context.watch(a.actor)
if (lockedBy.actor == a.actor) {
a.actor ! Acquired
} else {
pending = (a :: pending).distinct
logPending()
a.actor ! Waiting
}

case r : Release => release(lockedBy, r.actor)
case Terminated(a) => release(lockedBy, a)
}
}

This file was deleted.

Oops, something went wrong.
@@ -0,0 +1,103 @@
package blended.akka

import akka.actor.{ActorRef, ActorSystem, Props}
import akka.testkit.{ImplicitSender, TestActorRef, TestKit, TestProbe}
import blended.testsupport.scalatest.LoggingFreeSpecLike
import org.scalatest.Matchers

class SemaphoreActorSpec extends TestKit(ActorSystem("semaphore"))
with LoggingFreeSpecLike
with Matchers
with ImplicitSender {

"The Semaphore Actor should" - {

"Allow to acquire a lock" in {

val probe : TestProbe = TestProbe()
val sem : ActorRef = system.actorOf(Props[SemaphoreActor])

sem ! SemaphoreActor.Acquire(probe.ref)

probe.expectMsg(SemaphoreActor.Acquired)
}

"Allow to aquire the lock if the lock is already acquired by the same actor" in {
val probe : TestProbe = TestProbe()
val sem : ActorRef = system.actorOf(Props[SemaphoreActor])

sem ! SemaphoreActor.Acquire(probe.ref)
probe.expectMsg(SemaphoreActor.Acquired)

sem ! SemaphoreActor.Acquire(probe.ref)
probe.expectMsg(SemaphoreActor.Acquired)
}

"Deny to acquire a lock as long as another Actor is holding the semaphore" in {
val probe1 : TestProbe = TestProbe()
val probe2 : TestProbe = TestProbe()

val sem : ActorRef = system.actorOf(Props[SemaphoreActor])

sem ! SemaphoreActor.Acquire(probe1.ref)
probe1.expectMsg(SemaphoreActor.Acquired)

sem ! SemaphoreActor.Acquire(probe2.ref)
probe2.expectMsg(SemaphoreActor.Waiting)
}

"Allow to acquire a lock after the semaphore has been released" in {
val probe1 : TestProbe = TestProbe()
val probe2 : TestProbe = TestProbe()

val sem : ActorRef = system.actorOf(Props[SemaphoreActor])

sem ! SemaphoreActor.Acquire(probe1.ref)
probe1.expectMsg(SemaphoreActor.Acquired)

sem ! SemaphoreActor.Acquire(probe2.ref)
probe2.expectMsg(SemaphoreActor.Waiting)

sem ! SemaphoreActor.Release(probe1.ref)
probe2.expectMsg(SemaphoreActor.Acquired)
}

"Release the lock if the locking actor dies" in {
val probe1 : TestProbe = TestProbe()
val probe2 : TestProbe = TestProbe()

val sem : ActorRef = system.actorOf(Props[SemaphoreActor])

sem ! SemaphoreActor.Acquire(probe1.ref)
probe1.expectMsg(SemaphoreActor.Acquired)

sem ! SemaphoreActor.Acquire(probe2.ref)
probe2.expectMsg(SemaphoreActor.Waiting)

system.stop(probe1.ref)
probe2.expectMsg(SemaphoreActor.Acquired)
}

"Remove the actor from the waiting list if the actor dies" in {
val probe1 : TestProbe = TestProbe()
val probe2 : TestProbe = TestProbe()
val probe3 : TestProbe = TestProbe()

val sem = TestActorRef[SemaphoreActor]

sem ! SemaphoreActor.Acquire(probe1.ref)
probe1.expectMsg(SemaphoreActor.Acquired)

sem ! SemaphoreActor.Acquire(probe2.ref)
sem ! SemaphoreActor.Acquire(probe3.ref)
probe2.expectMsg(SemaphoreActor.Waiting)
probe3.expectMsg(SemaphoreActor.Waiting)

system.stop(probe2.ref)
system.stop(probe1.ref)
probe3.expectMsg(SemaphoreActor.Acquired)

sem.underlyingActor.pending should be (empty)
}
}
}
@@ -2,21 +2,32 @@ package blended.file

import java.io.{File, FilenameFilter}

import akka.actor.{Actor, ActorLogging, Props}
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.pattern.ask
import akka.util.Timeout
import blended.akka.SemaphoreActor.{Acquire, Acquired, Release, Waiting}
import blended.util.logging.Logger

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

object FilePollActor {

def props(cfg: FilePollConfig, handler: FilePollHandler) : Props =
Props(new FilePollActor(cfg, handler))
def props(
cfg: FilePollConfig,
handler: FilePollHandler,
sem : Option[ActorRef] = None
) : Props =
Props(new FilePollActor(cfg, handler, sem))
}

class FilePollActor(cfg: FilePollConfig, handler: FilePollHandler) extends Actor with ActorLogging {
class FilePollActor(
cfg: FilePollConfig,
handler: FilePollHandler,
sem : Option[ActorRef]
) extends Actor {

private val log : Logger = Logger[FilePollActor]
case object Tick

private[this] implicit val eCtxt : ExecutionContext = context.system.dispatcher
@@ -35,7 +46,7 @@ class FilePollActor(cfg: FilePollConfig, handler: FilePollHandler) extends Actor
new File(l)

if (f.exists()) {
log.info(s"Directory is locked with file [${f.getAbsolutePath()}]")
log.info(s"Directory for [${cfg.id}] is locked with file [${f.getAbsolutePath()}]")
true
} else {
false
@@ -50,7 +61,7 @@ class FilePollActor(cfg: FilePollConfig, handler: FilePollHandler) extends Actor
}

if (!srcDir.exists() || !srcDir.isDirectory() || !srcDir.canRead()) {
log.info(s"Directory [$srcDir] does not exist or is not readable.")
log.info(s"Directory [$srcDir] for [${cfg.id}]does not exist or is not readable.")
List.empty
} else if (locked()) {
List.empty
@@ -61,15 +72,31 @@ class FilePollActor(cfg: FilePollConfig, handler: FilePollHandler) extends Actor
if (cfg.pattern.isEmpty || cfg.pattern.forall(p => name.matches(p))) {
val f = new File(dir, name)
f.exists() && f.isFile() && f.canRead()
} else false
} else {
false
}
}
}).toList
}
}

override def receive: Receive = {
override def receive: Receive = idle

private[file] def idle : Receive = {
case Tick =>
log.info(s"Executing File Poll for directory [${cfg.sourceDir}]")
if (sem.isDefined) {
sem.foreach { s =>
log.trace(s"Using semaphore actor [$s] to schedule file poll in [${cfg.id}]")
s ! Acquire(self)
}
} else {
self ! Acquired
}

case Waiting => // Do nothing - just wait

case Acquired =>
log.info(s"Executing File Poll in [${cfg.id}] for directory [${cfg.sourceDir}]")

val futures : Iterable[Future[FileProcessed]] = files().map { f =>
context.actorOf(Props[FileProcessActor]).ask(FileProcessCmd(f, cfg, handler)).mapTo[FileProcessed]
@@ -80,13 +107,14 @@ class FilePollActor(cfg: FilePollConfig, handler: FilePollHandler) extends Actor
listFuture.onComplete { c =>
c match {
case Failure(t) =>
log.warning(s"Error processing directory [${cfg.sourceDir}] : [${t.getMessage()}]")
log.warn(s"Error processing directory [${cfg.sourceDir}] in [${cfg.id}] : [${t.getMessage()}]")

case Success(results) =>
val succeeded = results.count(_.success)
log.info(s"Processed [$succeeded] of [${results.size}] files from [${cfg.sourceDir}], ")
log.info(s"Processed [$succeeded] of [${results.size}] files in [${cfg.id}] from [${cfg.sourceDir}], ")
}

sem.foreach(_ ! Release(self))
context.system.scheduler.scheduleOnce(cfg.interval, self, Tick)
}
}
@@ -32,7 +32,6 @@ class FileProcessActor extends Actor with ActorLogging {

cmd.handler.processFile(cmd, tempFile)(context.system) match {
case Success(_) =>
cmd.handler.processFile(cmd, tempFile)(context.system)
requestor ! FileProcessed(cmd, success = true)

val archiveCmd = cmd.cfg.backup match {
@@ -35,6 +35,8 @@ class JMSFilePollHandler(

val env : FlowEnvelope = createEnvelope(cmd, f)

log.trace(s"Handling polled file in JMSHandler : [${env.flowMessage.header.mkString(",")}]")

sendMessages(
producerSettings = settings,
log = log,
@@ -1,5 +1,6 @@
package blended.file
import java.io.File
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.ActorSystem

@@ -13,5 +14,9 @@ class FailingFileHandler extends FilePollHandler {

class SucceedingFileHandler extends FilePollHandler {

override def processFile(cmd: FileProcessCmd, f: File)(implicit system: ActorSystem): Try[Unit] = Try {}
val count : AtomicInteger = new AtomicInteger(0)

override def processFile(cmd: FileProcessCmd, f: File)(implicit system: ActorSystem): Try[Unit] = Try {
count.incrementAndGet()
}
}
Oops, something went wrong.

0 comments on commit 3d9dc30

Please sign in to comment.