Skip to content

Commit

Permalink
RHBPMS-4854 - GlobalTimerService.timerJobsPerSession grows without bo…
Browse files Browse the repository at this point in the history
…unds (#936)
  • Loading branch information
mswiderski committed Aug 11, 2017
1 parent b5fcf35 commit adc6d53
Show file tree
Hide file tree
Showing 9 changed files with 844 additions and 353 deletions.
Expand Up @@ -59,6 +59,7 @@ public class GlobalTimerService implements TimerService, InternalSchedulerServic
protected GlobalSchedulerService schedulerService;
protected RuntimeManager manager;
protected ConcurrentHashMap<Long, List<GlobalJobHandle>> timerJobsPerSession = new ConcurrentHashMap<Long, List<GlobalJobHandle>>();

private String timerServiceId;

public GlobalTimerService(RuntimeManager manager, GlobalSchedulerService schedulerService) {
Expand Down Expand Up @@ -168,6 +169,15 @@ public Collection<TimerJobInstance> getTimerJobInstances(long id) {
logger.debug("Returning timers {} for session {}", timers, id);
return timers;
}

public void clearTimerJobInstances(long id) {
synchronized (timerJobsPerSession) {
List<GlobalJobHandle> jobs = timerJobsPerSession.remove(id);
logger.debug("Removed {} jobs for session {}", jobs, id);

logger.debug("Size of timer jobs per session is {}", timerJobsPerSession.size());
}
}

@Override
public void internalSchedule(TimerJobInstance timerJobInstance) {
Expand Down Expand Up @@ -254,6 +264,11 @@ private ExecutableRunner<RequestContext> getRunner() {
( (CommandServiceTimerJobFactoryManager) jobFactoryManager ).getRunner() :
null;
}

public ConcurrentHashMap<Long, List<GlobalJobHandle>> getTimerJobsPerSession() {
return timerJobsPerSession;
}


public static class GlobalJobHandle extends DefaultJobHandle
implements
Expand Down
Expand Up @@ -75,6 +75,7 @@ public void initScheduler(TimerService globalTimerService) {

@Override
public void shutdown() {

try {
this.scheduler.shutdown();
if ( !this.scheduler.awaitTermination( 10, TimeUnit.SECONDS ) ) {
Expand All @@ -84,6 +85,7 @@ public void shutdown() {
this.scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
this.activeTimer.clear();
}

@Override
Expand Down
Expand Up @@ -223,15 +223,19 @@ public void close(boolean removeJobs) {
cacheManager.dispose();
environment.close();
registry.remove(identifier);
TimerService timerService = TimerServiceRegistry.getInstance().remove(getIdentifier() + TimerServiceRegistry.TIMER_SERVICE_SUFFIX);
TimerService timerService = TimerServiceRegistry.getInstance().get(getIdentifier() + TimerServiceRegistry.TIMER_SERVICE_SUFFIX);
if (timerService != null) {
if (removeJobs && timerService instanceof GlobalTimerService) {
((GlobalTimerService) timerService).destroy();
}
timerService.shutdown();
GlobalSchedulerService schedulerService = ((SchedulerProvider) environment).getSchedulerService();
if (schedulerService != null) {
schedulerService.shutdown();
try {
if (removeJobs && timerService instanceof GlobalTimerService) {
((GlobalTimerService) timerService).destroy();
}
timerService.shutdown();
GlobalSchedulerService schedulerService = ((SchedulerProvider) environment).getSchedulerService();
if (schedulerService != null) {
schedulerService.shutdown();
}
} finally {
TimerServiceRegistry.getInstance().remove(getIdentifier() + TimerServiceRegistry.TIMER_SERVICE_SUFFIX);
}
}
this.closed = true;
Expand Down
Expand Up @@ -31,9 +31,12 @@
import org.drools.core.command.runtime.BatchExecutionCommandImpl;
import org.drools.core.common.InternalKnowledgeRuntime;
import org.drools.core.event.AbstractEventSupport;
import org.drools.core.time.TimerService;
import org.drools.persistence.api.OrderedTransactionSynchronization;
import org.drools.persistence.api.TransactionManager;
import org.drools.persistence.api.TransactionManagerHelper;
import org.jbpm.process.core.timer.TimerServiceRegistry;
import org.jbpm.process.core.timer.impl.GlobalTimerService;
import org.jbpm.runtime.manager.impl.error.ExecutionErrorManagerImpl;
import org.jbpm.runtime.manager.impl.factory.LocalTaskServiceFactory;
import org.jbpm.runtime.manager.impl.mapper.EnvironmentAwareProcessInstanceContext;
Expand Down Expand Up @@ -245,14 +248,24 @@ public void disposeRuntimeEngine(RuntimeEngine runtime) {
if (canDispose(runtime)) {
removeLocalRuntime(runtime);
((ExecutionErrorManagerImpl)executionErrorManager).closeHandler();
releaseAndCleanLock(((RuntimeEngineImpl)runtime).getKieSessionId(), runtime);

Long ksessionId = ((RuntimeEngineImpl)runtime).getKieSessionId();
releaseAndCleanLock(ksessionId, runtime);
if (runtime instanceof Disposable) {
// special handling for in memory to not allow to dispose if there is any context in the mapper
if (mapper instanceof InMemoryMapper && ((InMemoryMapper) mapper).hasContext(runtime.getKieSession().getIdentifier())) {
if (mapper instanceof InMemoryMapper && ((InMemoryMapper) mapper).hasContext(ksessionId)) {
return;
}
((Disposable) runtime).dispose();
}
if (ksessionId != null) {
TimerService timerService = TimerServiceRegistry.getInstance().get(getIdentifier() + TimerServiceRegistry.TIMER_SERVICE_SUFFIX);
if (timerService != null) {
if (timerService instanceof GlobalTimerService) {
((GlobalTimerService) timerService).clearTimerJobInstances(ksessionId);
}
}
}
}
} catch (Exception e) {
releaseAndCleanLock(runtime);
Expand Down
Expand Up @@ -21,9 +21,12 @@
import org.drools.core.command.impl.ExecutableCommand;
import org.drools.core.command.impl.RegistryContext;
import org.drools.core.common.InternalKnowledgeRuntime;
import org.drools.core.time.TimerService;
import org.drools.persistence.api.OrderedTransactionSynchronization;
import org.drools.persistence.api.TransactionManager;
import org.drools.persistence.api.TransactionManagerHelper;
import org.jbpm.process.core.timer.TimerServiceRegistry;
import org.jbpm.process.core.timer.impl.GlobalTimerService;
import org.jbpm.runtime.manager.impl.error.ExecutionErrorManagerImpl;
import org.jbpm.runtime.manager.impl.factory.LocalTaskServiceFactory;
import org.jbpm.runtime.manager.impl.mapper.EnvironmentAwareProcessInstanceContext;
Expand Down Expand Up @@ -224,15 +227,25 @@ public void disposeRuntimeEngine(RuntimeEngine runtime) {
if (canDispose(runtime)) {
removeLocalRuntime(runtime);
((ExecutionErrorManagerImpl)executionErrorManager).closeHandler();

Long ksessionId = ((RuntimeEngineImpl)runtime).getKieSessionId();
if (runtime instanceof Disposable) {
// special handling for in memory to not allow to dispose if there is any context in the mapper
if (mapper instanceof InMemoryMapper && ((InMemoryMapper)mapper).hasContext(runtime.getKieSession().getIdentifier())){
if (mapper instanceof InMemoryMapper && ((InMemoryMapper)mapper).hasContext(ksessionId)){
return;
}
((Disposable) runtime).dispose();
}

releaseAndCleanLock(runtime);
if (ksessionId != null) {
TimerService timerService = TimerServiceRegistry.getInstance().get(getIdentifier() + TimerServiceRegistry.TIMER_SERVICE_SUFFIX);
if (timerService != null) {
if (timerService instanceof GlobalTimerService) {
((GlobalTimerService) timerService).clearTimerJobInstances(ksessionId);
}
}
}
}
} catch (Exception e) {
releaseAndCleanLock(runtime);
Expand Down
Expand Up @@ -18,6 +18,9 @@
import java.util.HashMap;
import java.util.Map;

import org.drools.core.time.TimerService;
import org.jbpm.process.core.timer.TimerServiceRegistry;
import org.jbpm.process.core.timer.impl.GlobalTimerService;
import org.jbpm.runtime.manager.impl.error.ExecutionErrorManagerImpl;
import org.jbpm.runtime.manager.impl.factory.LocalTaskServiceFactory;
import org.jbpm.runtime.manager.impl.tx.DestroySessionTransactionSynchronization;
Expand Down Expand Up @@ -136,20 +139,31 @@ public void disposeRuntimeEngine(RuntimeEngine runtime) {
local.get().remove(identifier);
((ExecutionErrorManagerImpl)executionErrorManager).closeHandler();
try {
factory.onDispose(((RuntimeEngineImpl)runtime).getKieSessionId());
Long ksessionId = ((RuntimeEngineImpl)runtime).getKieSessionId();
factory.onDispose(ksessionId);
if (canDestroy(runtime)) {
runtime.getKieSession().destroy();
} else {
if (runtime instanceof Disposable) {
((Disposable) runtime).dispose();
}
}
if (ksessionId != null) {
TimerService timerService = TimerServiceRegistry.getInstance().get(getIdentifier() + TimerServiceRegistry.TIMER_SERVICE_SUFFIX);
if (timerService != null) {
if (timerService instanceof GlobalTimerService) {
((GlobalTimerService) timerService).clearTimerJobInstances(ksessionId);
}
}
}
} catch (Exception e) {
// do nothing
if (runtime instanceof Disposable) {
((Disposable) runtime).dispose();
}
}


}
} catch (Exception e) {
local.get().remove(identifier);
Expand Down

0 comments on commit adc6d53

Please sign in to comment.