Skip to content

Commit

Permalink
Support pooling Deflaters
Browse files Browse the repository at this point in the history
Gzip and Deflate encoding providers accept a DeflaterPool allowing
them to reuse deflaters to cut down on jni overhead. No pooling
implementations have been provided yet.

bugfix:
The default DeflaterPool implementation will call Deflater.end()
when finished, releasing the native ref.
  • Loading branch information
carterkozak authored and stuartwdouglas committed Jun 15, 2017
1 parent 4b88e19 commit 0d2d03a
Show file tree
Hide file tree
Showing 9 changed files with 259 additions and 10 deletions.
3 changes: 3 additions & 0 deletions core/src/main/java/io/undertow/UndertowMessages.java
Expand Up @@ -525,4 +525,7 @@ public interface UndertowMessages {

@Message(id = 165, value = "Invalid character %s in request-target")
String invalidCharacterInRequestTarget(char next);

@Message(id = 166, value = "Pooled object is closed")
IllegalStateException objectIsClosed();
}
Expand Up @@ -43,7 +43,11 @@
import io.undertow.UndertowLogger;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.ConduitFactory;
import io.undertow.util.NewInstanceObjectPool;
import io.undertow.util.ObjectPool;
import io.undertow.util.Headers;
import io.undertow.util.PooledObject;
import io.undertow.util.SimpleObjectPool;

