Skip to content

Commit

Permalink
[JBPM-9706] Create Native SQL command to operate database to execute …
Browse files Browse the repository at this point in the history
…functions / SQL queries and stores in the execution results
  • Loading branch information
elguardian committed May 6, 2021
1 parent 88f93a6 commit 198a105
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 0 deletions.
@@ -0,0 +1,117 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jbpm.executor.commands;

import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;

import org.jbpm.process.core.timer.DateTimeUtils;
import org.jbpm.runtime.manager.impl.jpa.EntityManagerFactoryManager;
import org.jbpm.shared.services.impl.TransactionalCommandService;
import org.jbpm.shared.services.impl.commands.NativeQueryStringCommand;
import org.jbpm.shared.services.impl.commands.QueryStringCommand;
import org.kie.api.executor.Command;
import org.kie.api.executor.CommandContext;
import org.kie.api.executor.ExecutionResults;
import org.kie.api.executor.Reoccurring;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ExecuteSQLQueryCommand implements Command, Reoccurring {

private static final Logger logger = LoggerFactory.getLogger(ExecuteSQLQueryCommand.class);

private long nextScheduleTimeAdd = 1 * 60 * 60 * 1000; // one hour in milliseconds

@Override
public Date getScheduleTime() {
if (nextScheduleTimeAdd < 0) {
return null;
}

long current = System.currentTimeMillis();

Date nextSchedule = new Date(current + nextScheduleTimeAdd);
logger.debug("Next schedule for job {} is set to {}", this.getClass().getSimpleName(), nextSchedule);

return nextSchedule;
}

@Override
public ExecutionResults execute(CommandContext ctx) throws Exception {

computeNextScheduleTime(ctx);
EntityManagerFactory emf = getEntityManager(ctx);
TransactionalCommandService commandService = new TransactionalCommandService(emf);

String sql = (String) ctx.getData("SQL");
String paramsString = (String) ctx.getData("ParametersList");

Map<String, Object> parameters = new HashMap<>();
if(paramsString != null && !paramsString.isEmpty()) {
String []p = paramsString.split(",");
Arrays.stream(p).forEach(item -> parameters.put(item, ctx.getData(item)));
}

List<Object> data = commandService.execute(new NativeQueryStringCommand(sql, parameters));

ExecutionResults executionResults = new ExecutionResults();
executionResults.setData("size", data.size());

StringBuilder report = new StringBuilder();
for(Object item : data) {
if(item instanceof Object[]) {
for(Object cell : (Object[]) item) {
report.append(cell).append(",");
}
} else {
report.append(item);
}
report.append("\n");
}

executionResults.setData("data", report.toString());
return executionResults;
}

private EntityManagerFactory getEntityManager(CommandContext ctx) {
String emfName = (String) ctx.getData("EmfName");
if (emfName == null) {
emfName = "org.jbpm.domain";
}
return EntityManagerFactoryManager.get().getOrCreate(emfName);
}

private void computeNextScheduleTime(CommandContext ctx) {
String singleRun = (String) ctx.getData("SingleRun");
if ("true".equalsIgnoreCase(singleRun)) {
// disable rescheduling
this.nextScheduleTimeAdd = -1;
}
String nextRun = (String) ctx.getData("NextRun");
if (nextRun != null) {
nextScheduleTimeAdd = DateTimeUtils.parseDateAsDuration(nextRun);
}
}
}
@@ -0,0 +1,129 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jbpm.executor;

import java.io.IOException;
import java.util.List;
import java.util.UUID;

import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;

import org.jbpm.executor.impl.ExecutorServiceImpl;
import org.jbpm.executor.test.CountDownAsyncJobListener;
import org.jbpm.runtime.manager.impl.jpa.EntityManagerFactoryManager;
import org.jbpm.test.util.ExecutorTestUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.kie.api.executor.CommandContext;
import org.kie.api.executor.ExecutorService;
import org.kie.api.executor.RequestInfo;
import org.kie.api.executor.STATUS;
import org.kie.api.runtime.query.QueryContext;
import org.kie.test.util.db.PoolingDataSourceWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;


