Skip to content

Commit

Permalink
JBPM-6590 - Isolation of Quartz scheduler / triggers per kjar (#1062)
Browse files Browse the repository at this point in the history
  • Loading branch information
mswiderski committed Nov 9, 2017
1 parent 3bafb87 commit 069af1e
Show file tree
Hide file tree
Showing 11 changed files with 453 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ public interface NamedJobContext extends JobContext {
String getJobName();

Long getProcessInstanceId();

default String getDeploymentId() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
*/
package org.jbpm.process.core.timer.impl;

import java.io.NotSerializableException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.drools.core.time.InternalSchedulerService;
import org.drools.core.time.Job;
import org.drools.core.time.JobContext;
Expand All @@ -29,6 +37,7 @@
import org.jbpm.process.core.timer.impl.GlobalTimerService.GlobalJobHandle;
import org.jbpm.process.instance.timer.TimerManager.ProcessJobContext;
import org.jbpm.process.instance.timer.TimerManager.StartProcessJobContext;
import org.kie.api.runtime.EnvironmentName;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
Expand All @@ -45,14 +54,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.NotSerializableException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* Quartz based <code>GlobalSchedulerService</code> that is configured according
* to Quartz rules and allows to store jobs in data base. With that it survives
Expand Down Expand Up @@ -86,23 +87,32 @@ public QuartzSchedulerService() {
public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {
Long id = idCounter.getAndIncrement();
String jobname = null;
String groupName = "jbpm";

if (ctx instanceof ProcessJobContext) {
ProcessJobContext processCtx = (ProcessJobContext) ctx;
jobname = processCtx.getSessionId() + "-" + processCtx.getProcessInstanceId() + "-" + processCtx.getTimer().getId();
if (processCtx instanceof StartProcessJobContext) {
jobname = "StartProcess-"+((StartProcessJobContext) processCtx).getProcessId()+ "-" + processCtx.getTimer().getId();
}
String deploymentId = (String)processCtx.getKnowledgeRuntime().getEnvironment().get(EnvironmentName.DEPLOYMENT_ID);
if (deploymentId != null) {
groupName = deploymentId;
}
} else if (ctx instanceof NamedJobContext) {
jobname = ((NamedJobContext) ctx).getJobName();
String deploymentId = ((NamedJobContext) ctx).getDeploymentId();
if (deploymentId != null) {
groupName = deploymentId;
}
} else {
jobname = "Timer-"+ctx.getClass().getSimpleName()+ "-" + id;

}
logger.debug("Scheduling timer with name " + jobname);
// check if this scheduler already has such job registered if so there is no need to schedule it again
try {
JobDetail jobDetail = scheduler.getJobDetail(jobname, "jbpm");
JobDetail jobDetail = scheduler.getJobDetail(jobname, groupName);

if (jobDetail != null) {
TimerJobInstance timerJobInstance = (TimerJobInstance) jobDetail.getJobDataMap().get("timerJobInstance");
Expand All @@ -111,7 +121,7 @@ public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {
} catch (SchedulerException e) {

}
GlobalQuartzJobHandle quartzJobHandle = new GlobalQuartzJobHandle(id, jobname, "jbpm");
GlobalQuartzJobHandle quartzJobHandle = new GlobalQuartzJobHandle(id, jobname, groupName);
TimerJobInstance jobInstance = globalTimerService.
getTimerJobFactoryManager().createTimerJobInstance( job,
ctx,
Expand Down Expand Up @@ -215,7 +225,7 @@ public synchronized void initScheduler(TimerService timerService) {

if (scheduler == null) {
try {
scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler.startDelayed(START_DELAY);
} catch (SchedulerException e) {
throw new RuntimeException("Exception when initializing QuartzSchedulerService", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jbpm.process.core.timer.impl.quartz;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;

import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
import org.quartz.utils.Key;
import org.slf4j.Logger;

public class DeploymentsAwarePostgreSQLDelegate extends PostgreSQLDelegate {

private QuartzUtils quartzUtils = new QuartzUtils();

public DeploymentsAwarePostgreSQLDelegate(Logger log, String tablePrefix, String instanceId, Boolean useProperties) {
super(log, tablePrefix, instanceId, useProperties);
}

public DeploymentsAwarePostgreSQLDelegate(Logger log, String tablePrefix, String instanceId) {
super(log, tablePrefix, instanceId);
}

@Override
public List selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan) throws SQLException {
PreparedStatement ps = null;
ResultSet rs = null;
List nextTriggers = new LinkedList();
try {
List<String> deploymentIds = quartzUtils.getDeployments();
ps = conn.prepareStatement(rtp(quartzUtils.nextTriggerQuery(deploymentIds)));

// Try to give jdbc driver a hint to hopefully not pull over
// more than the few rows we actually need.
ps.setFetchSize(5);
ps.setMaxRows(5);

ps.setString(1, STATE_WAITING);
ps.setBigDecimal(2, new BigDecimal(String.valueOf(noLaterThan)));
ps.setBigDecimal(3, new BigDecimal(String.valueOf(noEarlierThan)));
int index = 4;
for (String deployment : deploymentIds) {
ps.setString(index++, deployment);
}

rs = ps.executeQuery();

while (rs.next() && nextTriggers.size() < 5) {
nextTriggers.add(new Key(
rs.getString(COL_TRIGGER_NAME),
rs.getString(COL_TRIGGER_GROUP)));
}

return nextTriggers;
} finally {
closeResultSet(rs);
closeStatement(ps);
}
}

@Override
public int countMisfiredTriggersInStates(Connection conn, String state1, String state2, long ts) throws SQLException {
PreparedStatement ps = null;
ResultSet rs = null;

try {
List<String> deploymentIds = quartzUtils.getDeployments();

ps = conn.prepareStatement(rtp(quartzUtils.countMisfiredTriggersQuery(deploymentIds)));
ps.setBigDecimal(1, new BigDecimal(String.valueOf(ts)));
ps.setString(2, state1);
ps.setString(3, state2);
int index = 4;
for (String deployment : deploymentIds) {
ps.setString(index++, deployment);
}
rs = ps.executeQuery();

if (rs.next()) {
return rs.getInt(1);
}

throw new SQLException("No misfired trigger count returned.");
} finally {
closeResultSet(rs);
closeStatement(ps);
}
}

@Override
public boolean selectMisfiredTriggersInStates(Connection conn,
String state1,
String state2,
long ts,
int count,
List resultList) throws SQLException {
PreparedStatement ps = null;
ResultSet rs = null;

try {
List<String> deploymentIds = quartzUtils.getDeployments();

ps = conn.prepareStatement(rtp(quartzUtils.misfiredTriggersQuery(deploymentIds)));
ps.setBigDecimal(1, new BigDecimal(String.valueOf(ts)));
ps.setString(2, state1);
ps.setString(3, state2);
int index = 4;
for (String deployment : deploymentIds) {
ps.setString(index++, deployment);
}
rs = ps.executeQuery();

boolean hasReachedLimit = false;
while (rs.next() && (hasReachedLimit == false)) {
if (resultList.size() == count) {
hasReachedLimit = true;
} else {
String triggerName = rs.getString(COL_TRIGGER_NAME);
String groupName = rs.getString(COL_TRIGGER_GROUP);
resultList.add(new Key(triggerName, groupName));
}
}

return hasReachedLimit;
} finally {
closeResultSet(rs);
closeStatement(ps);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jbpm.process.core.timer.impl.quartz;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;

import org.quartz.impl.jdbcjobstore.StdJDBCDelegate;
import org.quartz.utils.Key;
import org.slf4j.Logger;


public class DeploymentsAwareStdJDBCDelegate extends StdJDBCDelegate {

private QuartzUtils quartzUtils = new QuartzUtils();

public DeploymentsAwareStdJDBCDelegate(Logger logger, String tablePrefix, String instanceId, Boolean useProperties) {
super(logger, tablePrefix, instanceId, useProperties);
}

public DeploymentsAwareStdJDBCDelegate(Logger logger, String tablePrefix, String instanceId) {
super(logger, tablePrefix, instanceId);
}

@Override
public List selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan)
throws SQLException {
PreparedStatement ps = null;
ResultSet rs = null;
List nextTriggers = new LinkedList();
try {
List<String> deploymentIds = quartzUtils.getDeployments();
ps = conn.prepareStatement(rtp(quartzUtils.nextTriggerQuery(deploymentIds)));

// Try to give jdbc driver a hint to hopefully not pull over
// more than the few rows we actually need.
ps.setFetchSize(5);
ps.setMaxRows(5);

ps.setString(1, STATE_WAITING);
ps.setBigDecimal(2, new BigDecimal(String.valueOf(noLaterThan)));
ps.setBigDecimal(3, new BigDecimal(String.valueOf(noEarlierThan)));
int index = 4;
for (String deployment : deploymentIds) {
ps.setString(index++, deployment);
}

rs = ps.executeQuery();

while (rs.next() && nextTriggers.size() < 5) {
nextTriggers.add(new Key(
rs.getString(COL_TRIGGER_NAME),
rs.getString(COL_TRIGGER_GROUP)));
}

return nextTriggers;
} finally {
closeResultSet(rs);
closeStatement(ps);
}
}

@Override
public int countMisfiredTriggersInStates(Connection conn, String state1, String state2, long ts) throws SQLException {
PreparedStatement ps = null;
ResultSet rs = null;

try {
List<String> deploymentIds = quartzUtils.getDeployments();

ps = conn.prepareStatement(rtp(quartzUtils.countMisfiredTriggersQuery(deploymentIds)));
ps.setBigDecimal(1, new BigDecimal(String.valueOf(ts)));
ps.setString(2, state1);
ps.setString(3, state2);
int index = 4;
for (String deployment : deploymentIds) {
ps.setString(index++, deployment);
}
rs = ps.executeQuery();

if (rs.next()) {
return rs.getInt(1);
}

throw new SQLException("No misfired trigger count returned.");
} finally {
closeResultSet(rs);
closeStatement(ps);
}
}

@Override
public boolean selectMisfiredTriggersInStates(Connection conn, String state1, String state2,
long ts, int count, List resultList) throws SQLException {
PreparedStatement ps = null;
ResultSet rs = null;

try {
List<String> deploymentIds = quartzUtils.getDeployments();

ps = conn.prepareStatement(rtp(quartzUtils.misfiredTriggersQuery(deploymentIds)));
ps.setBigDecimal(1, new BigDecimal(String.valueOf(ts)));
ps.setString(2, state1);
ps.setString(3, state2);
int index = 4;
for (String deployment : deploymentIds) {
ps.setString(index++, deployment);
}
rs = ps.executeQuery();

boolean hasReachedLimit = false;
while (rs.next() && (hasReachedLimit == false)) {
if (resultList.size() == count) {
hasReachedLimit = true;
} else {
String triggerName = rs.getString(COL_TRIGGER_NAME);
String groupName = rs.getString(COL_TRIGGER_GROUP);
resultList.add(new Key(triggerName, groupName));
}
}

return hasReachedLimit;
} finally {
closeResultSet(rs);
closeStatement(ps);
}
}
}
Loading

0 comments on commit 069af1e

Please sign in to comment.