Skip to content

Commit 18b7332

Browse files
committed
First version with Worker for Camunda8.
1 parent ebecd62 commit 18b7332

File tree

15 files changed

+261
-52
lines changed

15 files changed

+261
-52
lines changed

01-domain/src/main/scala/camundala/domain/exports.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import scala.deriving.Mirror
1212
import scala.language.implicitConversions
1313

1414
// circe JsonInOutEncoder/ Decoder
15-
export io.circe.{Codec as CirceCodec}
15+
export io.circe.Codec as CirceCodec
1616
export io.circe.{Decoder, HCursor, Json}
1717

1818
type InOutCodec[T] = io.circe.Codec[T]
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package camundala.worker
2+
3+
4+
import scala.concurrent.duration.*
5+
6+
7+
trait JobWorker:
8+
def topic: String
9+
def timeout: Duration = 10.seconds
10+
11+
12+
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package camundala.worker
2+
3+
import zio.*
4+
5+
import scala.compiletime.uninitialized
6+
7+
trait WorkerApp extends ZIOAppDefault:
8+
def workerClients: Seq[WorkerClient[?]]
9+
var theWorkers: Set[JobWorker] = uninitialized
10+
11+
def workers(dWorkers: (JobWorker | Seq[JobWorker])*): Unit =
12+
theWorkers = dWorkers
13+
.flatMap:
14+
case d: JobWorker => Seq(d)
15+
case s: Seq[?] => s.collect{case d: JobWorker => d}
16+
.toSet
17+
18+
override def run: ZIO[Any, Any, Any] =
19+
for
20+
_ <- Console.printLine("Starting WorkerApp")
21+
_ <- ZIO.collectAllPar(workerClients.map(_.run(theWorkers)))
22+
yield ()
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package camundala.worker
2+
3+
import camundala.worker.JobWorker
4+
import zio.ZIO
5+
6+
trait WorkerClient[T <: JobWorker]:
7+
def run(workers: Set[JobWorker]): ZIO[Any, Any, Any] =
8+
runWorkers(workers.collect { case w: T => w })
9+
protected def runWorkers(workers: Set[T]): ZIO[Any, Any, Any]
10+

03-worker/src/main/scala/camundala/worker/WorkerDsl.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ import camundala.domain.*
66
import camundala.worker.CamundalaWorkerError.*
77
import scala.reflect.ClassTag
88

9-
trait WorkerDsl[In <: Product: InOutCodec, Out <: Product: InOutCodec]:
9+
trait WorkerDsl[In <: Product: InOutCodec, Out <: Product: InOutCodec]
10+
extends JobWorker:
1011

1112
protected def engineContext: EngineContext
1213

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package camundala.worker.c8zio
2+
3+
import camundala.worker.JobWorker
4+
import io.camunda.zeebe.client.api.response.ActivatedJob
5+
import io.camunda.zeebe.client.api.worker.{JobClient, JobHandler}
6+
7+
trait C8Worker extends JobWorker, JobHandler:
8+
def handle(client: JobClient, job: ActivatedJob): Unit =
9+
println(s"Handling Job: ${job}")
10+
client.newCompleteCommand(job.getKey).send().join()
11+
12+
end C8Worker

04-worker-c8zio/src/main/scala/camundala/worker/c8zio/C8WorkerClient.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package camundala.worker.c8zio
22

3+
import camundala.worker.{JobWorker, WorkerClient}
34
import io.camunda.zeebe.client.ZeebeClient
45
import io.camunda.zeebe.client.impl.oauth.OAuthCredentialsProviderBuilder
5-
import zio.{ZIO, ZIOAppDefault}
6+
import zio.{Console, ZIO}
67

78
import java.net.URI
8-
import java.time.Duration
99

10-
object C8WorkerClient extends ZIOAppDefault:
10+
object C8WorkerClient extends WorkerClient[C8Worker]:
1111

12-
override def run: ZIO[Any, Any, Any] =
12+
def runWorkers(workers: Set[C8Worker]): ZIO[Any, Any, Any] =
13+
Console.printLine(s"Starting Zeebe Worker Client: ${workers}") *>
1314
ZIO.acquireReleaseWith(zeebeClient)(_.closeClient()): client =>
1415
for
1516
server <- ZIO.attempt(
@@ -18,16 +19,19 @@ object C8WorkerClient extends ZIOAppDefault:
1819
.send
1920
.join
2021
).forever.fork
21-
worker <- ZIO.attempt(client
22-
.newWorker()
23-
.jobType("publish-tweet")
24-
.handler(ExampleJobHandler())
25-
.timeout(Duration.ofSeconds(10))
26-
.open()).fork
27-
_ <- worker.join
22+
_ <- ZIO.collectAllPar(workers.map(w => registerWorker(w, client)))
2823
_ <- server.join
2924
yield ()
3025

