Skip to content

Commit

Permalink
Working Worker C7 example.
Browse files Browse the repository at this point in the history
  • Loading branch information
pme123 committed Jan 20, 2025
1 parent 3af0336 commit 4584d68
Show file tree
Hide file tree
Showing 13 changed files with 429 additions and 28 deletions.
5 changes: 4 additions & 1 deletion 03-worker/src/main/scala/camundala/worker/JobWorker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import scala.concurrent.duration.*

trait JobWorker:
def topic: String
protected def worker: Worker[?, ?, ?]

def timeout: Duration = 10.seconds

protected def errorHandled(error: CamundalaWorkerError, handledErrors: Seq[String]): Boolean =
Expand All @@ -17,8 +19,9 @@ trait JobWorker:
error: CamundalaWorkerError,
regexHandledErrors: Seq[String]
) =
val errorMsg = error.errorMsg.replace("\n", "")
errorHandled && regexHandledErrors.forall(regex =>
error.errorMsg.matches(s".*$regex.*")
errorMsg.matches(s".*$regex.*")
)

protected def filteredOutput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ object C7NoAuthClient extends C7Client:
ZIO.attempt:
ExternalTaskClient.create()
.baseUrl("http://localhost:8887/engine-rest")
.asyncResponseTimeout(15000)
// .asyncResponseTimeout(10000)
.customizeHttpClient: httpClientBuilder =>
httpClientBuilder.setDefaultRequestConfig(RequestConfig.custom()
.setResponseTimeout(Timeout.ofSeconds(15))
// .setResponseTimeout(Timeout.ofSeconds(15))
.build())
.build()

Expand Down
145 changes: 129 additions & 16 deletions 04-worker-c7zio/src/main/scala/camundala/worker/c8zio/C7Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,156 @@ package camundala.worker.c8zio
import camundala.bpmn.GeneralVariables
import camundala.domain.*
import camundala.worker.*
import org.camunda.bpm.client.task
import camundala.worker.CamundalaWorkerError.*
import org.camunda.bpm.client.task as camunda
import zio.*
import zio.ZIO.*
import zio.*

import java.util.Date
import zio.ZIO.*

import scala.concurrent.ExecutionContext.Implicits.global
import java.util.Date
import scala.concurrent.Future
import scala.jdk.CollectionConverters.*

trait C7Worker[In: InOutDecoder, Out: InOutEncoder] extends JobWorker, camunda.ExternalTaskHandler:

protected def c7Context: C7Context

private lazy val runtime = Runtime.default

def client: C7Client = OAuth2Client

def logger: WorkerLogger = Slf4JLogger.logger(getClass.getName)

