diff --git a/core/src/main/java/org/infinispan/marshall/jboss/AbstractJBossMarshaller.java b/core/src/main/java/org/infinispan/marshall/jboss/AbstractJBossMarshaller.java index f7b6ff94e48c..5341d4059677 100644 --- a/core/src/main/java/org/infinispan/marshall/jboss/AbstractJBossMarshaller.java +++ b/core/src/main/java/org/infinispan/marshall/jboss/AbstractJBossMarshaller.java @@ -3,6 +3,7 @@ import org.infinispan.io.ByteBuffer; import org.infinispan.io.ExposedByteArrayOutputStream; import org.infinispan.marshall.AbstractMarshaller; +import org.infinispan.marshall.StreamingMarshaller; import org.infinispan.util.ConcurrentWeakKeyHashMap; import org.infinispan.util.logging.BasicLogFactory; import org.jboss.logging.BasicLogger; @@ -32,18 +33,35 @@ * Common parent for both embedded and standalone JBoss Marshalling-based marshallers. * * @author Galder ZamarreƱo + * @author Sanne Grinovero * @since 5.0 */ -public abstract class AbstractJBossMarshaller extends AbstractMarshaller { +public abstract class AbstractJBossMarshaller extends AbstractMarshaller implements StreamingMarshaller { protected static final BasicLogger log = BasicLogFactory.getLog(AbstractJBossMarshaller.class); protected static final boolean trace = log.isTraceEnabled(); protected static final JBossMarshallerFactory factory = new JBossMarshallerFactory(); protected static final int DEF_INSTANCE_COUNT = 16; protected static final int DEF_CLASS_COUNT = 8; + private static final int PER_THREAD_REUSABLE_INSTANCES = 6; protected final MarshallingConfiguration baseCfg; + /** + * Marshaller thread local. In non-internal marshaller usages, such as Java + * Hot Rod client, this is a singleton shared by all so no urgent need for + * static here. JBMAR clears pretty much any state during finish(), so no + * urgent need to clear the thread local since it shouldn't be leaking. + * It might take a long time to warmup and pre-initialize all needed instances! + */ + private final ThreadLocal marshallerTL = new ThreadLocal() { + @Override + protected PerThreadInstanceHolder initialValue() { + MarshallingConfiguration cfg = baseCfg.clone(); + return new PerThreadInstanceHolder(cfg); + } + }; + /** * Cache of classes that are considered to be marshallable. Since checking * whether a type is marshallable requires attempting to marshalling them, @@ -89,8 +107,15 @@ final public ObjectOutput startObjectOutput(final OutputStream os, final boolean return startObjectOutput(os, isReentrant, 512); } + private final Marshaller getMarshaller(boolean isReentrant, final int estimatedSize) throws IOException { + PerThreadInstanceHolder instanceHolder = marshallerTL.get(); + return instanceHolder.getMarshaller(estimatedSize); + } - protected abstract Marshaller getMarshaller(boolean isReentrant, final int estimatedSize) throws IOException; + private final Unmarshaller getUnmarshaller(boolean isReentrant) throws IOException { + PerThreadInstanceHolder instanceHolder = marshallerTL.get(); + return instanceHolder.getUnmarshaller(); + } final public void finishObjectOutput(final ObjectOutput oo) { try { @@ -126,8 +151,6 @@ final public ObjectInput startObjectInput(final InputStream is, final boolean is return unmarshaller; } - protected abstract Unmarshaller getUnmarshaller(boolean isReentrant) throws IOException; - final public Object objectFromObjectStream(final ObjectInput in) throws IOException, ClassNotFoundException { return in.readObject(); } @@ -231,4 +254,72 @@ private static URL[] getClassLoaderURLs(final ClassLoader cl) { } + private static final class PerThreadInstanceHolder implements RiverCloseListener { + + final MarshallingConfiguration configuration; + final ExtendedRiverMarshaller[] reusableMarshaller = new ExtendedRiverMarshaller[PER_THREAD_REUSABLE_INSTANCES]; + int availableMarshallerIndex = 0; + final ExtendedRiverUnmarshaller[] reusableUnMarshaller = new ExtendedRiverUnmarshaller[PER_THREAD_REUSABLE_INSTANCES]; + int availableUnMarshallerIndex = 0; + + PerThreadInstanceHolder(final MarshallingConfiguration threadDedicatedConfiguration) { + this.configuration = threadDedicatedConfiguration; + } + + Unmarshaller getUnmarshaller() throws IOException { + if (availableUnMarshallerIndex == PER_THREAD_REUSABLE_INSTANCES) { + //we're above the pool threshold: make a throw-away-after usage Marshaller + configuration.setBufferSize(512);//reset to default + return factory.createUnmarshaller(configuration); + } + else { + ExtendedRiverUnmarshaller unMarshaller = reusableUnMarshaller[availableUnMarshallerIndex]; + if (unMarshaller != null) { + availableUnMarshallerIndex++; + return unMarshaller; + } + else { + configuration.setBufferSize(512);//reset to default + unMarshaller = factory.createUnmarshaller(configuration); + unMarshaller.setCloseListener(this); + reusableUnMarshaller[availableUnMarshallerIndex] = unMarshaller; + availableUnMarshallerIndex++; + return unMarshaller; + } + } + } + + ExtendedRiverMarshaller getMarshaller(int estimatedSize) throws IOException { + if (availableMarshallerIndex == PER_THREAD_REUSABLE_INSTANCES) { + //we're above the pool threshold: make a throw-away-after usage Marshaller + configuration.setBufferSize(estimatedSize); + return factory.createMarshaller(configuration); + } + else { + ExtendedRiverMarshaller marshaller = reusableMarshaller[availableMarshallerIndex]; + if (marshaller != null) { + availableMarshallerIndex++; + return marshaller; + } + else { + marshaller = factory.createMarshaller(configuration); + marshaller.setCloseListener(this); + reusableMarshaller[availableMarshallerIndex] = marshaller; + availableMarshallerIndex++; + return marshaller; + } + } + } + + @Override + public void closeMarshaller() { + availableMarshallerIndex--; + } + + @Override + public void closeUnmarshaller() { + availableUnMarshallerIndex--; + } + } + } diff --git a/core/src/main/java/org/infinispan/marshall/jboss/GenericJBossMarshaller.java b/core/src/main/java/org/infinispan/marshall/jboss/GenericJBossMarshaller.java index 4d67606d3247..8168af6f561c 100644 --- a/core/src/main/java/org/infinispan/marshall/jboss/GenericJBossMarshaller.java +++ b/core/src/main/java/org/infinispan/marshall/jboss/GenericJBossMarshaller.java @@ -22,12 +22,6 @@ */ package org.infinispan.marshall.jboss; -import org.infinispan.CacheException; -import org.jboss.marshalling.Marshaller; -import org.jboss.marshalling.Unmarshaller; - -import java.io.IOException; - /** * A marshaller that makes use of JBoss Marshalling * to serialize and deserialize objects. This marshaller is oriented at external, @@ -39,68 +33,10 @@ */ public final class GenericJBossMarshaller extends AbstractJBossMarshaller { - /** - * Marshaller thread local. In non-internal marshaller usages, such as Java - * Hot Rod client, this is a singleton shared by all so no urgent need for - * static here. JBMAR clears pretty much any state during finish(), so no - * urgent need to clear the thread local since it shouldn't be leaking. - */ - private final ThreadLocal marshallerTL = - new ThreadLocal() { - @Override - protected org.jboss.marshalling.Marshaller initialValue() { - try { - return factory.createMarshaller(baseCfg); - } catch (IOException e) { - throw new CacheException(e); - } - } - }; - - /** - * Unmarshaller thread local. In non-internal marshaller usages, such as - * Java Hot Rod client, this is a singleton shared by all so no urgent need - * for static here. JBMAR clears pretty much any state during finish(), so - * no urgent need to clear the thread local since it shouldn't be leaking. - */ - private final ThreadLocal unmarshallerTL = new - ThreadLocal() { - @Override - protected Unmarshaller initialValue() { - try { - return factory.createUnmarshaller(baseCfg); - } catch (IOException e) { - throw new CacheException(e); - } - } - }; - public GenericJBossMarshaller() { super(); baseCfg.setClassResolver( new DefaultContextClassResolver(this.getClass().getClassLoader())); } - protected Marshaller getMarshaller(boolean isReentrant, int expectedByteSize) throws IOException { - Marshaller marshaller = isReentrant ? - factory.createMarshaller(baseCfg) : marshallerTL.get(); - - if (log.isTraceEnabled()) - log.tracef("Start marshaller after retrieving marshaller from %s", - isReentrant ? "factory" : "thread local"); - - return marshaller; - } - - protected Unmarshaller getUnmarshaller(boolean isReentrant) throws IOException { - Unmarshaller unmarshaller = isReentrant ? - factory.createUnmarshaller(baseCfg) : unmarshallerTL.get(); - - if (log.isTraceEnabled()) - log.tracef("Start unmarshaller after retrieving marshaller from %s", - isReentrant ? "factory" : "thread local"); - - return unmarshaller; - } - } diff --git a/core/src/main/java/org/infinispan/marshall/jboss/JBossMarshaller.java b/core/src/main/java/org/infinispan/marshall/jboss/JBossMarshaller.java index 69ab17c0ac2d..9e6e69b061cb 100644 --- a/core/src/main/java/org/infinispan/marshall/jboss/JBossMarshaller.java +++ b/core/src/main/java/org/infinispan/marshall/jboss/JBossMarshaller.java @@ -48,20 +48,10 @@ * @author Sanne Grinovero * @since 4.0 */ -public class JBossMarshaller extends AbstractJBossMarshaller implements StreamingMarshaller { - - private static final int PER_THREAD_REUSABLE_INSTANCES = 4; +public final class JBossMarshaller extends AbstractJBossMarshaller implements StreamingMarshaller { ExternalizerTable externalizerTable; - private final ThreadLocal marshallerTL = new ThreadLocal() { - @Override - protected PerThreadInstanceHolder initialValue() { - MarshallingConfiguration cfg = baseCfg.clone(); - return new PerThreadInstanceHolder(cfg); - } - }; - public void inject(ExternalizerTable externalizerTable, ClassLoader cl, InvocationContextContainer icc) { log.debug("Using JBoss Marshalling"); this.externalizerTable = externalizerTable; @@ -71,18 +61,6 @@ public void inject(ExternalizerTable externalizerTable, ClassLoader cl, Invocati baseCfg.setClassResolver(new EmbeddedContextClassResolver(cl, icc)); } - @Override - protected Marshaller getMarshaller(boolean isReentrant, final int estimatedSize) throws IOException { - PerThreadInstanceHolder instanceHolder = marshallerTL.get(); - return instanceHolder.getMarshaller(estimatedSize); - } - - @Override - protected Unmarshaller getUnmarshaller(boolean isReentrant) throws IOException { - PerThreadInstanceHolder instanceHolder = marshallerTL.get(); - return instanceHolder.getUnmarshaller(); - } - @Override public void stop() { super.stop(); @@ -122,72 +100,4 @@ protected ClassLoader getClassLoader() { } } - private static final class PerThreadInstanceHolder implements RiverCloseListener { - - final MarshallingConfiguration configuration; - final ExtendedRiverMarshaller[] reusableMarshaller = new ExtendedRiverMarshaller[PER_THREAD_REUSABLE_INSTANCES]; - int availableMarshallerIndex = 0; - final ExtendedRiverUnmarshaller[] reusableUnMarshaller = new ExtendedRiverUnmarshaller[PER_THREAD_REUSABLE_INSTANCES]; - int availableUnMarshallerIndex = 0; - - PerThreadInstanceHolder(final MarshallingConfiguration threadDedicatedConfiguration) { - this.configuration = threadDedicatedConfiguration; - } - - Unmarshaller getUnmarshaller() throws IOException { - if (availableUnMarshallerIndex == PER_THREAD_REUSABLE_INSTANCES) { - //we're above the pool threshold: make a throw-away-after usage Marshaller - configuration.setBufferSize(512);//reset to default - return factory.createUnmarshaller(configuration); - } - else { - ExtendedRiverUnmarshaller unMarshaller = reusableUnMarshaller[availableUnMarshallerIndex]; - if (unMarshaller != null) { - availableUnMarshallerIndex++; - return unMarshaller; - } - else { - configuration.setBufferSize(512);//reset to default - unMarshaller = factory.createUnmarshaller(configuration); - unMarshaller.setCloseListener(this); - reusableUnMarshaller[availableUnMarshallerIndex] = unMarshaller; - availableUnMarshallerIndex++; - return unMarshaller; - } - } - } - - ExtendedRiverMarshaller getMarshaller(int estimatedSize) throws IOException { - if (availableMarshallerIndex == PER_THREAD_REUSABLE_INSTANCES) { - //we're above the pool threshold: make a throw-away-after usage Marshaller - configuration.setBufferSize(estimatedSize); - return factory.createMarshaller(configuration); - } - else { - ExtendedRiverMarshaller marshaller = reusableMarshaller[availableMarshallerIndex]; - if (marshaller != null) { - availableMarshallerIndex++; - return marshaller; - } - else { - marshaller = factory.createMarshaller(configuration); - marshaller.setCloseListener(this); - reusableMarshaller[availableMarshallerIndex] = marshaller; - availableMarshallerIndex++; - return marshaller; - } - } - } - - @Override - public void closeMarshaller() { - availableMarshallerIndex--; - } - - @Override - public void closeUnmarshaller() { - availableUnMarshallerIndex--; - } - } - }