26+
private def registerWorker(worker: C8Worker, client: ZeebeClient) =
27+
Console.printLine("Registering Worker: " + worker.topic) *>
28+
ZIO.attempt(client
29+
.newWorker()
30+
.jobType(worker.topic)
31+
.handler(worker)
32+
.timeout(worker.timeout.toMillis)
33+
.open())
34+
3135
private lazy val zeebeClient =
3236
ZIO.attempt:
3337
ZeebeClient.newClientBuilder()

04-worker-c8zio/src/main/scala/camundala/worker/c8zio/ExampleJobHandler.scala

Lines changed: 0 additions & 13 deletions
This file was deleted.
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_0m1f84q" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.23.0" modeler:executionPlatform="Camunda Platform" modeler:executionPlatformVersion="7.21.0">
3+
<bpmn:process id="twitter-auto-c7" name="twitter-auto-c7" isExecutable="true">
4+
<bpmn:startEvent id="StartEvent_1">
5+
<bpmn:outgoing>Flow_1c8rqmn</bpmn:outgoing>
6+
</bpmn:startEvent>
7+
<bpmn:sequenceFlow id="Flow_1c8rqmn" sourceRef="StartEvent_1" targetRef="Activity_1p8rmh1" />
8+
<bpmn:endEvent id="Event_1mv5kky">
9+
<bpmn:incoming>Flow_0yzjas8</bpmn:incoming>
10+
</bpmn:endEvent>
11+
<bpmn:sequenceFlow id="Flow_0yzjas8" sourceRef="Activity_1p8rmh1" targetRef="Event_1mv5kky" />
12+
<bpmn:serviceTask id="Activity_1p8rmh1" name="Publish on Twitter" camunda:type="external" camunda:topic="publish-tweet">
13+
<bpmn:incoming>Flow_1c8rqmn</bpmn:incoming>
14+
<bpmn:outgoing>Flow_0yzjas8</bpmn:outgoing>
15+
</bpmn:serviceTask>
16+
</bpmn:process>
17+
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
18+
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="twitter-auto-c7">
19+
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
20+
<dc:Bounds x="179" y="99" width="36" height="36" />
21+
</bpmndi:BPMNShape>
22+
<bpmndi:BPMNShape id="Event_1mv5kky_di" bpmnElement="Event_1mv5kky">
23+
<dc:Bounds x="432" y="99" width="36" height="36" />
24+
</bpmndi:BPMNShape>
25+
<bpmndi:BPMNShape id="Activity_1iubci5_di" bpmnElement="Activity_1p8rmh1">
26+
<dc:Bounds x="270" y="77" width="100" height="80" />
27+
</bpmndi:BPMNShape>
28+
<bpmndi:BPMNEdge id="Flow_1c8rqmn_di" bpmnElement="Flow_1c8rqmn">
29+
<di:waypoint x="215" y="117" />
30+
<di:waypoint x="270" y="117" />
31+
</bpmndi:BPMNEdge>
32+
<bpmndi:BPMNEdge id="Flow_0yzjas8_di" bpmnElement="Flow_0yzjas8">
33+
<di:waypoint x="370" y="117" />
34+
<di:waypoint x="432" y="117" />
35+
</bpmndi:BPMNEdge>
36+
</bpmndi:BPMNPlane>
37+
</bpmndi:BPMNDiagram>
38+
</bpmn:definitions>
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package camundala.examples.demos.newWorker
2+
3+
import camundala.bpmn.*
4+
5+
trait CompanyBpmnDsl extends BpmnDsl:
6+
7+
end CompanyBpmnDsl
8+
9+
trait CompanyBpmnProcessDsl extends BpmnProcessDsl, CompanyBpmnDsl
10+
trait CompanyBpmnServiceTaskDsl extends BpmnServiceTaskDsl, CompanyBpmnDsl
11+
trait CompanyBpmnCustomTaskDsl extends BpmnCustomTaskDsl, CompanyBpmnDsl
12+
trait CompanyBpmnDecisionDsl extends BpmnDecisionDsl, CompanyBpmnDsl
13+
trait CompanyBpmnUserTaskDsl extends BpmnUserTaskDsl, CompanyBpmnDsl
14+
trait CompanyBpmnMessageEventDsl extends BpmnMessageEventDsl, CompanyBpmnDsl
15+
trait CompanyBpmnSignalEventDsl extends BpmnSignalEventDsl, CompanyBpmnDsl
16+
trait CompanyBpmnTimerEventDsl extends BpmnTimerEventDsl, CompanyBpmnDsl

0 commit comments

Comments
 (0)