Permalink
Browse files

[#1259] Add optimized queue for SCMP pattern and use it in NIO and na…

…tive transport

This queue also produces less GC then CLQ when make use of OneTimeTask
  • Loading branch information...
1 parent ccd135d commit b8e5eb77a07ceff220ea68f7b52137393e7fa50d @normanmaurer normanmaurer committed Feb 26, 2014
View
9 NOTICE.txt
@@ -119,3 +119,12 @@ by Google Inc, which can be obtained at:
* license/LICENSE.snappy.txt (New BSD License)
* HOMEPAGE:
* http://code.google.com/p/snappy/
+
+This product contains a modified version of Roland Kuhn's ASL2
+AbstractNodeQueue , which is based on Dmitriy Vyukov's non-intrusive MPSC queue.
+It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.abstractnodequeue.txt (Public Domain)
+ * HOMEPAGE:
+ * https://github.com/akka/akka/blob/wip-2.2.3-for-scala-2.11/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java
View
240 common/src/main/java/io/netty/util/internal/ConcurrentSCMPQueue.java
@@ -0,0 +1,240 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.util.internal;
+
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A lock-free concurrent {@link java.util.Queue} implementations for single-consumer multiple-producer pattern.
+ * <strong>It's important is is only used for this as otherwise it is not thread-safe.</strong>
+ *
+ * This implementation is based on:
+ * - https://github.com/akka/akka/blob/wip-2.2.3-for-scala-2.11/akka-actor/src/main/java/akka/dispatch/
+ * AbstractNodeQueue.java
+ *
+ * - http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
+ */
+@SuppressWarnings("serial")
+final class ConcurrentSCMPQueue extends AtomicReference<OneTimeTask> implements Queue<Runnable> {
+ private static final long tailOffset;
+
+ static {
+ try {
+ tailOffset = PlatformDependent0.UNSAFE.objectFieldOffset(
+ ConcurrentSCMPQueue.class.getDeclaredField("tail"));
+ } catch (Throwable t) {
+ throw new ExceptionInInitializerError(t);
+ }
+ }
+
+ // Extends AtomicReference for the "head" slot (which is the one that is appended to)
+ // since Unsafe does not expose XCHG operation intrinsically
+ @SuppressWarnings("unused")
+ private volatile OneTimeTask tail;
+
+ ConcurrentSCMPQueue() {
+ final OneTimeTask task = new WrappingIoTask(null);
+ tail = task;
+ set(task);
+ }
+
+ @Override
+ public boolean add(Runnable runnable) {
+ if (runnable instanceof OneTimeTask) {
+ OneTimeTask node = (OneTimeTask) runnable;
+ node.setNext(null);
+ getAndSet(node).setNext(node);
+ } else {
+ final OneTimeTask n = new WrappingIoTask(runnable);
+ getAndSet(n).setNext(n);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean offer(Runnable runnable) {
+ return add(runnable);
+ }
+
+ @Override
+ public Runnable remove() {
+ Runnable task = poll();
+ if (task == null) {
+ throw new NoSuchElementException();
+ }
+ return task;
+ }
+
+ @Override
+ public Runnable poll() {
+ final OneTimeTask next = peakTask();
+ if (next == null) {
+ return null;
+ }
+ final OneTimeTask ret = next;
+ PlatformDependent0.UNSAFE.putOrderedObject(this, tailOffset, next);
+ return unwrapIfNeeded(ret);
+ }
+
+ @Override
+ public Runnable element() {
+ final OneTimeTask next = peakTask();
+ if (next == null) {
+ throw new NoSuchElementException();
+ }
+ return unwrapIfNeeded(next);
+ }
+
+ @Override
+ public Runnable peek() {
+ final OneTimeTask next = peakTask();
+ if (next == null) {
+ return null;
+ }
+ return unwrapIfNeeded(next);
+ }
+
+ @Override
+ public int size() {
+ int count = 0;
+ OneTimeTask n = peakTask();
+ for (;;) {
+ if (n == null) {
+ break;
+ }
+ count++;
+ n = n.next();
+ }
+ return count;
+ }
+
+ @SuppressWarnings("unchecked")
+ private OneTimeTask peakTask() {
+ for (;;) {
+ final OneTimeTask tail = (OneTimeTask) PlatformDependent0.UNSAFE.getObjectVolatile(this, tailOffset);
+ final OneTimeTask next = tail.next();
+ if (next != null || get() == tail) {
+ return next;
+ }
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return peek() == null;
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ OneTimeTask n = peakTask();
+ for (;;) {
+ if (n == null) {
+ break;
+ }
+ if (unwrapIfNeeded(n) == o) {
+ return true;
+ }
+ n = n.next();
+ }
+ return false;
+ }
+
+ @Override
+ public Iterator<Runnable> iterator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object[] toArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T[] toArray(T[] a) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ return false;
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ for (Object o: c) {
+ if (!contains(o)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends Runnable> c) {
+ for (Runnable r: c) {
+ add(r);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ return false;
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ return false;
+ }
+
+ @Override
+ public void clear() {
+ for (;;) {
+ if (poll() == null) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Unwrap {@link OneTimeTask} if needed and so return the proper queued task.
+ */
+ private static Runnable unwrapIfNeeded(OneTimeTask task) {
+ if (task instanceof WrappingIoTask) {
+ return ((WrappingIoTask) task).task;
+ }
+ return task;
+ }
+
+ private static final class WrappingIoTask extends OneTimeTask {
+ private final Runnable task;
+
+ WrappingIoTask(Runnable task) {
+ this.task = task;
+ }
+
+ @Override
+ public void run() {
+ task.run();
+ }
+ }
+}
View
55 common/src/main/java/io/netty/util/internal/OneTimeTask.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.util.internal;
+
+import io.netty.util.concurrent.EventExecutor;
+
+/**
+ * {@link java.lang.Runnable} which represent and IO-Task and should be executed by an {@link EventExecutor}.
+ *
+ * <strong>It is important this will not be reused. After submitted it is not allowed to get submitted again!</strong>
+ */
+public abstract class OneTimeTask implements Runnable {
+
+ private static final long nextOffset;
+
+ static {
+ if (PlatformDependent0.hasUnsafe()) {
+ try {
+ nextOffset = PlatformDependent0.UNSAFE.objectFieldOffset(
+ OneTimeTask.class.getDeclaredField("tail"));
+ } catch (Throwable t) {
+ throw new ExceptionInInitializerError(t);
+ }
+ } else {
+ nextOffset = -1;
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private volatile OneTimeTask tail;
+
+ // Only use from ConcurrentSCMPQueue and so we are sure Unsafe is present
+ @SuppressWarnings("unchecked")
+ final OneTimeTask next() {
+ return (OneTimeTask) PlatformDependent0.UNSAFE.getObjectVolatile(this, nextOffset);
+ }
+
+ // Only use from ConcurrentSCMPQueue and so we are sure Unsafe is present
+ final void setNext(final OneTimeTask newNext) {
+ PlatformDependent0.UNSAFE.putOrderedObject(this, nextOffset, newNext);
+ }
+}
View
10 common/src/main/java/io/netty/util/internal/PlatformDependent.java
@@ -32,8 +32,10 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -369,6 +371,14 @@ public static void copyMemory(long srcAddr, byte[] dst, int dstIndex, long lengt
return null;
}
+ public static Queue<Runnable> newNonBlockingEventLoopQueue() {
+ if (hasUnsafe()) {
+ return new ConcurrentSCMPQueue();
+ } else {
+ return new ConcurrentLinkedQueue<Runnable>();
+ }
+ }
+
private static boolean isAndroid0() {
boolean android;
try {
View
2 common/src/main/java/io/netty/util/internal/PlatformDependent0.java
@@ -36,7 +36,7 @@
final class PlatformDependent0 {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PlatformDependent0.class);
- private static final Unsafe UNSAFE;
+ static final Unsafe UNSAFE;
private static final boolean BIG_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
private static final long ADDRESS_FIELD_OFFSET;
View
15 license/LICENSE.abstractnodequeue.txt
@@ -0,0 +1,15 @@
+This software is licensed under the Apache 2 license, quoted below.
+
+Copyright 2009-2013 Typesafe Inc. [http://www.typesafe.com]
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not
+use this file except in compliance with the License. You may obtain a copy of
+the License at
+
+ [http://www.apache.org/licenses/LICENSE-2.0]
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations under
+the License.
View
2 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java
@@ -156,7 +156,7 @@ void remove(AbstractEpollChannel ch) {
@Override
protected Queue<Runnable> newTaskQueue() {
// This event loop never calls takeTask()
- return new ConcurrentLinkedQueue<Runnable>();
+ return PlatformDependent.newNonBlockingEventLoopQueue();
}
/**
View
15 transport/src/main/java/io/netty/channel/AbstractChannel.java
@@ -19,6 +19,7 @@
import io.netty.util.DefaultAttributeMap;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.EmptyArrays;
+import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThreadLocalRandom;
import io.netty.util.internal.logging.InternalLogger;
@@ -412,7 +413,7 @@ public final void register(EventLoop eventLoop, final ChannelPromise promise) {
register0(promise);
} else {
try {
- eventLoop.execute(new Runnable() {
+ eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
@@ -480,7 +481,7 @@ public final void bind(final SocketAddress localAddress, final ChannelPromise pr
}
if (!wasActive && isActive()) {
- invokeLater(new Runnable() {
+ invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelActive();
@@ -507,7 +508,7 @@ public final void disconnect(final ChannelPromise promise) {
}
if (wasActive && !isActive()) {
- invokeLater(new Runnable() {
+ invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelInactive();
@@ -526,7 +527,7 @@ public final void close(final ChannelPromise promise) {
}
if (inFlush0) {
- invokeLater(new Runnable() {
+ invokeLater(new OneTimeTask() {
@Override
public void run() {
close(promise);
@@ -561,7 +562,7 @@ public void run() {
} finally {
if (wasActive && !isActive()) {
- invokeLater(new Runnable() {
+ invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelInactive();
@@ -600,7 +601,7 @@ public final void deregister(final ChannelPromise promise) {
} finally {
if (registered) {
registered = false;
- invokeLater(new Runnable() {
+ invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelUnregistered();
@@ -625,7 +626,7 @@ public void beginRead() {
try {
doBeginRead();
} catch (final Exception e) {
- invokeLater(new Runnable() {
+ invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
View
27 transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java
@@ -22,6 +22,7 @@
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
+import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.StringUtil;
import java.net.SocketAddress;
@@ -150,7 +151,7 @@ public ChannelHandlerContext fireChannelRegistered() {
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
- executor.execute(new Runnable() {
+ executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRegistered();
@@ -175,7 +176,7 @@ public ChannelHandlerContext fireChannelUnregistered() {
if (executor.inEventLoop()) {
next.invokeChannelUnregistered();
} else {
- executor.execute(new Runnable() {
+ executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelUnregistered();
@@ -200,7 +201,7 @@ public ChannelHandlerContext fireChannelActive() {
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
- executor.execute(new Runnable() {
+ executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelActive();
@@ -225,7 +226,7 @@ public ChannelHandlerContext fireChannelInactive() {
if (executor.inEventLoop()) {
next.invokeChannelInactive();
} else {
- executor.execute(new Runnable() {
+ executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelInactive();
@@ -256,7 +257,7 @@ public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
next.invokeExceptionCaught(cause);
} else {
try {
- executor.execute(new Runnable() {
+ executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeExceptionCaught(cause);
@@ -296,7 +297,7 @@ public ChannelHandlerContext fireUserEventTriggered(final Object event) {
if (executor.inEventLoop()) {
next.invokeUserEventTriggered(event);
} else {
- executor.execute(new Runnable() {
+ executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeUserEventTriggered(event);
@@ -325,7 +326,7 @@ public ChannelHandlerContext fireChannelRead(final Object msg) {
if (executor.inEventLoop()) {
next.invokeChannelRead(msg);
} else {
- executor.execute(new Runnable() {
+ executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRead(msg);
@@ -443,7 +444,7 @@ public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
- safeExecute(executor, new Runnable() {
+ safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
@@ -481,7 +482,7 @@ public ChannelFuture connect(
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
- safeExecute(executor, new Runnable() {
+ safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
@@ -515,7 +516,7 @@ public ChannelFuture disconnect(final ChannelPromise promise) {
next.invokeDisconnect(promise);
}
} else {
- safeExecute(executor, new Runnable() {
+ safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
if (!channel().metadata().hasDisconnect()) {
@@ -547,7 +548,7 @@ public ChannelFuture close(final ChannelPromise promise) {
if (executor.inEventLoop()) {
next.invokeClose(promise);
} else {
- safeExecute(executor, new Runnable() {
+ safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
next.invokeClose(promise);
@@ -575,7 +576,7 @@ public ChannelFuture deregister(final ChannelPromise promise) {
if (executor.inEventLoop()) {
next.invokeDeregister(promise);
} else {
- safeExecute(executor, new Runnable() {
+ safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
next.invokeDeregister(promise);
@@ -870,7 +871,7 @@ private static void safeExecute(EventExecutor executor, Runnable runnable, Chann
}
}
- abstract static class AbstractWriteTask implements Runnable {
+ abstract static class AbstractWriteTask extends OneTimeTask {
private final Recycler.Handle handle;
private DefaultChannelHandlerContext ctx;
View
3 transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java
@@ -23,6 +23,7 @@
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop;
+import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@@ -199,7 +200,7 @@ public void connect(
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
- connectTimeoutFuture = eventLoop().schedule(new Runnable() {
+ connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
View
4 transport/src/main/java/io/netty/channel/nio/NioEventLoop.java
@@ -21,6 +21,7 @@
import io.netty.channel.EventLoopException;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.nio.AbstractNioChannel.NioUnsafe;
+import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@@ -38,7 +39,6 @@
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -165,7 +165,7 @@ private Selector openSelector() {
@Override
protected Queue<Runnable> newTaskQueue() {
// This event loop never calls takeTask()
- return new ConcurrentLinkedQueue<Runnable>();
+ return PlatformDependent.newNonBlockingEventLoopQueue();
}
/**
View
3 transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java
@@ -28,6 +28,7 @@
import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
+import io.netty.util.internal.OneTimeTask;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -140,7 +141,7 @@ public ChannelFuture shutdownOutput(final ChannelPromise promise) {
promise.setFailure(t);
}
} else {
- loop.execute(new Runnable() {
+ loop.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput(promise);

0 comments on commit b8e5eb7

Please sign in to comment.