Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JBPM-9699] Register WorkItemHandler when kafka extension is enabled #1918

Merged
merged 1 commit into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ public class SendTaskHandler implements WorkItemHandler {

private static final Logger logger = LoggerFactory.getLogger(SendTaskHandler.class);

@Override
public void executeWorkItem(WorkItem workItem, WorkItemManager manager) {
String message = (String) workItem.getParameter("Message");
logger.debug("Sending message: {}", message);
logger.debug("Sending message: {}", workItem.getParameter("Message"));
manager.completeWorkItem(workItem.getId(), null);
}

@Override
public void abortWorkItem(WorkItem workItem, WorkItemManager manager) {
// Do nothing, cannot be aborted
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,194 +16,18 @@

package org.jbpm.runtime.manager.impl.deploy;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Stack;

import org.kie.internal.runtime.conf.BuilderHandler;
import org.kie.internal.runtime.conf.DeploymentDescriptor;
import org.kie.internal.runtime.conf.DeploymentDescriptorBuilder;
import org.kie.internal.runtime.conf.MergeMode;
import org.kie.internal.runtime.conf.NamedObjectModel;
import org.kie.internal.runtime.conf.ObjectModel;

public class DeploymentDescriptorMerger {

public DeploymentDescriptor merge(List<DeploymentDescriptor> descriptorHierarchy, MergeMode mode) {
if (descriptorHierarchy == null || descriptorHierarchy.isEmpty()) {
throw new IllegalArgumentException("Descriptor hierarchy list cannot be empty");
}

if (descriptorHierarchy.size() == 1) {
return descriptorHierarchy.get(0);
}
Stack<DeploymentDescriptor> stack = new Stack<DeploymentDescriptor>();
stack.addAll(descriptorHierarchy);
if (mode == null) {
mode = MergeMode.MERGE_COLLECTIONS;
}

while (stack.size() > 1) {
DeploymentDescriptor master = stack.pop();
DeploymentDescriptor slave = stack.pop();
DeploymentDescriptor desc = merge(master, slave, mode);
// add merged one to be next iteration slave
stack.push(desc);
}
// last element from the stack is the one that contains all merged descriptors
return stack.pop();
return org.kie.internal.runtime.manager.deploy.DeploymentDescriptorMerger.merge(descriptorHierarchy, mode);
}

public DeploymentDescriptor merge(DeploymentDescriptor master, DeploymentDescriptor slave, MergeMode mode) {
if (master == null || slave == null) {
throw new IllegalArgumentException("Descriptors to merge must be provided");
}

DeploymentDescriptor merged = null;
DeploymentDescriptorBuilder builder = master.getBuilder();
builder.setBuildHandler(new MergeModeBuildHandler(mode));

switch (mode) {
case KEEP_ALL:
// do nothing as master wins
merged = master;
break;
case OVERRIDE_ALL:
// do nothing as slave wins
merged = slave;
break;
case OVERRIDE_EMPTY:
builder.auditMode(slave.getAuditMode());
builder.auditPersistenceUnit(slave.getAuditPersistenceUnit());
builder.persistenceMode(slave.getPersistenceMode());
builder.persistenceUnit(slave.getPersistenceUnit());
builder.runtimeStrategy(slave.getRuntimeStrategy());
builder.setConfiguration(slave.getConfiguration());
builder.setEnvironmentEntries(slave.getEnvironmentEntries());
builder.setEventListeners(slave.getEventListeners());
builder.setGlobals(slave.getGlobals());
builder.setMarshalingStrategies(slave.getMarshallingStrategies());
builder.setTaskEventListeners(slave.getTaskEventListeners());
builder.setWorkItemHandlers(slave.getWorkItemHandlers());
builder.setRequiredRoles(slave.getRequiredRoles());
builder.setClasses(slave.getClasses());
builder.setLimitSerializationClasses(slave.getLimitSerializationClasses());

merged = builder.get();
break;

case MERGE_COLLECTIONS:

builder.auditMode(slave.getAuditMode());
builder.auditPersistenceUnit(slave.getAuditPersistenceUnit());
builder.persistenceMode(slave.getPersistenceMode());
builder.persistenceUnit(slave.getPersistenceUnit());
builder.runtimeStrategy(slave.getRuntimeStrategy());

for (ObjectModel model : slave.getEventListeners()) {
builder.addEventListener(model);
}
for (ObjectModel model : slave.getMarshallingStrategies()) {
builder.addMarshalingStrategy(model);
}

// we need to keep the order of task listeners otherwise they will rise in different order
// so the master must be the latest ones
List<ObjectModel> taskEventListeners = new ArrayList<>(slave.getTaskEventListeners());
for(ObjectModel model : master.getTaskEventListeners()) {
if(!taskEventListeners.contains(model)) {
taskEventListeners.add(model);
}
}
builder.setTaskEventListeners(taskEventListeners);

for (NamedObjectModel model : slave.getConfiguration()) {
builder.addConfiguration(model);
}
for (NamedObjectModel model : slave.getEnvironmentEntries()) {
builder.addEnvironmentEntry(model);
}
for (NamedObjectModel model : slave.getGlobals()) {
builder.addGlobal(model);
}
for (NamedObjectModel model : slave.getWorkItemHandlers()) {
builder.addWorkItemHandler(model);
}
for (String requiredRole : slave.getRequiredRoles()) {
builder.addRequiredRole(requiredRole);
}
for (String clazz : slave.getClasses()) {
builder.addClass(clazz);
}
Boolean slaveLimit = slave.getLimitSerializationClasses();
Boolean masterLimit = master.getLimitSerializationClasses();
if( slaveLimit != null && masterLimit != null &&
(!slaveLimit || !masterLimit) ) {
builder.setLimitSerializationClasses(false);
}

merged = builder.get();
break;

default:
break;
}


return merged;
return org.kie.internal.runtime.manager.deploy.DeploymentDescriptorMerger.merge(master,slave, mode);
}

private class MergeModeBuildHandler implements BuilderHandler {

private MergeMode mode;

MergeModeBuildHandler(MergeMode mode) {
this.mode = mode;
}

@Override
public boolean accepted(Object value) {
boolean accepted = false;
switch (mode) {
case OVERRIDE_ALL:
accepted = true;
break;
case OVERRIDE_EMPTY:
if (!isEmpty(value)) {
accepted = true;
}
break;

case MERGE_COLLECTIONS:
if (!isEmpty(value)) {
accepted = true;
}
break;

default:
break;
}
return accepted;
}

protected boolean isEmpty(Object value) {
if (value == null) {
return true;
}

if (value instanceof String) {
return ((String) value).isEmpty();
}

if (value instanceof Collection<?>) {
return ((Collection<?>) value).isEmpty();
}

return false;
}

}


}