Skip to content

Commit

Permalink
[JBPM-9706] Create Native SQL command to operate database to execute …
Browse files Browse the repository at this point in the history
…functions / SQL queries and stores in the execution results
  • Loading branch information
elguardian committed Apr 22, 2021
1 parent 5a59702 commit acea4cf
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 0 deletions.
@@ -0,0 +1,117 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jbpm.executor.commands;

import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;

import org.jbpm.process.core.timer.DateTimeUtils;
import org.jbpm.runtime.manager.impl.jpa.EntityManagerFactoryManager;
import org.jbpm.shared.services.impl.TransactionalCommandService;
import org.jbpm.shared.services.impl.commands.NativeQueryStringCommand;
import org.jbpm.shared.services.impl.commands.QueryStringCommand;
import org.kie.api.executor.Command;
import org.kie.api.executor.CommandContext;
import org.kie.api.executor.ExecutionResults;
import org.kie.api.executor.Reoccurring;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ExecuteSQLQueryCommand implements Command, Reoccurring {

private static final Logger logger = LoggerFactory.getLogger(ExecuteSQLQueryCommand.class);

private long nextScheduleTimeAdd = 1 * 60 * 60 * 1000; // one hour in milliseconds

@Override
public Date getScheduleTime() {
if (nextScheduleTimeAdd < 0) {
return null;
}

long current = System.currentTimeMillis();

Date nextSchedule = new Date(current + nextScheduleTimeAdd);
logger.debug("Next schedule for job {} is set to {}", this.getClass().getSimpleName(), nextSchedule);

return nextSchedule;
}

@Override
public ExecutionResults execute(CommandContext ctx) throws Exception {

computeNextScheduleTime(ctx);
EntityManagerFactory emf = getEntityManager(ctx);
TransactionalCommandService commandService = new TransactionalCommandService(emf);

String sql = (String) ctx.getData("SQL");
String paramsString = (String) ctx.getData("ParametersList");

Map<String, Object> parameters = new HashMap<>();
if(paramsString != null && !paramsString.isEmpty()) {
String []p = paramsString.split(",");
Arrays.stream(p).forEach(item -> parameters.put(item, ctx.getData(item)));
}

List<Object> data = commandService.execute(new NativeQueryStringCommand(sql, parameters));

ExecutionResults executionResults = new ExecutionResults();
executionResults.setData("size", data.size());

StringBuilder report = new StringBuilder();
for(Object item : data) {
if(item instanceof Object[]) {
for(Object cell : (Object[]) item) {
report.append(cell).append(",");
}
} else {
report.append(item);
}
report.append("\n");
}

executionResults.setData("data", report.toString());
return executionResults;
}

private EntityManagerFactory getEntityManager(CommandContext ctx) {
String emfName = (String) ctx.getData("EmfName");
if (emfName == null) {
emfName = "org.jbpm.domain";
}
return EntityManagerFactoryManager.get().getOrCreate(emfName);
}

private void computeNextScheduleTime(CommandContext ctx) {
String singleRun = (String) ctx.getData("SingleRun");
if ("true".equalsIgnoreCase(singleRun)) {
// disable rescheduling
this.nextScheduleTimeAdd = -1;
}
String nextRun = (String) ctx.getData("NextRun");
if (nextRun != null) {
nextScheduleTimeAdd = DateTimeUtils.parseDateAsDuration(nextRun);
}
}
}
Expand Up @@ -16,8 +16,14 @@
package org.jbpm.executor;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -37,6 +43,7 @@
import org.junit.BeforeClass;
import org.junit.Test;
import org.kie.api.executor.CommandContext;
import org.kie.api.executor.ExecutionResults;
import org.kie.api.executor.ExecutorService;
import org.kie.api.executor.RequestInfo;
import org.kie.api.executor.STATUS;
Expand Down Expand Up @@ -135,7 +142,44 @@ public void reoccurringExecutionTest() throws Exception {
assertEquals(0, queuedRequests.size());
List<RequestInfo> executedRequests = executorService.getCompletedRequests(new QueryContext());
assertEquals(4, executedRequests.size());
}

@Test(timeout=60000)
public void testSQLCommand() throws InterruptedException, ClassNotFoundException, IOException {
String businessKey = UUID.randomUUID().toString();

CountDownAsyncJobListener countDownListener = configureListener(1);
CommandContext ctxCMDprint = new CommandContext();
ctxCMDprint.setData("SingleRun", "true");
ctxCMDprint.setData("EmfName", "org.jbpm.executor");
ctxCMDprint.setData("businessKey", businessKey);
ctxCMDprint.setData("SQL", "SELECT POWER(2,4)");
executorService.scheduleRequest("org.jbpm.executor.commands.ExecuteSQLQueryCommand", ctxCMDprint);
countDownListener.waitTillCompleted();

CommandContext ctxCMD = new CommandContext();
ctxCMD.setData("SingleRun", "true");
ctxCMD.setData("EmfName", "org.jbpm.executor");
ctxCMD.setData("businessKey", businessKey);
ctxCMD.setData("SQL", "SELECT * FROM RequestInfo WHERE businessKey = :paramKey");
ctxCMD.setData("ParametersList", "paramKey");
ctxCMD.setData("paramKey", businessKey);

Date futureDate = new Date(System.currentTimeMillis());

countDownListener = configureListener(1);
executorService.scheduleRequest("org.jbpm.executor.commands.ExecuteSQLQueryCommand", futureDate, ctxCMD);
countDownListener.waitTillCompleted();

List<RequestInfo> requests = executorService.getRequestsByBusinessKey(businessKey, new QueryContext());
assertNotNull(requests);
assertEquals(2, requests.size());
assertEquals(STATUS.DONE, requests.get(0).getStatus());
try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(requests.get(1).getResponseData()))) {
ExecutionResults results = (ExecutionResults) ois.readObject();
String items = (String) results.getData().get("data");
assertFalse(items.isEmpty());
}
}

}
Expand Up @@ -86,6 +86,12 @@ public <T> T queryStringInTransaction(String queryString, Class<T> clazz) {
}


public <T> T nativeQueryStringWithParametersInTransaction(String queryString, Map<String, Object> params, Class<T> clazz) {
check();
Query query = this.em.createNativeQuery(queryString);
return queryStringWithParameters(params, false, null, clazz, query);
}

public <T> T queryStringWithParametersInTransaction(String queryString,
Map<String, Object> params, Class<T> clazz) {
check();
Expand Down
@@ -0,0 +1,49 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jbpm.shared.services.impl.commands;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.jbpm.shared.services.impl.JpaPersistenceContext;
import org.kie.api.command.ExecutableCommand;
import org.kie.api.runtime.Context;

public class NativeQueryStringCommand implements ExecutableCommand<List<Object>> {

private static final long serialVersionUID = -4014807273522465028L;

private String queryName;
private Map<String, Object> params;

public NativeQueryStringCommand(String queryName) {
this.queryName = queryName;
}

public NativeQueryStringCommand(String queryName, Map<String, Object> params) {
this.queryName = queryName;
this.params = params;
}

@Override
public List<Object> execute(Context context) {
JpaPersistenceContext ctx = (JpaPersistenceContext) context;
return (List<Object>) ctx.nativeQueryStringWithParametersInTransaction(queryName, params == null ? new HashMap<>() : params, List.class);
}

}

0 comments on commit acea4cf

Please sign in to comment.