Skip to content

Commit

Permalink
[DROOLS-795] fix timers serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
mariofusco committed Jul 7, 2015
1 parent 5404f83 commit 57c29a8
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 99 deletions.
Expand Up @@ -15,24 +15,18 @@
*/ */
package org.jbpm.process.core.timer.impl; package org.jbpm.process.core.timer.impl;


import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

import org.drools.core.command.CommandService; import org.drools.core.command.CommandService;
import org.drools.core.command.impl.CommandBasedStatefulKnowledgeSession; import org.drools.core.command.impl.CommandBasedStatefulKnowledgeSession;
import org.drools.core.command.impl.KnowledgeCommandContext; import org.drools.core.command.impl.KnowledgeCommandContext;
import org.drools.core.common.InternalKnowledgeRuntime; import org.drools.core.common.InternalKnowledgeRuntime;
import org.drools.core.time.AcceptsTimerJobFactoryManager;
import org.drools.core.time.InternalSchedulerService; import org.drools.core.time.InternalSchedulerService;
import org.drools.core.time.Job; import org.drools.core.time.Job;
import org.drools.core.time.JobContext; import org.drools.core.time.JobContext;
import org.drools.core.time.JobHandle; import org.drools.core.time.JobHandle;
import org.drools.core.time.SelfRemovalJobContext; import org.drools.core.time.SelfRemovalJobContext;
import org.drools.core.time.TimerService; import org.drools.core.time.TimerService;
import org.drools.core.time.Trigger; import org.drools.core.time.Trigger;
import org.drools.core.time.impl.CommandServiceTimerJobFactoryManager;
import org.drools.core.time.impl.DefaultJobHandle; import org.drools.core.time.impl.DefaultJobHandle;
import org.drools.core.time.impl.TimerJobFactoryManager; import org.drools.core.time.impl.TimerJobFactoryManager;
import org.drools.core.time.impl.TimerJobInstance; import org.drools.core.time.impl.TimerJobInstance;
Expand All @@ -49,8 +43,14 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;



public class GlobalTimerService implements TimerService, InternalSchedulerService, AcceptsTimerJobFactoryManager { public class GlobalTimerService implements TimerService, InternalSchedulerService {


private static final Logger logger = LoggerFactory.getLogger(GlobalTimerService.class); private static final Logger logger = LoggerFactory.getLogger(GlobalTimerService.class);


Expand Down Expand Up @@ -179,8 +179,10 @@ public void internalSchedule(TimerJobInstance timerJobInstance) {


@Override @Override
public void setTimerJobFactoryManager(TimerJobFactoryManager timerJobFactoryManager) { public void setTimerJobFactoryManager(TimerJobFactoryManager timerJobFactoryManager) {
if (this.jobFactoryManager.getCommandService() == null) { if (jobFactoryManager instanceof CommandServiceTimerJobFactoryManager &&
this.jobFactoryManager.setCommandService(timerJobFactoryManager.getCommandService()); timerJobFactoryManager instanceof CommandServiceTimerJobFactoryManager &&
getCommandService() == null) {
( (CommandServiceTimerJobFactoryManager) jobFactoryManager ).setCommandService( ( (CommandServiceTimerJobFactoryManager) timerJobFactoryManager ).getCommandService() );
} }
} }


Expand All @@ -200,7 +202,7 @@ public CommandService getCommandService(JobContext jobContext) {
} else if(ctxorig instanceof NamedJobContext){ } else if(ctxorig instanceof NamedJobContext){
return getCommandService(((NamedJobContext)ctxorig).getProcessInstanceId(), ctx); return getCommandService(((NamedJobContext)ctxorig).getProcessInstanceId(), ctx);
} else { } else {
return jobFactoryManager.getCommandService(); return getCommandService();
} }


return getCommandService(ctx.getProcessInstanceId(), ctx); return getCommandService(ctx.getProcessInstanceId(), ctx);
Expand Down Expand Up @@ -238,9 +240,14 @@ protected CommandService getCommandService(Long processInstanceId, ProcessJobCon
ctx.setKnowledgeRuntime((InternalKnowledgeRuntime) runtime.getKieSession()); ctx.setKnowledgeRuntime((InternalKnowledgeRuntime) runtime.getKieSession());
} }


return new DisposableCommandService(jobFactoryManager.getCommandService(), manager, runtime, schedulerService.retryEnabled()); return new DisposableCommandService(getCommandService(), manager, runtime, schedulerService.retryEnabled());
} }


