From 04d58e7714844e73f68e6143e235bb55791c9888 Mon Sep 17 00:00:00 2001 From: Mario Fusco Date: Thu, 11 Oct 2018 14:49:30 +0200 Subject: [PATCH] [DROOLS-3076] KieSession pool (#2099) * [DROOLS-3076] KieSession pool * [DROOLS-3076] KieSession pool * [DROOLS-3076] KieSession pool * [DROOLS-3076] KieSession pool * [DROOLS-3076] KieSession pool --- .../kie/builder/impl/KieContainerImpl.java | 74 ++++++-- .../impl/KieContainerSessionsPoolImpl.java | 86 +++++++++ .../SegmentMemoryPrototypeTest.java | 1 - .../session/SessionsPoolTest.java | 171 ++++++++++++++++++ .../drools/core/RuleBaseConfiguration.java | 21 --- .../core/common/ClassAwareObjectStore.java | 2 +- .../core/common/ConcurrentNodeMemories.java | 16 +- .../org/drools/core/common/DefaultAgenda.java | 5 +- .../drools/core/common/NamedEntryPoint.java | 3 + .../common/PhreakWorkingMemoryFactory.java | 5 - .../core/impl/AbstractKieSessionsPool.java | 65 +++++++ .../core/impl/InternalKnowledgeBase.java | 2 - .../drools/core/impl/KieSessionsPoolImpl.java | 67 +++++++ .../drools/core/impl/KnowledgeBaseImpl.java | 24 +-- .../org/drools/core/impl/SessionsCache.java | 104 ----------- .../impl/StatefulKnowledgeSessionImpl.java | 36 ++-- .../drools/core/impl/StatefulSessionPool.java | 48 +++++ .../impl/StatelessKnowledgeSessionImpl.java | 67 ++++--- .../org/drools/core/time/TimerService.java | 11 +- .../core/time/impl/JDKTimerService.java | 35 ++-- .../core/time/impl/PseudoClockScheduler.java | 11 +- .../org/drools/core/util/ScalablePool.java | 94 ++++++++++ .../persistence/jpa/JpaJDKTimerService.java | 7 +- .../integrationtests/drl/GlobalTest.java | 37 ---- 24 files changed, 713 insertions(+), 279 deletions(-) create mode 100644 drools-compiler/src/main/java/org/drools/compiler/kie/builder/impl/KieContainerSessionsPoolImpl.java create mode 100644 drools-compiler/src/test/java/org/drools/compiler/integrationtests/session/SessionsPoolTest.java create mode 100644 drools-core/src/main/java/org/drools/core/impl/AbstractKieSessionsPool.java create mode 100644 drools-core/src/main/java/org/drools/core/impl/KieSessionsPoolImpl.java delete mode 100644 drools-core/src/main/java/org/drools/core/impl/SessionsCache.java create mode 100644 drools-core/src/main/java/org/drools/core/impl/StatefulSessionPool.java create mode 100644 drools-core/src/main/java/org/drools/core/util/ScalablePool.java diff --git a/drools-compiler/src/main/java/org/drools/compiler/kie/builder/impl/KieContainerImpl.java b/drools-compiler/src/main/java/org/drools/compiler/kie/builder/impl/KieContainerImpl.java index f980625d3b2..72f23d7c7d8 100644 --- a/drools-compiler/src/main/java/org/drools/compiler/kie/builder/impl/KieContainerImpl.java +++ b/drools-compiler/src/main/java/org/drools/compiler/kie/builder/impl/KieContainerImpl.java @@ -35,12 +35,15 @@ import org.drools.compiler.management.KieContainerMonitor; import org.drools.compiler.reteoo.compiled.ObjectTypeNodeCompiler; import org.drools.core.InitialFact; +import org.drools.core.SessionConfiguration; import org.drools.core.SessionConfigurationImpl; import org.drools.core.common.ProjectClassLoader; import org.drools.core.impl.InternalKieContainer; import org.drools.core.impl.InternalKnowledgeBase; +import org.drools.core.impl.KnowledgeBaseImpl; import org.drools.core.impl.RuleUnitExecutorSession; import org.drools.core.impl.StatefulKnowledgeSessionImpl; +import org.drools.core.impl.StatefulSessionPool; import org.drools.core.impl.StatelessKnowledgeSessionImpl; import org.drools.core.management.DroolsManagementAgent; import org.drools.core.management.DroolsManagementAgent.CBSKey; @@ -61,6 +64,7 @@ import org.kie.api.io.ResourceType; import org.kie.api.logger.KieLoggers; import org.kie.api.runtime.Environment; +import org.kie.api.runtime.KieContainerSessionsPool; import org.kie.api.runtime.KieSession; import org.kie.api.runtime.KieSessionConfiguration; import org.kie.api.runtime.StatelessKieSession; @@ -98,6 +102,8 @@ public class KieContainerImpl private final String containerId; + private final Map sessionConfsCache = new ConcurrentHashMap<>(); + public KieModule getMainKieModule() { return kr.getKieModule(getReleaseId()); } @@ -442,8 +448,28 @@ public KieSession newKieSession(Environment environment) { } public KieSession newKieSession(Environment environment, KieSessionConfiguration conf) { - KieSessionModel defaultKieSessionModel = findKieSessionModel(false); - return newKieSession(defaultKieSessionModel.getName(), environment, conf); + return newKieSession(null, environment, conf); + } + + public KieContainerSessionsPool newKieSessionsPool( int initialSize) { + return new KieContainerSessionsPoolImpl(this, initialSize); + } + + StatefulSessionPool createKieSessionsPool(String kSessionName, KieSessionConfiguration conf, Environment env, int initialSize, boolean stateless) { + KieSessionModel kSessionModel = kSessionName != null ? getKieSessionModel(kSessionName) : findKieSessionModel(false); + if ( kSessionModel == null ) { + log.error("Unknown KieSession name: " + kSessionName); + return null; + } + KnowledgeBaseImpl kBase = (KnowledgeBaseImpl) getKieBaseFromKieSessionModel( kSessionModel ); + return kBase == null ? null : new StatefulSessionPool(kBase, initialSize, () -> { + SessionConfiguration sessConf = conf != null ? (SessionConfiguration) conf : kBase.getSessionConfiguration(); + StatefulKnowledgeSessionImpl kSession = stateless ? + kBase.internalCreateStatefulKnowledgeSession( env, sessConf ).setStateless( true ) : + (StatefulKnowledgeSessionImpl) kBase.newKieSession( sessConf, env ); + registerNewKieSession( kSessionModel, ( InternalKnowledgeBase ) kBase, kSession ); + return kSession; + }); } private KieSessionModel findKieSessionModel(boolean stateless) { @@ -514,29 +540,40 @@ public KieSession newKieSession(String kSessionName, Environment environment, Ki log.error("Unknown KieSession name: " + kSessionName); return null; } - if (kSessionModel.getType() == KieSessionModel.KieSessionType.STATELESS) { - throw new RuntimeException("Trying to create a stateful KieSession from a stateless KieSessionModel: " + kSessionModel.getName()); - } - KieBase kBase = getKieBase( kSessionModel.getKieBaseModel().getName() ); - if ( kBase == null ) { - log.error("Unknown KieBase name: " + kSessionModel.getKieBaseModel().getName()); - return null; - } + + KieBase kBase = getKieBaseFromKieSessionModel( kSessionModel ); + if ( kBase == null ) return null; KieSession kSession = kBase.newKieSession( conf != null ? conf : getKieSessionConfiguration( kSessionModel ), environment ); + registerNewKieSession( kSessionModel, ( InternalKnowledgeBase ) kBase, kSession ); + return kSession; + } + + private void registerNewKieSession( KieSessionModel kSessionModel, InternalKnowledgeBase kBase, KieSession kSession ) { if (isJndiAvailable()) { wireSessionComponents( kSessionModel, kSession ); } registerLoggers(kSessionModel, kSession); registerCalendars(kSessionModel, kSession); - ((StatefulKnowledgeSessionImpl) kSession).initMBeans(containerId, ((InternalKnowledgeBase) kBase).getId(), kSessionModel.getName()); + ((StatefulKnowledgeSessionImpl ) kSession).initMBeans(containerId, kBase.getId(), kSessionModel.getName()); kSessions.put(kSessionModel.getName(), kSession); - return kSession; } - private void registerLoggers(KieSessionModelImpl kSessionModel, KieRuntimeEventManager kSession) { + private KieBase getKieBaseFromKieSessionModel( KieSessionModel kSessionModel ) { + if (kSessionModel.getType() == KieSessionModel.KieSessionType.STATELESS) { + throw new RuntimeException("Trying to create a stateful KieSession from a stateless KieSessionModel: " + kSessionModel.getName()); + } + KieBase kBase = getKieBase( kSessionModel.getKieBaseModel().getName() ); + if ( kBase == null ) { + log.error("Unknown KieBase name: " + kSessionModel.getKieBaseModel().getName()); + return null; + } + return kBase; + } + + private void registerLoggers(KieSessionModel kSessionModel, KieRuntimeEventManager kSession) { KieLoggers kieLoggers = KieServices.Factory.get().getLoggers(); if (kSessionModel.getConsoleLogger() != null) { kieLoggers.newConsoleLogger(kSession); @@ -616,13 +653,16 @@ public KieSessionConfiguration getKieSessionConfiguration( String kSessionName ) } private KieSessionConfiguration getKieSessionConfiguration( KieSessionModel kSessionModel ) { - KieSessionConfiguration ksConf = new SessionConfigurationImpl( null, kProject.getClassLoader() ); - ksConf.setOption( kSessionModel.getClockType() ); - ksConf.setOption( kSessionModel.getBeliefSystem() ); - return ksConf; + return sessionConfsCache.computeIfAbsent(kSessionModel.getName(), k -> { + KieSessionConfiguration ksConf = new SessionConfigurationImpl( null, kProject.getClassLoader() ); + ksConf.setOption( kSessionModel.getClockType() ); + ksConf.setOption( kSessionModel.getBeliefSystem() ); + return ksConf; + }); } public void dispose() { + sessionConfsCache.clear(); kBases.values().forEach( kb -> ( (InternalKnowledgeBase) kb ).setKieContainer( null ) ); Set cbskeys = new HashSet(); diff --git a/drools-compiler/src/main/java/org/drools/compiler/kie/builder/impl/KieContainerSessionsPoolImpl.java b/drools-compiler/src/main/java/org/drools/compiler/kie/builder/impl/KieContainerSessionsPoolImpl.java new file mode 100644 index 00000000000..63ed637a46d --- /dev/null +++ b/drools-compiler/src/main/java/org/drools/compiler/kie/builder/impl/KieContainerSessionsPoolImpl.java @@ -0,0 +1,86 @@ +/* + * Copyright 2005 JBoss Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.drools.compiler.kie.builder.impl; + +import org.drools.core.impl.AbstractKieSessionsPool; +import org.drools.core.impl.StatefulSessionPool; +import org.drools.core.impl.StatelessKnowledgeSessionImpl; +import org.kie.api.runtime.KieContainerSessionsPool; +import org.kie.api.runtime.KieSession; +import org.kie.api.runtime.KieSessionConfiguration; +import org.kie.api.runtime.StatelessKieSession; + +public class KieContainerSessionsPoolImpl extends AbstractKieSessionsPool implements KieContainerSessionsPool { + + private final KieContainerImpl kContainer; + + KieContainerSessionsPoolImpl( KieContainerImpl kContainer, int initialSize ) { + super(initialSize); + this.kContainer = kContainer; + } + + @Override + public KieSession newKieSession() { + return newKieSession( null, null ); + } + + @Override + public KieSession newKieSession( KieSessionConfiguration conf ) { + return newKieSession( null, conf ); + } + + @Override + public KieSession newKieSession( String kSessionName ) { + return newKieSession( kSessionName, null ); + } + + @Override + public KieSession newKieSession( String kSessionName, KieSessionConfiguration conf ) { + return getPool(kSessionName, conf, false).get(); + } + + @Override + public StatelessKieSession newStatelessKieSession() { + return newStatelessKieSession( null, null ); + } + + @Override + public StatelessKieSession newStatelessKieSession( KieSessionConfiguration conf ) { + return newStatelessKieSession( null, conf ); + } + + @Override + public StatelessKieSession newStatelessKieSession( String kSessionName ) { + return newStatelessKieSession( kSessionName, null ); + } + + @Override + public StatelessKieSession newStatelessKieSession( String kSessionName, KieSessionConfiguration conf ) { + return new StatelessKnowledgeSessionImpl( conf, getPool(kSessionName, conf, true) ); + } + + @Override + protected StatefulSessionPool createStatefulSessionPool( String kSessionName, KieSessionConfiguration conf, boolean stateless ) { + return kContainer.createKieSessionsPool(kSessionName, conf, environment, initialSize, stateless); + } + + @Override + protected String getKey(String kSessionName, KieSessionConfiguration conf, boolean stateless) { + String key = kSessionName == null ? (stateless ? "DEFAULT_STATELESS" : "DEFAULT") : kSessionName; + return conf == null ? key : key + "@" + System.identityHashCode( conf ); + } +} diff --git a/drools-compiler/src/test/java/org/drools/compiler/integrationtests/SegmentMemoryPrototypeTest.java b/drools-compiler/src/test/java/org/drools/compiler/integrationtests/SegmentMemoryPrototypeTest.java index 2412c7d35ed..ff984eb6c0d 100644 --- a/drools-compiler/src/test/java/org/drools/compiler/integrationtests/SegmentMemoryPrototypeTest.java +++ b/drools-compiler/src/test/java/org/drools/compiler/integrationtests/SegmentMemoryPrototypeTest.java @@ -135,7 +135,6 @@ public void testSessionCache() { try { checkKieSession(ksession); } finally { - ksession.dispose(); try { ksession.reset(); checkKieSession(ksession); diff --git a/drools-compiler/src/test/java/org/drools/compiler/integrationtests/session/SessionsPoolTest.java b/drools-compiler/src/test/java/org/drools/compiler/integrationtests/session/SessionsPoolTest.java new file mode 100644 index 00000000000..10cd7414fb3 --- /dev/null +++ b/drools-compiler/src/test/java/org/drools/compiler/integrationtests/session/SessionsPoolTest.java @@ -0,0 +1,171 @@ +/* + * Copyright 2018 JBoss Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.drools.compiler.integrationtests.session; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.junit.Test; +import org.kie.api.io.ResourceType; +import org.kie.api.runtime.KieContainer; +import org.kie.api.runtime.KieSession; +import org.kie.api.runtime.KieContainerSessionsPool; +import org.kie.api.runtime.StatelessKieSession; +import org.kie.internal.utils.KieHelper; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class SessionsPoolTest { + + @Test + public void testKieSessionsPool() { + KieContainerSessionsPool pool = getKieContainer().newKieSessionsPool( 1 ); + + KieSession ksession = pool.newKieSession(); + try { + checkKieSession( ksession ); + } finally { + ksession.dispose(); + } + + try { + ksession.insert( "test2" ); + fail("it shouldn't be possible to operate on a disposed session even if created from a pool"); + } catch (Exception e) { } + + KieSession ksession2 = pool.newKieSession(); + + // using a pool with only one session so it should return the same one as before + assertSame( ksession, ksession2 ); + assertNull( ksession2.getGlobal( "list" ) ); + checkKieSession( ksession2 ); + + pool.shutdown(); + + try { + ksession.insert( "test3" ); + fail("after pool shutdown all sessions created from it should be disposed"); + } catch (IllegalStateException e) { } + + try { + pool.newKieSession(); + fail("after pool shutdown it shouldn't be possible to get sessions from it"); + } catch (IllegalStateException e) { } + } + + @Test + public void testKieSessionsPoolInMultithreadEnv() throws InterruptedException, ExecutionException { + KieContainerSessionsPool pool = getKieContainer().newKieSessionsPool( 4 ); + + final int THREAD_NR = 10; + ExecutorService executor = Executors.newFixedThreadPool(THREAD_NR, r -> { + final Thread t = new Thread(r); + t.setDaemon(true); + return t; + }); + + try { + CompletionService ecs = new ExecutorCompletionService<>( executor ); + for (int i = 0; i < THREAD_NR; i++) { + ecs.submit( () -> { + try { + KieSession ksession = pool.newKieSession(); + try { + checkKieSession( ksession ); + } finally { + ksession.dispose(); + } + + try { + ksession.insert( "test2" ); + fail( "it shouldn't be possible to operate on a disposed session even if created from a pool" ); + } catch (IllegalStateException e) { + } + return true; + } catch (final Exception e) { + return false; + } + } ); + } + boolean success = true; + for (int i = 0; i < THREAD_NR; i++) { + success = ecs.take().get() && success; + } + assertTrue( success ); + } finally { + executor.shutdown(); + } + + pool.shutdown(); + try { + pool.newKieSession(); + fail("after pool shutdown it shouldn't be possible to get sessions from it"); + } catch (IllegalStateException e) { } + } + + @Test + public void testStatelessKieSessionsPool() { + String drl = + "global java.util.List list\n" + + "rule R1 when\n" + + " $s: String()\n" + + "then\n" + + " list.add($s);\n" + + "end\n"; + + KieContainerSessionsPool pool = getKieContainer().newKieSessionsPool( 1 ); + StatelessKieSession session = pool.newStatelessKieSession(); + + List list = new ArrayList<>(); + session.setGlobal( "list", list ); + session.execute( "test" ); + assertEquals(1, list.size()); + + list.clear(); + session.execute( "test" ); + assertEquals(1, list.size()); + } + + private KieContainer getKieContainer() { + String drl = + "global java.util.List list\n" + + "rule R1 when\n" + + " $s: String()\n" + + "then\n" + + " list.add($s);\n" + + "end\n"; + + return new KieHelper().addContent( drl, ResourceType.DRL ).getKieContainer(); + } + + private void checkKieSession( KieSession ksession ) { + List list = new ArrayList<>(); + ksession.setGlobal( "list", list ); + ksession.insert( "test" ); + ksession.fireAllRules(); + assertEquals(1, list.size()); + } +} diff --git a/drools-core/src/main/java/org/drools/core/RuleBaseConfiguration.java b/drools-core/src/main/java/org/drools/core/RuleBaseConfiguration.java index e910428c65e..3fee1869106 100755 --- a/drools-core/src/main/java/org/drools/core/RuleBaseConfiguration.java +++ b/drools-core/src/main/java/org/drools/core/RuleBaseConfiguration.java @@ -46,7 +46,6 @@ import org.kie.api.conf.SingleValueKieBaseOption; import org.kie.api.runtime.rule.ConsequenceExceptionHandler; import org.kie.internal.builder.conf.ClassLoaderCacheOption; -import org.kie.internal.builder.conf.SessionCacheOption; import org.kie.internal.conf.AlphaThresholdOption; import org.kie.internal.conf.CompositeKeyDepthOption; import org.kie.internal.conf.ConsequenceExceptionHandlerOption; @@ -151,8 +150,6 @@ public class RuleBaseConfiguration private IndexPrecedenceOption indexPrecedenceOption; - private SessionCacheOption sessionCacheOption; - // if "true", rulebase builder will try to split // the rulebase into multiple partitions that can be evaluated // in parallel by using multiple internal threads @@ -209,7 +206,6 @@ public void writeExternal(ObjectOutput out) throws IOException { out.writeBoolean(phreakEnabled); out.writeBoolean(declarativeAgenda); out.writeObject(componentFactory); - out.writeObject(sessionCacheOption); } public void readExternal(ObjectInput in) throws IOException, @@ -241,7 +237,6 @@ public void readExternal(ObjectInput in) throws IOException, phreakEnabled = in.readBoolean(); declarativeAgenda = in.readBoolean(); componentFactory = (KieComponentFactory) in.readObject(); - sessionCacheOption = (SessionCacheOption) in.readObject(); } /** @@ -333,8 +328,6 @@ public void setProperty(String name, setMBeansEnabled( MBeansOption.isEnabled(value)); } else if ( name.equals( ClassLoaderCacheOption.PROPERTY_NAME ) ) { setClassLoaderCacheEnabled( StringUtils.isEmpty( value ) ? true : Boolean.valueOf(value)); - } else if ( name.equals( SessionCacheOption.PROPERTY_NAME ) ) { - setSessionCacheOption(SessionCacheOption.determineOption(StringUtils.isEmpty(value) ? "none" : value)); } } @@ -476,8 +469,6 @@ private void init(Properties properties) { setClassLoaderCacheEnabled( Boolean.valueOf( this.chainedProperties.getProperty( ClassLoaderCacheOption.PROPERTY_NAME, "true" ) ) ); - setSessionCacheOption(SessionCacheOption.determineOption(this.chainedProperties.getProperty(SessionCacheOption.PROPERTY_NAME, "none"))); - setDeclarativeAgendaEnabled( Boolean.valueOf( this.chainedProperties.getProperty( DeclarativeAgendaOption.PROPERTY_NAME, "false" ) ) ); } @@ -738,16 +729,6 @@ public void setClassLoaderCacheEnabled(final boolean classLoaderCacheEnabled) { this.classLoaderCacheEnabled = classLoaderCacheEnabled; } - public SessionCacheOption getSessionCacheOption() { - return this.sessionCacheOption; - } - - public void setSessionCacheOption(SessionCacheOption sessionCacheOption) { - checkCanChange(); // throws an exception if a change isn't possible; - this.sessionCacheOption = sessionCacheOption; - } - - public boolean isDeclarativeAgenda() { return this.declarativeAgenda; } @@ -1202,8 +1183,6 @@ public void setOption(T option) { setMBeansEnabled( ( (MBeansOption) option ).isEnabled()); } else if (option instanceof ClassLoaderCacheOption) { setClassLoaderCacheEnabled( ( (ClassLoaderCacheOption) option ).isClassLoaderCacheEnabled()); - } else if (option instanceof SessionCacheOption) { - setSessionCacheOption( (SessionCacheOption) option); } else if (option instanceof DeclarativeAgendaOption) { setDeclarativeAgendaEnabled(((DeclarativeAgendaOption) option).isDeclarativeAgendaEnabled()); } diff --git a/drools-core/src/main/java/org/drools/core/common/ClassAwareObjectStore.java b/drools-core/src/main/java/org/drools/core/common/ClassAwareObjectStore.java index c0be0c9cb0d..51530d09230 100644 --- a/drools-core/src/main/java/org/drools/core/common/ClassAwareObjectStore.java +++ b/drools-core/src/main/java/org/drools/core/common/ClassAwareObjectStore.java @@ -96,7 +96,7 @@ public boolean isEmpty() { @Override public void clear() { storesMap.clear(); - concreteStores.clear(); + concreteStores = new CopyOnWriteArrayList(); if (isEqualityBehaviour) { equalityMap.clear(); } diff --git a/drools-core/src/main/java/org/drools/core/common/ConcurrentNodeMemories.java b/drools-core/src/main/java/org/drools/core/common/ConcurrentNodeMemories.java index 48045697af6..020446190c8 100644 --- a/drools-core/src/main/java/org/drools/core/common/ConcurrentNodeMemories.java +++ b/drools-core/src/main/java/org/drools/core/common/ConcurrentNodeMemories.java @@ -16,8 +16,6 @@ package org.drools.core.common; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -55,24 +53,20 @@ public void clear() { public void resetAllMemories(StatefulKnowledgeSession session) { InternalKnowledgeBase kBase = (InternalKnowledgeBase)session.getKieBase(); - Set smems = new HashSet(); for (int i = 0; i < memories.length(); i++) { Memory memory = memories.get(i); if (memory != null) { if (memory.getSegmentMemory() != null) { - smems.add(memory.getSegmentMemory()); + SegmentMemory smem = memory.getSegmentMemory(); + smem.reset(kBase.getSegmentPrototype(smem)); + if ( smem.isSegmentLinked() ) { + smem.notifyRuleLinkSegment((InternalWorkingMemory)session); + } } memory.reset(); } } - - for (SegmentMemory smem : smems) { - smem.reset(kBase.getSegmentPrototype(smem)); - if ( smem.isSegmentLinked() ) { - smem.notifyRuleLinkSegment((InternalWorkingMemory)session); - } - } } /** diff --git a/drools-core/src/main/java/org/drools/core/common/DefaultAgenda.java b/drools-core/src/main/java/org/drools/core/common/DefaultAgenda.java index 4c6963f6858..91ce939e431 100644 --- a/drools-core/src/main/java/org/drools/core/common/DefaultAgenda.java +++ b/drools-core/src/main/java/org/drools/core/common/DefaultAgenda.java @@ -525,9 +525,8 @@ private boolean removeGroup(InternalAgendaGroup group) { } private void clearFocusStack() { - InternalAgendaGroup[] groups = focusStack.toArray( new InternalAgendaGroup[focusStack.size()] ); - for ( InternalAgendaGroup group : groups ) { - group.visited(); + for ( AgendaGroup group : focusStack ) { + (( InternalAgendaGroup ) group).visited(); } this.focusStack.clear(); } diff --git a/drools-core/src/main/java/org/drools/core/common/NamedEntryPoint.java b/drools-core/src/main/java/org/drools/core/common/NamedEntryPoint.java index 7534ee8779e..0d3f0e4edd2 100644 --- a/drools-core/src/main/java/org/drools/core/common/NamedEntryPoint.java +++ b/drools-core/src/main/java/org/drools/core/common/NamedEntryPoint.java @@ -134,6 +134,9 @@ public void unlock() { public void reset() { this.objectStore.clear(); + if (tms != null) { + tms.clear(); + } } public ObjectStore getObjectStore() { diff --git a/drools-core/src/main/java/org/drools/core/common/PhreakWorkingMemoryFactory.java b/drools-core/src/main/java/org/drools/core/common/PhreakWorkingMemoryFactory.java index 2e9b12efa5e..9d561a62396 100644 --- a/drools-core/src/main/java/org/drools/core/common/PhreakWorkingMemoryFactory.java +++ b/drools-core/src/main/java/org/drools/core/common/PhreakWorkingMemoryFactory.java @@ -32,11 +32,6 @@ public static WorkingMemoryFactory getInstance() { } public InternalWorkingMemory createWorkingMemory(long id, InternalKnowledgeBase kBase, SessionConfiguration config, Environment environment) { - InternalWorkingMemory cachedWm = kBase.getCachedSession(config, environment); - if (cachedWm != null) { - return cachedWm; - } - return new StatefulKnowledgeSessionImpl( id, kBase, true, config, environment); } diff --git a/drools-core/src/main/java/org/drools/core/impl/AbstractKieSessionsPool.java b/drools-core/src/main/java/org/drools/core/impl/AbstractKieSessionsPool.java new file mode 100644 index 00000000000..26068a64773 --- /dev/null +++ b/drools-core/src/main/java/org/drools/core/impl/AbstractKieSessionsPool.java @@ -0,0 +1,65 @@ +/* + * Copyright 2005 JBoss Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.drools.core.impl; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.kie.api.runtime.Environment; +import org.kie.api.runtime.KieSessionConfiguration; +import org.kie.api.runtime.KieSessionsPool; + +public abstract class AbstractKieSessionsPool implements KieSessionsPool { + + private volatile boolean alive = true; + + protected final int initialSize; + + private final Map pools = new ConcurrentHashMap<>(); + + protected final Environment environment = EnvironmentFactory.newEnvironment(); + + protected AbstractKieSessionsPool( int initialSize ) { + this.initialSize = initialSize; + } + + @Override + public void shutdown() { + alive = false; + pools.values().forEach( StatefulSessionPool::shutdown ); + pools.clear(); + } + + protected StatefulSessionPool getPool( KieSessionConfiguration conf, boolean stateless) { + return getPool( null, conf, stateless); + } + + protected StatefulSessionPool getPool( String kSessionName, KieSessionConfiguration conf, boolean stateless) { + checkAlive(); + return pools.computeIfAbsent( getKey(kSessionName, conf, stateless), k -> createStatefulSessionPool( kSessionName, conf, stateless ) ); + } + + private void checkAlive() { + if (!alive) { + throw new IllegalStateException( "Illegal method call. This session pool was previously disposed." ); + } + } + + protected abstract StatefulSessionPool createStatefulSessionPool( String kSessionName, KieSessionConfiguration conf, boolean stateless ); + + protected abstract String getKey(String kSessionName, KieSessionConfiguration conf, boolean stateless); +} diff --git a/drools-core/src/main/java/org/drools/core/impl/InternalKnowledgeBase.java b/drools-core/src/main/java/org/drools/core/impl/InternalKnowledgeBase.java index e42df7fd3a6..411edaced24 100644 --- a/drools-core/src/main/java/org/drools/core/impl/InternalKnowledgeBase.java +++ b/drools-core/src/main/java/org/drools/core/impl/InternalKnowledgeBase.java @@ -95,8 +95,6 @@ public interface InternalKnowledgeBase extends KieBase { void disposeStatefulSession(StatefulKnowledgeSessionImpl statefulSession); - StatefulKnowledgeSessionImpl getCachedSession(SessionConfiguration config, Environment environment); - TripleStore getTripleStore(); TraitRegistry getTraitRegistry(); diff --git a/drools-core/src/main/java/org/drools/core/impl/KieSessionsPoolImpl.java b/drools-core/src/main/java/org/drools/core/impl/KieSessionsPoolImpl.java new file mode 100644 index 00000000000..f5d1e8d5208 --- /dev/null +++ b/drools-core/src/main/java/org/drools/core/impl/KieSessionsPoolImpl.java @@ -0,0 +1,67 @@ +/* + * Copyright 2005 JBoss Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.drools.core.impl; + +import org.drools.core.SessionConfiguration; +import org.kie.api.runtime.KieSession; +import org.kie.api.runtime.KieSessionConfiguration; +import org.kie.api.runtime.StatelessKieSession; + +public class KieSessionsPoolImpl extends AbstractKieSessionsPool { + + private final KnowledgeBaseImpl kBase; + + KieSessionsPoolImpl( KnowledgeBaseImpl kBase, int initialSize ) { + super(initialSize); + this.kBase = kBase; + } + + @Override + public KieSession newKieSession() { + return newKieSession( kBase.getSessionConfiguration() ); + } + + @Override + public KieSession newKieSession( KieSessionConfiguration conf ) { + return getPool(conf, false).get(); + } + + @Override + public StatelessKieSession newStatelessKieSession() { + return newStatelessKieSession( kBase.getSessionConfiguration() ); + } + + @Override + public StatelessKieSession newStatelessKieSession( KieSessionConfiguration conf ) { + return new StatelessKnowledgeSessionImpl( conf, getPool(conf, true) ); + } + + @Override + protected String getKey( String kSessionName, KieSessionConfiguration conf, boolean stateless ) { + String key = stateless ? "DEFAULT_STATELESS" : "DEFAULT"; + return conf == null ? key : key + "@" + System.identityHashCode( conf ); + } + + @Override + protected StatefulSessionPool createStatefulSessionPool( String kSessionName, KieSessionConfiguration conf, boolean stateless ) { + return new StatefulSessionPool(kBase, initialSize, () -> + stateless ? + kBase.internalCreateStatefulKnowledgeSession( environment, ( SessionConfiguration ) conf ).setStateless( true ): + (StatefulKnowledgeSessionImpl ) kBase.newKieSession(conf, environment)); + } + +} diff --git a/drools-core/src/main/java/org/drools/core/impl/KnowledgeBaseImpl.java b/drools-core/src/main/java/org/drools/core/impl/KnowledgeBaseImpl.java index f3e89aa9136..c0615174d5b 100644 --- a/drools-core/src/main/java/org/drools/core/impl/KnowledgeBaseImpl.java +++ b/drools-core/src/main/java/org/drools/core/impl/KnowledgeBaseImpl.java @@ -107,6 +107,7 @@ import org.kie.api.runtime.Environment; import org.kie.api.runtime.KieSession; import org.kie.api.runtime.KieSessionConfiguration; +import org.kie.api.runtime.KieSessionsPool; import org.kie.api.runtime.StatelessKieSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -173,8 +174,6 @@ public class KnowledgeBaseImpl public final Set kieBaseListeners = Collections.newSetFromMap(new ConcurrentHashMap()); - private transient SessionsCache sessionsCache; - private transient Queue kbaseModificationsQueue = new ConcurrentLinkedQueue(); private transient AtomicInteger sessionDeactivationsCounter = new AtomicInteger(); @@ -218,10 +217,6 @@ public KnowledgeBaseImpl(final String id, setupRete(); - if ( this.config.getSessionCacheOption().isEnabled() ) { - sessionsCache = new SessionsCache(this.config.getSessionCacheOption().isAsync()); - } - sessionConfiguration = new SessionConfigurationImpl( null, this.config.getClassLoader(), this.config.getChainedProperties() ); } @@ -328,7 +323,11 @@ public Query getQuery(String packageName, String queryName) { return getPackage(packageName).getRule( queryName ); } - + + public KieSessionsPool newKieSessionsPool( int initialSize) { + return new KieSessionsPoolImpl(this, initialSize); + } + public KieSession newKieSession() { return newKieSession(null, EnvironmentFactory.newEnvironment()); } @@ -364,7 +363,7 @@ public StatefulKnowledgeSessionImpl createSession(long id, FactHandleFactory han return internalInitSession( config, session ); } - StatefulKnowledgeSessionImpl internalCreateStatefulKnowledgeSession( Environment environment, SessionConfiguration sessionConfig ) { + public StatefulKnowledgeSessionImpl internalCreateStatefulKnowledgeSession( Environment environment, SessionConfiguration sessionConfig ) { StatefulKnowledgeSessionImpl session = ( StatefulKnowledgeSessionImpl ) kieComponentFactory.getWorkingMemoryFactory() .createWorkingMemory( nextWorkingMemoryCounter(), this, sessionConfig, environment ); return internalInitSession( sessionConfig, session ); @@ -623,21 +622,12 @@ public String getId() { } public void disposeStatefulSession(StatefulKnowledgeSessionImpl statefulSession) { - if (sessionsCache != null) { - synchronized (sessionsCache) { - sessionsCache.store(statefulSession); - } - } this.statefulSessions.remove(statefulSession); if (kieContainer != null) { kieContainer.disposeSession( statefulSession ); } } - public StatefulKnowledgeSessionImpl getCachedSession(SessionConfiguration config, Environment environment) { - return sessionsCache != null ? sessionsCache.getCachedSession(config) : null; - } - public FactHandleFactory newFactHandleFactory() { return this.factHandleFactory.newInstance(); } diff --git a/drools-core/src/main/java/org/drools/core/impl/SessionsCache.java b/drools-core/src/main/java/org/drools/core/impl/SessionsCache.java deleted file mode 100644 index a873972bb40..00000000000 --- a/drools-core/src/main/java/org/drools/core/impl/SessionsCache.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright 2015 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.drools.core.impl; - -import org.drools.core.SessionConfiguration; -import org.kie.internal.concurrent.ExecutorProviderFactory; - -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; - -public class SessionsCache { - - private final Map> cleanSessions = new ConcurrentHashMap>(); - private final boolean isAsync; - - SessionsCache(boolean isAsync) { - this.isAsync = isAsync; - } - - public void store(StatefulKnowledgeSessionImpl session) { - if (isAsync) { - SessionResetterHolder.SESSION_RESETTER.enqueue(this, session); - } else { - session.reset(); - storeResettedSession(session); - } - } - - private void storeResettedSession(StatefulKnowledgeSessionImpl session) { - Queue cache = cleanSessions.get(session.getSessionConfiguration()); - if (cache == null) { - cache = new ConcurrentLinkedQueue(); - cleanSessions.put(session.getSessionConfiguration(), cache); - } - cache.offer(session); - } - - public StatefulKnowledgeSessionImpl getCachedSession(SessionConfiguration config) { - Queue cache = cleanSessions.get(config); - return cache != null ? cache.poll() : null; - } - - private static class SessionResetterHolder { - private static final SessionResetter SESSION_RESETTER = new SessionResetter(); - } - - private static class SessionResetter { - private final Executor executor = ExecutorProviderFactory.getExecutorProvider().newSingleThreadExecutor(); - - private final BlockingQueue dirtySessions = new ArrayBlockingQueue(20); - - private SessionResetter() { - executor.execute(new Runnable() { - @Override - public void run() { - while (true) { - try { - dirtySessions.take().doReset(); - } catch (InterruptedException e) { } - } - } - }); - } - - private void enqueue(SessionsCache sessionsCache, StatefulKnowledgeSessionImpl session) { - if (!dirtySessions.offer(new SessionSlot(sessionsCache, session))) { - throw new IllegalStateException("Cannot insert item into the queue! There is no space left in the queue."); - } - } - } - - private static class SessionSlot { - private final SessionsCache sessionsCache; - private final StatefulKnowledgeSessionImpl session; - - private SessionSlot(SessionsCache sessionsCache, StatefulKnowledgeSessionImpl session) { - this.sessionsCache = sessionsCache; - this.session = session; - } - - private void doReset() { - session.reset(); - sessionsCache.storeResettedSession(session); - } - } -} diff --git a/drools-core/src/main/java/org/drools/core/impl/StatefulKnowledgeSessionImpl.java b/drools-core/src/main/java/org/drools/core/impl/StatefulKnowledgeSessionImpl.java index d30452218fd..d6f4959e0d0 100644 --- a/drools-core/src/main/java/org/drools/core/impl/StatefulKnowledgeSessionImpl.java +++ b/drools-core/src/main/java/org/drools/core/impl/StatefulKnowledgeSessionImpl.java @@ -88,7 +88,6 @@ import org.drools.core.phreak.RuleAgendaItem; import org.drools.core.phreak.SegmentUtilities; import org.drools.core.reteoo.AsyncReceiveNode; -import org.drools.core.reteoo.ClassObjectTypeConf; import org.drools.core.reteoo.EntryPointNode; import org.drools.core.reteoo.InitialFactImpl; import org.drools.core.reteoo.LeftInputAdapterNode; @@ -157,6 +156,7 @@ import static java.util.stream.Collectors.toList; +import static org.drools.core.base.ClassObjectType.InitialFact_ObjectType; import static org.drools.core.common.PhreakPropagationContextFactory.createPropagationContextForFact; import static org.drools.core.reteoo.PropertySpecificUtil.allSetButTraitBitMask; @@ -246,8 +246,6 @@ public class StatefulKnowledgeSessionImpl extends AbstractRuntime private Map runtimeServices; - private boolean alive = true; - private AtomicBoolean mbeanRegistered = new AtomicBoolean(false); private DroolsManagementAgent.CBSKey mbeanRegisteredCBSKey; @@ -257,6 +255,9 @@ public class StatefulKnowledgeSessionImpl extends AbstractRuntime private List receiveNodeMemories; + private transient StatefulSessionPool pool; + private transient boolean alive = true; + // ------------------------------------------------------------ // Constructors // ------------------------------------------------------------ @@ -512,7 +513,19 @@ public KieBase getKieBase() { return this.kBase; } + StatefulKnowledgeSessionImpl fromPool(StatefulSessionPool pool) { + this.pool = pool; + alive = true; + return this; + } + public void dispose() { + alive = false; + if (pool != null) { + pool.release(this); + return; + } + if (!agenda.dispose(this)) { return; } @@ -540,9 +553,8 @@ public void dispose() { if (processRuntime != null) { this.processRuntime.dispose(); } - if (timerService != null) { - this.timerService.shutdown(); - } + + this.timerService.shutdown(); if (this.workItemManager != null) { ((org.drools.core.process.instance.WorkItemManager)this.workItemManager).dispose(); @@ -556,7 +568,7 @@ public void dispose() { } public boolean isAlive() { - return agenda.isAlive(); + return alive && agenda.isAlive(); } public void destroy() { @@ -755,9 +767,7 @@ public InternalFactHandle initInitialFact(InternalKnowledgeBase kBase, InternalW InitialFact initialFact = InitialFactImpl.getInstance(); InternalFactHandle handle = new DefaultFactHandle(0, initialFact, 0, entryPoint ); - ClassObjectTypeConf otc = (ClassObjectTypeConf) entryPoint.getObjectTypeConfigurationRegistry() - .getObjectTypeConf(epId, initialFact); - ObjectTypeNode otn = otc.getConcreteObjectTypeNode(); + ObjectTypeNode otn = entryPoint.getEntryPointNode().getObjectTypeNodes().get( InitialFact_ObjectType ); if (otn != null) { PropagationContextFactory ctxFact = kBase.getConfiguration().getComponentFactory().getPropagationContextFactory(); PropagationContext pctx = ctxFact.createPropagationContext( 0, PropagationContext.Type.INSERTION, null, @@ -1104,16 +1114,14 @@ public void reset() { this.opCounter.set(0); this.lastIdleTimestamp.set( -1 ); - initDefaultEntryPoint(); + this.defaultEntryPoint.reset(); updateEntryPointsCache(); - timerService = TimerServiceFactory.getTimerService(this.config); + timerService.reset(); this.processRuntime = null; this.initialFactHandle = initInitialFact(kBase, null); - - alive = true; } public void reset(int handleId, diff --git a/drools-core/src/main/java/org/drools/core/impl/StatefulSessionPool.java b/drools-core/src/main/java/org/drools/core/impl/StatefulSessionPool.java new file mode 100644 index 00000000000..a53bfec59a1 --- /dev/null +++ b/drools-core/src/main/java/org/drools/core/impl/StatefulSessionPool.java @@ -0,0 +1,48 @@ +/* + * Copyright 2018 JBoss Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.drools.core.impl; + +import java.util.function.Supplier; + +import org.drools.core.util.ScalablePool; + +public class StatefulSessionPool { + + private final KnowledgeBaseImpl kbase; + private final ScalablePool pool; + + public StatefulSessionPool( KnowledgeBaseImpl kbase, int initialSize, Supplier supplier ) { + this.kbase = kbase; + this.pool = new ScalablePool<>(initialSize, supplier, s -> s.reset(), s -> s.fromPool(null).dispose()); + } + + public KnowledgeBaseImpl getKieBase() { + return kbase; + } + + public StatefulKnowledgeSessionImpl get() { + return pool.get().fromPool( this ); + } + + public void release(StatefulKnowledgeSessionImpl session) { + pool.release( session ); + } + + public void shutdown() { + pool.clear(); + } +} diff --git a/drools-core/src/main/java/org/drools/core/impl/StatelessKnowledgeSessionImpl.java b/drools-core/src/main/java/org/drools/core/impl/StatelessKnowledgeSessionImpl.java index 1151216c921..e2159484c77 100644 --- a/drools-core/src/main/java/org/drools/core/impl/StatelessKnowledgeSessionImpl.java +++ b/drools-core/src/main/java/org/drools/core/impl/StatelessKnowledgeSessionImpl.java @@ -59,56 +59,75 @@ public class StatelessKnowledgeSessionImpl extends AbstractRuntime StatelessKnowledgeSession, StatelessKieSession { - private InternalKnowledgeBase kBase; + private KnowledgeBaseImpl kBase; private MapGlobalResolver sessionGlobals = new MapGlobalResolver(); private Map channels = new HashMap(); private final List listeners = new CopyOnWriteArrayList(); - private KieSessionConfiguration conf; + private SessionConfiguration conf; private Environment environment; private AtomicBoolean mbeanRegistered = new AtomicBoolean(false); private DroolsManagementAgent.CBSKey mbeanRegisteredCBSKey; - private AtomicLong wmCreated = new AtomicLong(0); + private final AtomicLong wmCreated; + + private final StatefulSessionPool pool; public StatelessKnowledgeSessionImpl() { + pool = null; + wmCreated = new AtomicLong(0); } - public StatelessKnowledgeSessionImpl(final InternalKnowledgeBase kBase, - final KieSessionConfiguration conf) { - this.kBase = kBase; - this.conf = (conf != null) ? conf : kBase.getSessionConfiguration(); + public StatelessKnowledgeSessionImpl(InternalKnowledgeBase kBase, + KieSessionConfiguration conf) { + this.kBase = (KnowledgeBaseImpl)kBase; + this.conf = conf != null ? (SessionConfiguration) conf : kBase.getSessionConfiguration(); this.environment = EnvironmentFactory.newEnvironment(); + this.pool = null; + wmCreated = new AtomicLong(0); + } + + public StatelessKnowledgeSessionImpl(KieSessionConfiguration conf, + StatefulSessionPool pool) { + this.kBase = pool.getKieBase(); + this.conf = conf != null ? (SessionConfiguration) conf : kBase.getSessionConfiguration(); + this.environment = null; + this.pool = pool; + wmCreated = new AtomicLong(1); } public InternalKnowledgeBase getKnowledgeBase() { return this.kBase; } + private StatefulKnowledgeSession newWorkingMemory() { + StatefulKnowledgeSessionImpl ksession = pool != null ? pool.get() : createWorkingMemory(); - public StatefulKnowledgeSession newWorkingMemory() { - this.kBase.readLock(); - try { - StatefulKnowledgeSessionImpl ksession = ((KnowledgeBaseImpl)kBase) - .internalCreateStatefulKnowledgeSession( this.environment, (SessionConfiguration) this.conf ) - .setStateless( true ); + ((Globals ) ksession.getGlobalResolver()).setDelegate(this.sessionGlobals); - ((Globals) ksession.getGlobalResolver()).setDelegate(this.sessionGlobals); + registerListeners( ksession ); - registerListeners( ksession ); + for( Map.Entry entry : this.channels.entrySet() ) { + ksession.registerChannel( entry.getKey(), entry.getValue() ); + } - for( Map.Entry entry : this.channels.entrySet() ) { - ksession.registerChannel( entry.getKey(), entry.getValue() ); - } - + return ksession; + } + + private StatefulKnowledgeSessionImpl createWorkingMemory() { + this.kBase.readLock(); + try { + StatefulKnowledgeSessionImpl ksession = kBase + .internalCreateStatefulKnowledgeSession( this.environment, this.conf ) + .setStateless( true ); wmCreated.incrementAndGet(); return ksession; } finally { this.kBase.readUnlock(); } } - + public void initMBeans(String containerId, String kbaseId, String ksessionName) { if (kBase.getConfiguration() != null && kBase.getConfiguration().isMBeansEnabled() && mbeanRegistered.compareAndSet(false, true)) { this.mbeanRegisteredCBSKey = new DroolsManagementAgent.CBSKey(containerId, kbaseId, ksessionName); @@ -295,11 +314,7 @@ public List executeWithResults(Iterable objects, ObjectFilter filter) { return list; } - public Environment getEnvironment() { - return environment; - } - - protected void dispose(StatefulKnowledgeSession ksession) { + private void dispose(StatefulKnowledgeSession ksession) { ksession.dispose(); } @@ -319,7 +334,7 @@ public boolean equals( Object obj ) { if ( this == obj ) { return true; } - if ( obj == null || !(obj instanceof ListnerHolder) ) { + if ( !(obj instanceof ListnerHolder) ) { return false; } diff --git a/drools-core/src/main/java/org/drools/core/time/TimerService.java b/drools-core/src/main/java/org/drools/core/time/TimerService.java index 1ff6a1f2ca3..f64d816df52 100644 --- a/drools-core/src/main/java/org/drools/core/time/TimerService.java +++ b/drools-core/src/main/java/org/drools/core/time/TimerService.java @@ -16,11 +16,11 @@ package org.drools.core.time; +import java.util.Collection; + import org.drools.core.time.impl.TimerJobFactoryManager; import org.drools.core.time.impl.TimerJobInstance; -import java.util.Collection; - /** * An interface for all timer service implementations used in a drools session. */ @@ -33,11 +33,16 @@ public interface TimerService extends SchedulerService { */ long getCurrentTime(); + /** + * Reset this service + */ + void reset(); + /** * Shuts the service down */ void shutdown(); - + /** * Returns the number of time units (usually ms) to * the next scheduled job diff --git a/drools-core/src/main/java/org/drools/core/time/impl/JDKTimerService.java b/drools-core/src/main/java/org/drools/core/time/impl/JDKTimerService.java index 534ff9d92c1..89d07dec356 100644 --- a/drools-core/src/main/java/org/drools/core/time/impl/JDKTimerService.java +++ b/drools-core/src/main/java/org/drools/core/time/impl/JDKTimerService.java @@ -16,14 +16,6 @@ package org.drools.core.time.impl; -import org.drools.core.time.InternalSchedulerService; -import org.drools.core.time.Job; -import org.drools.core.time.JobContext; -import org.drools.core.time.JobHandle; -import org.drools.core.time.TimerService; -import org.drools.core.time.Trigger; -import org.kie.api.time.SessionClock; - import java.util.Collection; import java.util.Date; import java.util.concurrent.Callable; @@ -32,6 +24,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.drools.core.time.InternalSchedulerService; +import org.drools.core.time.Job; +import org.drools.core.time.JobContext; +import org.drools.core.time.JobHandle; +import org.drools.core.time.TimerService; +import org.drools.core.time.Trigger; +import org.kie.api.time.SessionClock; + /** * A default Scheduler implementation that uses the * JDK built-in ScheduledThreadPoolExecutor as the @@ -43,7 +43,9 @@ public class JDKTimerService SessionClock, InternalSchedulerService { - private AtomicLong idCounter = new AtomicLong(); + private final int size; + + private AtomicLong idCounter; protected ScheduledThreadPoolExecutor scheduler; @@ -54,17 +56,15 @@ public JDKTimerService() { } public JDKTimerService(int size) { + this.size = size; this.scheduler = new ScheduledThreadPoolExecutor(size); + this.idCounter = new AtomicLong(0L); } public void setTimerJobFactoryManager(TimerJobFactoryManager timerJobFactoryManager) { this.jobFactoryManager = timerJobFactoryManager; } - public void setCounter(long counter) { - idCounter = new AtomicLong(counter); - } - public TimerJobFactoryManager getTimerJobFactoryManager() { return this.jobFactoryManager; } @@ -76,6 +76,15 @@ public long getCurrentTime() { return System.currentTimeMillis(); } + public void reset() { + if (idCounter.get() != 0L) { + this.scheduler.shutdownNow(); + this.scheduler = new ScheduledThreadPoolExecutor( size ); + this.idCounter.set( 0L ); + } + } + + @Override public void shutdown() { // forcing a shutdownNow instead of a regular shutdown() // to avoid delays on shutdown. This is an irreversible diff --git a/drools-core/src/main/java/org/drools/core/time/impl/PseudoClockScheduler.java b/drools-core/src/main/java/org/drools/core/time/impl/PseudoClockScheduler.java index ba8781f961f..d776c98d0bb 100644 --- a/drools-core/src/main/java/org/drools/core/time/impl/PseudoClockScheduler.java +++ b/drools-core/src/main/java/org/drools/core/time/impl/PseudoClockScheduler.java @@ -178,9 +178,14 @@ public synchronized void setSession(InternalWorkingMemory session) { this.session = session; } - /** - * {@inheritDoc} - */ + @Override + public void reset() { + idCounter.set(0); + timer.set(0); + queue.clear(); + } + + @Override public void shutdown() { // nothing to do } diff --git a/drools-core/src/main/java/org/drools/core/util/ScalablePool.java b/drools-core/src/main/java/org/drools/core/util/ScalablePool.java new file mode 100644 index 00000000000..ae887c9247d --- /dev/null +++ b/drools-core/src/main/java/org/drools/core/util/ScalablePool.java @@ -0,0 +1,94 @@ +/* + * Copyright 2018 JBoss Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.drools.core.util; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ScalablePool { + + private static final Logger log = LoggerFactory.getLogger(ScalablePool.class); + + private final T[] fixedSizePool; + private volatile int cursor; + + private java.util.Queue dynamicPool; + + private final Supplier supplier; + private final Consumer resetter; + private final Consumer disposer; + + public ScalablePool( int initialSize, Supplier supplier, Consumer resetter, Consumer disposer ) { + this.supplier = supplier; + this.resetter = resetter; + this.disposer = disposer; + + fixedSizePool = (T[]) new Object[initialSize]; + for (int i = 0; i < initialSize; i++) { + fixedSizePool[i] = this.supplier.get(); + } + cursor = initialSize; + } + + public T get() { + synchronized (fixedSizePool) { + if ( cursor > 0 ) { + return fixedSizePool[--cursor]; + } + } + if (dynamicPool == null) { + dynamicPool = new java.util.LinkedList<>(); + } else { + synchronized (dynamicPool) { + if ( !dynamicPool.isEmpty() ) { + return dynamicPool.poll(); + } + } + } + return supplier.get(); + } + + public void release(T t) { + resetter.accept( t ); + synchronized (fixedSizePool) { + if ( cursor < fixedSizePool.length ) { + fixedSizePool[cursor++] = t; + return; + } + } + synchronized (dynamicPool) { + dynamicPool.offer( t ); + } + } + + public void clear() { + for (int i = 0; i < fixedSizePool.length; i++) { + disposer.accept( fixedSizePool[i] ); + fixedSizePool[i] = null; + } + if (dynamicPool != null) { + for (T t : dynamicPool) { + disposer.accept( t ); + } + dynamicPool.clear(); + } + } + +} diff --git a/drools-persistence/drools-persistence-jpa/src/main/java/org/drools/persistence/jpa/JpaJDKTimerService.java b/drools-persistence/drools-persistence-jpa/src/main/java/org/drools/persistence/jpa/JpaJDKTimerService.java index f39b52c38a0..22ec4e98052 100644 --- a/drools-persistence/drools-persistence-jpa/src/main/java/org/drools/persistence/jpa/JpaJDKTimerService.java +++ b/drools-persistence/drools-persistence-jpa/src/main/java/org/drools/persistence/jpa/JpaJDKTimerService.java @@ -55,7 +55,6 @@ public void setCommandService(ExecutableRunner runner ) { public JpaJDKTimerService() { this( 1 ); - timerInstances = new ConcurrentHashMap(); } public JpaJDKTimerService(int size) { @@ -63,6 +62,12 @@ public JpaJDKTimerService(int size) { timerInstances = new ConcurrentHashMap(); } + @Override + public void reset() { + super.reset(); + timerInstances.clear(); + } + protected Callable createCallableJob(Job job, JobContext ctx, Trigger trigger, diff --git a/drools-test-coverage/test-compiler-integration/src/test/java/org/drools/compiler/integrationtests/drl/GlobalTest.java b/drools-test-coverage/test-compiler-integration/src/test/java/org/drools/compiler/integrationtests/drl/GlobalTest.java index 650c487898c..4cb291fbba7 100644 --- a/drools-test-coverage/test-compiler-integration/src/test/java/org/drools/compiler/integrationtests/drl/GlobalTest.java +++ b/drools-test-coverage/test-compiler-integration/src/test/java/org/drools/compiler/integrationtests/drl/GlobalTest.java @@ -22,7 +22,6 @@ import java.util.Map; import org.drools.core.base.MapGlobalResolver; -import org.drools.core.impl.StatelessKnowledgeSessionImpl; import org.drools.testcoverage.common.model.Cheese; import org.drools.testcoverage.common.util.KieBaseTestConfiguration; import org.drools.testcoverage.common.util.KieBaseUtil; @@ -145,42 +144,6 @@ public void testGlobalAccess() { assertEquals(entries2[0].getValue(), "Testing 2"); assertEquals(1, session2.getGlobals().getGlobalKeys().size()); assertTrue(session2.getGlobals().getGlobalKeys().contains("myGlobal")); - - // Testing 3. - final KieSession session3 = ((StatelessKnowledgeSessionImpl) session2).newWorkingMemory(); - try { - session3.insert(sample); - session3.fireAllRules(); - Map.Entry[] entries3 = ((MapGlobalResolver) session3.getGlobals()).getGlobals(); - assertEquals(1, entries3.length); - assertEquals(entries3[0].getValue(), "Testing 2"); - assertEquals(1, session3.getGlobals().getGlobalKeys().size()); - assertTrue(session3.getGlobals().getGlobalKeys().contains("myGlobal")); - - session3.setGlobal("myGlobal", "Testing 3 Over"); - entries3 = ((MapGlobalResolver) session3.getGlobals()).getGlobals(); - assertEquals(1, entries3.length); - assertEquals(entries3[0].getValue(), "Testing 3 Over"); - assertEquals(1, session3.getGlobals().getGlobalKeys().size()); - assertTrue(session3.getGlobals().getGlobalKeys().contains("myGlobal")); - } finally { - session3.dispose(); - } - - // Testing 4. - final KieSession session4 = ((StatelessKnowledgeSessionImpl) session2).newWorkingMemory(); - try { - session4.setGlobal("myGlobal", "Testing 4"); - session4.insert(sample); - session4.fireAllRules(); - final Map.Entry[] entries4 = ((MapGlobalResolver) session4.getGlobals()).getGlobals(); - assertEquals(1, entries4.length); - assertEquals(entries4[0].getValue(), "Testing 4"); - assertEquals(1, session4.getGlobals().getGlobalKeys().size()); - assertTrue(session4.getGlobals().getGlobalKeys().contains("myGlobal")); - } finally { - session4.dispose(); - } } @Test