public class ExecuteSQLQueryCommandTest{

private static final Logger logger = LoggerFactory.getLogger(ExecuteSQLQueryCommandTest.class);

private ExecutorService executorService;

private EntityManagerFactory emf;
private PoolingDataSourceWrapper pds;

@Before
public void setUp() {
logger.debug("Setting up executor and data pool !");
pds = ExecutorTestUtil.setupPoolingDataSource();
emf = Persistence.createEntityManagerFactory("org.jbpm.executor");
EntityManagerFactoryManager.get().addEntityManagerFactory("org.jbpm.executor", emf);
executorService = ExecutorServiceFactory.newExecutorService(emf);

executorService.setThreadPoolSize(1);
executorService.setInterval(1);

executorService.init();
}

@After
public void tearDown() {
logger.debug("Destroying executor and data pool !");
executorService.clearAllRequests();
executorService.clearAllErrors();

System.clearProperty("org.kie.executor.msg.length");
System.clearProperty("org.kie.executor.stacktrace.length");
executorService.destroy();
emf.close();
pds.close();
}


protected CountDownAsyncJobListener configureListener(int threads) {
CountDownAsyncJobListener countDownListener = new CountDownAsyncJobListener(threads);
((ExecutorServiceImpl) executorService).addAsyncJobListener(countDownListener);
return countDownListener;
}

@Test(timeout=60000)
public void testSQLFunctionCommand() throws InterruptedException, ClassNotFoundException, IOException {
CountDownAsyncJobListener countDownListener = configureListener(1);

String firstBusinessKey = UUID.randomUUID().toString();
CommandContext ctxCMDprint = new CommandContext();
ctxCMDprint.setData("SingleRun", "true");
ctxCMDprint.setData("EmfName", "org.jbpm.executor");
ctxCMDprint.setData("businessKey", firstBusinessKey);
ctxCMDprint.setData("SQL", "SELECT POWER(2,4)");
executorService.scheduleRequest("org.jbpm.executor.commands.ExecuteSQLQueryCommand", ctxCMDprint);

countDownListener.waitTillCompleted();

List<RequestInfo> requests = executorService.getRequestsByBusinessKey(firstBusinessKey, new QueryContext());
assertNotNull(requests);
assertEquals(1, requests.size());
assertEquals(STATUS.DONE, requests.get(0).getStatus());
}

@Test(timeout=60000)
public void testSQLQueryCommand() throws InterruptedException, ClassNotFoundException, IOException {
CountDownAsyncJobListener countDownListener = configureListener(1);

String secondBusinessKey = UUID.randomUUID().toString();
CommandContext ctxCMD = new CommandContext();
ctxCMD.setData("SingleRun", "true");
ctxCMD.setData("EmfName", "org.jbpm.executor");
ctxCMD.setData("businessKey", secondBusinessKey);
ctxCMD.setData("SQL", "SELECT * FROM RequestInfo WHERE businessKey = :paramKey");
ctxCMD.setData("ParametersList", "paramKey");
ctxCMD.setData("paramKey", secondBusinessKey);

executorService.scheduleRequest("org.jbpm.executor.commands.ExecuteSQLQueryCommand", ctxCMD);
countDownListener.waitTillCompleted();

List<RequestInfo> requests = executorService.getRequestsByBusinessKey(secondBusinessKey, new QueryContext());
assertNotNull(requests);
assertEquals(1, requests.size());
assertEquals(STATUS.DONE, requests.get(0).getStatus());
}
}
Expand Up @@ -86,6 +86,13 @@ public <T> T queryStringInTransaction(String queryString, Class<T> clazz) {
}


public <T> T nativeQueryStringWithParametersInTransaction(String queryString, Map<String, Object> params, Class<T> clazz) {
check();
Query query = this.em.createNativeQuery(queryString);
params.forEach((key,value) -> query.setParameter(key,value));
return (T) query.getResultList();
}

public <T> T queryStringWithParametersInTransaction(String queryString,
Map<String, Object> params, Class<T> clazz) {
check();
Expand Down
@@ -0,0 +1,49 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jbpm.shared.services.impl.commands;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.jbpm.shared.services.impl.JpaPersistenceContext;
import org.kie.api.command.ExecutableCommand;
import org.kie.api.runtime.Context;

public class NativeQueryStringCommand implements ExecutableCommand<List<Object>> {

private static final long serialVersionUID = -4014807273522465028L;

private String queryName;
private Map<String, Object> params;

public NativeQueryStringCommand(String queryName) {
this.queryName = queryName;
}

public NativeQueryStringCommand(String queryName, Map<String, Object> params) {
this.queryName = queryName;
this.params = params;
}

@Override
public List<Object> execute(Context context) {
JpaPersistenceContext ctx = (JpaPersistenceContext) context;
return (List<Object>) ctx.nativeQueryStringWithParametersInTransaction(queryName, params == null ? new HashMap<>() : params, List.class);
}

}

0 comments on commit 198a105

Please sign in to comment.