private CommandService getCommandService() {
return jobFactoryManager instanceof CommandServiceTimerJobFactoryManager ?
( (CommandServiceTimerJobFactoryManager) jobFactoryManager ).getCommandService() :
null;
}


public static class GlobalJobHandle extends DefaultJobHandle public static class GlobalJobHandle extends DefaultJobHandle
implements implements
Expand Down
Expand Up @@ -15,14 +15,6 @@
*/ */
package org.jbpm.process.core.timer.impl; package org.jbpm.process.core.timer.impl;


import java.io.NotSerializableException;
import java.io.Serializable;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.drools.core.time.AcceptsTimerJobFactoryManager;
import org.drools.core.time.InternalSchedulerService; import org.drools.core.time.InternalSchedulerService;
import org.drools.core.time.Job; import org.drools.core.time.Job;
import org.drools.core.time.JobContext; import org.drools.core.time.JobContext;
Expand Down Expand Up @@ -53,6 +45,13 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.io.NotSerializableException;
import java.io.Serializable;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/** /**
* Quartz based <code>GlobalSchedulerService</code> that is configured according * Quartz based <code>GlobalSchedulerService</code> that is configured according
* to Quartz rules and allows to store jobs in data base. With that it survives * to Quartz rules and allows to store jobs in data base. With that it survives
Expand Down Expand Up @@ -108,7 +107,7 @@ public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {


} }
GlobalQuartzJobHandle quartzJobHandle = new GlobalQuartzJobHandle(id, jobname, "jbpm"); GlobalQuartzJobHandle quartzJobHandle = new GlobalQuartzJobHandle(id, jobname, "jbpm");
TimerJobInstance jobInstance = ((AcceptsTimerJobFactoryManager) globalTimerService). TimerJobInstance jobInstance = globalTimerService.
getTimerJobFactoryManager().createTimerJobInstance( job, getTimerJobFactoryManager().createTimerJobInstance( job,
ctx, ctx,
trigger, trigger,
Expand Down Expand Up @@ -163,7 +162,7 @@ public void internalSchedule(TimerJobInstance timerJobInstance) {
if (scheduler.isShutdown()) { if (scheduler.isShutdown()) {
return; return;
} }
((AcceptsTimerJobFactoryManager) globalTimerService).getTimerJobFactoryManager().addTimerJobInstance( timerJobInstance ); globalTimerService.getTimerJobFactoryManager().addTimerJobInstance( timerJobInstance );
JobDetail jobDetail = scheduler.getJobDetail(quartzJobHandle.getJobName(), quartzJobHandle.getJobGroup()); JobDetail jobDetail = scheduler.getJobDetail(quartzJobHandle.getJobName(), quartzJobHandle.getJobGroup());
if (jobDetail == null) { if (jobDetail == null) {
scheduler.scheduleJob(jobq, triggerq); scheduler.scheduleJob(jobq, triggerq);
Expand All @@ -186,11 +185,11 @@ public void internalSchedule(TimerJobInstance timerJobInstance) {
// in case job cannot be persisted, like rule timer then make it in memory // in case job cannot be persisted, like rule timer then make it in memory
internalSchedule(new InmemoryTimerJobInstanceDelegate(quartzJobHandle.getJobName(), ((GlobalTimerService) globalTimerService).getTimerServiceId())); internalSchedule(new InmemoryTimerJobInstanceDelegate(quartzJobHandle.getJobName(), ((GlobalTimerService) globalTimerService).getTimerServiceId()));
} else { } else {
((AcceptsTimerJobFactoryManager) globalTimerService).getTimerJobFactoryManager().removeTimerJobInstance(timerJobInstance); globalTimerService.getTimerJobFactoryManager().removeTimerJobInstance(timerJobInstance);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} catch (SchedulerException e) { } catch (SchedulerException e) {
((AcceptsTimerJobFactoryManager) globalTimerService).getTimerJobFactoryManager().removeTimerJobInstance(timerJobInstance); globalTimerService.getTimerJobFactoryManager().removeTimerJobInstance(timerJobInstance);
throw new RuntimeException("Exception while scheduling job", e); throw new RuntimeException("Exception while scheduling job", e);
} }
} }
Expand Down Expand Up @@ -384,7 +383,7 @@ public JobContext getJobContext() {


protected void findDelegate() { protected void findDelegate() {
if (delegate == null) { if (delegate == null) {
Collection<TimerJobInstance> timers = ((AcceptsTimerJobFactoryManager)TimerServiceRegistry.getInstance().get(timerServiceId)) Collection<TimerJobInstance> timers = TimerServiceRegistry.getInstance().get(timerServiceId)
.getTimerJobFactoryManager().getTimerJobInstances(); .getTimerJobFactoryManager().getTimerJobInstances();
for (TimerJobInstance instance : timers) { for (TimerJobInstance instance : timers) {
if (((GlobalQuartzJobHandle)instance.getJobHandle()).getJobName().equals(jobname)) { if (((GlobalQuartzJobHandle)instance.getJobHandle()).getJobName().equals(jobname)) {
Expand Down
Expand Up @@ -15,9 +15,6 @@
*/ */
package org.jbpm.process.core.timer.impl; package org.jbpm.process.core.timer.impl;