/**
* Channel that handles deflate compression
Expand All @@ -52,7 +56,10 @@
*/
public class DeflatingStreamSinkConduit implements StreamSinkConduit {

protected final Deflater deflater;
protected volatile Deflater deflater;

protected final PooledObject<Deflater> pooledObject;
private final ObjectPool<Deflater> deflaterPool;
private final ConduitFactory<StreamSinkConduit> conduitFactory;
private final HttpServerExchange exchange;

Expand Down Expand Up @@ -83,13 +90,28 @@ public DeflatingStreamSinkConduit(final ConduitFactory<StreamSinkConduit> condui
}

public DeflatingStreamSinkConduit(final ConduitFactory<StreamSinkConduit> conduitFactory, final HttpServerExchange exchange, int deflateLevel) {
deflater = new Deflater(deflateLevel, true);
this(conduitFactory, exchange, newInstanceDeflaterPool(deflateLevel));
}

public DeflatingStreamSinkConduit(final ConduitFactory<StreamSinkConduit> conduitFactory, final HttpServerExchange exchange, ObjectPool<Deflater> deflaterPool) {
this.deflaterPool = deflaterPool;
this.pooledObject = deflaterPool.allocate();
this.deflater = pooledObject.getObject();
this.currentBuffer = exchange.getConnection().getByteBufferPool().allocate();
this.exchange = exchange;
this.conduitFactory = conduitFactory;
setWriteReadyHandler(new WriteReadyHandler.ChannelListenerHandler<>(Connectors.getConduitSinkChannel(exchange)));
}

public static ObjectPool<Deflater> newInstanceDeflaterPool(int deflateLevel) {
return new NewInstanceObjectPool<Deflater>(() -> new Deflater(deflateLevel, true), Deflater::end);
}

public static ObjectPool<Deflater> simpleDeflaterPool(int poolSize, int deflateLevel) {
return new SimpleObjectPool<Deflater>(poolSize, () -> new Deflater(deflateLevel, true), Deflater::end);
}


@Override
public int write(final ByteBuffer src) throws IOException {
if (anyAreSet(state, SHUTDOWN | CLOSED) || currentBuffer == null) {
Expand Down Expand Up @@ -515,6 +537,8 @@ private void freeBuffer() {
currentBuffer = null;
state = state & ~FLUSHING_BUFFER;
}
deflater.end();
if (deflater != null) {
pooledObject.close();
}
}
}
Expand Up @@ -21,6 +21,7 @@
import io.undertow.server.Connectors;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.ConduitFactory;
import io.undertow.util.ObjectPool;
import org.xnio.conduits.StreamSinkConduit;

import java.util.zip.CRC32;
Expand Down Expand Up @@ -61,7 +62,14 @@ public GzipStreamSinkConduit(
ConduitFactory<StreamSinkConduit> conduitFactory,
HttpServerExchange exchange,
int deflateLevel) {
super(conduitFactory, exchange, deflateLevel);
this(conduitFactory, exchange, newInstanceDeflaterPool(deflateLevel));
}

public GzipStreamSinkConduit(
ConduitFactory<StreamSinkConduit> conduitFactory,
HttpServerExchange exchange,
ObjectPool deflaterPool) {
super(conduitFactory, exchange, deflaterPool);
writeHeader();
Connectors.updateResponseBytesSent(exchange, HEADER.length);
}
Expand Down
Expand Up @@ -23,6 +23,7 @@
import io.undertow.server.ConduitWrapper;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.ConduitFactory;
import io.undertow.util.ObjectPool;
import org.xnio.conduits.StreamSinkConduit;

import java.util.zip.Deflater;
Expand All @@ -34,14 +35,19 @@
*/
public class DeflateEncodingProvider implements ContentEncodingProvider {

private final int deflateLevel;
private final ObjectPool<Deflater> deflaterPool;

public DeflateEncodingProvider() {
this(Deflater.DEFLATED);
}

public DeflateEncodingProvider(int deflateLevel) {
this.deflateLevel = deflateLevel;
this(DeflatingStreamSinkConduit.newInstanceDeflaterPool(deflateLevel));
}


public DeflateEncodingProvider(ObjectPool<Deflater> deflaterPool) {
this.deflaterPool = deflaterPool;
}

@Override
Expand All @@ -50,7 +56,7 @@ public ConduitWrapper<StreamSinkConduit> getResponseWrapper() {
@Override
public StreamSinkConduit wrap(final ConduitFactory<StreamSinkConduit> factory, final HttpServerExchange exchange) {
UndertowLogger.REQUEST_LOGGER.tracef("Created DEFLATE response conduit for %s", exchange);
return new DeflatingStreamSinkConduit(factory, exchange, deflateLevel);
return new DeflatingStreamSinkConduit(factory, exchange, deflaterPool);
}
};
}
Expand Down
Expand Up @@ -19,10 +19,12 @@
package io.undertow.server.handlers.encoding;

import io.undertow.UndertowLogger;
import io.undertow.conduits.DeflatingStreamSinkConduit;
import io.undertow.conduits.GzipStreamSinkConduit;
import io.undertow.server.ConduitWrapper;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.ConduitFactory;
import io.undertow.util.ObjectPool;
import org.xnio.conduits.StreamSinkConduit;

import java.util.zip.Deflater;
Expand All @@ -34,14 +36,18 @@
*/
public class GzipEncodingProvider implements ContentEncodingProvider {

private final int deflateLevel;
private final ObjectPool<Deflater> deflaterPool;

public GzipEncodingProvider() {
this(Deflater.DEFAULT_COMPRESSION);
}

public GzipEncodingProvider(int deflateLevel) {
this.deflateLevel = deflateLevel;
this(DeflatingStreamSinkConduit.newInstanceDeflaterPool(deflateLevel));
}

public GzipEncodingProvider(ObjectPool<Deflater> deflaterPool) {
this.deflaterPool = deflaterPool;
}

@Override
Expand All @@ -50,7 +56,7 @@ public ConduitWrapper<StreamSinkConduit> getResponseWrapper() {
@Override
public StreamSinkConduit wrap(final ConduitFactory<StreamSinkConduit> factory, final HttpServerExchange exchange) {
UndertowLogger.REQUEST_LOGGER.tracef("Created GZIP response conduit for %s", exchange);
return new GzipStreamSinkConduit(factory, exchange, deflateLevel);
return new GzipStreamSinkConduit(factory, exchange, deflaterPool);
}
};
}
Expand Down
63 changes: 63 additions & 0 deletions core/src/main/java/io/undertow/util/NewInstanceObjectPool.java
@@ -0,0 +1,63 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2014 Red Hat, Inc., and individual contributors
* as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.undertow.util;

import java.util.function.Consumer;
import java.util.function.Supplier;

import io.undertow.UndertowMessages;

/**
* @author ckozak
* @author Stuart Douglas
*/
public class NewInstanceObjectPool<T> implements ObjectPool {

private final Supplier<T> supplier;
private final Consumer<T> consumer;

public NewInstanceObjectPool(Supplier<T> supplier, Consumer<T> consumer) {
this.supplier = supplier;
this.consumer = consumer;
}


@Override
public PooledObject allocate() {
final T obj = supplier.get();
return new PooledObject() {

private volatile boolean closed = false;

@Override
public T getObject() {
if(closed) {
throw UndertowMessages.MESSAGES.objectIsClosed();
}
return obj;
}

@Override
public void close() {
closed = true;
consumer.accept(obj);
}
};
}
}
31 changes: 31 additions & 0 deletions core/src/main/java/io/undertow/util/ObjectPool.java
@@ -0,0 +1,31 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2014 Red Hat, Inc., and individual contributors
* as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.undertow.util;

/**
* A pool of objects.
*
* @author ckozak
* @author Stuart Douglas
*/
public interface ObjectPool<T> {

PooledObject<T> allocate();

}
33 changes: 33 additions & 0 deletions core/src/main/java/io/undertow/util/PooledObject.java
@@ -0,0 +1,33 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2014 Red Hat, Inc., and individual contributors
* as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.undertow.util;

import java.io.Closeable;

/**
* Represents a generic pooled object
*
* @author Stuart Douglas
*/
public interface PooledObject<T> extends Closeable, AutoCloseable {

T getObject();

void close();
}
75 changes: 75 additions & 0 deletions core/src/main/java/io/undertow/util/SimpleObjectPool.java
@@ -0,0 +1,75 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2014 Red Hat, Inc., and individual contributors
* as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.undertow.util;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Consumer;
import java.util.function.Supplier;

import io.undertow.UndertowMessages;

/**
* Simple pool that attempts to maintain a specified number of objects in the pool. If more objects are created new ones
* are created on the fly, and then destroyed once the pool is full.
*
* @author ckozak
* @author Stuart Douglas
*/
public class SimpleObjectPool<T> implements ObjectPool {

private final Supplier<T> supplier;
private final Consumer<T> consumer;
private final LinkedBlockingDeque<T> pool;

public SimpleObjectPool(int poolSize, Supplier<T> supplier, Consumer<T> consumer) {
this.supplier = supplier;
this.consumer = consumer;
pool = new LinkedBlockingDeque<T>(poolSize);
}


@Override
public PooledObject allocate() {
T obj = pool.poll();
if(obj == null) {
obj = supplier.get();
}
final T finObj = obj;
return new PooledObject() {

private volatile boolean closed = false;

@Override
public T getObject() {
if (closed) {
throw UndertowMessages.MESSAGES.objectIsClosed();
}
return finObj;
}

@Override
public void close() {
closed = true;
if(!pool.offer(finObj)) {
consumer.accept(finObj);
}
}
};
}
}

0 comments on commit 0d2d03a

Please sign in to comment.