Skip to content

Commit

Permalink
QE review changes for AMQ Streams integration (#137) (apache#3246)
Browse files Browse the repository at this point in the history
* QE review changes for AMQ Streams integration

* Further work on custom serializer classes

* further QE correction on custom serializers

* peer review fixes
  • Loading branch information
mramendi committed Feb 2, 2021
1 parent 9c3f8cf commit 6cc660a
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 14 deletions.
1 change: 1 addition & 0 deletions assemblies/assembly-integrating-amq-streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ include::{jbpm-dir}/Kafka/integration-kafka-con.adoc[leveloffset=+1]
include::{jbpm-dir}/Kafka/message-receive-event-proc.adoc[leveloffset=+2]
include::{jbpm-dir}/Kafka/message-send-event-proc.adoc[leveloffset=+2]
include::{jbpm-dir}/Kafka/message-customtask-proc.adoc[leveloffset=+2]
include::{jbpm-dir}/Kafka/custom-serializer-proc.adoc[leveloffset=+3]
include::{jbpm-dir}/Kafka/kieserver-kafka-proc.adoc[leveloffset=+1]
include::{jbpm-dir}/Kafka/kieserver-kafka-emit-proc.adoc[leveloffset=+1]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[id='custom-serializer-proc_{context}']
= Adding a custom serializer class for the `KafkaPublishMessages` custom task

If you want to use a custom serializer class for the `KafkaPublishMessages` custom task, you must upload the source code and configure the class.

.Procedure

. Prepare a Java source file with the custom serializer class, for example, `MyCustomSerializer`. Use the package name for your space and project, for example, `com.myspace.test`. Also prepare the source files for any other required custom classes.
. In {CENTRAL}, enter your project and select the *Settings* -> *Dependencies* tab.
. Add any dependencies that your custom classes require, for example, `org.apache.kafka.kafka-clients`.
. Select the *Assets* tab.
. For each of the class source files, complete the following steps:
.. Click *Import Asset*.
.. In the *Please select a file to upload* field, select the location of the Java source file for the custom serializer class.
.. Click *Ok* to upload the file.
. Select the *Settings* -> *Deployments* -> *Work Item Handlers* tab.
. In the `KafkaPublishMessages` line, modify the `Value` field to add the `classLoader` parameter. For example, the initial value of this field can be the following string:
+
--
----
new org.jbpm.process.workitem.kafka.KafkaWorkItemHandler("127.0.0.1:9092", "jbpm", "com.myspace.test.MyCustomSerializer", "com.myspace.test.MyCustomSerializer")
----

In this example, change the value to the following string:

----
new org.jbpm.process.workitem.kafka.KafkaWorkItemHandler("127.0.0.1:9092", "jbpm", "com.myspace.test.MyCustomSerializer", "com.myspace.test.MyCustomSerializer", classLoader)
----
--
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ endif::JBPM,DROOLS,OP[]
ifdef::PAM,DM[]
{KAFKA_PRODUCT}, based on Apache Kafka,
endif::PAM,DM[]
is a streaming platform. It passes messages, sorted into topics, between applications in a software environment.
is a streaming platform. It acts as a message broker, passing messages, which are sorted into topics, between applications in a software environment.

You can create business processes using {PRODUCT} that send and receive Kafka messages in the following ways:

* Create a start event or intermediate catch event of the type _message_ or _signal_. The {KIE_SERVER} automatically subsribes to the Kafka topic that is defined in the message or signal. A message triggers the event. The event can pass the content of the message to the subsequent node in the process.
* Create a start event, intermediate catch event, or boundary event (attached to a human task) of the type _message_ or _signal_. The {KIE_SERVER} automatically subscribes to the Kafka topic that is defined in the message or signal. A message triggers the event. The event node acts as the consumer of the message and can pass the content of the message to the subsequent node in the process.

* Create an end event or intermediate throw event of the type _message_ or _signal_. When the process triggers the event, the {KIE_SERVER} sends a Kafka message in the topic that is defined in the message or signal. The message contains the data that is configured in the event.
* Create an end event or intermediate throw event of the type _message_ or _signal_. When the process triggers the event, the {KIE_SERVER} sends a Kafka message in the topic that is defined in the message or signal. The message contains the data that is configured in the event. The event node acts as the producer of the message.

* Add the `KafkaPublishMessages` custom task to the process. This task does not require the {KIE_SERVER} Kafka capability but is significantly more complicated to configure than signal or message events.
* Add the `KafkaPublishMessages` custom task to the process. This task does not require the {KIE_SERVER} Kafka capability but can be more complicated to configure than signal or message events.

* Configure your service and the {KIE_SERVER} to emit Kafka messages about every completed process, case, and task when transactions are committed.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ To run a process that sends or receives Kafka messages (except when using the cu

.Procedure

. To enable integration with {KAFKA_PRODUCT}, set the `org.kie.kafka.server.ext.disabled` system property of the {KIE_SERVER} to `false`.
. To enable integration with {KAFKA_PRODUCT}, set the following system properties according to your environment:
** If you are using the {KIE_SERVER} on {EAP}, set the `org.kie.kafka.server.ext.disabled` system property of the {KIE_SERVER} to `false`.
** If you are using Spring Boot, set the `kieserver.kafka.enabled` system property to `true`.
+
. To configure the connection to the Kafka broker, set the `org.kie.server.jbpm-kafka.ext.bootstrap.servers` system property to the host and port of the broker. The default value is `localhost:9092`. You can use a comma-separated list of multiple host:port pairs.
. Optional: Set any of the following system properties to configure sending and receiving Kafka messages:
** `org.kie.server.jbpm-kafka.ext.client.id`: An identifier string to pass to the broker when making requests. {KAFKA_PRODUCT} uses this string for logging.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[id='message-customtask-proc_{context}']
= Adding a custom task that sends Kafka messages

You can add a `KafkaPublishMessages` custom task to your process. This task sends Kafka messages. It does not use the {KIE_SERVER} Kafka capability, so you can use this task in processes that do not run on a {KIE_SERVER}. However, this task is more complicated to configure than other {KAFKA_PRODUCT} integration options.
You can add a `KafkaPublishMessages` custom task to your process. This task sends Kafka messages. It does not use the {KIE_SERVER} Kafka capability, so you can use this task in processes that do not run on a {KIE_SERVER}. However, this task can be more complicated to configure than other {KAFKA_PRODUCT} integration options.

.Procedure

Expand All @@ -13,10 +13,17 @@ You can add a `KafkaPublishMessages` custom task to your process. This task send
. Enter the following information:
** *Bootstrap Servers*: The host and port of the Kafka broker, for example, `localhost:9092`. You can use a comma-separated list of multiple host:port pairs.
** *Client ID*: An identifier string to pass to the broker when making requests. {KAFKA_PRODUCT} uses this string for logging.
** *Key Serializer class*: The class that provides the key serializer. Enter the standard serializer class name: `org.apache.kafka.common.serialization.StringSerializer`.
** *Value Serializer class*: The class that provides the value serializer. Enter the standard serializer class name: `org.apache.kafka.common.serialization.StringSerializer`.
. Click the *Assets* tab.
** *Key Serializer class*: The class that provides the key serializer. You can use the standard serializer class name: `org.apache.kafka.common.serialization.StringSerializer`. Alternatively you can use your own custom serializer class.
** *Value Serializer class*: The class that provides the value serializer. You can use the standard serializer class name: `org.apache.kafka.common.serialization.StringSerializer`. Alternatively you can use your own custom serializer class.
+
In any of these fields, you can enter an `env[_property_]` value. In this case, the {PROCESS_ENGINE} reads the setting from a system property at runtime. For example, you can set *Client ID* to `env[application.client.id]` and then, before running the process service, set the client ID value in the `application.client.id` system property.
+
. Select the *Assets* tab.
. Select the business process and open the business process designer.
. Add the `KafkaPublishMessages` custom task, available under *Custom Tasks* in the BPMN modeler palette.
. In the properties of the custom task, open the data assignments.
. Assign the *Key*, *Topic*, and *Value* inputs to define the message.

.Next steps

If you configured a custom serializer class, you must upload the source code and configure the class, as described in xref:custom-serializer-proc_{context}[].
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@ ifdef::JBPM,DROOLS,OP[]
xref:jBPMBPMN2[].
endif::JBPM,DROOLS,OP[]
. Select the business process and open the business process designer.
. Add a start event or an intermediate catch event of the type _message_ or _signal_.
. Add a start event, an intermediate catch event, or a boundary event (attached to a human task) of the type _message_ or _signal_.
. Open the properties of the event.
. In the *Message* or *Signal* field, select *New* and then enter the name of the message or signal. This name must be the same as the name of the topic from which the event is to receive Kafka messages, or else must be defined in an `org.kie.server.jbpm-kafka.ext.topics.*` system property of the {KIE_SERVER}.
. In the *Message* or *Signal* field, select *New* and then enter the name of the message or signal. This name must be the same as the name of the topic from which the event is to receive Kafka messages, or else must be defined in an `org.kie.server.jbpm-kafka.ext.topics._broker-topic-name_` system property of the {KIE_SERVER}.
+
For instructions about using `org.kie.server.jbpm-kafka.ext.topics.*` system properties to define topic names, see xref:kieserver-kafka-proc_{context}[].
+
. Add an output data item. Select the data object that you created as its type.
. Save the business process.
. Save the business process.

.Next steps

To enable {KAFKA_PRODUCT} integration when running the process, you must configure the {KIE_SERVER} according to instructions in xref:kieserver-kafka-proc_{context}[].
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ endif::JBPM,DROOLS,OP[]
. Select the business process and open the business process designer.
. Add an intermediate throw event or an end event of the type _message_ or _signal_.
. Open the properties of the event.
. In the *Message* or *Signal* field, select *New* and then enter the name of the message or signal. This name must be the same as the name of the topic to which the event is to send Kafka messages, or else must be defined in an `org.kie.server.jbpm-kafka.ext.topics.*` system property of the {KIE_SERVER}.
. In the *Message* or *Signal* field, select *New* and then enter the name of the message or signal. This name must be the same as the name of the topic to which the event is to send Kafka messages, or else must be defined in an `org.kie.server.jbpm-kafka.ext.topics._broker-topic-name_` system property of the {KIE_SERVER}.
+
For instructions about using `org.kie.server.jbpm-kafka.ext.topics.*` system properties to define topic names, see xref:kieserver-kafka-proc_{context}[].
+
. Add an input data item. Select the data object that you created as its type.
. Save the business process.
. Save the business process.

.Next steps

To enable {KAFKA_PRODUCT} integration when running the process, you must configure the {KIE_SERVER} according to instructions in xref:kieserver-kafka-proc_{context}[].
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ include::Kafka/integration-kafka-con.adoc[leveloffset=+1]
include::Kafka/message-receive-event-proc.adoc[leveloffset=+1]
include::Kafka/message-send-event-proc.adoc[leveloffset=+1]
include::Kafka/message-customtask-proc.adoc[leveloffset=+1]
include::Kafka/custom-serializer-proc.adoc[leveloffset=+2]
include::Kafka/kieserver-kafka-proc.adoc[leveloffset=+1]
include::Kafka/kieserver-kafka-emit-proc.adoc[leveloffset=+1]

0 comments on commit 6cc660a

Please sign in to comment.