Skip to content

Commit

Permalink
Worker C8 state of work.
Browse files Browse the repository at this point in the history
  • Loading branch information
pme123 committed Jan 17, 2025
1 parent 751432c commit 0ef0d75
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 56 deletions.
2 changes: 2 additions & 0 deletions 03-api/src/main/scala/camundala/api/ApiProjectConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ case class VersionConfig(major: Int, minor: Int, patch: Int, isSnapshot: Boolean
major * 100000 + minor * 1000 + patch

override def toString: String = s"$minorVersion.$patch${if isSnapshot then "-SNAPSHOT" else ""}"
def isHigherThan(config: VersionConfig): Boolean =
versionAsInt > config.versionAsInt
end VersionConfig

object VersionConfig:
Expand Down
48 changes: 38 additions & 10 deletions 03-worker/src/main/scala/camundala/worker/Handler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,30 @@ import sttp.model.Uri.QuerySegment
import sttp.model.{Method, Uri}
import scala.reflect.ClassTag

trait WorkerHandler:
def worker: Worker[?, ?, ?]
def topic: String

def applicationName: String
def registerHandler(): Unit
def registerHandler( register: => Unit): Unit =
val appPackageName = applicationName.replace("-", ".")
val testMode = sys.env.get("WORKER_TEST_MODE").contains("true") // did not work with lazy val
if testMode || getClass.getName.startsWith(appPackageName)
then
register
logger.info(s"Worker registered: $topic -> ${worker.getClass.getSimpleName}")
logger.debug(prettyString(worker))
else
logger.info(
s"Worker NOT registered: $topic -> ${worker.getClass.getSimpleName} (class starts not with $appPackageName)"
)
end if
end registerHandler

protected lazy val logger: WorkerLogger
end WorkerHandler

/** handler for Custom Validation (next to the automatic Validation of the In Object.
*
* For example if one of two optional variables must exist.
Expand Down Expand Up @@ -125,11 +149,11 @@ case class ServiceHandler[
val rRequest = runnableRequest(inputObject)
for
optWithServiceMock <- withServiceMock(rRequest, inputObject)
output <- handleMocking(optWithServiceMock, rRequest).getOrElse(
summon[EngineRunContext]
.sendRequest[ServiceIn, ServiceOut](rRequest)
.flatMap(out => outputMapper(out, inputObject))
)
output <- handleMocking(optWithServiceMock, rRequest).getOrElse(
summon[EngineRunContext]
.sendRequest[ServiceIn, ServiceOut](rRequest)
.flatMap(out => outputMapper(out, inputObject))
)
yield output
end for
end runWork
Expand Down Expand Up @@ -157,13 +181,17 @@ case class ServiceHandler[
case (_, Some(json)) =>
(for
mockedResponse <- decodeMock[MockedServiceResponse[ServiceOut]](json)
out <- handleServiceMock(mockedResponse, runnableRequest, in)
out <- handleServiceMock(mockedResponse, runnableRequest, in)
yield out)
.map(Some.apply)
case (true, _) =>
handleServiceMock(dynamicServiceOutMock.map(_(in)).getOrElse(defaultServiceOutMock), runnableRequest, in)
case (true, _) =>
handleServiceMock(
dynamicServiceOutMock.map(_(in)).getOrElse(defaultServiceOutMock),
runnableRequest,
in
)
.map(Some.apply)
case _ =>
case _ =>
Right(None)

end withServiceMock
Expand Down Expand Up @@ -200,7 +228,7 @@ case class ServiceHandler[
mockedResponse match
case MockedServiceResponse(_, Right(body), headers) =>
mapBodyOutput(body, headers, in)
case MockedServiceResponse(status, Left(body), _) =>
case MockedServiceResponse(status, Left(body), _) =>
Left(
ServiceRequestError(
status,
Expand Down
1 change: 0 additions & 1 deletion 03-worker/src/main/scala/camundala/worker/WorkerDsl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ trait WorkerDsl[In <: Product: InOutCodec, Out <: Product: InOutCodec]

// needed that it can be called from CSubscriptionPostProcessor
def worker: Worker[In, Out, ?]

def topic: String = worker.topic

def runWorkFromWorker(in: In)(using EngineRunContext): Option[Either[RunWorkError, Out]] =
Expand Down
10 changes: 10 additions & 0 deletions 03-worker/src/main/scala/camundala/worker/exports.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import camundala.domain.*
import camundala.worker.CamundalaWorkerError.*
import io.circe.*

import java.util.Date

export sttp.model.Uri.UriContext
export sttp.model.Method
export sttp.model.Uri
Expand Down Expand Up @@ -168,3 +170,11 @@ end CamundalaWorkerError

def niceClassName(clazz: Class[?]) =
clazz.getName.split("""\$""").head

def printTimeOnConsole(start: Date) =
val time = new Date().getTime - start.getTime
val color = if time > 1000 then Console.YELLOW_B
else if time > 250 then Console.MAGENTA
else Console.BLACK
s"($color$time ms${Console.RESET})"
end printTimeOnConsole
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import scala.util.{Failure, Success}
/** To avoid Annotations (Camunda Version specific), we extend ExternalTaskHandler for required
* parameters.
*/
trait C7WorkerHandler extends camunda.ExternalTaskHandler:
trait C7WorkerHandler extends camunda.ExternalTaskHandler, WorkerHandler:

@Value("${spring.application.name}")
var applicationName: String = scala.compiletime.uninitialized
Expand All @@ -29,9 +29,6 @@ trait C7WorkerHandler extends camunda.ExternalTaskHandler:
@Autowired()
protected var externalTaskClient: ExternalTaskClient = scala.compiletime.uninitialized

def worker: Worker[?, ?, ?]
def topic: String

override def execute(
externalTask: camunda.ExternalTask,
externalTaskService: camunda.ExternalTaskService
Expand All @@ -43,27 +40,17 @@ trait C7WorkerHandler extends camunda.ExternalTaskHandler:
)
executeWorker(externalTaskService)(using externalTask)
logger.info(
s"Worker: ${externalTask.getTopicName} (${externalTask.getId}) ended ${printTime(startDate)} > ${externalTask.getBusinessKey}"
s"Worker: ${externalTask.getTopicName} (${externalTask.getId}) ended ${printTimeOnConsole(startDate)} > ${externalTask.getBusinessKey}"
)
end execute

@PostConstruct
def registerHandler(): Unit =
val appPackageName = applicationName.replace("-", ".")
val testMode = sys.env.get("WORKER_TEST_MODE").contains("true") // did not work with lazy val
if testMode || getClass.getName.startsWith(appPackageName)
then
registerHandler:
externalTaskClient
.subscribe(topic)
.handler(this)
.open()
logger.info(s"Worker registered: $topic -> ${worker.getClass.getSimpleName}")
logger.debug(prettyString(worker))
else
logger.info(
s"Worker NOT registered: $topic -> ${worker.getClass.getSimpleName} (class starts not with $appPackageName)"
)
end if
end registerHandler

private def executeWorker(
Expand Down Expand Up @@ -94,14 +81,6 @@ trait C7WorkerHandler extends camunda.ExternalTaskHandler:
end try
end executeWorker

private def printTime(start: Date) =
val time = new Date().getTime - start.getTime
val color = if time > 1000 then Console.YELLOW_B
else if time > 250 then Console.MAGENTA
else Console.BLACK
s"($color$time ms${Console.RESET})"
end printTime

extension (externalTaskService: camunda.ExternalTaskService)

private def handleSuccess(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package camundala
package camunda7.worker

import camundala.worker.*
import org.camunda.bpm.client.task.ExternalTask
import sttp.client3.{HttpClientSyncBackend, Identity, SttpBackend}

Expand Down
106 changes: 101 additions & 5 deletions 04-worker-c8zio/src/main/scala/camundala/worker/c8zio/C8Worker.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,108 @@
package camundala.worker.c8zio

import camundala.worker.JobWorker
import camundala.domain.*
import camundala.worker.{CamundalaWorkerError, JobWorker, Worker, printTimeOnConsole}
import io.camunda.zeebe.client.api.response.ActivatedJob
import io.camunda.zeebe.client.api.worker.{JobClient, JobHandler}
import zio.*

trait C8Worker extends JobWorker, JobHandler:
def handle(client: JobClient, job: ActivatedJob): Unit =
println(s"Handling Job: ${job}")
client.newCompleteCommand(job.getKey).send().join()
import java.util.Date

trait C8Worker[In: InOutDecoder, Out: InOutEncoder] extends JobWorker, JobHandler:
def topic: String

lazy val runtime = Runtime.default

import cats.data.ValidatedNel
import cats.implicits.*
def handle(client: JobClient, job: ActivatedJob): Unit =
Unsafe.unsafe:
implicit unsafe =>
runtime.unsafe.run(
(for
startDate <- ZIO.succeed(new Date())
json <- ZIO.fromEither(parser.parse(job.getVariables))
in <- ZIO.fromEither(
customDecodeAccumulating[In](json.hcursor)
)
businessKey <-
ZIO.fromEither(json.as[BusinessKey].map(_.businessKey.getOrElse("no businessKey")))
_ <- Console.printLine(
s"Worker: ${job.getType} (${job.getWorker}) started > $businessKey"
)
_ <- Console.printLine(s"IN: $in")
_ <- ZIO.attempt(client.newCompleteCommand(job.getKey).send().join())
_ <-
Console.printLine(
s"Worker: ${job.getType} (${job.getWorker}) ended ${printTimeOnConsole(startDate)} > $businessKey"
)
yield ())
).getOrThrow()

def handle2(client: JobClient, job: ActivatedJob): Unit =
Unsafe.unsafe { implicit unsafe =>
runtime.unsafe.run(
for
businessKey <-
ZIO.succeed(Option(job.getVariable("businessKey")).getOrElse("no businessKey"))
_ <- Console.printLine(
s"Worker: ${job.getType} (${job.getWorker}) started > $businessKey"
)
json <- ZIO.fromEither(parser.parse(job.getVariables))

_ <- Console.printLine(
s"Worker: ${job.getType} (${job.getWorker}) started > $businessKey"
)
startDate <- ZIO.succeed(new Date())
json <- ZIO.fromEither(parser.parse(job.getVariables))
_ <- Console.printLine(
s"Worker: ${job.getType} (${job.getWorker}) started > $businessKey"
)
_ <- handleJob(client, job)
_ <-
Console.printLine(
s"Worker: ${job.getType} (${job.getWorker}) ended ${printTimeOnConsole(startDate)} > $businessKey"
)
_ <- ZIO.attempt(client.newCompleteCommand(job.getKey).send().join())
yield ()
).getOrThrowFiberFailure()
}

private def handleJob(
client: JobClient,
job: ActivatedJob
): ZIO[Any, Any, Any] = Console.printLine("handleJob")
/* val variablesExtractor = ProcessVariablesExtractor(job)
for
tryProcessVariables <- variablesExtractor.extract(worker.variableNames ++ worker.inConfigVariableNames)
tryGeneralVariables <- processExtractor.extractGeneral())
filteredOut <- worker.executor(using EngineRunContext(engineContext, tryGeneralVariables)).execute(tryProcessVariables)
val tryProcessVariables =
ProcessVariablesExtractor(job).extract(worker.variableNames ++ worker.inConfigVariableNames)
val tryGeneralVariables = ProcessVariablesExtractor.extractGeneral()
try
(for
generalVariables <- tryGeneralVariables
context = EngineRunContext(engineContext, generalVariables)
filteredOut <-
worker.executor(using context).execute(tryProcessVariables)
yield externalTaskService.handleSuccess(filteredOut, generalVariables.manualOutMapping) //
).left.map { ex =>
externalTaskService.handleError(ex, tryGeneralVariables)
}
catch // safety net
case ex: Throwable =>
ex.printStackTrace()
externalTaskService.handleError(
UnexpectedError(errorMsg =
s"We caught an UnhandledException: ${ex.getMessage}\n - check the Workers Log."
),
tryGeneralVariables
)
end try
end handleJob*/
case class BusinessKey(businessKey: Option[String])
object BusinessKey:
given InOutCodec[BusinessKey] = deriveInOutCodec

end C8Worker
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import zio.{Console, ZIO}

import java.net.URI

object C8WorkerClient extends WorkerClient[C8Worker]:
object C8WorkerClient extends WorkerClient[C8Worker[?,?]]:

def runWorkers(workers: Set[C8Worker]): ZIO[Any, Any, Any] =
def runWorkers(workers: Set[C8Worker[?,?]]): ZIO[Any, Any, Any] =
Console.printLine(s"Starting Zeebe Worker Client: ${workers}") *>
ZIO.acquireReleaseWith(zeebeClient)(_.closeClient()): client =>
for
Expand All @@ -23,7 +23,7 @@ object C8WorkerClient extends WorkerClient[C8Worker]:
_ <- server.join
yield ()

private def registerWorker(worker: C8Worker, client: ZeebeClient) =
private def registerWorker(worker: C8Worker[?,?], client: ZeebeClient) =
Console.printLine("Registering Worker: " + worker.topic) *>
ZIO.attempt(client
.newWorker()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package camundala.worker.c8zio
package camundala.worker
package c8zio

/** To avoid Annotations (Camunda Version specific), we extend ExternalTaskHandler for required
* parameters.
*/
trait C8WorkerHandler:
trait C8WorkerHandler extends WorkerHandler:


end C8WorkerHandler
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,31 @@ import camundala.worker.c8zio.C8Worker

import scala.reflect.ClassTag

trait CompanyWorkerHandler extends C7WorkerHandler, C8Worker
trait CompanyWorkerHandler[
In <: Product: InOutCodec,
Out <: Product: InOutCodec
] extends C7WorkerHandler, C8Worker[In, Out]

trait CompanyValidationWorkerDsl[
In <: Product: InOutCodec
] extends CompanyWorkerHandler, ValidationWorkerDsl[In]
] extends CompanyWorkerHandler[In, NoOutput], ValidationWorkerDsl[In]

trait CompanyInitWorkerDsl[
In <: Product: InOutCodec,
Out <: Product: InOutCodec,
InitIn <: Product: InOutCodec,
InConfig <: Product: InOutCodec
] extends CompanyWorkerHandler, InitWorkerDsl[In, Out, InitIn, InConfig]
] extends CompanyWorkerHandler[In, Out], InitWorkerDsl[In, Out, InitIn, InConfig]

trait CompanyCustomWorkerDsl[
In <: Product: InOutCodec,
Out <: Product: InOutCodec
] extends CompanyWorkerHandler, CustomWorkerDsl[In, Out]
] extends CompanyWorkerHandler[In, Out], CustomWorkerDsl[In, Out]


trait CompanyServiceWorkerDsl[
In <: Product: InOutCodec,
Out <: Product: InOutCodec,
ServiceIn: InOutEncoder,
ServiceOut: InOutDecoder: ClassTag
] extends CompanyWorkerHandler, ServiceWorkerDsl[In, Out, ServiceIn, ServiceOut]
] extends CompanyWorkerHandler[In, Out], ServiceWorkerDsl[In, Out, ServiceIn, ServiceOut]
Loading

0 comments on commit 0ef0d75

Please sign in to comment.