From 01b8fc535a61d89e0ddf98cea65f5ffe9e30889a Mon Sep 17 00:00:00 2001 From: Mario Fusco Date: Fri, 1 Apr 2016 16:33:04 +0200 Subject: [PATCH] [DROOLS-1111] make listeners registration on stateless session thread safe --- .../impl/StatelessKnowledgeSessionImpl.java | 181 +++++++----------- 1 file changed, 71 insertions(+), 110 deletions(-) diff --git a/drools-core/src/main/java/org/drools/core/impl/StatelessKnowledgeSessionImpl.java b/drools-core/src/main/java/org/drools/core/impl/StatelessKnowledgeSessionImpl.java index 5411239019f..3a6cb73b7a8 100644 --- a/drools-core/src/main/java/org/drools/core/impl/StatelessKnowledgeSessionImpl.java +++ b/drools-core/src/main/java/org/drools/core/impl/StatelessKnowledgeSessionImpl.java @@ -26,11 +26,7 @@ import org.drools.core.command.runtime.rule.FireAllRulesCommand; import org.drools.core.common.InternalFactHandle; import org.drools.core.common.WorkingMemoryFactory; -import org.drools.core.event.AgendaEventSupport; -import org.drools.core.event.ProcessEventSupport; -import org.drools.core.event.RuleRuntimeEventSupport; import org.drools.core.runtime.impl.ExecutionResultImpl; -import org.drools.core.runtime.process.InternalProcessRuntime; import org.kie.api.KieBase; import org.kie.api.command.Command; import org.kie.api.event.process.ProcessEventListener; @@ -51,11 +47,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.EventListener; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; public class StatelessKnowledgeSessionImpl extends AbstractRuntime implements @@ -67,15 +63,7 @@ public class StatelessKnowledgeSessionImpl extends AbstractRuntime private MapGlobalResolver sessionGlobals = new MapGlobalResolver(); private Map channels = new HashMap(); - /** The event mapping */ - public Set cachedRuleRuntimeListeners; - public Set cachedAgendaListeners; - public Set cachedProcessEventListener; - - /** The event support */ - private AgendaEventSupport agendaEventSupport = new AgendaEventSupport(); - private RuleRuntimeEventSupport ruleRuntimeEventSupport = new RuleRuntimeEventSupport(); - private ProcessEventSupport processEventSupport = new ProcessEventSupport(); + private final List listeners = new CopyOnWriteArrayList(); private KieSessionConfiguration conf; private Environment environment; @@ -120,21 +108,9 @@ public StatefulKnowledgeSession newWorkingMemory() { this.environment); StatefulKnowledgeSessionImpl ksessionImpl = (StatefulKnowledgeSessionImpl) ksession; - // we don't pass the mapped listener wrappers to the session constructor anymore, - // because they would be ignored anyway, since the wm already contains those listeners - ((Globals) ksessionImpl.getGlobalResolver()).setDelegate(this.sessionGlobals); - // copy over the default generated listeners that are used for internal stuff once - registerSystemListeners(ksessionImpl); - registerCustomListeners(); - - ksessionImpl.setAgendaEventSupport( this.agendaEventSupport ); - ksessionImpl.setRuleRuntimeEventSupport(this.ruleRuntimeEventSupport); - InternalProcessRuntime processRuntime = ksessionImpl.internalGetProcessRuntime(); - if ( processRuntime != null ) { - processRuntime.setProcessEventSupport( this.processEventSupport ); - } + registerListeners( ksessionImpl ); for( Map.Entry entry : this.channels.entrySet() ) { ksession.registerChannel( entry.getKey(), entry.getValue() ); @@ -146,109 +122,76 @@ public StatefulKnowledgeSession newWorkingMemory() { } } - private void registerSystemListeners(StatefulKnowledgeSessionImpl wm) { - for (AgendaEventListener listener : wm.getAgendaEventSupport().getEventListeners()) { - this.agendaEventSupport.addEventListener(listener); - } - for (RuleRuntimeEventListener listener : wm.getRuleRuntimeEventSupport().getEventListeners()) { - this.ruleRuntimeEventSupport.addEventListener(listener); + private void registerListeners( StatefulKnowledgeSessionImpl wm ) { + if ( listeners.isEmpty()) { + return; } - InternalProcessRuntime processRuntime = wm.internalGetProcessRuntime(); - if ( processRuntime != null ) { - for ( ProcessEventListener listener : processRuntime.getProcessEventListeners() ) { - this.processEventSupport.addEventListener( listener ); - } - } - } - - private void registerCustomListeners() { - if ( cachedAgendaListeners != null ) { - for (AgendaEventListener agendaListener : cachedAgendaListeners) { - this.agendaEventSupport.addEventListener( agendaListener ); - } - } - if ( cachedRuleRuntimeListeners != null ) { - for (RuleRuntimeEventListener wmListener : cachedRuleRuntimeListeners) { - this.ruleRuntimeEventSupport.addEventListener(wmListener); - } - } - if ( cachedProcessEventListener != null ) { - for (ProcessEventListener processListener : cachedProcessEventListener) { - this.processEventSupport.addEventListener( processListener ); + for (ListnerHolder listnerHolder : listeners ) { + switch (listnerHolder.type) { + case AGENDA: + wm.addEventListener( (AgendaEventListener)listnerHolder.listener ); + break; + case RUNTIME: + wm.addEventListener( (RuleRuntimeEventListener)listnerHolder.listener ); + break; + case PROCESS: + wm.addEventListener( (ProcessEventListener)listnerHolder.listener ); + break; } } } public void addEventListener(AgendaEventListener listener) { - registerAgendaEventListener(listener); - } - - private void registerAgendaEventListener(AgendaEventListener listener) { - if ( this.cachedAgendaListeners == null ) { - this.cachedAgendaListeners = new HashSet(); - } - this.cachedAgendaListeners.add(listener); + listeners.add( new ListnerHolder( ListnerHolder.Type.AGENDA, listener ) ); } public Collection getAgendaEventListeners() { - return cachedAgendaListeners != null ? Collections.unmodifiableCollection( cachedAgendaListeners ) : Collections.emptySet(); + return (Collection) getListeners(ListnerHolder.Type.AGENDA); } public void removeEventListener(AgendaEventListener listener) { - if ( this.cachedAgendaListeners != null ) { - cachedAgendaListeners.remove( listener ); - this.agendaEventSupport.removeEventListener( listener ); - } + listeners.remove( new ListnerHolder( ListnerHolder.Type.AGENDA, listener ) ); } public void addEventListener(RuleRuntimeEventListener listener) { - registerRuleRuntimeEventListener(listener); - } - - private void registerRuleRuntimeEventListener(RuleRuntimeEventListener listener) { - if ( this.cachedRuleRuntimeListeners == null ) { - this.cachedRuleRuntimeListeners = new HashSet(); - } - this.cachedRuleRuntimeListeners.add(listener); + listeners.add( new ListnerHolder( ListnerHolder.Type.RUNTIME, listener ) ); } public void removeEventListener(RuleRuntimeEventListener listener) { - if ( this.cachedRuleRuntimeListeners != null ) { - this.ruleRuntimeEventSupport.removeEventListener(listener); - } + listeners.remove( new ListnerHolder( ListnerHolder.Type.RUNTIME, listener ) ); } public Collection getRuleRuntimeEventListeners() { - if ( this.cachedRuleRuntimeListeners == null ) { - this.cachedRuleRuntimeListeners = new HashSet(); - } - - return Collections.unmodifiableCollection( this.cachedRuleRuntimeListeners ); + return (Collection) getListeners(ListnerHolder.Type.RUNTIME); } public void addEventListener(ProcessEventListener listener) { - if ( this.cachedProcessEventListener == null ) { - this.cachedProcessEventListener = new HashSet(); - } - this.cachedProcessEventListener.add(listener); - this.processEventSupport.addEventListener(listener); + listeners.add( new ListnerHolder( ListnerHolder.Type.PROCESS, listener ) ); } public Collection getProcessEventListeners() { - return Collections.unmodifiableCollection( this.cachedProcessEventListener ); + return (Collection) getListeners(ListnerHolder.Type.PROCESS); } - public void removeEventListener(ProcessEventListener listener) { - if (this.cachedProcessEventListener != null) { - this.cachedProcessEventListener.remove(listener); + private Collection getListeners(ListnerHolder.Type type) { + if ( listeners.isEmpty()) { + return Collections.emptySet(); + } + Collection l = new ArrayList(); + for (ListnerHolder listnerHolder : listeners ) { + if (listnerHolder.type == type) { + l.add( listnerHolder.listener ); + } } - this.processEventSupport.removeEventListener( listener ); + return l; } - public void setGlobal(String identifier, - Object value) { - this.sessionGlobals.setGlobal(identifier, - value); + public void removeEventListener(ProcessEventListener listener) { + listeners.remove( new ListnerHolder( ListnerHolder.Type.RUNTIME, listener ) ); + } + + public void setGlobal(String identifier, Object value) { + this.sessionGlobals.setGlobal(identifier, value); } public Globals getGlobals() { @@ -361,20 +304,38 @@ public Environment getEnvironment() { } protected void dispose(StatefulKnowledgeSession ksession) { - StatefulKnowledgeSessionImpl wm = ((StatefulKnowledgeSessionImpl) ksession); + ksession.dispose(); + } - for ( AgendaEventListener listener : wm.getAgendaEventSupport().getEventListeners() ) { - this.agendaEventSupport.removeEventListener( listener ); - } - for ( RuleRuntimeEventListener listener: wm.getRuleRuntimeEventSupport().getEventListeners() ) { - this.ruleRuntimeEventSupport.removeEventListener(listener); + private static class ListnerHolder { + enum Type { AGENDA, RUNTIME, PROCESS } + + final Type type; + final EventListener listener; + + private ListnerHolder( Type type, EventListener listener ) { + this.type = type; + this.listener = listener; } - InternalProcessRuntime processRuntime = wm.internalGetProcessRuntime(); - if ( processRuntime != null ) { - for ( ProcessEventListener listener: processRuntime.getProcessEventListeners() ) { - this.processEventSupport.removeEventListener( listener ); + + @Override + public boolean equals( Object obj ) { + if ( this == obj ) { + return true; } + if ( obj == null || !(obj instanceof ListnerHolder) ) { + return false; + } + + ListnerHolder that = (ListnerHolder) obj; + return type == that.type && listener == that.listener; + } + + @Override + public int hashCode() { + int result = type.hashCode(); + result = 31 * result + listener.hashCode(); + return result; } - ksession.dispose(); } }