Skip to content

Commit

Permalink
Merge branch 'release-3.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
atooni committed Sep 6, 2019
2 parents 973c837 + b09a53f commit 118776a
Show file tree
Hide file tree
Showing 17 changed files with 653 additions and 424 deletions.
Expand Up @@ -62,8 +62,8 @@ class EnvelopeFileDropper(

private[this] def handleError(env : FlowEnvelope, error : Throwable) : FileDropResult = {
log.error(s"Error dropping envelope [${env.id}] to file : [${error.getMessage()}]")
val cmd = dropCmd(env)(_ => Success(ByteString(""))).unwrap
dropActor ! FileDropAbort(error)
val cmd = dropCmd(env)(e => Success(ByteString(""))).get
dropActor ! FileDropAbort(env.id, error)
FileDropResult(cmd, Some(error))
}

Expand All @@ -77,7 +77,10 @@ class EnvelopeFileDropper(
implicit val eCtxt : ExecutionContext = system.dispatcher

(dropActor ? cmd).mapTo[FileDropResult].onComplete {
case Success(r) => p.complete(Success(r))
case Success(r) => r.error match {
case None => p.complete(Success(r))
case Some(t) => p.complete(Success(handleError(env, t)))
}
case Failure(t) => p.complete(Success(handleError(env, t)))
}

Expand Down
219 changes: 113 additions & 106 deletions blended.file/src/main/scala/blended/file/FileDropActor.scala
Expand Up @@ -10,7 +10,7 @@ import akka.actor.{Actor, ActorRef}
import akka.util.ByteString
import blended.util.logging.Logger

import scala.util.Try
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal

case class FileDropCommand(
Expand All @@ -24,7 +24,7 @@ case class FileDropCommand(
properties: Map[String, Any]
) {

override def equals(obj: scala.Any): Boolean = obj match {
override def equals(obj: Any): Boolean = obj match {
case cmd : FileDropCommand =>
content.equals(cmd.content) &&
directory.equals(cmd.directory) &&
Expand All @@ -36,11 +36,35 @@ case class FileDropCommand(
case _ => false
}

override def hashCode(): Int = toString().hashCode()

override def toString: String = {

val ts = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss:SSS").format(new Date(timestamp))
s"FileDropCommand[$id](dir = [$directory], fileName = [$fileName], compressed = $compressed, append = $append, timestamp = [$ts], content-size = ${content.length}), " +
s"properties=${properties.mkString("[", ",", "]")}"
s"FileDropCommand[$id](dir = [$directory], fileName = [$fileName], compressed = $compressed, append = $append, " +
s"timestamp = [$ts], content-size = ${content.length}), properties=${properties.mkString("[", ",", "]")}"
}

// determine the final file name for a file drop
val finalFile : File = {

val file = new File(directory, fileName)

if (!append) {
if (file.exists()) {
// In case we need to generate a new file name
new File(directory, fileName.lastIndexOf('.') match {
case -1 => s"dup_${System.currentTimeMillis()}_${fileName}"
case pos => s"${fileName.substring(0, pos)}.dup_${System.currentTimeMillis()}${fileName.substring(pos)}"
})
} else {
// In case we do not append and we can generate a new file
file
}
} else {
// In case we append the content to a file, we keep the same final file name
file
}
}
}

Expand All @@ -52,12 +76,15 @@ object FileDropResult {

case class FileDropResult(cmd: FileDropCommand, error: Option[Throwable])
case object FileDropChunk
case class FileDropAbort(t:Throwable)
case class FileDropAbort(id: String, t:Throwable)

class FileDropController extends Actor {
override def receive: Receive = Actor.emptyBehavior
}

class FileDropActor extends Actor {

/**
*
* @param requestor The actor which has requested the filedrop and expects a response.
* @param cmd The FileDropCommand to execute.
* @param tmpFile If in append mode, tmpFile will point to a copy of the original file
Expand All @@ -71,49 +98,32 @@ class FileDropActor extends Actor {
tmpFile : Option[File],
outFile : File,
is : Option[InputStream],
os : Option[OutputStream]
os : Option[OutputStream],
error : Option[Throwable]
)

private val log : Logger = Logger[FileDropActor]

override def preStart(): Unit = context.become(idle)
override def preStart(): Unit = context.become(idle(Seq.empty))

private def checkDirectory(dir: File) : Boolean = {
private def checkDirectory(dir: File) : Try[File] = {

if (!dir.exists()) {
log.debug(s"Creating directory [${dir.getAbsolutePath}]")
dir.mkdirs()
}

dir.exists() && dir.isDirectory && dir.canWrite
}

// determine the final file name for a file drop
private def finalFile(cmd: FileDropCommand) : File = {

val file = new File(cmd.directory, cmd.fileName)

if (!cmd.append) {
if (file.exists()) {
// In case we need to generate a new file name
new File(cmd.directory, cmd.fileName.lastIndexOf('.') match {
case -1 => s"dup_${System.currentTimeMillis()}_${cmd.fileName}"
case pos => s"${cmd.fileName.substring(0, pos)}.dup_${System.currentTimeMillis()}${cmd.fileName.substring(pos)}"
})
} else {
// In case we do not append and we can generate a new file
file
}
if (dir.exists() && dir.isDirectory() && dir.canWrite()) {
Success(dir)
} else {
// In case we append the content to a file, we keep the same final file name
file
Failure(new Exception(s"Directory [${dir.getAbsolutePath()}] does not exist or is not writable."))
}
}

// A temp file is only created when we need to append to an existing file.
def tmpFile(cmd: FileDropCommand) : Option[File] = {
if (cmd.append) {
val file = finalFile(cmd)
val file : File = cmd.finalFile
if (file.exists()) {
val tmpName = s"${cmd.fileName}.${cmd.timestamp}.tmp"
val tmpFile = new File(cmd.directory, tmpName)
Expand All @@ -133,7 +143,7 @@ class FileDropActor extends Actor {
new File(cmd.directory, s"${cmd.fileName}.${cmd.timestamp}.out")

// prepare the output stream, if required for append
def prepareOutputStream(cmd: FileDropCommand) : (OutputStream, Option[File], File) = {
def prepareOutputStream(cmd: FileDropCommand) : Try[(OutputStream, Option[File], File)] = Try {

val tf = tmpFile(cmd)

Expand Down Expand Up @@ -169,51 +179,73 @@ class FileDropActor extends Actor {
if (next.isEmpty) None else Some(zis)
}
} else {
log.debug(s"Writing content without compression to ${outFile(cmd)}")
log.debug(s"Writing content for cmd [${cmd.id}] without compression to [${outFile(cmd)}]")
Some(new ByteArrayInputStream(cmd.content.toArray))
}
}

private[this] def respond(
state : FileDropState,
t : Option[Throwable] = None
) : Unit = {

private def cleanUp(state : FileDropState) : Unit = {
Try(state.is.foreach(_.close()))
Try(state.os.foreach(_.close()))

val fdr = FileDropResult.result(state.cmd, t)

if (t.isDefined) {
if (state.error.isDefined) {
// in case an error was encountered, we will restore the original file
// and forget the append
state.tmpFile.foreach { tf => tf.renameTo(finalFile(state.cmd)) }
state.tmpFile.foreach { tf => tf.renameTo(state.cmd.finalFile) }
state.outFile.delete()
} else {
// In case the command was successful, we will delete the tmpfile
// and create the final file
state.tmpFile.foreach(_.delete())
state.outFile.renameTo(finalFile(state.cmd))
state.outFile.renameTo(state.cmd.finalFile)
}
state.requestor ! fdr
}

context.become(idle)
private[this] def respond(
state : FileDropState,
pending : Seq[FileDropState]
) : Unit = {

cleanUp(state)
state.requestor ! FileDropResult.result(state.cmd, state.error)

pending match {
case Seq() =>
log.debug(s"No more pending Filedrops...switching to idle state")
context.become(idle(Seq.empty))
case s =>
val state : FileDropState = s.last
log.debug(s"Scheduling pending file drop [${state.cmd.id}]")
context.become(dropping(state, s.take(s.size - 1)))
self ! FileDropChunk
}
}

override def receive: Receive = Actor.emptyBehavior

// While in dropping state we drop one chunk after the other
private def dropping(state : FileDropState) : Receive = {
private def dropping(state : FileDropState, pending : Seq[FileDropState]) : Receive = {
case cmd : FileDropCommand =>
sender() ! FileDropResult(cmd, Some(new Exception("Filedropper is busy")))
val newState : FileDropState = createDropState(cmd, sender())
log.debug(s"File drop for [${cmd.id}] is pending, [${pending.size + 1}] pending in total")
context.become(dropping(state, pending ++ Seq(newState)))
self ! FileDropChunk

// Abort the file drop
case FileDropAbort(t) =>
respond(state, Some(t))
case FileDropAbort(id, t) =>
if (state.cmd.id == id) {
respond(state.copy(error = Some(t)), pending)
} else {
pending.find(_.cmd.id == id).foreach{ s =>
cleanUp(s)
s.requestor ! FileDropResult.result(s.cmd, Some(t))
context.become(dropping(state, pending.filter(_.cmd.id != id)))
}
}

// Drop the next chunk to the out directory
case FileDropChunk =>
log.debug("Dropping chunk")
log.trace("Dropping chunk")
(state.is, state.os) match {
// Streams are still open, so we can proceed
case (Some(in), Some(out)) =>
Expand All @@ -226,77 +258,52 @@ class FileDropActor extends Actor {
out.write(buffer, 0, cnt)
self ! FileDropChunk
} else {
log.info(s"Successfully executed [${state.cmd}] and created file [${finalFile(state.cmd).getAbsolutePath}]")
respond(state, None)
log.info(s"Successfully executed [${state.cmd}] and created file [${state.cmd.finalFile.getAbsolutePath}]")
respond(state.copy(error = None), pending)
}
} catch {
case NonFatal(e) =>
log.warn(e)(s"Error while dropping file [${e.getMessage()}]")
respond(state, Some(e))
respond(state. copy(error = Some(e)), pending)
}
// The input or output stream is not open
case (_,_) =>
respond(state, Some(new Exception("Illegal stream state in FileDropActor")))
respond(state.copy(error = Some(new Exception("Illegal stream state in FileDropActor"))), pending)
}
}

private def idle : Receive = {

case FileDropChunk =>
case FileDropAbort(_) =>

case cmd : FileDropCommand =>

val requestor = sender()
val outdir = new File(cmd.directory)

var tf : Option[File] = None

var is : Option[InputStream] = None
var os : Option[OutputStream] = None
private def createDropState(cmd : FileDropCommand, requestor: ActorRef) : FileDropState = {
checkDirectory(new File(cmd.directory)) match {
case Success(_) => prepareOutputStream(cmd) match {
case Success((s, tf, of)) =>
FileDropState(requestor = requestor, cmd = cmd, tmpFile = tf,
outFile = of, is = inputStream(cmd), os = Some(s), error = None
)
case Failure(t) =>
FileDropState(requestor = requestor, cmd = cmd, tmpFile = None,
outFile = outFile(cmd), is = None, os = None, error = Some(t)
)
}

if (checkDirectory(outdir)) {
case Failure(t) =>
FileDropState(requestor = requestor, cmd = cmd, tmpFile = None,
outFile = outFile(cmd), is = None, os = None, error = Some(t)
)
}
}

try {
val (s, tf, of) = prepareOutputStream(cmd)
os = Some(s)
is = inputStream(cmd)
private def idle(pending : Seq[FileDropState]) : Receive = {

val state : FileDropState = FileDropState(
requestor = requestor,
cmd = cmd,
tmpFile = tf,
outFile = of,
is = is,
os = os
)
case FileDropChunk =>
case FileDropAbort(_, _) =>

context.become(dropping(state))
self ! FileDropChunk
} catch {
case NonFatal(t) =>
log.warn(s"Error executing $cmd: ${t.getMessage}")

respond(FileDropState(
requestor = requestor,
cmd = cmd,
tmpFile = tf,
outFile = outFile(cmd),
is = is,
os = os
), Some(t))
}
case cmd : FileDropCommand =>
val state : FileDropState = createDropState(cmd, sender())
if(state.error.isEmpty) {
context.become(dropping(state, pending))
self ! FileDropChunk
} else {
val msg = s"The directory [${outdir.getAbsolutePath}] does not exist or is not writable."
log.warn(msg)
respond(FileDropState(
requestor = requestor,
cmd = cmd,
tmpFile = tf,
outFile = outFile(cmd),
is = is,
os = os
), Some(new Exception(msg)))
respond(state, pending)
}
}
}
21 changes: 8 additions & 13 deletions blended.file/src/main/scala/blended/file/FileDropStage.scala
Expand Up @@ -11,19 +11,14 @@ import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

/**
* The Filedrop Stream consumes messages from a given upstream producing
* FlowEnvelope. Each FlowEnvelope will by writen to the designated file
* drop location using an instance of a file drop actor. The file actor
* responds with a FileDropResult, which is passed further downstream.
*
* Users of the Filedrop Stage must implement the logic of handling FileDropResults
* if required.
*
* @param name
* @param config
* @param log
* @param system
*/
* The Filedrop Stream consumes messages from a given upstream producing
* FlowEnvelope. Each FlowEnvelope will by written to the designated file
* drop location using an instance of a file drop actor. The file actor
* responds with a FileDropResult, which is passed further downstream.
*
* Users of the Filedrop Stage must implement the logic of handling FileDropResults
* if required.
*/
class FileDropStage(
name : String,
config : FileDropConfig,
Expand Down

0 comments on commit 118776a

Please sign in to comment.