Skip to content

Commit

Permalink
ISPN-1815 Move new pool to AbstractJBossMarshaller, replacing pool lo…
Browse files Browse the repository at this point in the history
…gic in GenericJBossMarshaller as well
  • Loading branch information
Sanne authored and danberindei committed Jan 31, 2012
1 parent 7eb2e29 commit 75b61ea
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 159 deletions.
Expand Up @@ -3,6 +3,7 @@
import org.infinispan.io.ByteBuffer;
import org.infinispan.io.ExposedByteArrayOutputStream;
import org.infinispan.marshall.AbstractMarshaller;
import org.infinispan.marshall.StreamingMarshaller;
import org.infinispan.util.ConcurrentWeakKeyHashMap;
import org.infinispan.util.logging.BasicLogFactory;
import org.jboss.logging.BasicLogger;
Expand Down Expand Up @@ -32,18 +33,35 @@
* Common parent for both embedded and standalone JBoss Marshalling-based marshallers.
*
* @author Galder Zamarreño
* @author Sanne Grinovero
* @since 5.0
*/
public abstract class AbstractJBossMarshaller extends AbstractMarshaller {
public abstract class AbstractJBossMarshaller extends AbstractMarshaller implements StreamingMarshaller {

protected static final BasicLogger log = BasicLogFactory.getLog(AbstractJBossMarshaller.class);
protected static final boolean trace = log.isTraceEnabled();
protected static final JBossMarshallerFactory factory = new JBossMarshallerFactory();
protected static final int DEF_INSTANCE_COUNT = 16;
protected static final int DEF_CLASS_COUNT = 8;
private static final int PER_THREAD_REUSABLE_INSTANCES = 6;

protected final MarshallingConfiguration baseCfg;

/**
* Marshaller thread local. In non-internal marshaller usages, such as Java
* Hot Rod client, this is a singleton shared by all so no urgent need for
* static here. JBMAR clears pretty much any state during finish(), so no
* urgent need to clear the thread local since it shouldn't be leaking.
* It might take a long time to warmup and pre-initialize all needed instances!
*/
private final ThreadLocal<PerThreadInstanceHolder> marshallerTL = new ThreadLocal<PerThreadInstanceHolder>() {
@Override
protected PerThreadInstanceHolder initialValue() {
MarshallingConfiguration cfg = baseCfg.clone();
return new PerThreadInstanceHolder(cfg);
}
};

/**
* Cache of classes that are considered to be marshallable. Since checking
* whether a type is marshallable requires attempting to marshalling them,
Expand Down Expand Up @@ -89,8 +107,15 @@ final public ObjectOutput startObjectOutput(final OutputStream os, final boolean
return startObjectOutput(os, isReentrant, 512);
}

private final Marshaller getMarshaller(boolean isReentrant, final int estimatedSize) throws IOException {
PerThreadInstanceHolder instanceHolder = marshallerTL.get();
return instanceHolder.getMarshaller(estimatedSize);
}

protected abstract Marshaller getMarshaller(boolean isReentrant, final int estimatedSize) throws IOException;
private final Unmarshaller getUnmarshaller(boolean isReentrant) throws IOException {
PerThreadInstanceHolder instanceHolder = marshallerTL.get();
return instanceHolder.getUnmarshaller();
}

final public void finishObjectOutput(final ObjectOutput oo) {
try {
Expand Down Expand Up @@ -126,8 +151,6 @@ final public ObjectInput startObjectInput(final InputStream is, final boolean is
return unmarshaller;
}

protected abstract Unmarshaller getUnmarshaller(boolean isReentrant) throws IOException;

final public Object objectFromObjectStream(final ObjectInput in) throws IOException, ClassNotFoundException {
return in.readObject();
}
Expand Down Expand Up @@ -231,4 +254,72 @@ private static URL[] getClassLoaderURLs(final ClassLoader cl) {

}

private static final class PerThreadInstanceHolder implements RiverCloseListener {

final MarshallingConfiguration configuration;
final ExtendedRiverMarshaller[] reusableMarshaller = new ExtendedRiverMarshaller[PER_THREAD_REUSABLE_INSTANCES];
int availableMarshallerIndex = 0;
final ExtendedRiverUnmarshaller[] reusableUnMarshaller = new ExtendedRiverUnmarshaller[PER_THREAD_REUSABLE_INSTANCES];
int availableUnMarshallerIndex = 0;

PerThreadInstanceHolder(final MarshallingConfiguration threadDedicatedConfiguration) {
this.configuration = threadDedicatedConfiguration;
}

Unmarshaller getUnmarshaller() throws IOException {
if (availableUnMarshallerIndex == PER_THREAD_REUSABLE_INSTANCES) {
//we're above the pool threshold: make a throw-away-after usage Marshaller
configuration.setBufferSize(512);//reset to default
return factory.createUnmarshaller(configuration);
}
else {
ExtendedRiverUnmarshaller unMarshaller = reusableUnMarshaller[availableUnMarshallerIndex];
if (unMarshaller != null) {
availableUnMarshallerIndex++;
return unMarshaller;
}
else {
configuration.setBufferSize(512);//reset to default
unMarshaller = factory.createUnmarshaller(configuration);
unMarshaller.setCloseListener(this);
reusableUnMarshaller[availableUnMarshallerIndex] = unMarshaller;
availableUnMarshallerIndex++;
return unMarshaller;
}
}
}

ExtendedRiverMarshaller getMarshaller(int estimatedSize) throws IOException {
if (availableMarshallerIndex == PER_THREAD_REUSABLE_INSTANCES) {
//we're above the pool threshold: make a throw-away-after usage Marshaller
configuration.setBufferSize(estimatedSize);
return factory.createMarshaller(configuration);
}
else {
ExtendedRiverMarshaller marshaller = reusableMarshaller[availableMarshallerIndex];
if (marshaller != null) {
availableMarshallerIndex++;
return marshaller;
}
else {
marshaller = factory.createMarshaller(configuration);
marshaller.setCloseListener(this);
reusableMarshaller[availableMarshallerIndex] = marshaller;
availableMarshallerIndex++;
return marshaller;
}
}
}

@Override
public void closeMarshaller() {
availableMarshallerIndex--;
}

@Override
public void closeUnmarshaller() {
availableUnMarshallerIndex--;
}
}

}
Expand Up @@ -22,12 +22,6 @@
*/
package org.infinispan.marshall.jboss;

import org.infinispan.CacheException;
import org.jboss.marshalling.Marshaller;
import org.jboss.marshalling.Unmarshaller;

import java.io.IOException;

/**
* A marshaller that makes use of <a href="http://www.jboss.org/jbossmarshalling">JBoss Marshalling</a>
* to serialize and deserialize objects. This marshaller is oriented at external,
Expand All @@ -39,68 +33,10 @@
*/
public final class GenericJBossMarshaller extends AbstractJBossMarshaller {

/**
* Marshaller thread local. In non-internal marshaller usages, such as Java
* Hot Rod client, this is a singleton shared by all so no urgent need for
* static here. JBMAR clears pretty much any state during finish(), so no
* urgent need to clear the thread local since it shouldn't be leaking.
*/
private final ThreadLocal<org.jboss.marshalling.Marshaller> marshallerTL =
new ThreadLocal<org.jboss.marshalling.Marshaller>() {
@Override
protected org.jboss.marshalling.Marshaller initialValue() {
try {
return factory.createMarshaller(baseCfg);
} catch (IOException e) {
throw new CacheException(e);
}
}
};

/**
* Unmarshaller thread local. In non-internal marshaller usages, such as
* Java Hot Rod client, this is a singleton shared by all so no urgent need
* for static here. JBMAR clears pretty much any state during finish(), so
* no urgent need to clear the thread local since it shouldn't be leaking.
*/
private final ThreadLocal<Unmarshaller> unmarshallerTL = new
ThreadLocal<Unmarshaller>() {
@Override
protected Unmarshaller initialValue() {
try {
return factory.createUnmarshaller(baseCfg);
} catch (IOException e) {
throw new CacheException(e);
}
}
};

public GenericJBossMarshaller() {
super();
baseCfg.setClassResolver(
new DefaultContextClassResolver(this.getClass().getClassLoader()));
}

protected Marshaller getMarshaller(boolean isReentrant, int expectedByteSize) throws IOException {
Marshaller marshaller = isReentrant ?
factory.createMarshaller(baseCfg) : marshallerTL.get();

if (log.isTraceEnabled())
log.tracef("Start marshaller after retrieving marshaller from %s",
isReentrant ? "factory" : "thread local");

return marshaller;
}

protected Unmarshaller getUnmarshaller(boolean isReentrant) throws IOException {
Unmarshaller unmarshaller = isReentrant ?
factory.createUnmarshaller(baseCfg) : unmarshallerTL.get();

if (log.isTraceEnabled())
log.tracef("Start unmarshaller after retrieving marshaller from %s",
isReentrant ? "factory" : "thread local");

return unmarshaller;
}

}
Expand Up @@ -48,20 +48,10 @@
* @author Sanne Grinovero
* @since 4.0
*/
public class JBossMarshaller extends AbstractJBossMarshaller implements StreamingMarshaller {

private static final int PER_THREAD_REUSABLE_INSTANCES = 4;
public final class JBossMarshaller extends AbstractJBossMarshaller implements StreamingMarshaller {

ExternalizerTable externalizerTable;

private final ThreadLocal<PerThreadInstanceHolder> marshallerTL = new ThreadLocal<PerThreadInstanceHolder>() {
@Override
protected PerThreadInstanceHolder initialValue() {
MarshallingConfiguration cfg = baseCfg.clone();
return new PerThreadInstanceHolder(cfg);
}
};

public void inject(ExternalizerTable externalizerTable, ClassLoader cl, InvocationContextContainer icc) {
log.debug("Using JBoss Marshalling");
this.externalizerTable = externalizerTable;
Expand All @@ -71,18 +61,6 @@ public void inject(ExternalizerTable externalizerTable, ClassLoader cl, Invocati
baseCfg.setClassResolver(new EmbeddedContextClassResolver(cl, icc));
}

@Override
protected Marshaller getMarshaller(boolean isReentrant, final int estimatedSize) throws IOException {
PerThreadInstanceHolder instanceHolder = marshallerTL.get();
return instanceHolder.getMarshaller(estimatedSize);
}

@Override
protected Unmarshaller getUnmarshaller(boolean isReentrant) throws IOException {
PerThreadInstanceHolder instanceHolder = marshallerTL.get();
return instanceHolder.getUnmarshaller();
}

@Override
public void stop() {
super.stop();
Expand Down Expand Up @@ -122,72 +100,4 @@ protected ClassLoader getClassLoader() {
}
}

private static final class PerThreadInstanceHolder implements RiverCloseListener {

final MarshallingConfiguration configuration;
final ExtendedRiverMarshaller[] reusableMarshaller = new ExtendedRiverMarshaller[PER_THREAD_REUSABLE_INSTANCES];
int availableMarshallerIndex = 0;
final ExtendedRiverUnmarshaller[] reusableUnMarshaller = new ExtendedRiverUnmarshaller[PER_THREAD_REUSABLE_INSTANCES];
int availableUnMarshallerIndex = 0;

PerThreadInstanceHolder(final MarshallingConfiguration threadDedicatedConfiguration) {
this.configuration = threadDedicatedConfiguration;
}

Unmarshaller getUnmarshaller() throws IOException {
if (availableUnMarshallerIndex == PER_THREAD_REUSABLE_INSTANCES) {
//we're above the pool threshold: make a throw-away-after usage Marshaller
configuration.setBufferSize(512);//reset to default
return factory.createUnmarshaller(configuration);
}
else {
ExtendedRiverUnmarshaller unMarshaller = reusableUnMarshaller[availableUnMarshallerIndex];
if (unMarshaller != null) {
availableUnMarshallerIndex++;
return unMarshaller;
}
else {
configuration.setBufferSize(512);//reset to default
unMarshaller = factory.createUnmarshaller(configuration);
unMarshaller.setCloseListener(this);
reusableUnMarshaller[availableUnMarshallerIndex] = unMarshaller;
availableUnMarshallerIndex++;
return unMarshaller;
}
}
}

ExtendedRiverMarshaller getMarshaller(int estimatedSize) throws IOException {
if (availableMarshallerIndex == PER_THREAD_REUSABLE_INSTANCES) {
//we're above the pool threshold: make a throw-away-after usage Marshaller
configuration.setBufferSize(estimatedSize);
return factory.createMarshaller(configuration);
}
else {
ExtendedRiverMarshaller marshaller = reusableMarshaller[availableMarshallerIndex];
if (marshaller != null) {
availableMarshallerIndex++;
return marshaller;
}
else {
marshaller = factory.createMarshaller(configuration);
marshaller.setCloseListener(this);
reusableMarshaller[availableMarshallerIndex] = marshaller;
availableMarshallerIndex++;
return marshaller;
}
}
}

@Override
public void closeMarshaller() {
availableMarshallerIndex--;
}

@Override
public void closeUnmarshaller() {
availableUnMarshallerIndex--;
}
}

}

0 comments on commit 75b61ea

Please sign in to comment.