Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: jpinner/netty
base: f1ba4f23a6
...
head fork: jpinner/netty
compare: b6264c02d9
Checking mergeability… Don't worry, you can still create the pull request.
  • 3 commits
  • 3 files changed
  • 0 commit comments
  • 2 contributors
Commits on Aug 15, 2012
@normanmaurer normanmaurer Remove synchronized blocks to optimize BufferedWriteHandler. See #519 88124d8
@normanmaurer normanmaurer Set the SO_TIMEOUT on the underlying Socket so we will be able to run…
… submitted tasks in the IO-Thread even if the read operation would block because of nothing to read. See #520
d3d5a93
@normanmaurer normanmaurer Make sure that it continue to try to read from the socket even if the…
… SocketTimeoutException was triggered because of the SO_TIMEOUT. See #520
b6264c0
View
9 src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioWorker.java
@@ -18,6 +18,7 @@
import static org.jboss.netty.channel.Channels.*;
import java.io.IOException;
+import java.net.SocketTimeoutException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -73,9 +74,15 @@ public void run() {
try {
cont = process();
} catch (Throwable t) {
- if (!channel.isSocketClosed()) {
+ boolean readTimeout = t instanceof SocketTimeoutException;
+ if (!readTimeout && !channel.isSocketClosed()) {
fireExceptionCaught(channel, t);
}
+ if (readTimeout) {
+ // the readTimeout was triggered because of the SO_TIMEOUT,
+ // so just continue with the loop here
+ cont = true;
+ }
} finally {
processEventQueue();
View
10 src/main/java/org/jboss/netty/channel/socket/oio/OioSocketChannel.java
@@ -20,11 +20,13 @@
import java.io.PushbackInputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.SocketException;
import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelSink;
+import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.socket.DefaultSocketChannelConfig;
import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.channel.socket.SocketChannelConfig;
@@ -45,6 +47,12 @@
super(parent, factory, pipeline, sink);
this.socket = socket;
+ try {
+ socket.setSoTimeout(1000);
+ } catch (SocketException e) {
+ throw new ChannelException(
+ "Failed to configure the OioSocketChannel socket timeout.", e);
+ }
config = new DefaultSocketChannelConfig(socket);
}
View
30 src/main/java/org/jboss/netty/handler/queue/BufferedWriteHandler.java
@@ -22,6 +22,7 @@
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
@@ -165,6 +166,7 @@
private final Queue<MessageEvent> queue;
private final boolean consolidateOnFlush;
private volatile ChannelHandlerContext ctx;
+ private final AtomicBoolean flush = new AtomicBoolean(false);
/**
* Creates a new instance with the default unbounded {@link BlockingQueue}
@@ -244,15 +246,18 @@ public void flush(boolean consolidateOnFlush) {
// No write request was made.
return;
}
+ Channel channel = ctx.getChannel();
+ boolean acquired;
+
+ // use CAS to see if the have flush already running, if so we don't need to take further actions
+ if (acquired = flush.compareAndSet(false, true)) {
+ final Queue<MessageEvent> queue = getQueue();
+ if (consolidateOnFlush) {
+ if (queue.isEmpty()) {
+ return;
+ }
- final Queue<MessageEvent> queue = getQueue();
- if (consolidateOnFlush) {
- if (queue.isEmpty()) {
- return;
- }
-
- List<MessageEvent> pendingWrites = new ArrayList<MessageEvent>();
- synchronized (this) {
+ List<MessageEvent> pendingWrites = new ArrayList<MessageEvent>();
for (;;) {
MessageEvent e = queue.poll();
if (e == null) {
@@ -268,9 +273,8 @@ public void flush(boolean consolidateOnFlush) {
}
}
consolidatedWrite(pendingWrites);
- }
- } else {
- synchronized (this) {
+
+ } else {
for (;;) {
MessageEvent e = queue.poll();
if (e == null) {
@@ -280,6 +284,10 @@ public void flush(boolean consolidateOnFlush) {
}
}
}
+
+ if (acquired && (!channel.isConnected() || channel.isWritable() && !queue.isEmpty())) {
+ flush(consolidateOnFlush);
+ }
}
private List<MessageEvent> consolidatedWrite(final List<MessageEvent> pendingWrites) {

No commit comments for this range

Something went wrong with that request. Please try again.