Permalink
Browse files

[#1259] Add optimized queue for SCMP pattern and use it in NIO and na…

…tive transport

This queue also produces less GC then CLQ when make use of OneTimeTask
  • Loading branch information...
1 parent b6aa032 commit 6efac6179e1e13e6caba2cec6109ce27862efc9a @normanmaurer normanmaurer committed Feb 26, 2014
View
@@ -119,3 +119,12 @@ by Google Inc, which can be obtained at:
* license/LICENSE.snappy.txt (New BSD License)
* HOMEPAGE:
* http://code.google.com/p/snappy/
+
+This product contains a modified version of Roland Kuhn's ASL2
+AbstractNodeQueue, which is based on Dmitriy Vyukov's non-intrusive MPSC queue.
+It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.abstractnodequeue.txt (Public Domain)
+ * HOMEPAGE:
+ * https://github.com/akka/akka/blob/wip-2.2.3-for-scala-2.11/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java
@@ -0,0 +1,245 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
+ */
+package io.netty.util.internal;
+
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A lock-free concurrent {@link java.util.Queue} implementations for single-consumer multiple-producer pattern.
+ * <strong>It's important is is only used for this as otherwise it is not thread-safe.</strong>
+ *
+ * This implementation is based on:
+ * <ul>
+ * <li><a href="https://github.com/akka/akka/blob/wip-2.2.3-for-scala-2.11/akka-actor/src/main/java/akka/dispatch/
+ * AbstractNodeQueue.java">AbstractNodeQueue</a></li>
+ * <li><a href="http://www.1024cores.net/home/lock-free-algorithms/
+ * queues/non-intrusive-mpsc-node-based-queue">Non intrusive MPSC node based queue</a></li>
+ * </ul>
+ *
+ */
+@SuppressWarnings("serial")
+final class MpscLinkedQueue extends AtomicReference<OneTimeTask> implements Queue<Runnable> {
+ private static final long tailOffset;
+
+ static {
+ try {
+ tailOffset = PlatformDependent.objectFieldOffset(
+ MpscLinkedQueue.class.getDeclaredField("tail"));
+ } catch (Throwable t) {
+ throw new ExceptionInInitializerError(t);
+ }
+ }
+
+ // Extends AtomicReference for the "head" slot (which is the one that is appended to)
+ // since Unsafe does not expose XCHG operation intrinsically
+ @SuppressWarnings({ "unused", "FieldMayBeFinal" })
+ private volatile OneTimeTask tail;
+
+ MpscLinkedQueue() {
+ final OneTimeTask task = new OneTimeTaskAdapter(null);
+ tail = task;
+ set(task);
+ }
+
+ @Override
+ public boolean add(Runnable runnable) {
+ if (runnable instanceof OneTimeTask) {
+ OneTimeTask node = (OneTimeTask) runnable;
+ node.setNext(null);
+ getAndSet(node).setNext(node);
+ } else {
+ final OneTimeTask n = new OneTimeTaskAdapter(runnable);
+ getAndSet(n).setNext(n);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean offer(Runnable runnable) {
+ return add(runnable);
+ }
+
+ @Override
+ public Runnable remove() {
+ Runnable task = poll();
+ if (task == null) {
+ throw new NoSuchElementException();
+ }
+ return task;
+ }
+
+ @Override
+ public Runnable poll() {
+ final OneTimeTask next = peekTask();
+ if (next == null) {
+ return null;
+ }
+ final OneTimeTask ret = next;
+ PlatformDependent.putOrderedObject(this, tailOffset, next);
+ return unwrapIfNeeded(ret);
+ }
+
+ @Override
+ public Runnable element() {
+ final OneTimeTask next = peekTask();
+ if (next == null) {
+ throw new NoSuchElementException();
+ }
+ return unwrapIfNeeded(next);
+ }
+
+ @Override
+ public Runnable peek() {
+ final OneTimeTask next = peekTask();
+ if (next == null) {
+ return null;
+ }
+ return unwrapIfNeeded(next);
+ }
+
+ @Override
+ public int size() {
+ int count = 0;
+ OneTimeTask n = peekTask();
+ for (;;) {
+ if (n == null) {
+ break;
+ }
+ count++;
+ n = n.next();
+ }
+ return count;
+ }
+
+ @SuppressWarnings("unchecked")
+ private OneTimeTask peekTask() {
+ for (;;) {
+ final OneTimeTask tail = (OneTimeTask) PlatformDependent.getObjectVolatile(this, tailOffset);
+ final OneTimeTask next = tail.next();
+ if (next != null || get() == tail) {
+ return next;
+ }
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return peek() == null;
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ OneTimeTask n = peekTask();
+ for (;;) {
+ if (n == null) {
+ break;
+ }
+ if (unwrapIfNeeded(n) == o) {
+ return true;
+ }
+ n = n.next();
+ }
+ return false;
+ }
+
+ @Override
+ public Iterator<Runnable> iterator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object[] toArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T[] toArray(T[] a) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ return false;
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ for (Object o: c) {
+ if (!contains(o)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends Runnable> c) {
+ for (Runnable r: c) {
+ add(r);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ return false;
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ return false;
+ }
+
+ @Override
+ public void clear() {
+ for (;;) {
+ if (poll() == null) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Unwrap {@link OneTimeTask} if needed and so return the proper queued task.
+ */
+ private static Runnable unwrapIfNeeded(OneTimeTask task) {
+ if (task instanceof OneTimeTaskAdapter) {
+ return ((OneTimeTaskAdapter) task).task;
+ }
+ return task;
+ }
+
+ private static final class OneTimeTaskAdapter extends OneTimeTask {
+ private final Runnable task;
+
+ OneTimeTaskAdapter(Runnable task) {
+ this.task = task;
+ }
+
+ @Override
+ public void run() {
+ task.run();
+ }
+ }
+}
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.util.internal;
+
+import io.netty.util.concurrent.EventExecutor;
+
+/**
+ * {@link Runnable} which represent a one time task which may allow the {@link EventExecutor} to reduce the amount of
+ * produced garbage when queue it for execution.
+ *
+ * <strong>It is important this will not be reused. After submitted it is not allowed to get submitted again!</strong>
+ */
+public abstract class OneTimeTask implements Runnable {
+
+ private static final long nextOffset;
+
+ static {
+ if (PlatformDependent0.hasUnsafe()) {
+ try {
+ nextOffset = PlatformDependent.objectFieldOffset(
+ OneTimeTask.class.getDeclaredField("tail"));
+ } catch (Throwable t) {
+ throw new ExceptionInInitializerError(t);
+ }
+ } else {
+ nextOffset = -1;
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private volatile OneTimeTask tail;
+
+ // Only use from MpscLinkedQueue and so we are sure Unsafe is present
+ @SuppressWarnings("unchecked")
+ final OneTimeTask next() {
+ return (OneTimeTask) PlatformDependent.getObjectVolatile(this, nextOffset);
+ }
+
+ // Only use from MpscLinkedQueue and so we are sure Unsafe is present
+ final void setNext(final OneTimeTask newNext) {
+ PlatformDependent.putOrderedObject(this, nextOffset, newNext);
+ }
+}
@@ -32,8 +32,10 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -266,6 +268,10 @@ public static Object getObject(Object object, long fieldOffset) {
return PlatformDependent0.getObject(object, fieldOffset);
}
+ public static Object getObjectVolatile(Object object, long fieldOffset) {
+ return PlatformDependent0.getObjectVolatile(object, fieldOffset);
+ }
+
public static int getInt(Object object, long fieldOffset) {
return PlatformDependent0.getInt(object, fieldOffset);
}
@@ -290,6 +296,10 @@ public static long getLong(long address) {
return PlatformDependent0.getLong(address);
}
+ public static void putOrderedObject(Object object, long address, Object value) {
+ PlatformDependent0.putOrderedObject(object, address, value);
+ }
+
public static void putByte(long address, byte value) {
PlatformDependent0.putByte(address, value);
}
@@ -369,6 +379,18 @@ public static void copyMemory(long srcAddr, byte[] dst, int dstIndex, long lengt
return null;
}
+ /**
+ * Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single
+ * consumer (one thread!).
+ */
+ public static Queue<Runnable> newMpscQueue() {
+ if (hasUnsafe()) {
+ return new MpscLinkedQueue();
+ } else {
+ return new ConcurrentLinkedQueue<Runnable>();
+ }
+ }
+
private static boolean isAndroid0() {
boolean android;
try {
@@ -185,6 +185,10 @@ static Object getObject(Object object, long fieldOffset) {
return UNSAFE.getObject(object, fieldOffset);
}
+ static Object getObjectVolatile(Object object, long fieldOffset) {
+ return UNSAFE.getObjectVolatile(object, fieldOffset);
+ }
+
static int getInt(Object object, long fieldOffset) {
return UNSAFE.getInt(object, fieldOffset);
}
@@ -251,6 +255,10 @@ static long getLong(long address) {
}
}
+ static void putOrderedObject(Object object, long address, Object value) {
+ UNSAFE.putOrderedObject(object, address, value);
+ }
+
static void putByte(long address, byte value) {
UNSAFE.putByte(address, value);
}
Oops, something went wrong.

0 comments on commit 6efac61

Please sign in to comment.