Skip to content

Commit

Permalink
Added example Worker implementation to 04-worker-c8zio.
Browse files Browse the repository at this point in the history
  • Loading branch information
pme123 committed Dec 30, 2024
1 parent 1c7b7e5 commit c176892
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ trait C7WorkerHandler extends camunda.ExternalTaskHandler:
end filteredOutput

protected lazy val logger: WorkerLogger =
engineContext.getLogger(getClass)
engineContext.getLogger(classOf[C7WorkerHandler])

private[worker] def isErrorHandled(error: CamundalaWorkerError, handledErrors: Seq[String]) =
error.isMock || // if it is mocked, it is handled in the error, as it also could be a successful output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package camundala.worker
// Worker
export org.springframework.context.annotation.Configuration as SpringConfiguration
// WorkerApp
export org.springframework.boot.SpringApplication
export org.springframework.boot.SpringApplication.run as runSpringApp
export org.springframework.boot.autoconfigure.SpringBootApplication
export org.springframework.boot.context.properties.ConfigurationPropertiesScan
export org.springframework.context.annotation.ComponentScan
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package camundala.worker.c8zio

import io.camunda.zeebe.client.ZeebeClient
import io.camunda.zeebe.client.impl.oauth.OAuthCredentialsProviderBuilder
import zio.{ZIO, ZIOAppDefault}

import java.net.URI
import java.time.Duration

object C8WorkerClient extends ZIOAppDefault:

override def run: ZIO[Any, Any, Any] =
ZIO.acquireReleaseWith(zeebeClient)(_.closeClient()): client =>
for
server <- ZIO.attempt(
client
.newTopologyRequest
.send
.join
).forever.fork
worker <- ZIO.attempt(client
.newWorker()
.jobType("publish-tweet")
.handler(ExampleJobHandler())
.timeout(Duration.ofSeconds(10))
.open()).fork
_ <- worker.join
_ <- server.join
yield ()

private lazy val zeebeClient =
ZIO.attempt:
ZeebeClient.newClientBuilder()
.grpcAddress(URI.create(zeebeGrpc))
.restAddress(URI.create(zeebeRest))
.credentialsProvider(credentialsProvider)
.build

private lazy val zeebeGrpc =
"https://dbd4cad1-5621-4d66-b14e-71c92456939a.bru-2.zeebe.camunda.io:443"
private lazy val zeebeRest =
"https://bru-2.zeebe.camunda.io:443/dbd4cad1-5621-4d66-b14e-71c92456939a/v2"
private lazy val audience = "zeebe.camunda.io"
private lazy val clientId = sys.env("CAMUNDA8_CLOUD_CLIENTID")
private lazy val clientSecret = sys.env("CAMUNDA8_CLOUD_CLIENTSECRET")
private lazy val oAuthAPI = "https://login.cloud.camunda.io/oauth/token"

private lazy val credentialsProvider =
new OAuthCredentialsProviderBuilder()
.authorizationServerUrl(oAuthAPI)
.audience(audience)
.clientId(clientId)
.clientSecret(clientSecret)
.build

extension (client: ZeebeClient)
def closeClient() =
ZIO.succeed(if client != null then client.close() else ())

end C8WorkerClient
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package camundala.worker.c8zio

/** To avoid Annotations (Camunda Version specific), we extend ExternalTaskHandler for required
* parameters.
*/
trait C8WorkerHandler:


end C8WorkerHandler
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package camundala.worker.c8zio

import io.camunda.zeebe.client.api.response.ActivatedJob
import io.camunda.zeebe.client.api.worker.{JobClient, JobHandler}

class ExampleJobHandler extends JobHandler:

def handle(client: JobClient, job: ActivatedJob): Unit =
println(s"Handling Job: ${job}")
client.newCompleteCommand(job.getKey).send().join()


