Skip to content

Commit

Permalink
Working Worker C8 example.
Browse files Browse the repository at this point in the history
  • Loading branch information
pme123 committed Jan 19, 2025
1 parent 0ef0d75 commit 3af0336
Show file tree
Hide file tree
Showing 36 changed files with 768 additions and 255 deletions.
26 changes: 25 additions & 1 deletion 02-bpmn/src/main/scala/camundala/bpmn/exports.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,32 @@ case class GeneralVariables(
end GeneralVariables

object GeneralVariables:
given InOutCodec[GeneralVariables] = deriveCodec
given InOutCodec[GeneralVariables] = CirceCodec.from(decoder, deriveInOutEncoder)
given ApiSchema[GeneralVariables] = deriveApiSchema

implicit val decoder: Decoder[GeneralVariables] = new Decoder[GeneralVariables] :
final def apply(c: HCursor): Decoder.Result[GeneralVariables] =
for
servicesMocked <- c.downField("servicesMocked").as[Option[Boolean]].map(_.getOrElse(false))
mockedWorkers <- c.downField("mockedWorkers").as[Option[Seq[String]]].map(_.getOrElse(Seq.empty))
outputMock <- c.downField("outputMock").as[Option[Json]]
outputServiceMock <- c.downField("outputServiceMock").as[Option[Json]]
manualOutMapping <- c.downField("manualOutMapping").as[Option[Boolean]].map(_.getOrElse(false))
outputVariables <- c.downField("outputVariables").as[Option[Seq[String]]].map(_.getOrElse(Seq.empty))
handledErrors <- c.downField("handledErrors").as[Option[Seq[String]]].map(_.getOrElse(Seq.empty))
regexHandledErrors <- c.downField("regexHandledErrors").as[Option[Seq[String]]].map(_.getOrElse(Seq.empty))
impersonateUserId <- c.downField("impersonateUserId").as[Option[String]]
yield GeneralVariables(
servicesMocked,
mockedWorkers,
outputMock,
outputServiceMock,
manualOutMapping,
outputVariables,
handledErrors,
regexHandledErrors,
impersonateUserId
)
end GeneralVariables

lazy val regexHandledErrorsDescr =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package camundala
package worker

import camundala.domain.*
import camundala.bpmn.*
import camundala.worker.CamundalaWorkerError.ServiceError
import camundala.domain.*

import java.time.{LocalDate, LocalDateTime}
import scala.reflect.ClassTag
Expand Down
16 changes: 9 additions & 7 deletions 03-worker/src/main/scala/camundala/worker/Handler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ trait WorkerHandler:
def worker: Worker[?, ?, ?]
def topic: String

def applicationName: String
def registerHandler(): Unit
def projectName: String
def registerHandler( register: => Unit): Unit =
val appPackageName = applicationName.replace("-", ".")
val appPackageName = projectName.replace("-", ".")
val testMode = sys.env.get("WORKER_TEST_MODE").contains("true") // did not work with lazy val
if testMode || getClass.getName.startsWith(appPackageName)
then
Expand Down Expand Up @@ -66,6 +65,10 @@ object ValidationHandler:
funct(in)
end ValidationHandler


type InitProcessFunction =
EngineContext ?=> Either[InitProcessError, Map[String, Any]]

/** handler for Custom Process Initialisation. All the variables in the Result Map will be put on
* the process.
*
Expand Down Expand Up @@ -98,18 +101,17 @@ end ValidationHandler
trait InitProcessHandler[
In <: Product: InOutCodec
]:
def init(input: In): Either[InitProcessError, Map[String, Any]]
def init(input: In): InitProcessFunction
end InitProcessHandler

object InitProcessHandler:
def apply[
In <: Product: InOutCodec
](
funct: In => Either[InitProcessError, Map[String, Any]],
funct: In => InitProcessFunction,
processLabels: ProcessLabels
): InitProcessHandler[In] =
new InitProcessHandler[In]:
override def init(in: In): Either[InitProcessError, Map[String, Any]] =
override def init(in: In): InitProcessFunction =
funct(in)
.map:
_ ++ processLabels.toMap
Expand Down
28 changes: 26 additions & 2 deletions 03-worker/src/main/scala/camundala/worker/JobWorker.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,36 @@
package camundala.worker


import scala.concurrent.duration.*


trait JobWorker:
def topic: String
def timeout: Duration = 10.seconds

protected def errorHandled(error: CamundalaWorkerError, handledErrors: Seq[String]): Boolean =
error.isMock || // if it is mocked, it is handled in the error, as it also could be a successful output
handledErrors.contains(error.errorCode.toString) || handledErrors.map(
_.toLowerCase
).contains("catchall")

protected def regexMatchesAll(
errorHandled: Boolean,
error: CamundalaWorkerError,
regexHandledErrors: Seq[String]
) =
errorHandled && regexHandledErrors.forall(regex =>
error.errorMsg.matches(s".*$regex.*")
)

protected def filteredOutput(
outputVariables: Seq[String],
allOutputs: Map[String, Any]
): Map[String, Any] =
outputVariables match
case filter if filter.isEmpty => allOutputs
case filter =>
allOutputs
.filter:
case k -> _ => filter.contains(k)

end filteredOutput
end JobWorker
11 changes: 7 additions & 4 deletions 03-worker/src/main/scala/camundala/worker/WorkerApp.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package camundala.worker

import zio.*
import zio.ZIO.*

import scala.compiletime.uninitialized

trait WorkerApp extends ZIOAppDefault:
def workerClients: Seq[WorkerClient[?]]
var theWorkers: Set[JobWorker] = uninitialized
def workerRegistries: Seq[WorkerRegistry[?]]
protected var theWorkers: Set[JobWorker] = uninitialized

override val bootstrap: ZLayer[ZIOAppArgs, Any, Any] = ZioLogger.logger

def workers(dWorkers: (JobWorker | Seq[JobWorker])*): Unit =
theWorkers = dWorkers
.flatMap:
Expand All @@ -17,6 +20,6 @@ trait WorkerApp extends ZIOAppDefault:

override def run: ZIO[Any, Any, Any] =
for
_ <- Console.printLine("Starting WorkerApp")
_ <- ZIO.collectAllPar(workerClients.map(_.run(theWorkers)))
_ <- logInfo("Starting WorkerApp")
_ <- collectAllPar(workerRegistries.map(_.register(theWorkers)))
yield ()
10 changes: 0 additions & 10 deletions 03-worker/src/main/scala/camundala/worker/WorkerClient.scala

This file was deleted.

10 changes: 2 additions & 8 deletions 03-worker/src/main/scala/camundala/worker/WorkerDsl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ import scala.reflect.ClassTag
trait WorkerDsl[In <: Product: InOutCodec, Out <: Product: InOutCodec]
extends JobWorker:

protected def engineContext: EngineContext

protected def logger: WorkerLogger

// needed that it can be called from CSubscriptionPostProcessor
def worker: Worker[In, Out, ?]
def topic: String = worker.topic
Expand Down Expand Up @@ -171,12 +167,10 @@ private trait InitProcessDsl[
InitIn <: Product: InOutCodec,
InConfig <: Product: InOutCodec
]:
protected def engineContext: EngineContext

protected def customInit(in: In): InitIn

// by default the InConfig is initialized
final def initProcess(in: In): Either[InitProcessError, Map[String, Any]] =
final def initProcess(in: In)(using engineContext: EngineContext): Either[InitProcessError, Map[String, Any]] =
val inConfig = in match
case i: WithConfig[?] =>
initConfig(
Expand All @@ -201,7 +195,7 @@ private trait InitProcessDsl[
private def initConfig(
optConfig: Option[InConfig],
defaultConfig: InConfig
): Map[String, Any] =
)(using engineContext: EngineContext): Map[String, Any] =
val defaultJson = defaultConfig.asJson
val r = optConfig.map {
config =>
Expand Down
53 changes: 28 additions & 25 deletions 03-worker/src/main/scala/camundala/worker/WorkerExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,43 @@ case class WorkerExecutor[

def execute(
processVariables: Seq[Either[BadVariableError, (String, Option[Json])]]
) =
): Either[CamundalaWorkerError, Map[String, Any]] =
for
validatedInput <- InputValidator.validate(processVariables)
initializedOutput <- Initializer.initVariables(validatedInput)
mockedOutput <- OutMocker.mockedOutput(validatedInput)
validatedInput <- InputValidator.validate(processVariables)
initializedOutput <- Initializer.initVariables(validatedInput)(using context.engineContext)
mockedOutput <- OutMocker.mockedOutput(validatedInput)
// only run the work if it is not mocked
output <- if mockedOutput.isEmpty then WorkRunner.run(validatedInput) else Right(mockedOutput.get)
allOutputs = camundaOutputs(validatedInput, initializedOutput, output)
filteredOut = filteredOutput(allOutputs)
output <-
if mockedOutput.isEmpty then WorkRunner.run(validatedInput) else Right(mockedOutput.get)
allOutputs = camundaOutputs(validatedInput, initializedOutput, output)
filteredOut = filteredOutput(allOutputs)
// make MockedOutput as error if mocked
_ <- if mockedOutput.isDefined then Left(MockedOutput(filteredOut)) else Right(())
_ <- if mockedOutput.isDefined then Left(MockedOutput(filteredOut)) else Right(())
yield filteredOut

object InputValidator:
lazy val prototype = worker.in
lazy val prototype = worker.in
lazy val validationHandler = worker.validationHandler

def validate(
inputParamsAsJson: Seq[Either[Any, (String, Option[Json])]]
): Either[ValidatorError, In] =
val jsonResult: Either[ValidatorError, Seq[(String, Option[Json])]] =
val jsonResult: Either[ValidatorError, Seq[(String, Option[Json])]] =
inputParamsAsJson
.partition(_.isRight) match
case (successes, failures) if failures.isEmpty =>
Right(
successes.collect { case Right(value) => value }
)
case (_, failures) =>
case (_, failures) =>
Left(
ValidatorError(
failures
.collect { case Left(value) => value }
.mkString("Validator Error(s):\n - ", " - ", "\n")
)
)
val json: Either[ValidatorError, JsonObject] = jsonResult
val json: Either[ValidatorError, JsonObject] = jsonResult
.map(_.foldLeft(JsonObject()) { case (jsonObj, jsonKey -> jsonValue) =>
if jsonValue.isDefined
then jsonObj.add(jsonKey, jsonValue.get)
Expand All @@ -64,23 +65,25 @@ case class WorkerExecutor[
.map(ex => ValidatorError(errorMsg = ex.errorMsg))
.flatMap(in => validationHandler.map(h => h.validate(in)).getOrElse(Right(in)))
)
val in = toIn(json)

val in = toIn(json)
val result = in.flatMap:
case i: WithConfig[?] =>
val newIn =
for
jsonObj: JsonObject <- json
inputVariables = jsonObj.toMap
jsonObj: JsonObject <- json
inputVariables = jsonObj.toMap
configJson: JsonObject =
inputVariables.get("inConfig").getOrElse(i.defaultConfigAsJson).asObject.get
newJsonConfig = worker.inConfigVariableNames
.foldLeft(configJson):
case (configJson, n) =>
if jsonObj.contains(n)
then configJson.add(n, jsonObj(n).get)
else configJson
newJsonConfig = worker.inConfigVariableNames
.foldLeft(configJson):
case (configJson, n) =>
if jsonObj.contains(n)
then configJson.add(n, jsonObj(n).get)
else configJson
yield jsonObj.add("inConfig", newJsonConfig.asJson)
toIn(newIn)

case x =>
in
result
Expand All @@ -95,7 +98,7 @@ case class WorkerExecutor[

def initVariables(
validatedInput: In
): Either[InitProcessError, Map[String, Any]] =
): InitProcessFunction =
worker.initProcessHandler
.map { vi =>
vi.init(validatedInput).map(_ ++ defaultVariables)
Expand All @@ -115,10 +118,10 @@ case class WorkerExecutor[
case (_, Some(outputMock), _) =>
decodeMock(outputMock)
// if your worker is mocked we use the default mock
case (true, None, None) =>
case (true, None, None) =>
worker.defaultMock(in).map(Some(_))
// otherwise it is not mocked or it is a service mock which is handled in service Worker during running
case (_, None, _) =>
case (_, None, _) =>
Right(None)
end mockedOutput

Expand Down Expand Up @@ -150,7 +153,7 @@ case class WorkerExecutor[
(output match
case o: NoOutput =>
context.toEngineObject(o)
case _ =>
case _ =>
context.toEngineObject(output.asInstanceOf[Out])
)
private def filteredOutput(
Expand Down
12 changes: 12 additions & 0 deletions 03-worker/src/main/scala/camundala/worker/WorkerRegistry.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package camundala.worker

import zio.ZIO
import zio.ZIO.*

trait WorkerRegistry[T <: JobWorker]:
def register(workers: Set[JobWorker]): ZIO[Any, Any, Any] =
logInfo(s"Registering Workers for ${getClass.getSimpleName}") *>
registerWorkers(workers.collect { case w: T => w })

protected def registerWorkers(workers: Set[T]): ZIO[Any, Any, Any]
end WorkerRegistry
33 changes: 33 additions & 0 deletions 03-worker/src/main/scala/camundala/worker/ZioLogger.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package camundala.worker

import org.slf4j.{Logger, LoggerFactory}
import zio.*
import zio.logging.*
import zio.logging.backend.SLF4J

object ZioLogger:
val logger = Runtime.removeDefaultLoggers >>> SLF4J.slf4j

end ZioLogger

case class Slf4JLogger(private val delegateLogger: Logger) extends WorkerLogger:

def debug(message: String): Unit =
if delegateLogger.isDebugEnabled then
delegateLogger.debug(message)

def info(message: String): Unit =
if delegateLogger.isInfoEnabled then
delegateLogger.info(message)

def warn(message: String): Unit =
if delegateLogger.isWarnEnabled then
delegateLogger.warn(message)

def error(err: CamundalaWorkerError): Unit =
if delegateLogger.isErrorEnabled then
delegateLogger.error(s"Error ${err.causeMsg}")
end Slf4JLogger
object Slf4JLogger:
def logger(name: String) = Slf4JLogger(LoggerFactory.getLogger(name))
end Slf4JLogger
7 changes: 4 additions & 3 deletions 03-worker/src/main/scala/camundala/worker/ast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ sealed trait Worker[
// no handler for mocking - all done from the InOut Object
def runWorkHandler: Option[RunWorkHandler[In, Out]] = None
// helper
def variableNames: Seq[String] =
lazy val variableNames: Seq[String] =
(in.productElementNames.toSeq ++
otherEnumInExamples
.map:
_.flatMap(_.productElementNames)
.toSeq.flatten).distinct
.toSeq.flatten).distinct ++
inConfigVariableNames

def inConfigVariableNames: Seq[String] =
lazy val inConfigVariableNames: Seq[String] =
in match
case i: WithConfig[?] =>
i.defaultConfig.productElementNames.toSeq
Expand Down
Loading

0 comments on commit 3af0336

Please sign in to comment.