Skip to content

Commit

Permalink
JBPM-4531 - Upgrade Quartz to Version 2.x (currently to latest 2.2.3) (
Browse files Browse the repository at this point in the history
  • Loading branch information
mswiderski authored and Tihomir Surdilovic committed Jan 5, 2018
1 parent 9eb5b1b commit 46f6888
Show file tree
Hide file tree
Showing 27 changed files with 2,421 additions and 836 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
*/ */
package org.jbpm.process.core.timer.impl; package org.jbpm.process.core.timer.impl;


import static org.quartz.JobBuilder.newJob;
import static org.quartz.JobKey.jobKey;
import static org.quartz.TriggerBuilder.newTrigger;

import java.io.NotSerializableException; import java.io.NotSerializableException;
import java.io.Serializable; import java.io.Serializable;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -38,6 +44,7 @@
import org.jbpm.process.instance.timer.TimerManager.ProcessJobContext; import org.jbpm.process.instance.timer.TimerManager.ProcessJobContext;
import org.jbpm.process.instance.timer.TimerManager.StartProcessJobContext; import org.jbpm.process.instance.timer.TimerManager.StartProcessJobContext;
import org.kie.api.runtime.EnvironmentName; import org.kie.api.runtime.EnvironmentName;
import org.quartz.JobDataMap;
import org.quartz.JobDetail; import org.quartz.JobDetail;
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException; import org.quartz.JobExecutionException;
Expand All @@ -46,11 +53,9 @@
import org.quartz.Scheduler; import org.quartz.Scheduler;
import org.quartz.SchedulerException; import org.quartz.SchedulerException;
import org.quartz.SchedulerMetaData; import org.quartz.SchedulerMetaData;
import org.quartz.SimpleTrigger;
import org.quartz.impl.StdSchedulerFactory; import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.jdbcjobstore.JobStoreCMT; import org.quartz.impl.jdbcjobstore.JobStoreCMT;
import org.quartz.impl.jdbcjobstore.JobStoreSupport; import org.quartz.impl.jdbcjobstore.JobStoreSupport;
import org.quartz.spi.JobStore;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -112,7 +117,7 @@ public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {
logger.debug("Scheduling timer with name " + jobname); logger.debug("Scheduling timer with name " + jobname);
// check if this scheduler already has such job registered if so there is no need to schedule it again // check if this scheduler already has such job registered if so there is no need to schedule it again
try { try {
JobDetail jobDetail = scheduler.getJobDetail(jobname, groupName); JobDetail jobDetail = scheduler.getJobDetail(jobKey(jobname, groupName));


if (jobDetail != null) { if (jobDetail != null) {
TimerJobInstance timerJobInstance = (TimerJobInstance) jobDetail.getJobDataMap().get("timerJobInstance"); TimerJobInstance timerJobInstance = (TimerJobInstance) jobDetail.getJobDataMap().get("timerJobInstance");
Expand All @@ -134,14 +139,13 @@ public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {
return quartzJobHandle; return quartzJobHandle;
} }


@SuppressWarnings("unchecked")
@Override @Override
public boolean removeJob(JobHandle jobHandle) { public boolean removeJob(JobHandle jobHandle) {
GlobalQuartzJobHandle quartzJobHandle = (GlobalQuartzJobHandle) jobHandle; GlobalQuartzJobHandle quartzJobHandle = (GlobalQuartzJobHandle) jobHandle;


try { try {


boolean removed = scheduler.deleteJob(quartzJobHandle.getJobName(), quartzJobHandle.getJobGroup()); boolean removed = scheduler.deleteJob(jobKey(quartzJobHandle.getJobName(), quartzJobHandle.getJobGroup()));
return removed; return removed;
} catch (SchedulerException e) { } catch (SchedulerException e) {


Expand All @@ -165,9 +169,14 @@ public void internalSchedule(TimerJobInstance timerJobInstance) {


GlobalQuartzJobHandle quartzJobHandle = (GlobalQuartzJobHandle) timerJobInstance.getJobHandle(); GlobalQuartzJobHandle quartzJobHandle = (GlobalQuartzJobHandle) timerJobInstance.getJobHandle();
// Define job instance // Define job instance
JobDetail jobq = new JobDetail(quartzJobHandle.getJobName(), quartzJobHandle.getJobGroup(), QuartzJob.class); Map<String, Object> dataMap = new HashMap<>();
jobq.setRequestsRecovery(true); dataMap.put("timerJobInstance", timerJobInstance);
jobq.getJobDataMap().put("timerJobInstance", timerJobInstance);
JobDetail jobq = newJob(QuartzJob.class)
.withIdentity(quartzJobHandle.getJobName(), quartzJobHandle.getJobGroup())
.requestRecovery()
.usingJobData(new JobDataMap(dataMap))
.build();


// Amend nextFireTime not to schedule older than now + RESCHEDULE_DELAY // Amend nextFireTime not to schedule older than now + RESCHEDULE_DELAY
Date nextFireTime = timerJobInstance.getTrigger().hasNextFireTime(); Date nextFireTime = timerJobInstance.getTrigger().hasNextFireTime();
Expand All @@ -178,24 +187,26 @@ public void internalSchedule(TimerJobInstance timerJobInstance) {
} }


// Define a Trigger that will fire "now" // Define a Trigger that will fire "now"
org.quartz.Trigger triggerq = new SimpleTrigger(quartzJobHandle.getJobName()+"_trigger", quartzJobHandle.getJobGroup(), nextFireTime); org.quartz.Trigger triggerq = newTrigger()
logger.debug("triggerq.name = {}, triggerq.startTime = {}", triggerq.getName(), triggerq.getStartTime()); // nextFireTime is mapped to startTime .withIdentity(quartzJobHandle.getJobName()+"_trigger", quartzJobHandle.getJobGroup())
.startAt(nextFireTime)
.build();

logger.debug("triggerq.name = {}, triggerq.startTime = {}", triggerq.getKey().getName(), triggerq.getStartTime()); // nextFireTime is mapped to startTime


// Schedule the job with the trigger // Schedule the job with the trigger
try { try {
if (scheduler.isShutdown()) { if (scheduler.isShutdown()) {
return; return;
} }
globalTimerService.getTimerJobFactoryManager().addTimerJobInstance( timerJobInstance ); globalTimerService.getTimerJobFactoryManager().addTimerJobInstance( timerJobInstance );
JobDetail jobDetail = scheduler.getJobDetail(quartzJobHandle.getJobName(), quartzJobHandle.getJobGroup()); JobDetail jobDetail = scheduler.getJobDetail(jobKey(quartzJobHandle.getJobName(), quartzJobHandle.getJobGroup()));
if (jobDetail == null) { if (jobDetail == null) {
scheduler.scheduleJob(jobq, triggerq); scheduler.scheduleJob(jobq, triggerq);
} else { } else {
// need to add the job again to replace existing especially important if jobs are persisted in db // need to add the job again to replace existing especially important if jobs are persisted in db
scheduler.addJob(jobq, true); scheduler.addJob(jobq, true, true);
triggerq.setJobName(quartzJobHandle.getJobName()); scheduler.rescheduleJob(triggerq.getKey(), triggerq);
triggerq.setJobGroup(quartzJobHandle.getJobGroup());
scheduler.rescheduleJob(quartzJobHandle.getJobName()+"_trigger", quartzJobHandle.getJobGroup(), triggerq);
} }


} catch (ObjectAlreadyExistsException e) { } catch (ObjectAlreadyExistsException e) {
Expand Down Expand Up @@ -353,7 +364,7 @@ public void execute(JobExecutionContext quartzContext) throws JobExecutionExcept
failedCount++; failedCount++;
quartzContext.getJobDetail().getJobDataMap().put("failedCount", failedCount); quartzContext.getJobDetail().getJobDataMap().put("failedCount", failedCount);
if (failedCount > FAILED_JOB_RETRIES) { if (failedCount > FAILED_JOB_RETRIES) {
logger.error("Timer execution failed {} times in a roll, unscheduling ({})", FAILED_JOB_RETRIES, quartzContext.getJobDetail().getFullName()); logger.error("Timer execution failed {} times in a roll, unscheduling ({})", FAILED_JOB_RETRIES, quartzContext.getJobDetail().getKey());
reschedule = false; reschedule = false;
} }
// let's give it a bit of time before failing/retrying // let's give it a bit of time before failing/retrying
Expand Down Expand Up @@ -432,11 +443,10 @@ public JobHandle buildJobHandleForContext(NamedJobContext ctx) {
return new GlobalQuartzJobHandle(-1, ctx.getJobName(), "jbpm"); return new GlobalQuartzJobHandle(-1, ctx.getJobName(), "jbpm");
} }


@SuppressWarnings("unchecked")
@Override @Override
public boolean isTransactional() { public boolean isTransactional() {
try { try {
Class<JobStore> jobStoreClass = scheduler.getMetaData().getJobStoreClass(); Class<?> jobStoreClass = scheduler.getMetaData().getJobStoreClass();
if (JobStoreSupport.class.isAssignableFrom(jobStoreClass)) { if (JobStoreSupport.class.isAssignableFrom(jobStoreClass)) {
return true; return true;
} }
Expand Down Expand Up @@ -464,7 +474,7 @@ public boolean isValid(GlobalJobHandle jobHandle) {
} }
JobDetail jobDetail = null; JobDetail jobDetail = null;
try { try {
jobDetail = scheduler.getJobDetail(((GlobalQuartzJobHandle)jobHandle).getJobName(), ((GlobalQuartzJobHandle)jobHandle).getJobGroup()); jobDetail = scheduler.getJobDetail(jobKey(((GlobalQuartzJobHandle)jobHandle).getJobName(), ((GlobalQuartzJobHandle)jobHandle).getJobGroup()));
} catch (SchedulerException e) { } catch (SchedulerException e) {
logger.warn("Cannot fetch job detail for job handle {}", jobHandle); logger.warn("Cannot fetch job detail for job handle {}", jobHandle);
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@


package org.jbpm.process.core.timer.impl.quartz; package org.jbpm.process.core.timer.impl.quartz;


import static org.quartz.TriggerKey.triggerKey;

import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
Expand All @@ -24,50 +26,46 @@
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;


import org.quartz.TriggerKey;
import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate; import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
import org.quartz.utils.Key;
import org.slf4j.Logger;


public class DeploymentsAwarePostgreSQLDelegate extends PostgreSQLDelegate { public class DeploymentsAwarePostgreSQLDelegate extends PostgreSQLDelegate {


private QuartzUtils quartzUtils = new QuartzUtils(); private QuartzUtils quartzUtils = new QuartzUtils();


public DeploymentsAwarePostgreSQLDelegate(Logger log, String tablePrefix, String instanceId, Boolean useProperties) {
super(log, tablePrefix, instanceId, useProperties);
}

public DeploymentsAwarePostgreSQLDelegate(Logger log, String tablePrefix, String instanceId) {
super(log, tablePrefix, instanceId);
}

@Override @Override
public List selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan) throws SQLException { public List<TriggerKey> selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount) throws SQLException {
PreparedStatement ps = null; PreparedStatement ps = null;
ResultSet rs = null; ResultSet rs = null;
List nextTriggers = new LinkedList(); List<TriggerKey> nextTriggers = new LinkedList<TriggerKey>();
try { try {
List<String> deploymentIds = quartzUtils.getDeployments(); List<String> deploymentIds = quartzUtils.getDeployments();
ps = conn.prepareStatement(rtp(quartzUtils.nextTriggerQuery(deploymentIds))); ps = conn.prepareStatement(rtp(quartzUtils.nextTriggerQuery(deploymentIds)));


// Try to give jdbc driver a hint to hopefully not pull over // Set max rows to retrieve
// more than the few rows we actually need. if (maxCount < 1)
ps.setFetchSize(5); maxCount = 1; // we want at least one trigger back.
ps.setMaxRows(5); ps.setMaxRows(maxCount);

// Try to give jdbc driver a hint to hopefully not pull over more than the few rows we actually need.
// Note: in some jdbc drivers, such as MySQL, you must set maxRows before fetchSize, or you get exception!
ps.setFetchSize(maxCount);


ps.setString(1, STATE_WAITING); ps.setString(1, STATE_WAITING);
ps.setBigDecimal(2, new BigDecimal(String.valueOf(noLaterThan))); ps.setBigDecimal(2, new BigDecimal(String.valueOf(noLaterThan)));
ps.setBigDecimal(3, new BigDecimal(String.valueOf(noEarlierThan))); ps.setBigDecimal(3, new BigDecimal(String.valueOf(noEarlierThan)));

int index = 4; int index = 4;
for (String deployment : deploymentIds) { for (String deployment : deploymentIds) {
ps.setString(index++, deployment); ps.setString(index++, deployment);
} }


rs = ps.executeQuery(); rs = ps.executeQuery();

while (rs.next() && nextTriggers.size() < 5) { while (rs.next() && nextTriggers.size() <= maxCount) {
nextTriggers.add(new Key( nextTriggers.add(triggerKey(
rs.getString(COL_TRIGGER_NAME), rs.getString(COL_TRIGGER_NAME),
rs.getString(COL_TRIGGER_GROUP))); rs.getString(COL_TRIGGER_GROUP)));
} }


return nextTriggers; return nextTriggers;
Expand All @@ -76,23 +74,23 @@ public List selectTriggerToAcquire(Connection conn, long noLaterThan, long noEar
closeStatement(ps); closeStatement(ps);
} }
} }

@Override @Override
public int countMisfiredTriggersInStates(Connection conn, String state1, String state2, long ts) throws SQLException { public int countMisfiredTriggersInState(Connection conn, String state1, long ts) throws SQLException {
PreparedStatement ps = null; PreparedStatement ps = null;
ResultSet rs = null; ResultSet rs = null;


try { try {
List<String> deploymentIds = quartzUtils.getDeployments(); List<String> deploymentIds = quartzUtils.getDeployments();

ps = conn.prepareStatement(rtp(quartzUtils.countMisfiredTriggersQuery(deploymentIds))); ps = conn.prepareStatement(rtp(quartzUtils.countMisfiredTriggersQuery(deploymentIds)));
ps.setBigDecimal(1, new BigDecimal(String.valueOf(ts))); ps.setBigDecimal(1, new BigDecimal(String.valueOf(ts)));
ps.setString(2, state1); ps.setString(2, state1);
ps.setString(3, state2); int index = 3;
int index = 4;
for (String deployment : deploymentIds) { for (String deployment : deploymentIds) {
ps.setString(index++, deployment); ps.setString(index++, deployment);
} }

rs = ps.executeQuery(); rs = ps.executeQuery();


if (rs.next()) { if (rs.next()) {
Expand All @@ -105,14 +103,11 @@ public int countMisfiredTriggersInStates(Connection conn, String state1, String
closeStatement(ps); closeStatement(ps);
} }
} }

@Override @Override
public boolean selectMisfiredTriggersInStates(Connection conn, public List<TriggerKey> selectMisfiredTriggersInState(Connection conn,
String state1, String state,
String state2, long ts) throws SQLException {
long ts,
int count,
List resultList) throws SQLException {
PreparedStatement ps = null; PreparedStatement ps = null;
ResultSet rs = null; ResultSet rs = null;


Expand All @@ -121,26 +116,20 @@ public boolean selectMisfiredTriggersInStates(Connection conn,


ps = conn.prepareStatement(rtp(quartzUtils.misfiredTriggersQuery(deploymentIds))); ps = conn.prepareStatement(rtp(quartzUtils.misfiredTriggersQuery(deploymentIds)));
ps.setBigDecimal(1, new BigDecimal(String.valueOf(ts))); ps.setBigDecimal(1, new BigDecimal(String.valueOf(ts)));
ps.setString(2, state1); ps.setString(2, state);
ps.setString(3, state2); int index = 3;
int index = 4;
for (String deployment : deploymentIds) { for (String deployment : deploymentIds) {
ps.setString(index++, deployment); ps.setString(index++, deployment);
} }
rs = ps.executeQuery(); rs = ps.executeQuery();


boolean hasReachedLimit = false; LinkedList<TriggerKey> list = new LinkedList<TriggerKey>();
while (rs.next() && (hasReachedLimit == false)) { while (rs.next()) {
if (resultList.size() == count) { String triggerName = rs.getString(COL_TRIGGER_NAME);
hasReachedLimit = true; String groupName = rs.getString(COL_TRIGGER_GROUP);
} else { list.add(triggerKey(triggerName, groupName));
String triggerName = rs.getString(COL_TRIGGER_NAME);
String groupName = rs.getString(COL_TRIGGER_GROUP);
resultList.add(new Key(triggerName, groupName));
}
} }

return list;
return hasReachedLimit;
} finally { } finally {
closeResultSet(rs); closeResultSet(rs);
closeStatement(ps); closeStatement(ps);
Expand Down
Loading

0 comments on commit 46f6888

Please sign in to comment.