end ExampleJobHandler
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# Local Docker Configuration
zeebe.client.broker.gateway-address=127.0.0.1:26500
zeebe.client.security.plaintext=true
#zeebe.client.broker.gateway-address=127.0.0.1:26500
#zeebe.client.security.plaintext=true

# Cloud Configuration
#zeebe.client.cloud.region=bru-2
#zeebe.client.cloud.clusterId=ad5279ee-dfe3-43af-86c3-34f6cf2e9683
#zeebe.client.cloud.clientId=7j31qDoCE4dDmLQqiPQp3dn2ecCFOq.w
#zeebe.client.cloud.clientSecret=X_5T8x0VG._sQ9LACqnkeIRMmA5-iM0yf3O4lMAUXNkK20qex.eBN~cjGJAVSIDA
camunda.client.mode=saas
camunda.client.auth.client-id=L9n_ZQ3ehCtgPjTce8S7nabLGxhiVz3E
camunda.client.auth.client-secret=tu9c98w~Ip~7T_esBjS_8PMGl0.34R898Yt29wwzhyeMCQO78eOx6.4rk6ucsnsE
camunda.client.cluster-id=dbd4cad1-5621-4d66-b14e-71c92456939a
camunda.client.region=bru-2

server.port=8887
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,4 @@
</bpmndi:BPMNShape>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</definitions>
</definitions>
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package camundala.examples.twitter

import camundala.camunda8.CaseClassJsonMapperConfig
import io.camunda.zeebe.spring.client.EnableZeebeClient
import io.camunda.zeebe.spring.client.annotation.ZeebeDeployment
import io.camunda.zeebe.spring.client.annotation.Deployment
import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.context.annotation.Configuration
Expand All @@ -11,8 +10,7 @@ import org.springframework.context.annotation.Configuration
class AppConfig extends CaseClassJsonMapperConfig

@SpringBootApplication
@EnableZeebeClient
@ZeebeDeployment(resources = Array("classpath*:twitter*.bpmn"))
@Deployment(resources = Array("classpath*:twitter*.bpmn"))
class TwitterExampleApplication

object TwitterExampleApplication:
Expand Down
16 changes: 16 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ lazy val root = project
helper,
// implementations
camunda7Worker,
camunda8Worker,
// experiments
camunda, // not in use
camunda8, // not in use
Expand Down Expand Up @@ -171,6 +172,21 @@ lazy val camunda7Worker = project
)
.dependsOn(worker)

lazy val camunda8Worker = project
.in(file("./04-worker-c8zio"))
.configure(publicationSettings)
.settings(projectSettings("camunda8-worker"))
.settings(unitTestSettings)
.settings(
autoImportSetting,
libraryDependencies ++= Seq(
sttpDependency,
zeebeJavaClientDependency,
zioDependency
)
)
.dependsOn(worker)

// just demo
lazy val camunda = project
.in(file("./04-c7-spring"))
Expand Down
8 changes: 7 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ object Dependencies {
val camundaVersion = "7.22.0" // external task client
val jaxbApiVersion = "4.0.2" // needed by the camunda client 7.21?!
val scaffeineV = "5.2.1" // caching
val zioVersion = "2.1.14" // zio
// - sttpClient3

// --- Experiments
Expand Down Expand Up @@ -83,7 +84,12 @@ object Dependencies {
"org.springframework.boot" % "spring-boot-starter-webflux" % springBootVersion,
"io.camunda.spring" % "spring-boot-starter-camunda" % camunda8Version,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % scalaJacksonVersion
).map(_.exclude("org.slf4j", "slf4j-api"))
)//.map(_.exclude("org.slf4j", "slf4j-api"))

val zeebeJavaClientDependency =
"io.camunda" % "zeebe-client-java" % "8.7.0-alpha2" //TODO camunda8Version,
val zioDependency =
"dev.zio" %% "zio" % zioVersion

// examples
val camundaDependencies = Seq(
Expand Down

0 comments on commit c176892

Please sign in to comment.