import java.util.Collection;

import org.drools.core.time.AcceptsTimerJobFactoryManager;
import org.drools.core.time.InternalSchedulerService; import org.drools.core.time.InternalSchedulerService;
import org.drools.core.time.Job; import org.drools.core.time.Job;
import org.drools.core.time.JobContext; import org.drools.core.time.JobContext;
Expand All @@ -29,6 +26,8 @@
import org.jbpm.process.core.timer.TimerServiceRegistry; import org.jbpm.process.core.timer.TimerServiceRegistry;
import org.kie.api.time.SessionClock; import org.kie.api.time.SessionClock;


import java.util.Collection;

/** /**
* Simple delegate for timer service that fetches the real instance of timer service from * Simple delegate for timer service that fetches the real instance of timer service from
* TimerServiceRegistry under "default" key. * TimerServiceRegistry under "default" key.
Expand All @@ -44,8 +43,7 @@
* </code> * </code>
* *
*/ */
public class RegisteredTimerServiceDelegate implements TimerService, InternalSchedulerService, public class RegisteredTimerServiceDelegate implements TimerService, InternalSchedulerService, SessionClock {
AcceptsTimerJobFactoryManager, SessionClock {


private TimerService timerService; private TimerService timerService;


Expand Down Expand Up @@ -74,12 +72,12 @@ public boolean removeJob(JobHandle jobHandle) {
@Override @Override
public void setTimerJobFactoryManager( public void setTimerJobFactoryManager(
TimerJobFactoryManager timerJobFactoryManager) { TimerJobFactoryManager timerJobFactoryManager) {
((AcceptsTimerJobFactoryManager)timerService).setTimerJobFactoryManager(timerJobFactoryManager); timerService.setTimerJobFactoryManager(timerJobFactoryManager);
} }


@Override @Override
public TimerJobFactoryManager getTimerJobFactoryManager() { public TimerJobFactoryManager getTimerJobFactoryManager() {
return ((AcceptsTimerJobFactoryManager)timerService).getTimerJobFactoryManager(); return timerService.getTimerJobFactoryManager();
} }


@Override @Override
Expand Down
Expand Up @@ -15,16 +15,6 @@
*/ */
package org.jbpm.process.core.timer.impl; package org.jbpm.process.core.timer.impl;


import java.io.Serializable;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.drools.core.time.AcceptsTimerJobFactoryManager;
import org.drools.core.time.InternalSchedulerService; import org.drools.core.time.InternalSchedulerService;
import org.drools.core.time.Job; import org.drools.core.time.Job;
import org.drools.core.time.JobContext; import org.drools.core.time.JobContext;
Expand All @@ -40,6 +30,15 @@
import org.jbpm.process.instance.timer.TimerManager.ProcessJobContext; import org.jbpm.process.instance.timer.TimerManager.ProcessJobContext;
import org.jbpm.process.instance.timer.TimerManager.StartProcessJobContext; import org.jbpm.process.instance.timer.TimerManager.StartProcessJobContext;


import java.io.Serializable;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/** /**
* ThreadPool based scheduler service backed by <code>ThreadPoolSchedulerService</code> * ThreadPool based scheduler service backed by <code>ThreadPoolSchedulerService</code>
* *
Expand Down Expand Up @@ -99,7 +98,7 @@ public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {
} }
GlobalJDKJobHandle jobHandle = new GlobalJDKJobHandle( idCounter.getAndIncrement() ); GlobalJDKJobHandle jobHandle = new GlobalJDKJobHandle( idCounter.getAndIncrement() );


TimerJobInstance jobInstance = ((AcceptsTimerJobFactoryManager) globalTimerService). TimerJobInstance jobInstance = globalTimerService.
getTimerJobFactoryManager().createTimerJobInstance( job, getTimerJobFactoryManager().createTimerJobInstance( job,
ctx, ctx,
trigger, trigger,
Expand Down Expand Up @@ -137,8 +136,7 @@ public boolean removeJob(JobHandle jobHandle) {
jobname = "StartProcess-"+((StartProcessJobContext) processCtx).getProcessId()+ "-" + processCtx.getTimer().getId(); jobname = "StartProcess-"+((StartProcessJobContext) processCtx).getProcessId()+ "-" + processCtx.getTimer().getId();
} }
activeTimer.remove(jobname); activeTimer.remove(jobname);
((AcceptsTimerJobFactoryManager) globalTimerService).getTimerJobFactoryManager(). globalTimerService.getTimerJobFactoryManager().removeTimerJobInstance( ((GlobalJDKJobHandle) jobHandle).getTimerJobInstance() );
removeTimerJobInstance( ((GlobalJDKJobHandle) jobHandle).getTimerJobInstance() );
} catch (ClassCastException e) { } catch (ClassCastException e) {
// do nothing in case ProcessJobContext was not given // do nothing in case ProcessJobContext was not given
} }
Expand Down Expand Up @@ -169,7 +167,7 @@ public void internalSchedule(TimerJobInstance timerJobInstance) {
} }


jobHandle.setFuture( future ); jobHandle.setFuture( future );
((AcceptsTimerJobFactoryManager) globalTimerService).getTimerJobFactoryManager().addTimerJobInstance( timerJobInstance ); globalTimerService.getTimerJobFactoryManager().addTimerJobInstance( timerJobInstance );
} }


public static class GlobalJDKJobHandle extends GlobalJobHandle implements Serializable { public static class GlobalJDKJobHandle extends GlobalJobHandle implements Serializable {
Expand Down
Expand Up @@ -28,11 +28,11 @@
import org.drools.core.marshalling.impl.MarshallerWriteContext; import org.drools.core.marshalling.impl.MarshallerWriteContext;
import org.drools.core.marshalling.impl.ProtobufMessages.ActionQueue.Action; import org.drools.core.marshalling.impl.ProtobufMessages.ActionQueue.Action;
import org.drools.core.phreak.PropagationEntry; import org.drools.core.phreak.PropagationEntry;
import org.drools.core.time.AcceptsTimerJobFactoryManager;
import org.drools.core.time.TimeUtils; import org.drools.core.time.TimeUtils;
import org.drools.core.time.TimerService;
import org.drools.core.time.impl.CommandServiceTimerJobFactoryManager;
import org.drools.core.time.impl.CronExpression; import org.drools.core.time.impl.CronExpression;
import org.drools.core.time.impl.DefaultTimerJobFactoryManager; import org.drools.core.time.impl.ThreadSafeTrackableTimeJobFactoryManager;
import org.drools.core.time.impl.TrackableTimeJobFactoryManager;
import org.jbpm.process.core.event.EventFilter; import org.jbpm.process.core.event.EventFilter;
import org.jbpm.process.core.event.EventTransformer; import org.jbpm.process.core.event.EventTransformer;
import org.jbpm.process.core.event.EventTypeFilter; import org.jbpm.process.core.event.EventTypeFilter;
Expand Down Expand Up @@ -87,11 +87,11 @@ public class ProcessRuntimeImpl implements InternalProcessRuntime {


public ProcessRuntimeImpl(InternalKnowledgeRuntime kruntime) { public ProcessRuntimeImpl(InternalKnowledgeRuntime kruntime) {
this.kruntime = kruntime; this.kruntime = kruntime;
AcceptsTimerJobFactoryManager jfm = ( AcceptsTimerJobFactoryManager ) kruntime.getTimerService(); TimerService timerService = kruntime.getTimerService();
if ( jfm.getTimerJobFactoryManager() instanceof DefaultTimerJobFactoryManager ) { if ( !(timerService.getTimerJobFactoryManager() instanceof CommandServiceTimerJobFactoryManager) ) {
jfm.setTimerJobFactoryManager( new TrackableTimeJobFactoryManager() ); timerService.setTimerJobFactoryManager( new ThreadSafeTrackableTimeJobFactoryManager() );
} }
((AcceptsTimerJobFactoryManager)kruntime.getTimerService()).setTimerJobFactoryManager( new TrackableTimeJobFactoryManager() );
((CompositeClassLoader) getRootClassLoader()).addClassLoader( getClass().getClassLoader() ); ((CompositeClassLoader) getRootClassLoader()).addClassLoader( getClass().getClassLoader() );
initProcessInstanceManager(); initProcessInstanceManager();
initSignalManager(); initSignalManager();
Expand All @@ -116,10 +116,10 @@ private void initStartTimers() {
} }


public ProcessRuntimeImpl(InternalWorkingMemory workingMemory) { public ProcessRuntimeImpl(InternalWorkingMemory workingMemory) {
AcceptsTimerJobFactoryManager jfm = ( AcceptsTimerJobFactoryManager ) workingMemory.getTimerService(); TimerService timerService = workingMemory.getTimerService();
if ( jfm.getTimerJobFactoryManager() instanceof DefaultTimerJobFactoryManager ) { if ( !(timerService.getTimerJobFactoryManager() instanceof CommandServiceTimerJobFactoryManager) ) {
jfm.setTimerJobFactoryManager( new TrackableTimeJobFactoryManager() ); timerService.setTimerJobFactoryManager( new ThreadSafeTrackableTimeJobFactoryManager() );
} }


this.kruntime = (InternalKnowledgeRuntime) workingMemory.getKnowledgeRuntime(); this.kruntime = (InternalKnowledgeRuntime) workingMemory.getKnowledgeRuntime();
initProcessInstanceManager(); initProcessInstanceManager();
Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.jbpm.process.instance.timer; package org.jbpm.process.instance.timer;


import org.drools.core.common.InternalKnowledgeRuntime; import org.drools.core.common.InternalKnowledgeRuntime;
import org.drools.core.common.InternalWorkingMemory;
import org.drools.core.marshalling.impl.MarshallerReaderContext; import org.drools.core.marshalling.impl.MarshallerReaderContext;
import org.drools.core.marshalling.impl.MarshallerWriteContext; import org.drools.core.marshalling.impl.MarshallerWriteContext;
import org.drools.core.marshalling.impl.ProtobufInputMarshaller; import org.drools.core.marshalling.impl.ProtobufInputMarshaller;
Expand Down Expand Up @@ -409,6 +410,10 @@ public void setKnowledgeRuntime(InternalKnowledgeRuntime kruntime) {
this.kruntime = kruntime; this.kruntime = kruntime;
} }


@Override
public InternalWorkingMemory getWorkingMemory() {
return kruntime instanceof InternalWorkingMemory ? (InternalWorkingMemory)kruntime : null;
}
} }


public static class StartProcessJobContext extends ProcessJobContext { public static class StartProcessJobContext extends ProcessJobContext {
Expand Down
Expand Up @@ -15,17 +15,7 @@
*/ */
package org.jbpm.services.task.impl; package org.jbpm.services.task.impl;


import java.io.Serializable; import org.drools.core.common.InternalWorkingMemory;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.drools.core.time.Job; import org.drools.core.time.Job;
import org.drools.core.time.JobContext; import org.drools.core.time.JobContext;
import org.drools.core.time.JobHandle; import org.drools.core.time.JobHandle;
Expand All @@ -50,6 +40,17 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TaskDeadlinesServiceImpl implements TaskDeadlinesService { public class TaskDeadlinesServiceImpl implements TaskDeadlinesService {


private static final Logger logger = LoggerFactory.getLogger(TaskDeadlinesServiceImpl.class); private static final Logger logger = LoggerFactory.getLogger(TaskDeadlinesServiceImpl.class);
Expand Down Expand Up @@ -349,7 +350,11 @@ public String getJobName() {
public Long getProcessInstanceId() { public Long getProcessInstanceId() {
return processInstanceId; return processInstanceId;
} }


@Override
public InternalWorkingMemory getWorkingMemory() {
return null;
}
} }




Expand Down

0 comments on commit 57c29a8

Please sign in to comment.