Skip to content

Commit

Permalink
[JBPM-9738] Adding support for headers in WebServiceWorkItemHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Aug 5, 2021
1 parent bb94f5c commit b99a946
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 13 deletions.
@@ -0,0 +1,79 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jbpm.process.workitem.core.util;

import java.util.HashMap;
import java.util.Map;

public class WorkItemHeaderInfo {

private final String name;
private final Object content;
private final Map<String, Object> params;

public static class Builder {

private final String name;
private Object content;
private Map<String, Object> parameters;

private Builder(String name) {
this.name = name;
this.parameters = new HashMap<>();
}

public static Builder of(String name) {
return new Builder(name);
}

public Builder withContent(Object content) {
this.content = content;
return this;
}

public Builder withParam(String key, Object value) {
parameters.put(key, value);
return this;
}

public WorkItemHeaderInfo build() {
return new WorkItemHeaderInfo(name, content, parameters);
}
}

private WorkItemHeaderInfo(String name, Object content, Map<String, Object> params) {
this.name = name;
this.content = content;
this.params = params;
}

public String getName() {
return name;
}

public Object getContent() {
return content;
}

public Object getParam(String key) {
return params.get(key);
}

@Override
public String toString() {
return "WorkItemHeaderInfo [name=" + name + ", content=" + content + ", params=" + params + "]";
}
}
@@ -0,0 +1,54 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jbpm.process.workitem.core.util;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;

import org.jbpm.process.workitem.core.util.WorkItemHeaderInfo.Builder;
import org.kie.api.runtime.process.WorkItem;

public class WorkItemHeaderUtils {

private WorkItemHeaderUtils() {}

private static final String PREFIX = "HEADER_";

public static Collection<WorkItemHeaderInfo> getHeaderInfo(WorkItem workItem) {
Map<String, WorkItemHeaderInfo.Builder> map = new HashMap<>();
for (Entry<String, Object> param : workItem.getParameters().entrySet()) {
if (param.getKey().toUpperCase().startsWith(PREFIX)) {
String name = param.getKey().substring(PREFIX.length());
String paramKey = null;
int indexOf = name.lastIndexOf("_");
if (indexOf != -1) {
paramKey = name.substring(indexOf + 1);
name = name.substring(0, indexOf);
}
Builder builder = map.computeIfAbsent(name, Builder::of);
if (paramKey != null) {
builder.withParam(paramKey, param.getValue());
} else {
builder.withContent(param.getValue());
}
}
}
return map.values().stream().map(Builder::build).collect(Collectors.toList());
}
}
@@ -0,0 +1,47 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jbpm.process.workitem.core.util;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.junit.Test;
import org.kie.api.runtime.process.WorkItem;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class WorkItemHeaderUtilsTest {

@Test
public void testBuildHeaderList() {
WorkItem workItem = mock(WorkItem.class);
Map<String,Object> map = new HashMap<>();
map.put("HEADER_Pepito","fulanito");
map.put("header_Pepito_NS","http://pepito.com");
map.put("mamotreco","power");
when (workItem.getParameters()).thenReturn(map);
Collection<WorkItemHeaderInfo> headers = WorkItemHeaderUtils.getHeaderInfo(workItem);
assertEquals(1, headers.size());
WorkItemHeaderInfo header = headers.iterator().next();
assertEquals("Pepito", header.getName());
assertEquals("fulanito", header.getContent());
assertEquals("http://pepito.com", header.getParam("NS"));
}

}
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import javax.xml.namespace.QName;

Expand All @@ -38,6 +39,7 @@
import org.apache.cxf.endpoint.ClientCallback;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.endpoint.dynamic.DynamicClientFactory;
import org.apache.cxf.headers.Header;
import org.apache.cxf.jaxws.endpoint.dynamic.JaxWsDynamicClientFactory;
import org.apache.cxf.jaxws.interceptors.HolderInInterceptor;
import org.apache.cxf.jaxws.interceptors.WrapperClassInInterceptor;
Expand All @@ -52,6 +54,8 @@
import org.jbpm.process.workitem.core.util.WidMavenDepends;
import org.jbpm.process.workitem.core.util.WidParameter;
import org.jbpm.process.workitem.core.util.WidResult;
import org.jbpm.process.workitem.core.util.WorkItemHeaderInfo;
import org.jbpm.process.workitem.core.util.WorkItemHeaderUtils;
import org.jbpm.process.workitem.core.util.service.WidAction;
import org.jbpm.process.workitem.core.util.service.WidAuth;
import org.jbpm.process.workitem.core.util.service.WidService;
Expand Down Expand Up @@ -106,7 +110,7 @@ public class WebServiceWorkItemHandler extends AbstractLogOrThrowWorkItemHandler
private final Long defaultJbpmCxfClientConnectionTimeout = Long.parseLong(System.getProperty("org.jbpm.cxf.client.connectionTimeout", "30000"));
private final Long defaultJbpmCxfClientReceiveTimeout = Long.parseLong(System.getProperty("org.jbpm.cxf.client.receiveTimeout", "60000"));

private ConcurrentHashMap<String, Client> clients = new ConcurrentHashMap<String, Client>();
private ConcurrentHashMap<String, Client> clients = new ConcurrentHashMap<>();
private DynamicClientFactory dcf = null;
private KieSession ksession;
private int asyncTimeout = 10;
Expand Down Expand Up @@ -317,6 +321,7 @@ public WebServiceWorkItemHandler(String handlingProcessId,
this.handlingStrategy = handlingStrategy;
}

@Override
public void executeWorkItem(WorkItem workItem,
final WorkItemManager manager) {

Expand Down Expand Up @@ -397,6 +402,7 @@ public void executeWorkItem(WorkItem workItem,
}
new Thread(new Runnable() {

@Override
public void run() {

try {
Expand Down Expand Up @@ -513,22 +519,28 @@ protected Client getWSClient(WorkItem workItem, String interfaceRef) {
interfaceRef),
getInternalClassLoader(),
null);
setClientTimeout(workItem, client);
clients.put(interfaceRef,
client);
return client;
} catch (Exception e) {
logger.error("Error when creating WS Client",
e);
continue;
}
}
}
}
setClientTimeout(workItem, client);
Collection<WorkItemHeaderInfo> headers = WorkItemHeaderUtils.getHeaderInfo(workItem);
if (!headers.isEmpty()) {
client.getRequestContext().put(Header.HEADER_LIST, headers.stream().map(
this::buildHeader).collect(Collectors.toList()));
}
clients.put(interfaceRef, client);
return client;
} catch (Exception e) {
logger.error("Error when creating WS Client", e);
}
}
}
}
}
return null;
}

private Header buildHeader(WorkItemHeaderInfo header) {
return new Header(new QName(header.getParam("NS").toString(), header.getName()), header.getContent());
}

private void setClientTimeout(WorkItem workItem, Client client) {
HTTPConduit conduit = (HTTPConduit) client.getConduit();
HTTPClientPolicy policy = conduit.getClient();
Expand Down Expand Up @@ -556,6 +568,7 @@ protected synchronized DynamicClientFactory getDynamicClientFactory() {
return this.dcf;
}

@Override
public void abortWorkItem(WorkItem workItem,
WorkItemManager manager) {
// Do nothing, cannot be aborted
Expand Down

0 comments on commit b99a946

Please sign in to comment.