override def execute(
externalTask: camunda.ExternalTask,
externalTaskService: camunda.ExternalTaskService
): Unit =
Future:
val startDate = new Date()
logger.info(
s"Worker: ${externalTask.getTopicName} (${externalTask.getId}) started > ${externalTask.getBusinessKey}"
Unsafe.unsafe:
implicit unsafe =>
runtime.unsafe.runToFuture(
run(externalTaskService)(using externalTask)
.provideLayer(ZioLogger.logger)
).future

private def run(externalTaskService: camunda.ExternalTaskService)(using
externalTask: camunda.ExternalTask
): ZIO[Any, Throwable, Unit] =
for
startDate <- succeed(new Date())
_ <-
logInfo(
s"Worker: ${externalTask.getTopicName} (${externalTask.getId}) started > ${externalTask.getBusinessKey}"
)
_ <- executeWorker(externalTaskService)
_ <-
logInfo(
s"Worker: ${externalTask.getTopicName} (${externalTask.getId}) ended ${printTimeOnConsole(startDate)} > ${externalTask.getBusinessKey}"
)
yield ()

private def executeWorker(
externalTaskService: camunda.ExternalTaskService
): HelperContext[ZIO[Any, Throwable, Unit]] =
val tryProcessVariables =
ProcessVariablesExtractor.extract(worker.variableNames)
val tryGeneralVariables = ProcessVariablesExtractor.extractGeneral()
(for
generalVariables <- tryGeneralVariables
context = EngineRunContext(c7Context, generalVariables)
filteredOut <-
ZIO.fromEither(
worker.executor(using context).execute(variablesAsEithers(tryProcessVariables))
)
_ <- ZIO.attempt(externalTaskService.handleSuccess(
filteredOut,
generalVariables.manualOutMapping
))
yield () //
).mapError:
case ex: CamundalaWorkerError => ex
case ex => UnexpectedError(ex.getMessage)
.mapError: ex =>
externalTaskService.handleError(ex, tryGeneralVariables)
ex
end executeWorker

private def variablesAsEithers(tryProcessVariables: Seq[IO[
BadVariableError,
(String, Option[Json])
]]): Seq[Either[BadVariableError, (String, Option[Json])]] =
tryProcessVariables
.map((x: IO[BadVariableError, (String, Option[Json])]) =>
Unsafe.unsafe:
implicit unsafe => // can be removed if everything is ZIO
runtime.unsafe.run(x.either).getOrThrow()
)
// executeWorker(externalTaskService)(using externalTask)
logger.info(
s"Worker: ${externalTask.getTopicName} (${externalTask.getId}) ended ${printTimeOnConsole(startDate)} > ${externalTask.getBusinessKey}"

extension (externalTaskService: camunda.ExternalTaskService)

private def handleSuccess(
filteredOutput: Map[String, Any],
manualOutMapping: Boolean
): HelperContext[Unit] =
externalTaskService.complete(
summon[camunda.ExternalTask],
if manualOutMapping then Map.empty.asJava else filteredOutput.asJava, // Process Variables
if !manualOutMapping then Map.empty.asJava else filteredOutput.asJava // local Variables
)

end execute
private[worker] def handleError(
error: CamundalaWorkerError,
tryGeneralVariables: IO[BadVariableError, GeneralVariables]
): HelperContext[Unit] =
import CamundalaWorkerError.*
val errorMsg = error.errorMsg.replace("\n", "")
(for
generalVariables <- tryGeneralVariables
isErrorHandled = errorHandled(error, generalVariables.handledErrors)
errorRegexHandled =
regexMatchesAll(isErrorHandled, error, generalVariables.regexHandledErrors)
yield (isErrorHandled, errorRegexHandled, generalVariables))
.flatMap {
case (true, true, generalVariables) =>
val mockedOutput = error match
case error: ErrorWithOutput =>
error.output
case _ => Map.empty
val filtered = filteredOutput(generalVariables.outputVariables, mockedOutput)
ZIO.succeed(
if
error.isMock && !generalVariables.handledErrors.contains(
error.errorCode.toString
)
then
handleSuccess(filtered, generalVariables.manualOutMapping)
else
val errorVars = Map(
"errorCode" -> error.errorCode,
"errorMsg" -> error.errorMsg
)
val variables = (filtered ++ errorVars).asJava
logger.info(s"Handled Error: $errorVars")
externalTaskService.handleBpmnError(
summon[camunda.ExternalTask],
s"${error.errorCode}",
error.errorMsg,
variables
)
)
case (true, false, _) =>
ZIO.fail(HandledRegexNotMatchedError(error))
case _ =>
ZIO.fail(error)
}
.mapError: err =>
logger.error(err)
externalTaskService.handleFailure(
summon[camunda.ExternalTask],
err.causeMsg,
s" ${err.causeMsg}\nSee the log of the Worker: ${niceClassName(worker.getClass)}",
summon[camunda.ExternalTask].getRetries - 1,
timeout.toMillis
)

end handleError

end extension

end C7Worker
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,27 @@ import zio.ZIO.*
import zio.{Console, *}

class C7WorkerRegistry(client: C7Client)
extends WorkerRegistry[C7Worker[?, ?]]:
extends WorkerRegistry[C7Worker[?, ?]]:

def registerWorkers(workers: Set[C7Worker[?, ?]]): ZIO[Any, Any, Any] =
Console.printLine(s"Starting C7 Worker Client") *>
acquireReleaseWith(client.client)(_.closeClient()): client =>
for
_ <- collectAllPar(workers.map(w => registerWorker(w, client)))
server <- ZIO.succeed(()).forever.fork
_ <- collectAllPar(workers.map(w => registerWorker(w, client)))
_ <- server.join
yield ()

private def registerWorker(worker: C7Worker[?, ?], client: ExternalTaskClient) =
attempt(client
.subscribe(worker.topic)
.handler(worker)
.lockDuration(worker.timeout.toMillis)
//.lockDuration(worker.timeout.toMillis)
.open()) *>
logInfo("Registered C7 Worker: " + worker.topic)

extension (client: ExternalTaskClient)
def closeClient() =
succeed(if client != null then client.stop() else ())
logInfo("Closing C7 Worker Client") *>
succeed(if client != null then client.stop() else ())
end C7WorkerRegistry
Loading

0 comments on commit 4584d68

Please sign in to comment.