Skip to content

Commit

Permalink
Kafka Extension (#446)
Browse files Browse the repository at this point in the history
Signed-off-by: desmax74 <mdessi@redhat.com>
  • Loading branch information
desmax74 committed Mar 26, 2021
1 parent 49fffbc commit f3c114a
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 2 deletions.
42 changes: 42 additions & 0 deletions jboss-kie-kieserver/added/launch/jboss-kie-kieserver.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ function prepareEnv() {
unset KIE_SERVER_ID
unset KIE_SERVER_JBPM_CLUSTER
unset KIE_SERVER_JBPM_CLUSTER_TRANSPORT_LOCK_TIMEOUT
unset KIE_SERVER_KAFKA_EXT_ACKS
unset KIE_SERVER_KAFKA_EXT_AUTOCREATE_TOPICS
unset KIE_SERVER_KAFKA_EXT_BOOTSTRAP_SERVERS
unset KIE_SERVER_KAFKA_EXT_CLIENT_ID
unset KIE_SERVER_KAFKA_EXT_ENABLED
unset KIE_SERVER_KAFKA_EXT_GROUP_ID
unset KIE_SERVER_KAFKA_EXT_MAX_BLOCK_MS
unset KIE_SERVER_KAFKA_EXT_TOPICS
unset KIE_SERVER_LOCATION
unset KIE_SERVER_MGMT_DISABLED
unset KIE_SERVER_MODE
Expand Down Expand Up @@ -72,6 +80,7 @@ function configure() {
configure_drools
configure_jbpm
configure_jbpm_cluster
configure_kafka
configure_kie_server_mgmt
configure_mode
configure_metaspace
Expand Down Expand Up @@ -743,3 +752,36 @@ function configure_jbpm_cache() {
</replicated-cache>\
</cache-container>\n<cache-container name="server" aliases="singleton cluster" default-cache="default" module="org.wildfly.clustering.server">#g' ${CONFIG_FILE}
}

function configure_kafka(){
if [ "${KIE_SERVER_KAFKA_EXT_ENABLED^^}" = "TRUE" ]; then
if [ -n "${KIE_SERVER_KAFKA_EXT_BOOTSTRAP_SERVERS}" ];then
log_info "Kafka Extension enabled"
JBOSS_KIE_ARGS="${JBOSS_KIE_ARGS} -Dorg.kie.server.jbpm-kafka.ext.disabled=false"
JBOSS_KIE_ARGS="${JBOSS_KIE_ARGS} -Dorg.kie.server.jbpm-kafka.ext.bootstrap.servers=${KIE_SERVER_KAFKA_EXT_BOOTSTRAP_SERVERS}"
JBOSS_KIE_ARGS="${JBOSS_KIE_ARGS} -Dorg.kie.server.jbpm-kafka.ext.client.id=${KIE_SERVER_KAFKA_EXT_CLIENT_ID}"
JBOSS_KIE_ARGS="${JBOSS_KIE_ARGS} -Dorg.kie.server.jbpm-kafka.ext.group.id=${KIE_SERVER_KAFKA_EXT_GROUP_ID}"
JBOSS_KIE_ARGS="${JBOSS_KIE_ARGS} -Dorg.kie.server.jbpm-kafka.ext.acks=${KIE_SERVER_KAFKA_EXT_ACKS}"
JBOSS_KIE_ARGS="${JBOSS_KIE_ARGS} -Dorg.kie.server.jbpm-kafka.ext.max.block.ms=${KIE_SERVER_KAFKA_EXT_MAX_BLOCK_MS}"
JBOSS_KIE_ARGS="${JBOSS_KIE_ARGS} -Dorg.kie.server.jbpm-kafka.ext.allow.auto.create.topics=${KIE_SERVER_KAFKA_EXT_AUTOCREATE_TOPICS}"

IFS=',' read -a ks_topics <<< $KIE_SERVER_KAFKA_EXT_TOPICS
for topic in "${ks_topics[@]}"; do
IFS='=' read -a mapping <<< $topic
signal=${mapping[0]}
topic_name=${mapping[1]}
if [[ -n "$signal" && -n "$topic_name" ]]; then
JBOSS_KIE_ARGS="${JBOSS_KIE_ARGS} -Dorg.kie.server.jbpm-kafka.ext.topics.${topic}"
else
log_warning "mapping not configured, msg or topic name is empty. Value set [${signal}=${topic_name}]"
fi
done
else
JBOSS_KIE_ARGS="${JBOSS_KIE_ARGS} -Dorg.kie.server.jbpm-kafka.ext.disabled=true"
log_warning "Bootstrap servers not configured, kafka extension disabled"
fi
else
log_info "Kafka Extension disabled"
JBOSS_KIE_ARGS="${JBOSS_KIE_ARGS} -Dorg.kie.server.jbpm-kafka.ext.disabled=true"
fi
}
49 changes: 49 additions & 0 deletions jboss-kie-kieserver/tests/bats/jboss-kie-kieserver.bats
Original file line number Diff line number Diff line change
Expand Up @@ -644,3 +644,52 @@ EOF
echo "Result: ${result}"
[ "${result}" = "${expected}" ]
}

@test "Verify the Kafka extension" {
export KIE_SERVER_KAFKA_EXT_ENABLED="true"
export KIE_SERVER_KAFKA_EXT_BOOTSTRAP_SERVERS="localhost:9092"
export KIE_SERVER_KAFKA_EXT_CLIENT_ID="app"
export KIE_SERVER_KAFKA_EXT_GROUP_ID="jbpm-consumer"
export KIE_SERVER_KAFKA_EXT_ACKS="2"
export KIE_SERVER_KAFKA_EXT_MAX_BLOCK_MS="2000"
export KIE_SERVER_KAFKA_EXT_AUTOCREATE_TOPICS="true"
export KIE_SERVER_KAFKA_EXT_TOPICS="person=human,dog=animal,ant="

configure_kafka

local expected=" -Dorg.kie.server.jbpm-kafka.ext.disabled=false -Dorg.kie.server.jbpm-kafka.ext.bootstrap.servers=localhost:9092 -Dorg.kie.server.jbpm-kafka.ext.client.id=app -Dorg.kie.server.jbpm-kafka.ext.group.id=jbpm-consumer -Dorg.kie.server.jbpm-kafka.ext.acks=2 -Dorg.kie.server.jbpm-kafka.ext.max.block.ms=2000 -Dorg.kie.server.jbpm-kafka.ext.allow.auto.create.topics=true -Dorg.kie.server.jbpm-kafka.ext.topics.person=human -Dorg.kie.server.jbpm-kafka.ext.topics.dog=animal"
echo " Result: ${JBOSS_KIE_ARGS}"

echo "Expected: ${expected}"
[[ "${JBOSS_KIE_ARGS}" == "${expected}" ]]
}

@test "Verify the Kafka extension disabled" {
export KIE_SERVER_KAFKA_EXT_ENABLED="false"

configure_kafka

local expected=" -Dorg.kie.server.jbpm-kafka.ext.disabled=true"
echo " Result: ${JBOSS_KIE_ARGS}"

echo "Expected: ${expected}"
[[ "${JBOSS_KIE_ARGS}" == "${expected}" ]]
}

@test "Verify the Kafka extension without boootstrapservers" {
export KIE_SERVER_KAFKA_EXT_ENABLED="true"
export KIE_SERVER_KAFKA_EXT_CLIENT_ID="app"
export KIE_SERVER_KAFKA_EXT_GROUP_ID="jbpm-consumer"
export KIE_SERVER_KAFKA_EXT_ACKS="2"
export KIE_SERVER_KAFKA_EXT_MAX_BLOCK_MS="2000"
export KIE_SERVER_KAFKA_EXT_AUTOCREATE_TOPICS="true"
export KIE_SERVER_KAFKA_EXT_TOPICS="person=human,dog=animal,ant="

configure_kafka

local expected=" -Dorg.kie.server.jbpm-kafka.ext.disabled=true"
echo " Result: ${JBOSS_KIE_ARGS}"

echo "Expected: ${expected}"
[[ "${JBOSS_KIE_ARGS}" == "${expected}" ]]
}
47 changes: 45 additions & 2 deletions tests/features/common/kie-kieserver-common.feature
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ Feature: Kie Server common features
| KIE_SERVER_CONTAINER_DEPLOYMENT | test=org.package:mypackage:1.0 |
Then container log should contain Using standard EnvVar KIE_SERVER_CONTAINER_DEPLOYMENT: test=org.package:mypackage:1.0
And container log should contain INFO Attempting to pull dependencies for kjar 0 with

Scenario: Check KIE_SERVER_JBPM_CLUSTER flag enabled
When container is started with env
| variable | value |
Expand Down Expand Up @@ -554,4 +554,47 @@ Feature: Kie Server common features
| KIE_SERVER_JBPM_CLUSTER | true |
| KIE_SERVER_JBPM_CLUSTER_TRANSPORT_LOCK_TIMEOUT | 120000 |
Then container log should contain KIE Server's cluster for Jbpm failover is enabled.
And XML file /opt/eap/standalone/configuration/standalone-openshift.xml should contain value 120000 on XPath //*[local-name()='cache-container'][@name='jbpm']/*[local-name()='transport']/@lock-timeout
And XML file /opt/eap/standalone/configuration/standalone-openshift.xml should contain value 120000 on XPath //*[local-name()='cache-container'][@name='jbpm']/*[local-name()='transport']/@lock-timeout

Scenario: Check if the Kafka integration is disabled
When container is started with env
| variable | value |
| KIE_SERVER_KAFKA_EXT_ENABLED | false |
Then container log should contain -Dorg.kie.server.jbpm-kafka.ext.disabled=true

Scenario: Check if the Kafka integration is enabled
When container is started with env
| variable | value |
| KIE_SERVER_KAFKA_EXT_ENABLED | true |
| KIE_SERVER_KAFKA_EXT_BOOTSTRAP_SERVERS | localhost:9092 |
| KIE_SERVER_KAFKA_EXT_CLIENT_ID | app |
| KIE_SERVER_KAFKA_EXT_GROUP_ID | jbpm-consumer |
| KIE_SERVER_KAFKA_EXT_ACKS | 2 |
| KIE_SERVER_KAFKA_EXT_MAX_BLOCK_MS | 2000 |
| KIE_SERVER_KAFKA_EXT_AUTOCREATE_TOPICS | true |
| KIE_SERVER_KAFKA_EXT_TOPICS | person=human,dog=animal,ant= |
| SCRIPT_DEBUG | true |
Then container log should contain -Dorg.kie.server.jbpm-kafka.ext.disabled=false
And container log should contain -Dorg.kie.server.jbpm-kafka.ext.bootstrap.servers=localhost:9092
And container log should contain -Dorg.kie.server.jbpm-kafka.ext.client.id=app
And container log should contain -Dorg.kie.server.jbpm-kafka.ext.group.id=jbpm-consumer
And container log should contain -Dorg.kie.server.jbpm-kafka.ext.acks=2
And container log should contain -Dorg.kie.server.jbpm-kafka.ext.max.block.ms=2000
And container log should contain -Dorg.kie.server.jbpm-kafka.ext.allow.auto.create.topics=true
And container log should contain -Dorg.kie.server.jbpm-kafka.ext.topics.person=human
And container log should contain -Dorg.kie.server.jbpm-kafka.ext.topics.dog=animal
And container log should contain mapping not configured, msg or topic name is empty. Value set [ant=]

Scenario: Check if the Kafka integration is enabled without bootstrapservers
When container is started with env
| variable | value |
| KIE_SERVER_KAFKA_EXT_ENABLED | true |
| KIE_SERVER_KAFKA_EXT_CLIENT_ID | app |
| KIE_SERVER_KAFKA_EXT_GROUP_ID | jbpm-consumer |
| KIE_SERVER_KAFKA_EXT_ACKS | 2 |
| KIE_SERVER_KAFKA_EXT_MAX_BLOCK_MS | 2000 |
| KIE_SERVER_KAFKA_EXT_AUTOCREATE_TOPICS | true |
| KIE_SERVER_KAFKA_EXT_TOPICS | person=human,dog=animal,ant= |
| SCRIPT_DEBUG | true |
Then container log should contain -Dorg.kie.server.jbpm-kafka.ext.disabled=true
And container log should contain Bootstrap servers not configured, kafka extension disabled

0 comments on commit f3c114a

Please sign in to comment.