Skip to content

Commit

Permalink
[JBPM-9413] AMQ Streams (Kafka) integration Notifier / producer
Browse files Browse the repository at this point in the history
Defining kafka event emitter
  • Loading branch information
fjtirado committed Oct 8, 2020
1 parent 352d7df commit bbc6426
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 0 deletions.
73 changes: 73 additions & 0 deletions jbpm-event-emitters/jbpm-event-emitters-kafka/pom.xml
@@ -0,0 +1,73 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.jbpm</groupId>
<artifactId>jbpm-event-emitters</artifactId>
<version>7.45.0-SNAPSHOT</version>
</parent>
<artifactId>jbpm-event-emitters-kafka</artifactId>
<name>jBPM :: Event Emitters :: Kafka</name>
<description>jBPM Event Emitters for Kafka</description>

<dependencies>

<dependency>
<groupId>org.jbpm</groupId>
<artifactId>jbpm-persistence-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>

<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>2.0.0-milestone3</version>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>org.jbpm</groupId>
<artifactId>jbpm-test-util</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<testResources>
<testResource>
<directory>src/test/resources</directory>
<filtering>false</filtering>
</testResource>
<testResource>
<directory>src/test/filtered-resources</directory>
<filtering>true</filtering>
</testResource>
</testResources>
</build>
</project>
@@ -0,0 +1,139 @@
/*
* Copyright 2018 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jbpm.event.emitters.kafka;

import java.net.URI;
import java.text.SimpleDateFormat;
import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import io.cloudevents.core.builder.CloudEventBuilder;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.jbpm.persistence.api.integration.EventCollection;
import org.jbpm.persistence.api.integration.EventEmitter;
import org.jbpm.persistence.api.integration.InstanceView;
import org.jbpm.persistence.api.integration.base.BaseEventCollection;
import org.jbpm.persistence.api.integration.model.CaseInstanceView;
import org.jbpm.persistence.api.integration.model.ProcessInstanceView;
import org.jbpm.persistence.api.integration.model.TaskInstanceView;

/**
* Kafka implementation of EventEmitter that simply pushes out data to Kafka topic.
*
* This event emitter expects following parameters to configure itself - via system properties
* <ul>
* <li>org.jbpm.event.emitters.kafka.date_format - date and time format to be sent to ElasticSearch - default format is yyyy-MM-dd'T'HH:mm:ss.SSSZ</li>
* <li>org.jbpm.event.emitters.kafka.boopstrap.servers - kafka server ip, default is localhost:9092</li>
* <li>org.jbpm.event.emitters.kafka.topic.<processes|tasks|cases>. Topic name for subscribing to these events. Defaults are "jbpm-<processes|tasks|cases>-events"</li>
* </ul>
*/
public class KafkaEventEmitter implements EventEmitter {
private static final String SOURCE_FORMATTER = "/process/%s/%s";
private Producer<String, byte[]> producer;
private ObjectMapper mapper;

public KafkaEventEmitter() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty(
"org.jbpm.event.emitters.kafka.boopstrap.servers", "localhost:9092"));
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producer = new KafkaProducer<>(configs);
mapper = new ObjectMapper()
.setDateFormat(new SimpleDateFormat(System.getProperty(
"org.jbpm.event.emitters.kafka.date_format", System.getProperty(
"org.kie.server.json.date_format",
"yyyy-MM-dd'T'HH:mm:ss.SSSZ"))))
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
.configure(MapperFeature.PROPAGATE_TRANSIENT_MARKER, true);
}

public void deliver(Collection<InstanceView<?>> data) {
// no-op
}

public void apply(Collection<InstanceView<?>> data) {
if (data == null || data.isEmpty()) {
return;
}

for (InstanceView<?> view : data) {
String processId;
long processInstanceId;
String type;
String topic;
if (view instanceof ProcessInstanceView) {
ProcessInstanceView processInstanceView = (ProcessInstanceView) view;
topic = "processes";
type = "process";
processInstanceId = processInstanceView.getId();
processId = processInstanceView.getProcessId();
} else if (view instanceof TaskInstanceView) {
TaskInstanceView taskInstanceView = (TaskInstanceView) view;
type = "tasks";
topic = "task";
processInstanceId = taskInstanceView.getProcessInstanceId();
processId = taskInstanceView.getProcessId();
} else if (view instanceof CaseInstanceView) {
CaseInstanceView caseInstanceView = (CaseInstanceView) view;
type = "case";
topic = "cases";
processInstanceId = caseInstanceView.getId();
processId = caseInstanceView.getCaseId();
} else {
throw new UnsupportedOperationException("view " + view.getClass());
}
try {
producer.send(new ProducerRecord<>(getTopic(topic), mapper.writeValueAsBytes(CloudEventBuilder.v03()
.withSource(URI.create(String.format(SOURCE_FORMATTER, processId, processInstanceId))).withType(
type).withData(mapper.writeValueAsBytes(view)).withId(UUID.randomUUID().toString())
.withTime(OffsetDateTime.now()).build())));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("cannot convert " + view + " to byte[]", e);
}
}
}

private static String getTopic(String eventType) {
return System.getProperty("org.jbpm.event.emitters.kafka.topic." + eventType, "jbpm-" + eventType + "-events");
}

public void drop(Collection<InstanceView<?>> data) {
// no-op
}

@Override
public void close() {
producer.close();
}

@Override
public EventCollection newCollection() {
return new BaseEventCollection();
}

}
@@ -0,0 +1 @@
org.jbpm.event.emitters.kafka.KafkaEventEmitter
1 change: 1 addition & 0 deletions jbpm-event-emitters/pom.xml
Expand Up @@ -11,5 +11,6 @@
<description>jBPM Event Emitters</description>
<modules>
<module>jbpm-event-emitters-elasticsearch</module>
<module>jbpm-event-emitters-kafka</module>
</modules>
</project>

0 comments on commit bbc6426

Please sign in to comment.