diff --git a/build.gradle b/build.gradle index 34acf5e37..bd892435b 100644 --- a/build.gradle +++ b/build.gradle @@ -80,8 +80,8 @@ subprojects { } dependencies { - compile "io.projectreactor:reactor-core:3.1.1.RELEASE" - compile "io.netty:netty-buffer:4.1.16.Final" + compile "io.projectreactor:reactor-core:3.1.2.RELEASE" + compile "io.netty:netty-buffer:4.1.17.Final" compile "org.reactivestreams:reactive-streams:1.0.1" compile "org.slf4j:slf4j-api:1.7.25" compile "com.google.code.findbugs:jsr305:3.0.2" @@ -90,7 +90,7 @@ subprojects { testCompile "org.mockito:mockito-core:2.10.0" testCompile "org.hamcrest:hamcrest-library:1.3" testCompile "org.slf4j:slf4j-log4j12:1.7.25" - testCompile "io.projectreactor:reactor-test:3.1.1.RELEASE" + testCompile "io.projectreactor:reactor-test:3.1.2.RELEASE" } publishing { diff --git a/rsocket-core/src/main/java/io/rsocket/Frame.java b/rsocket-core/src/main/java/io/rsocket/Frame.java index dbe4f05d8..1f5809a22 100644 --- a/rsocket-core/src/main/java/io/rsocket/Frame.java +++ b/rsocket-core/src/main/java/io/rsocket/Frame.java @@ -500,6 +500,7 @@ public static int initialRequestN(final Frame frame) { result = 1; break; case FIRE_AND_FORGET: + case METADATA_PUSH: result = 0; break; default: diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java index 07b8879fd..1805d3eb1 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -16,15 +16,18 @@ package io.rsocket; -import static io.rsocket.util.ExceptionUtil.noStacktrace; - import io.netty.buffer.Unpooled; -import io.netty.util.collection.IntObjectHashMap; import io.rsocket.exceptions.ConnectionException; import io.rsocket.exceptions.Exceptions; import io.rsocket.internal.LimitableRequestPublisher; import io.rsocket.internal.UnboundedProcessor; -import java.nio.channels.ClosedChannelException; +import io.rsocket.util.NonBlockingHashMapLong; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import reactor.core.Disposable; +import reactor.core.publisher.*; + +import javax.annotation.Nullable; import java.time.Duration; import java.util.Collection; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,25 +35,17 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -import javax.annotation.Nullable; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import reactor.core.Disposable; -import reactor.core.publisher.*; /** Client Side of a RSocket socket. Sends {@link Frame}s to a {@link RSocketServer} */ class RSocketClient implements RSocket { - private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = - noStacktrace(new ClosedChannelException()); - private final DuplexConnection connection; private final Function frameDecoder; private final Consumer errorConsumer; private final StreamIdSupplier streamIdSupplier; private final MonoProcessor started; - private final IntObjectHashMap senders; - private final IntObjectHashMap> receivers; + private final NonBlockingHashMapLong senders; + private final NonBlockingHashMapLong> receivers; private final AtomicInteger missedAckCounter; private final UnboundedProcessor sendProcessor; @@ -80,8 +75,8 @@ class RSocketClient implements RSocket { this.errorConsumer = errorConsumer; this.streamIdSupplier = streamIdSupplier; this.started = MonoProcessor.create(); - this.senders = new IntObjectHashMap<>(256, 0.9f); - this.receivers = new IntObjectHashMap<>(256, 0.9f); + this.senders = new NonBlockingHashMapLong<>(256); + this.receivers = new NonBlockingHashMapLong<>(256); this.missedAckCounter = new AtomicInteger(); // DO NOT Change the order here. The Send processor must be subscribed to before receiving @@ -127,7 +122,7 @@ class RSocketClient implements RSocket { } private void handleSendProcessorError(Throwable t) { - Collection> values; + Collection> values; Collection values1; synchronized (RSocketClient.this) { values = receivers.values(); @@ -151,7 +146,7 @@ private void handleSendProcessorCancel(SignalType t) { if (SignalType.ON_ERROR == t) { return; } - Collection> values; + Collection> values; Collection values1; synchronized (RSocketClient.this) { values = receivers.values(); @@ -222,10 +217,15 @@ public Flux requestChannel(Publisher payloads) { @Override public Mono metadataPush(Payload payload) { - final Frame requestFrame = Frame.Request.from(0, FrameType.METADATA_PUSH, payload, 1); - payload.release(); - sendProcessor.onNext(requestFrame); - return Mono.empty(); + Mono defer = + Mono.fromRunnable( + () -> { + final Frame requestFrame = Frame.Request.from(0, FrameType.METADATA_PUSH, payload, 1); + payload.release(); + sendProcessor.onNext(requestFrame); + }); + + return started.then(defer); } @Override @@ -303,7 +303,7 @@ private Mono handleRequestResponse(final Payload payload) { Frame.Request.from(streamId, FrameType.REQUEST_RESPONSE, payload, 1); payload.release(); - MonoProcessor receiver = MonoProcessor.create(); + UnicastProcessor receiver = UnicastProcessor.create(); synchronized (this) { receivers.put(streamId, receiver); @@ -312,6 +312,7 @@ private Mono handleRequestResponse(final Payload payload) { sendProcessor.onNext(requestFrame); return receiver + .singleOrEmpty() .doOnError(t -> sendProcessor.onNext(Frame.Error.from(streamId, t))) .doOnCancel(() -> sendProcessor.onNext(Frame.Cancel.from(streamId))) .doFinally( @@ -438,7 +439,7 @@ private boolean contains(int streamId) { protected void cleanup() { try { - Collection> subscribers; + Collection> subscribers; Collection publishers; synchronized (RSocketClient.this) { subscribers = receivers.values(); @@ -468,9 +469,9 @@ private synchronized void cleanUpLimitableRequestPublisher( } } - private synchronized void cleanUpSubscriber(Subscriber subscriber) { + private synchronized void cleanUpSubscriber(UnicastProcessor subscriber) { try { - subscriber.onError(CLOSED_CHANNEL_EXCEPTION); + subscriber.cancel(); } catch (Throwable t) { errorConsumer.accept(t); } diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java index e60afa0b1..6cb777e22 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java @@ -16,20 +16,12 @@ package io.rsocket; -import static io.rsocket.Frame.Request.initialRequestN; -import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_C; -import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M; - import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.util.collection.IntObjectHashMap; import io.rsocket.exceptions.ApplicationException; import io.rsocket.internal.LimitableRequestPublisher; import io.rsocket.internal.UnboundedProcessor; -import java.util.Collection; -import java.util.function.Consumer; -import java.util.function.Function; -import javax.annotation.Nullable; +import io.rsocket.util.NonBlockingHashMapLong; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -39,6 +31,15 @@ import reactor.core.publisher.SignalType; import reactor.core.publisher.UnicastProcessor; +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.function.Consumer; +import java.util.function.Function; + +import static io.rsocket.Frame.Request.initialRequestN; +import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_C; +import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M; + /** Server side RSocket. Receives {@link Frame}s from a {@link RSocketClient} */ class RSocketServer implements RSocket { @@ -47,8 +48,8 @@ class RSocketServer implements RSocket { private final Function frameDecoder; private final Consumer errorConsumer; - private final IntObjectHashMap sendingSubscriptions; - private final IntObjectHashMap> channelProcessors; + private final NonBlockingHashMapLong sendingSubscriptions; + private final NonBlockingHashMapLong> channelProcessors; private final UnboundedProcessor sendProcessor; private Disposable receiveDisposable; @@ -62,8 +63,8 @@ class RSocketServer implements RSocket { this.requestHandler = requestHandler; this.frameDecoder = frameDecoder; this.errorConsumer = errorConsumer; - this.sendingSubscriptions = new IntObjectHashMap<>(); - this.channelProcessors = new IntObjectHashMap<>(); + this.sendingSubscriptions = new NonBlockingHashMapLong<>(); + this.channelProcessors = new NonBlockingHashMapLong<>(); // DO NOT Change the order here. The Send processor must be subscribed to before receiving // connections diff --git a/rsocket-core/src/main/java/io/rsocket/util/ConcurrentAutoTable.java b/rsocket-core/src/main/java/io/rsocket/util/ConcurrentAutoTable.java new file mode 100644 index 000000000..6c284d7bf --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/util/ConcurrentAutoTable.java @@ -0,0 +1,220 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.util; + +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static io.rsocket.util.UnsafeAccess.UNSAFE; + + +/** + * An auto-resizing table of {@code longs}, supporting low-contention CAS + * operations. Updates are done with CAS's to no particular table element. + * The intent is to support highly scalable counters, r/w locks, and other + * structures where the updates are associative, loss-free (no-brainer), and + * otherwise happen at such a high volume that the cache contention for + * CAS'ing a single word is unacceptable. + * + * @since 1.5 + * @author Cliff Click + */ +public class ConcurrentAutoTable implements Serializable { + + // --- public interface --- + + /** + * Add the given value to current counter value. Concurrent updates will + * not be lost, but addAndGet or getAndAdd are not implemented because the + * total counter value (i.e., {@link #get}) is not atomically updated. + * Updates are striped across an array of counters to avoid cache contention + * and has been tested with performance scaling linearly up to 768 CPUs. + */ + public void add( long x ) { add_if( x); } + /** {@link #add} with -1 */ + public void decrement() { add_if(-1L); } + /** {@link #add} with +1 */ + public void increment() { add_if( 1L); } + + /** Atomically set the sum of the striped counters to specified value. + * Rather more expensive than a simple store, in order to remain atomic. + */ + public void set( long x ) { + CAT newcat = new CAT(null,4,x); + // Spin until CAS works + while( !CAS_cat(_cat,newcat) ) {/*empty*/} + } + + /** + * Current value of the counter. Since other threads are updating furiously + * the value is only approximate, but it includes all counts made by the + * current thread. Requires a pass over the internally striped counters. + */ + public long get() { return _cat.sum(); } + /** Same as {@link #get}, included for completeness. */ + public int intValue() { return (int)_cat.sum(); } + /** Same as {@link #get}, included for completeness. */ + public long longValue() { return _cat.sum(); } + + /** + * A cheaper {@link #get}. Updated only once/millisecond, but as fast as a + * simple load instruction when not updating. + */ + public long estimate_get( ) { return _cat.estimate_sum(); } + + /** + * Return the counter's {@code long} value converted to a string. + */ + public String toString() { return _cat.toString(); } + + /** + * A more verbose print than {@link #toString}, showing internal structure. + * Useful for debugging. + */ + public void print() { _cat.print(); } + + /** + * Return the internal counter striping factor. Useful for diagnosing + * performance problems. + */ + public int internal_size() { return _cat._t.length; } + + // Only add 'x' to some slot in table, hinted at by 'hash'. The sum can + // overflow. Value is CAS'd so no counts are lost. The CAS is retried until + // it succeeds. Returned value is the old value. + private long add_if( long x ) { return _cat.add_if(x,hash(),this); } + + // The underlying array of concurrently updated long counters + private volatile CAT _cat = new CAT(null,16/*Start Small, Think Big!*/,0L); + private static AtomicReferenceFieldUpdater _catUpdater = + AtomicReferenceFieldUpdater.newUpdater(ConcurrentAutoTable.class,CAT.class, "_cat"); + private boolean CAS_cat( CAT oldcat, CAT newcat ) { return _catUpdater.compareAndSet(this,oldcat,newcat); } + + // Hash spreader + private static int hash() { + //int h = (int)Thread.currentThread().getId(); + int h = System.identityHashCode(Thread.currentThread()); + return h<<3; // Pad out cache lines. The goal is to avoid cache-line contention + } + + // --- CAT ----------------------------------------------------------------- + private static class CAT implements Serializable { + + // Unsafe crud: get a function which will CAS arrays + private static final int _Lbase = UNSAFE.arrayBaseOffset(long[].class); + private static final int _Lscale = UNSAFE.arrayIndexScale(long[].class); + private static long rawIndex(long[] ary, int i) { + assert i >= 0 && i < ary.length; + return _Lbase + i * _Lscale; + } + private static boolean CAS( long[] A, int idx, long old, long nnn ) { + return UNSAFE.compareAndSwapLong( A, rawIndex(A,idx), old, nnn ); + } + + //volatile long _resizers; // count of threads attempting a resize + //static private final AtomicLongFieldUpdater _resizerUpdater = + // AtomicLongFieldUpdater.newUpdater(CAT.class, "_resizers"); + + private final CAT _next; + private volatile long _fuzzy_sum_cache; + private volatile long _fuzzy_time; + private static final int MAX_SPIN=1; + private final long[] _t; // Power-of-2 array of longs + + CAT( CAT next, int sz, long init ) { + _next = next; + _t = new long[sz]; + _t[0] = init; + } + + // Only add 'x' to some slot in table, hinted at by 'hash'. The sum can + // overflow. Value is CAS'd so no counts are lost. The CAS is attempted + // ONCE. + public long add_if( long x, int hash, ConcurrentAutoTable master ) { + final long[] t = _t; + final int idx = hash & (t.length-1); + // Peel loop; try once fast + long old = t[idx]; + final boolean ok = CAS( t, idx, old, old+x ); + if( ok ) return old; // Got it + // Try harder + int cnt=0; + while( true ) { + old = t[idx]; + if( CAS( t, idx, old, old+x ) ) break; // Got it! + cnt++; + } + if( cnt < MAX_SPIN ) return old; // Allowable spin loop count + if( t.length >= 1024*1024 ) return old; // too big already + + // Too much contention; double array size in an effort to reduce contention + //long r = _resizers; + //final int newbytes = (t.length<<1)<<3/*word to bytes*/; + //while( !_resizerUpdater.compareAndSet(this,r,r+newbytes) ) + // r = _resizers; + //r += newbytes; + if( master._cat != this ) return old; // Already doubled, don't bother + //if( (r>>17) != 0 ) { // Already too much allocation attempts? + // // We could use a wait with timeout, so we'll wakeup as soon as the new + // // table is ready, or after the timeout in any case. Annoyingly, this + // // breaks the non-blocking property - so for now we just briefly sleep. + // //synchronized( this ) { wait(8*megs); } // Timeout - we always wakeup + // try { Thread.sleep(r>>17); } catch( InterruptedException e ) { } + // if( master._cat != this ) return old; + //} + + CAT newcat = new CAT(this,t.length*2,0); + // Take 1 stab at updating the CAT with the new larger size. If this + // fails, we assume some other thread already expanded the CAT - so we + // do not need to retry until it succeeds. + while( master._cat == this && !master.CAS_cat(this,newcat) ) {/*empty*/} + return old; + } + + + // Return the current sum of all things in the table. Writers can be + // updating the table furiously, so the sum is only locally accurate. + public long sum( ) { + long sum = _next == null ? 0 : _next.sum(); // Recursively get cached sum + final long[] t = _t; + for( long cnt : t ) sum += cnt; + return sum; + } + + // Fast fuzzy version. Used a cached value until it gets old, then re-up + // the cache. + public long estimate_sum( ) { + // For short tables, just do the work + if( _t.length <= 64 ) return sum(); + // For bigger tables, periodically freshen a cached value + long millis = System.currentTimeMillis(); + if( _fuzzy_time != millis ) { // Time marches on? + _fuzzy_sum_cache = sum(); // Get sum the hard way + _fuzzy_time = millis; // Indicate freshness of cached value + } + return _fuzzy_sum_cache; // Return cached sum + } + + public String toString( ) { return Long.toString(sum()); } + + public void print() { + long[] t = _t; + System.out.print("["+t[0]); + for( int i=1; iHashtable. However, even though all operations are + * thread-safe, operations do not entail locking and there is + * not any support for locking the entire table in a way that + * prevents all access. This class is fully interoperable with + * Hashtable in programs that rely on its thread safety but not on + * its synchronization details. + * + *

Operations (including put) generally do not block, so may + * overlap with other update operations (including other puts and + * removes). Retrievals reflect the results of the most recently + * completed update operations holding upon their onset. For + * aggregate operations such as putAll, concurrent retrievals may + * reflect insertion or removal of only some entries. Similarly, Iterators + * and Enumerations return elements reflecting the state of the hash table at + * some point at or since the creation of the iterator/enumeration. They do + * not throw {@link ConcurrentModificationException}. However, + * iterators are designed to be used by only one thread at a time. + * + *

Very full tables, or tables with high re-probe rates may trigger an + * internal resize operation to move into a larger table. Resizing is not + * terribly expensive, but it is not free either; during resize operations + * table throughput may drop somewhat. All threads that visit the table + * during a resize will 'help' the resizing but will still be allowed to + * complete their operation before the resize is finished (i.e., a simple + * 'get' operation on a million-entry table undergoing resizing will not need + * to block until the entire million entries are copied). + * + *

This class and its views and iterators implement all of the + * optional methods of the {@link Map} and {@link Iterator} + * interfaces. + * + *

Like {@link Hashtable} but unlike {@link HashMap}, this class + * does not allow null to be used as a key or value. + * + * + * @since 1.5 + * @author Cliff Click + * @param the type of keys maintained by this map + * @param the type of mapped values + */ + +public class NonBlockingHashMap + extends AbstractMap + implements ConcurrentMap, Cloneable, Serializable { + + private static final long serialVersionUID = 1234123412341234123L; + + private static final int REPROBE_LIMIT=10; // Too many reprobes then force a table-resize + + // --- Bits to allow Unsafe access to arrays + private static final int _Obase = UNSAFE.arrayBaseOffset(Object[].class); + private static final int _Oscale = UNSAFE.arrayIndexScale(Object[].class); + private static final int _Olog = _Oscale==4?2:(_Oscale==8?3:9999); + private static long rawIndex(final Object[] ary, final int idx) { + assert idx >= 0 && idx < ary.length; + // Note the long-math requirement, to handle arrays of more than 2^31 bytes + // - or 2^28 - or about 268M - 8-byte pointer elements. + return _Obase + ((long)idx << _Olog); + } + + // --- Setup to use Unsafe + private static final long _kvs_offset; + static { // + Field f = null; + try { f = NonBlockingHashMap.class.getDeclaredField("_kvs"); } + catch( NoSuchFieldException e ) { throw new RuntimeException(e); } + _kvs_offset = UNSAFE.objectFieldOffset(f); + } + private final boolean CAS_kvs( final Object[] oldkvs, final Object[] newkvs ) { + return UNSAFE.compareAndSwapObject(this, _kvs_offset, oldkvs, newkvs ); + } + + // --- Adding a 'prime' bit onto Values via wrapping with a junk wrapper class + private static final class Prime { + final Object _V; + Prime( Object V ) { _V = V; } + static Object unbox( Object V ) { return V instanceof Prime ? ((Prime)V)._V : V; } + } + + // --- hash ---------------------------------------------------------------- + // Helper function to spread lousy hashCodes. Throws NPE for null Key, on + // purpose - as the first place to conveniently toss the required NPE for a + // null Key. + private static final int hash(final Object key) { + int h = key.hashCode(); // The real hashCode call + h ^= (h>>>20) ^ (h>>>12); + h ^= (h>>> 7) ^ (h>>> 4); + h += h<<7; // smear low bits up high, for hashcodes that only differ by 1 + return h; + } + + + + // --- The Hash Table -------------------- + // Slot 0 is always used for a 'CHM' entry below to hold the interesting + // bits of the hash table. Slot 1 holds full hashes as an array of ints. + // Slots {2,3}, {4,5}, etc hold {Key,Value} pairs. The entire hash table + // can be atomically replaced by CASing the _kvs field. + // + // Why is CHM buried inside the _kvs Object array, instead of the other way + // around? The CHM info is used during resize events and updates, but not + // during standard 'get' operations. I assume 'get' is much more frequent + // than 'put'. 'get' can skip the extra indirection of skipping through the + // CHM to reach the _kvs array. + private transient Object[] _kvs; + private static final CHM chm (Object[] kvs) { return (CHM )kvs[0]; } + private static final int[] hashes(Object[] kvs) { return (int[])kvs[1]; } + // Number of K,V pairs in the table + private static final int len(Object[] kvs) { return (kvs.length-2)>>1; } + + // Time since last resize + private transient long _last_resize_milli; + + // --- Minimum table size ---------------- + // Pick size 8 K/V pairs, which turns into (8*2+2)*4+12 = 84 bytes on a + // standard 32-bit HotSpot, and (8*2+2)*8+12 = 156 bytes on 64-bit Azul. + private static final int MIN_SIZE_LOG=3; // + private static final int MIN_SIZE=(1<>4); + } + + // --- NonBlockingHashMap -------------------------------------------------- + // Constructors + + /** Create a new NonBlockingHashMap with default minimum size (currently set + * to 8 K/V pairs or roughly 84 bytes on a standard 32-bit JVM). */ + public NonBlockingHashMap( ) { this(MIN_SIZE); } + + /** Create a new NonBlockingHashMap with initial room for the given number of + * elements, thus avoiding internal resizing operations to reach an + * appropriate size. Large numbers here when used with a small count of + * elements will sacrifice space for a small amount of time gained. The + * initial size will be rounded up internally to the next larger power of 2. */ + public NonBlockingHashMap(final int initial_sz ) { initialize(initial_sz); } + private final void initialize( int initial_sz ) { + if (initial_sz < 0) { + throw new IllegalArgumentException("initial_sz: " + initial_sz + " (expected: >= 0)"); + } + int i; // Convert to next largest power-of-2 + if( initial_sz > 1024*1024 ) initial_sz = 1024*1024; + for( i=MIN_SIZE_LOG; (1<size() == 0. + * @return size() == 0 */ + @Override + public boolean isEmpty ( ) { return size() == 0; } + + /** Tests if the key in the table using the equals method. + * @return true if the key is in the table using the equals method + * @throws NullPointerException if the specified key is null */ + @Override + public boolean containsKey( Object key ) { return get(key) != null; } + + /** Legacy method testing if some key maps into the specified value in this + * table. This method is identical in functionality to {@link + * #containsValue}, and exists solely to ensure full compatibility with + * class {@link Hashtable}, which supported this method prior to + * introduction of the Java Collections framework. + * @param val a value to search for + * @return true if this map maps one or more keys to the specified value + * @throws NullPointerException if the specified value is null */ + public boolean contains ( Object val ) { return containsValue(val); } + + /** Maps the specified key to the specified value in the table. Neither key + * nor value can be null. + *

The value can be retrieved by calling {@link #get} with a key that is + * equal to the original key. + * @param key key with which the specified value is to be associated + * @param val value to be associated with the specified key + * @return the previous value associated with key, or + * null if there was no mapping for key + * @throws NullPointerException if the specified key or value is null */ + @Override + public TypeV put ( TypeK key, TypeV val ) { return putIfMatch( key, val, NO_MATCH_OLD); } + + /** Atomically, do a {@link #put} if-and-only-if the key is not mapped. + * Useful to ensure that only a single mapping for the key exists, even if + * many threads are trying to create the mapping in parallel. + * @return the previous value associated with the specified key, + * or null if there was no mapping for the key + * @throws NullPointerException if the specified key or value is null */ + @Override + public TypeV putIfAbsent( TypeK key, TypeV val ) { return putIfMatch( key, val, TOMBSTONE ); } + + /** Removes the key (and its corresponding value) from this map. + * This method does nothing if the key is not in the map. + * @return the previous value associated with key, or + * null if there was no mapping for key + * @throws NullPointerException if the specified key is null */ + @Override + public TypeV remove ( Object key ) { return putIfMatch( key,TOMBSTONE, NO_MATCH_OLD); } + + /** Atomically do a {@link #remove(Object)} if-and-only-if the key is mapped + * to a value which is equals to the given value. + * @throws NullPointerException if the specified key or value is null */ + public boolean remove ( Object key,Object val ) { return putIfMatch( key,TOMBSTONE, val ) == val; } + + /** Atomically do a put(key,val) if-and-only-if the key is + * mapped to some value already. + * @throws NullPointerException if the specified key or value is null */ + @Override + public TypeV replace ( TypeK key, TypeV val ) { return putIfMatch( key, val,MATCH_ANY ); } + + /** Atomically do a put(key,newValue) if-and-only-if the key is + * mapped a value which is equals to oldValue. + * @throws NullPointerException if the specified key or value is null */ + @Override + public boolean replace ( TypeK key, TypeV oldValue, TypeV newValue ) { + return putIfMatch( key, newValue, oldValue ) == oldValue; + } + + + // Atomically replace newVal for oldVal, returning the value that existed + // there before. If the oldVal matches the returned value, then newVal was + // inserted, otherwise not. A null oldVal means the key does not exist (only + // insert if missing); a null newVal means to remove the key. + public final TypeV putIfMatchAllowNull( Object key, Object newVal, Object oldVal ) { + if( oldVal == null ) oldVal = TOMBSTONE; + if( newVal == null ) newVal = TOMBSTONE; + final TypeV res = (TypeV)putIfMatch( this, _kvs, key, newVal, oldVal ); + assert !(res instanceof Prime); + //assert res != null; + return res == TOMBSTONE ? null : res; + } + + /** Atomically replace newVal for oldVal, returning the value that existed + * there before. If the oldVal matches the returned value, then newVal was + * inserted, otherwise not. + * @return the previous value associated with the specified key, + * or null if there was no mapping for the key + * @throws NullPointerException if the key or either value is null + */ + public final TypeV putIfMatch( Object key, Object newVal, Object oldVal ) { + if (oldVal == null || newVal == null) throw new NullPointerException(); + final Object res = putIfMatch( this, _kvs, key, newVal, oldVal ); + assert !(res instanceof Prime); + assert res != null; + return res == TOMBSTONE ? null : (TypeV)res; + } + + + /** Copies all of the mappings from the specified map to this one, replacing + * any existing mappings. + * @param m mappings to be stored in this map */ + @Override + public void putAll(Map m) { + for (Entry e : m.entrySet()) + put(e.getKey(), e.getValue()); + } + + /** Removes all of the mappings from this map. */ + @Override + public void clear() { // Smack a new empty table down + Object[] newkvs = new NonBlockingHashMap(MIN_SIZE)._kvs; + while( !CAS_kvs(_kvs,newkvs) ) // Spin until the clear works + ; + } + + /** Returns true if this Map maps one or more keys to the specified + * value. Note: This method requires a full internal traversal of the + * hash table and is much slower than {@link #containsKey}. + * @param val value whose presence in this map is to be tested + * @return true if this map maps one or more keys to the specified value + * @throws NullPointerException if the specified value is null */ + @Override + public boolean containsValue( final Object val ) { + if( val == null ) throw new NullPointerException(); + for( TypeV V : values() ) + if( V == val || V.equals(val) ) + return true; + return false; + } + + // This function is supposed to do something for Hashtable, and the JCK + // tests hang until it gets called... by somebody ... for some reason, + // any reason.... + protected void rehash() { + } + + /** + * Creates a shallow copy of this hashtable. All the structure of the + * hashtable itself is copied, but the keys and values are not cloned. + * This is a relatively expensive operation. + * + * @return a clone of the hashtable. + */ + @Override + public Object clone() { + try { + // Must clone, to get the class right; NBHM might have been + // extended so it would be wrong to just make a new NBHM. + NonBlockingHashMap t = (NonBlockingHashMap) super.clone(); + // But I don't have an atomic clone operation - the underlying _kvs + // structure is undergoing rapid change. If I just clone the _kvs + // field, the CHM in _kvs[0] won't be in sync. + // + // Wipe out the cloned array (it was shallow anyways). + t.clear(); + // Now copy sanely + for( TypeK K : keySet() ) { + final TypeV V = get(K); // Do an official 'get' + t.put(K,V); + } + return t; + } catch (CloneNotSupportedException e) { + // this shouldn't happen, since we are Cloneable + throw new InternalError(); + } + } + + /** + * Returns a string representation of this map. The string representation + * consists of a list of key-value mappings in the order returned by the + * map's entrySet view's iterator, enclosed in braces + * ("{}"). Adjacent mappings are separated by the characters + * ", " (comma and space). Each key-value mapping is rendered as + * the key followed by an equals sign ("=") followed by the + * associated value. Keys and values are converted to strings as by + * {@link String#valueOf(Object)}. + * + * @return a string representation of this map + */ + @Override + public String toString() { + Iterator> i = entrySet().iterator(); + if( !i.hasNext()) + return "{}"; + + StringBuilder sb = new StringBuilder(); + sb.append('{'); + for (;;) { + Entry e = i.next(); + TypeK key = e.getKey(); + TypeV value = e.getValue(); + sb.append(key == this ? "(this Map)" : key); + sb.append('='); + sb.append(value == this ? "(this Map)" : value); + if( !i.hasNext()) + return sb.append('}').toString(); + sb.append(", "); + } + } + + // --- keyeq --------------------------------------------------------------- + // Check for key equality. Try direct pointer compare first, then see if + // the hashes are unequal (fast negative test) and finally do the full-on + // 'equals' v-call. + private static boolean keyeq( Object K, Object key, int[] hashes, int hash, int fullhash ) { + return + K==key || // Either keys match exactly OR + // hash exists and matches? hash can be zero during the install of a + // new key/value pair. + ((hashes[hash] == 0 || hashes[hash] == fullhash) && + // Do not call the users' "equals()" call with a Tombstone, as this can + // surprise poorly written "equals()" calls that throw exceptions + // instead of simply returning false. + K != TOMBSTONE && // Do not call users' equals call with a Tombstone + // Do the match the hard way - with the users' key being the loop- + // invariant "this" pointer. I could have flipped the order of + // operands (since equals is commutative), but I'm making mega-morphic + // v-calls in a re-probing loop and nailing down the 'this' argument + // gives both the JIT and the hardware a chance to prefetch the call target. + key.equals(K)); // Finally do the hard match + } + + // --- get ----------------------------------------------------------------- + /** Returns the value to which the specified key is mapped, or {@code null} + * if this map contains no mapping for the key. + *

More formally, if this map contains a mapping from a key {@code k} to + * a value {@code v} such that {@code key.equals(k)}, then this method + * returns {@code v}; otherwise it returns {@code null}. (There can be at + * most one such mapping.) + * @throws NullPointerException if the specified key is null */ + // Never returns a Prime nor a Tombstone. + @Override + public TypeV get( Object key ) { + final Object V = get_impl(this,_kvs,key); + assert !(V instanceof Prime); // Never return a Prime + assert V != TOMBSTONE; + return (TypeV)V; + } + + private static final Object get_impl(final NonBlockingHashMap topmap, final Object[] kvs, final Object key ) { + final int fullhash= hash (key); // throws NullPointerException if key is null + final int len = len (kvs); // Count of key/value pairs, reads kvs.length + final CHM chm = chm (kvs); // The CHM, for a volatile read below; reads slot 0 of kvs + final int[] hashes=hashes(kvs); // The memoized hashes; reads slot 1 of kvs + + int idx = fullhash & (len-1); // First key hash + + // Main spin/reprobe loop, looking for a Key hit + int reprobe_cnt=0; + while( true ) { + // Probe table. Each read of 'val' probably misses in cache in a big + // table; hopefully the read of 'key' then hits in cache. + final Object K = key(kvs,idx); // Get key before volatile read, could be null + final Object V = val(kvs,idx); // Get value before volatile read, could be null or Tombstone or Prime + if( K == null ) return null; // A clear miss + + // We need a volatile-read here to preserve happens-before semantics on + // newly inserted Keys. If the Key body was written just before inserting + // into the table a Key-compare here might read the uninitialized Key body. + // Annoyingly this means we have to volatile-read before EACH key compare. + // . + // We also need a volatile-read between reading a newly inserted Value + // and returning the Value (so the user might end up reading the stale + // Value contents). Same problem as with keys - and the one volatile + // read covers both. + final Object[] newkvs = chm._newkvs; // VOLATILE READ before key compare + + // Key-compare + if( keyeq(K,key,hashes,idx,fullhash) ) { + // Key hit! Check for no table-copy-in-progress + if( !(V instanceof Prime) ) // No copy? + return (V == TOMBSTONE) ? null : V; // Return the value + // Key hit - but slot is (possibly partially) copied to the new table. + // Finish the copy & retry in the new table. + return get_impl(topmap,chm.copy_slot_and_check(topmap,kvs,idx,key),key); // Retry in the new table + } + // get and put must have the same key lookup logic! But only 'put' + // needs to force a table-resize for a too-long key-reprobe sequence. + // Check for too-many-reprobes on get - and flip to the new table. + if( ++reprobe_cnt >= reprobe_limit(len) || // too many probes + K == TOMBSTONE ) // found a TOMBSTONE key, means no more keys in this table + return newkvs == null ? null : get_impl(topmap,topmap.help_copy(newkvs),key); // Retry in the new table + + idx = (idx+1)&(len-1); // Reprobe by 1! (could now prefetch) + } + } + + // --- getk ----------------------------------------------------------------- + /** Returns the Key to which the specified key is mapped, or {@code null} + * if this map contains no mapping for the key. + * @throws NullPointerException if the specified key is null */ + // Never returns a Prime nor a Tombstone. + public TypeK getk( TypeK key ) { + return (TypeK)getk_impl(this,_kvs,key); + } + + private static final Object getk_impl(final NonBlockingHashMap topmap, final Object[] kvs, final Object key ) { + final int fullhash= hash (key); // throws NullPointerException if key is null + final int len = len (kvs); // Count of key/value pairs, reads kvs.length + final CHM chm = chm (kvs); // The CHM, for a volatile read below; reads slot 0 of kvs + final int[] hashes=hashes(kvs); // The memoized hashes; reads slot 1 of kvs + + int idx = fullhash & (len-1); // First key hash + + // Main spin/reprobe loop, looking for a Key hit + int reprobe_cnt=0; + while( true ) { + // Probe table. + final Object K = key(kvs,idx); // Get key before volatile read, could be null + if( K == null ) return null; // A clear miss + + // We need a volatile-read here to preserve happens-before semantics on + // newly inserted Keys. If the Key body was written just before inserting + // into the table a Key-compare here might read the uninitialized Key body. + // Annoyingly this means we have to volatile-read before EACH key compare. + // . + // We also need a volatile-read between reading a newly inserted Value + // and returning the Value (so the user might end up reading the stale + // Value contents). Same problem as with keys - and the one volatile + // read covers both. + final Object[] newkvs = chm._newkvs; // VOLATILE READ before key compare + + // Key-compare + if( keyeq(K,key,hashes,idx,fullhash) ) + return K; // Return existing Key! + + // get and put must have the same key lookup logic! But only 'put' + // needs to force a table-resize for a too-long key-reprobe sequence. + // Check for too-many-reprobes on get - and flip to the new table. + if( ++reprobe_cnt >= reprobe_limit(len) || // too many probes + K == TOMBSTONE ) { // found a TOMBSTONE key, means no more keys in this table + return newkvs == null ? null : getk_impl(topmap,topmap.help_copy(newkvs),key); // Retry in the new table + } + + idx = (idx+1)&(len-1); // Reprobe by 1! (could now prefetch) + } + } + + // --- putIfMatch --------------------------------------------------------- + // Put, Remove, PutIfAbsent, etc. Return the old value. If the returned + // value is equal to expVal (or expVal is NO_MATCH_OLD) then the put can be + // assumed to work (although might have been immediately overwritten). Only + // the path through copy_slot passes in an expected value of null, and + // putIfMatch only returns a null if passed in an expected null. + static volatile int DUMMY_VOLATILE; + private static final Object putIfMatch(final NonBlockingHashMap topmap, final Object[] kvs, final Object key, final Object putval, final Object expVal ) { + assert putval != null; + assert !(putval instanceof Prime); + assert !(expVal instanceof Prime); + final int fullhash = hash (key); // throws NullPointerException if key null + final int len = len (kvs); // Count of key/value pairs, reads kvs.length + final CHM chm = chm (kvs); // Reads kvs[0] + final int[] hashes = hashes(kvs); // Reads kvs[1], read before kvs[0] + int idx = fullhash & (len-1); + + // --- + // Key-Claim stanza: spin till we can claim a Key (or force a resizing). + int reprobe_cnt=0; + Object K=null, V=null; + Object[] newkvs=null; + while( true ) { // Spin till we get a Key slot + V = val(kvs,idx); // Get old value (before volatile read below!) + K = key(kvs,idx); // Get current key + if( K == null ) { // Slot is free? + // Found an empty Key slot - which means this Key has never been in + // this table. No need to put a Tombstone - the Key is not here! + if( putval == TOMBSTONE ) return putval; // Not-now & never-been in this table + if( expVal == MATCH_ANY ) return null; // Will not match, even after K inserts + // Claim the null key-slot + if( CAS_key(kvs,idx, null, key ) ) { // Claim slot for Key + chm._slots.add(1); // Raise key-slots-used count + hashes[idx] = fullhash; // Memoize fullhash + break; // Got it! + } + // CAS to claim the key-slot failed. + // + // This re-read of the Key points out an annoying short-coming of Java + // CAS. Most hardware CAS's report back the existing value - so that + // if you fail you have a *witness* - the value which caused the CAS to + // fail. The Java API turns this into a boolean destroying the + // witness. Re-reading does not recover the witness because another + // thread can write over the memory after the CAS. Hence we can be in + // the unfortunate situation of having a CAS fail *for cause* but + // having that cause removed by a later store. This turns a + // non-spurious-failure CAS (such as Azul has) into one that can + // apparently spuriously fail - and we avoid apparent spurious failure + // by not allowing Keys to ever change. + + // Volatile read, to force loads of K to retry despite JIT, otherwise + // it is legal to e.g. haul the load of "K = key(kvs,idx);" outside of + // this loop (since failed CAS ops have no memory ordering semantics). + int dummy = DUMMY_VOLATILE; + continue; + } + // Key slot was not null, there exists a Key here + + // We need a volatile-read here to preserve happens-before semantics on + // newly inserted Keys. If the Key body was written just before inserting + // into the table a Key-compare here might read the uninitialized Key body. + // Annoyingly this means we have to volatile-read before EACH key compare. + newkvs = chm._newkvs; // VOLATILE READ before key compare + + if( keyeq(K,key,hashes,idx,fullhash) ) + break; // Got it! + + // get and put must have the same key lookup logic! Lest 'get' give + // up looking too soon. + //topmap._reprobes.add(1); + if( ++reprobe_cnt >= reprobe_limit(len) || // too many probes or + K == TOMBSTONE ) { // found a TOMBSTONE key, means no more keys + // We simply must have a new table to do a 'put'. At this point a + // 'get' will also go to the new table (if any). We do not need + // to claim a key slot (indeed, we cannot find a free one to claim!). + newkvs = chm.resize(topmap,kvs); + if( expVal != null ) topmap.help_copy(newkvs); // help along an existing copy + return putIfMatch(topmap,newkvs,key,putval,expVal); + } + + idx = (idx+1)&(len-1); // Reprobe! + } // End of spinning till we get a Key slot + + // --- + // Found the proper Key slot, now update the matching Value slot. We + // never put a null, so Value slots monotonically move from null to + // not-null (deleted Values use Tombstone). Thus if 'V' is null we + // fail this fast cutout and fall into the check for table-full. + if( putval == V ) return V; // Fast cutout for no-change + + // See if we want to move to a new table (to avoid high average re-probe + // counts). We only check on the initial set of a Value from null to + // not-null (i.e., once per key-insert). Of course we got a 'free' check + // of newkvs once per key-compare (not really free, but paid-for by the + // time we get here). + if( newkvs == null && // New table-copy already spotted? + // Once per fresh key-insert check the hard way + ((V == null && chm.tableFull(reprobe_cnt,len)) || + // Or we found a Prime, but the JMM allowed reordering such that we + // did not spot the new table (very rare race here: the writing + // thread did a CAS of _newkvs then a store of a Prime. This thread + // reads the Prime, then reads _newkvs - but the read of Prime was so + // delayed (or the read of _newkvs was so accelerated) that they + // swapped and we still read a null _newkvs. The resize call below + // will do a CAS on _newkvs forcing the read. + V instanceof Prime) ) + newkvs = chm.resize(topmap,kvs); // Force the new table copy to start + // See if we are moving to a new table. + // If so, copy our slot and retry in the new table. + if( newkvs != null ) + return putIfMatch(topmap,chm.copy_slot_and_check(topmap,kvs,idx,expVal),key,putval,expVal); + + // --- + // We are finally prepared to update the existing table + assert !(V instanceof Prime); + + // Must match old, and we do not? Then bail out now. Note that either V + // or expVal might be TOMBSTONE. Also V can be null, if we've never + // inserted a value before. expVal can be null if we are called from + // copy_slot. + + if( expVal != NO_MATCH_OLD && // Do we care about expected-Value at all? + V != expVal && // No instant match already? + (expVal != MATCH_ANY || V == TOMBSTONE || V == null) && + !(V==null && expVal == TOMBSTONE) && // Match on null/TOMBSTONE combo + (expVal == null || !expVal.equals(V)) ) // Expensive equals check at the last + return V; // Do not update! + + // Actually change the Value in the Key,Value pair + if( CAS_val(kvs, idx, V, putval ) ) { + // CAS succeeded - we did the update! + // Both normal put's and table-copy calls putIfMatch, but table-copy + // does not (effectively) increase the number of live k/v pairs. + if( expVal != null ) { + // Adjust sizes - a striped counter + if( (V == null || V == TOMBSTONE) && putval != TOMBSTONE ) chm._size.add( 1); + if( !(V == null || V == TOMBSTONE) && putval == TOMBSTONE ) chm._size.add(-1); + } + } else { // Else CAS failed + V = val(kvs,idx); // Get new value + // If a Prime'd value got installed, we need to re-run the put on the + // new table. Otherwise we lost the CAS to another racing put. + // Simply retry from the start. + if( V instanceof Prime ) + return putIfMatch(topmap,chm.copy_slot_and_check(topmap,kvs,idx,expVal),key,putval,expVal); + } + // Win or lose the CAS, we are done. If we won then we know the update + // happened as expected. If we lost, it means "we won but another thread + // immediately stomped our update with no chance of a reader reading". + return (V==null && expVal!=null) ? TOMBSTONE : V; + } + + // --- help_copy --------------------------------------------------------- + // Help along an existing resize operation. This is just a fast cut-out + // wrapper, to encourage inlining for the fast no-copy-in-progress case. We + // always help the top-most table copy, even if there are nested table + // copies in progress. + private final Object[] help_copy( Object[] helper ) { + // Read the top-level KVS only once. We'll try to help this copy along, + // even if it gets promoted out from under us (i.e., the copy completes + // and another KVS becomes the top-level copy). + Object[] topkvs = _kvs; + CHM topchm = chm(topkvs); + if( topchm._newkvs == null ) return helper; // No copy in-progress + topchm.help_copy_impl(this,topkvs,false); + return helper; + } + + + // --- CHM ----------------------------------------------------------------- + // The control structure for the NonBlockingHashMap + private static final class CHM { + // Size in active K,V pairs + private final ConcurrentAutoTable _size; + public int size () { return (int)_size.get(); } + + // --- + // These next 2 fields are used in the resizing heuristics, to judge when + // it is time to resize or copy the table. Slots is a count of used-up + // key slots, and when it nears a large fraction of the table we probably + // end up reprobing too much. Last-resize-milli is the time since the + // last resize; if we are running back-to-back resizes without growing + // (because there are only a few live keys but many slots full of dead + // keys) then we need a larger table to cut down on the churn. + + // Count of used slots, to tell when table is full of dead unusable slots + private final ConcurrentAutoTable _slots; + public int slots() { return (int)_slots.get(); } + + // --- + // New mappings, used during resizing. + // The 'new KVs' array - created during a resize operation. This + // represents the new table being copied from the old one. It's the + // volatile variable that is read as we cross from one table to the next, + // to get the required memory orderings. It monotonically transits from + // null to set (once). + volatile Object[] _newkvs; + private static final AtomicReferenceFieldUpdater _newkvsUpdater = + AtomicReferenceFieldUpdater.newUpdater(CHM.class,Object[].class, "_newkvs"); + // Set the _next field if we can. + boolean CAS_newkvs( Object[] newkvs ) { + while( _newkvs == null ) + if( _newkvsUpdater.compareAndSet(this,null,newkvs) ) + return true; + return false; + } + + // Sometimes many threads race to create a new very large table. Only 1 + // wins the race, but the losers all allocate a junk large table with + // hefty allocation costs. Attempt to control the overkill here by + // throttling attempts to create a new table. I cannot really block here + // (lest I lose the non-blocking property) but late-arriving threads can + // give the initial resizing thread a little time to allocate the initial + // new table. The Right Long Term Fix here is to use array-lets and + // incrementally create the new very large array. In C I'd make the array + // with malloc (which would mmap under the hood) which would only eat + // virtual-address and not real memory - and after Somebody wins then we + // could in parallel initialize the array. Java does not allow + // un-initialized array creation (especially of ref arrays!). + volatile long _resizers; // count of threads attempting an initial resize + private static final AtomicLongFieldUpdater _resizerUpdater = + AtomicLongFieldUpdater.newUpdater(CHM.class, "_resizers"); + + // --- + // Simple constructor + CHM( ConcurrentAutoTable size ) { + _size = size; + _slots= new ConcurrentAutoTable(); + } + + // --- tableFull --------------------------------------------------------- + // Heuristic to decide if this table is too full, and we should start a + // new table. Note that if a 'get' call has reprobed too many times and + // decided the table must be full, then always the estimate_sum must be + // high and we must report the table is full. If we do not, then we might + // end up deciding that the table is not full and inserting into the + // current table, while a 'get' has decided the same key cannot be in this + // table because of too many reprobes. The invariant is: + // slots.estimate_sum >= max_reprobe_cnt >= reprobe_limit(len) + private final boolean tableFull( int reprobe_cnt, int len ) { + return + // Do the cheap check first: we allow some number of reprobes always + reprobe_cnt >= REPROBE_LIMIT && + (reprobe_cnt >= reprobe_limit(len) || + // More expensive check: see if the table is > 1/2 full. + _slots.estimate_get() >= (len>>1)); + } + + // --- resize ------------------------------------------------------------ + // Resizing after too many probes. "How Big???" heuristics are here. + // Callers will (not this routine) will 'help_copy' any in-progress copy. + // Since this routine has a fast cutout for copy-already-started, callers + // MUST 'help_copy' lest we have a path which forever runs through + // 'resize' only to discover a copy-in-progress which never progresses. + private final Object[] resize(NonBlockingHashMap topmap, Object[] kvs) { + assert chm(kvs) == this; + + // Check for resize already in progress, probably triggered by another thread + Object[] newkvs = _newkvs; // VOLATILE READ + if( newkvs != null ) // See if resize is already in progress + return newkvs; // Use the new table already + + // No copy in-progress, so start one. First up: compute new table size. + int oldlen = len(kvs); // Old count of K,V pairs allowed + int sz = size(); // Get current table count of active K,V pairs + int newsz = sz; // First size estimate + + // Heuristic to determine new size. We expect plenty of dead-slots-with-keys + // and we need some decent padding to avoid endless reprobing. + if( sz >= (oldlen>>2) ) { // If we are >25% full of keys then... + newsz = oldlen<<1; // Double size, so new table will be between 12.5% and 25% full + // For tables less than 1M entries, if >50% full of keys then... + // For tables more than 1M entries, if >75% full of keys then... + if( 4L*sz >= ((oldlen>>20)!=0?3L:2L)*oldlen ) + newsz = oldlen<<2; // Double double size, so new table will be between %12.5 (18.75%) and 25% (25%) + } + // This heuristic in the next 2 lines leads to a much denser table + // with a higher reprobe rate + //if( sz >= (oldlen>>1) ) // If we are >50% full of keys then... + // newsz = oldlen<<1; // Double size + + // Last (re)size operation was very recent? Then double again + // despite having few live keys; slows down resize operations + // for tables subject to a high key churn rate - but do not + // forever grow the table. If there is a high key churn rate + // the table needs a steady state of rare same-size resize + // operations to clean out the dead keys. + long tm = System.currentTimeMillis(); + if( newsz <= oldlen && // New table would shrink or hold steady? + tm <= topmap._last_resize_milli+10000) // Recent resize (less than 10 sec ago) + newsz = oldlen<<1; // Double the existing size + + // Do not shrink, ever. If we hit this size once, assume we + // will again. + if( newsz < oldlen ) newsz = oldlen; + + // Convert to power-of-2 + int log2; + for( log2=MIN_SIZE_LOG; (1< ((len >> 2) + (len >> 1))) throw new RuntimeException("Table is full."); + } + + // Now limit the number of threads actually allocating memory to a + // handful - lest we have 750 threads all trying to allocate a giant + // resized array. + long r = _resizers; + while( !_resizerUpdater.compareAndSet(this,r,r+1) ) + r = _resizers; + // Size calculation: 2 words (K+V) per table entry, plus a handful. We + // guess at 64-bit pointers; 32-bit pointers screws up the size calc by + // 2x but does not screw up the heuristic very much. + long megs = ((((1L<>20/*megs*/; + if( r >= 2 && megs > 0 ) { // Already 2 guys trying; wait and see + newkvs = _newkvs; // Between dorking around, another thread did it + if( newkvs != null ) // See if resize is already in progress + return newkvs; // Use the new table already + // TODO - use a wait with timeout, so we'll wakeup as soon as the new table + // is ready, or after the timeout in any case. + //synchronized( this ) { wait(8*megs); } // Timeout - we always wakeup + // For now, sleep a tad and see if the 2 guys already trying to make + // the table actually get around to making it happen. + try { Thread.sleep(megs); } catch( Exception e ) { } + } + // Last check, since the 'new' below is expensive and there is a chance + // that another thread slipped in a new thread while we ran the heuristic. + newkvs = _newkvs; + if( newkvs != null ) // See if resize is already in progress + return newkvs; // Use the new table already + + // Double size for K,V pairs, add 1 for CHM + newkvs = new Object[(int)len]; // This can get expensive for big arrays + newkvs[0] = new CHM(_size); // CHM in slot 0 + newkvs[1] = new int[1< _copyIdxUpdater = + AtomicLongFieldUpdater.newUpdater(CHM.class, "_copyIdx"); + + // Work-done reporting. Used to efficiently signal when we can move to + // the new table. From 0 to len(oldkvs) refers to copying from the old + // table to the new. + volatile long _copyDone= 0; + static private final AtomicLongFieldUpdater _copyDoneUpdater = + AtomicLongFieldUpdater.newUpdater(CHM.class, "_copyDone"); + + // --- help_copy_impl ---------------------------------------------------- + // Help along an existing resize operation. We hope its the top-level + // copy (it was when we started) but this CHM might have been promoted out + // of the top position. + private final void help_copy_impl(NonBlockingHashMap topmap, Object[] oldkvs, boolean copy_all ) { + assert chm(oldkvs) == this; + Object[] newkvs = _newkvs; + assert newkvs != null; // Already checked by caller + int oldlen = len(oldkvs); // Total amount to copy + final int MIN_COPY_WORK = Math.min(oldlen,1024); // Limit per-thread work + + // --- + int panic_start = -1; + int copyidx=-9999; // Fool javac to think it's initialized + while( _copyDone < oldlen ) { // Still needing to copy? + // Carve out a chunk of work. The counter wraps around so every + // thread eventually tries to copy every slot repeatedly. + + // We "panic" if we have tried TWICE to copy every slot - and it still + // has not happened. i.e., twice some thread somewhere claimed they + // would copy 'slot X' (by bumping _copyIdx) but they never claimed to + // have finished (by bumping _copyDone). Our choices become limited: + // we can wait for the work-claimers to finish (and become a blocking + // algorithm) or do the copy work ourselves. Tiny tables with huge + // thread counts trying to copy the table often 'panic'. + if( panic_start == -1 ) { // No panic? + copyidx = (int)_copyIdx; + while( !_copyIdxUpdater.compareAndSet(this,copyidx,copyidx+MIN_COPY_WORK) ) + copyidx = (int)_copyIdx; // Re-read + if( !(copyidx < (oldlen<<1)) ) // Panic! + panic_start = copyidx; // Record where we started to panic-copy + } + + // We now know what to copy. Try to copy. + int workdone = 0; + for( int i=0; i 0 ) // Report work-done occasionally + copy_check_and_promote( topmap, oldkvs, workdone );// See if we can promote + //for( int i=0; i 0 ) { + while( !_copyDoneUpdater.compareAndSet(this,copyDone,copyDone+workdone) ) { + copyDone = _copyDone; // Reload, retry + assert (copyDone+workdone) <= oldlen; + } + } + + // Check for copy being ALL done, and promote. Note that we might have + // nested in-progress copies and manage to finish a nested copy before + // finishing the top-level copy. We only promote top-level copies. + if( copyDone+workdone == oldlen && // Ready to promote this table? + topmap._kvs == oldkvs && // Looking at the top-level table? + // Attempt to promote + topmap.CAS_kvs(oldkvs,_newkvs) ) { + topmap._last_resize_milli = System.currentTimeMillis(); // Record resize time for next check + } + } + + // --- copy_slot --------------------------------------------------------- + // Copy one K/V pair from oldkvs[i] to newkvs. Returns true if we can + // confirm that we set an old-table slot to TOMBPRIME, and only returns after + // updating the new table. We need an accurate confirmed-copy count so + // that we know when we can promote (if we promote the new table too soon, + // other threads may 'miss' on values not-yet-copied from the old table). + // We don't allow any direct updates on the new table, unless they first + // happened to the old table - so that any transition in the new table from + // null to not-null must have been from a copy_slot (or other old-table + // overwrite) and not from a thread directly writing in the new table. + private boolean copy_slot(NonBlockingHashMap topmap, int idx, Object[] oldkvs, Object[] newkvs ) { + // Blindly set the key slot from null to TOMBSTONE, to eagerly stop + // fresh put's from inserting new values in the old table when the old + // table is mid-resize. We don't need to act on the results here, + // because our correctness stems from box'ing the Value field. Slamming + // the Key field is a minor speed optimization. + Object key; + while( (key=key(oldkvs,idx)) == null ) + CAS_key(oldkvs,idx, null, TOMBSTONE); + + // --- + // Prevent new values from appearing in the old table. + // Box what we see in the old table, to prevent further updates. + Object oldval = val(oldkvs,idx); // Read OLD table + while( !(oldval instanceof Prime) ) { + final Prime box = (oldval == null || oldval == TOMBSTONE) ? TOMBPRIME : new Prime(oldval); + if( CAS_val(oldkvs,idx,oldval,box) ) { // CAS down a box'd version of oldval + // If we made the Value slot hold a TOMBPRIME, then we both + // prevented further updates here but also the (absent) + // oldval is vacuously available in the new table. We + // return with true here: any thread looking for a value for + // this key can correctly go straight to the new table and + // skip looking in the old table. + if( box == TOMBPRIME ) + return true; + // Otherwise we boxed something, but it still needs to be + // copied into the new table. + oldval = box; // Record updated oldval + break; // Break loop; oldval is now boxed by us + } + oldval = val(oldkvs,idx); // Else try, try again + } + if( oldval == TOMBPRIME ) return false; // Copy already complete here! + + // --- + // Copy the value into the new table, but only if we overwrite a null. + // If another value is already in the new table, then somebody else + // wrote something there and that write is happens-after any value that + // appears in the old table. + Object old_unboxed = ((Prime)oldval)._V; + assert old_unboxed != TOMBSTONE; + putIfMatch(topmap, newkvs, key, old_unboxed, null); + + // --- + // Finally, now that any old value is exposed in the new table, we can + // forever hide the old-table value by slapping a TOMBPRIME down. This + // will stop other threads from uselessly attempting to copy this slot + // (i.e., it's a speed optimization not a correctness issue). + while( oldval != TOMBPRIME && !CAS_val(oldkvs,idx,oldval,TOMBPRIME) ) + oldval = val(oldkvs,idx); + + return oldval != TOMBPRIME; // True if we slammed the TOMBPRIME down + } // end copy_slot + } // End of CHM + + + // --- Snapshot ------------------------------------------------------------ + // The main class for iterating over the NBHM. It "snapshots" a clean + // view of the K/V array. + private class SnapshotV implements Iterator, Enumeration { + final Object[] _sskvs; + public SnapshotV() { + while( true ) { // Verify no table-copy-in-progress + Object[] topkvs = _kvs; + CHM topchm = chm(topkvs); + if( topchm._newkvs == null ) { // No table-copy-in-progress + // The "linearization point" for the iteration. Every key in this + // table will be visited, but keys added later might be skipped or + // even be added to a following table (also not iterated over). + _sskvs = topkvs; + break; + } + // Table copy in-progress - so we cannot get a clean iteration. We + // must help finish the table copy before we can start iterating. + topchm.help_copy_impl(NonBlockingHashMap.this,topkvs,true); + } + // Warm-up the iterator + next(); + } + int length() { return len(_sskvs); } + Object key(int idx) { return NonBlockingHashMap.key(_sskvs,idx); } + private int _idx; // Varies from 0-keys.length + private Object _nextK, _prevK; // Last 2 keys found + private TypeV _nextV, _prevV; // Last 2 values found + public boolean hasNext() { return _nextV != null; } + public TypeV next() { + // 'next' actually knows what the next value will be - it had to + // figure that out last go-around lest 'hasNext' report true and + // some other thread deleted the last value. Instead, 'next' + // spends all its effort finding the key that comes after the + // 'next' key. + if( _idx != 0 && _nextV == null ) throw new NoSuchElementException(); + _prevK = _nextK; // This will become the previous key + _prevV = _nextV; // This will become the previous value + _nextV = null; // We have no more next-key + // Attempt to set <_nextK,_nextV> to the next K,V pair. + // _nextV is the trigger: stop searching when it is != null + while( _idx elements() { return new SnapshotV(); } + + // --- values -------------------------------------------------------------- + /** Returns a {@link Collection} view of the values contained in this map. + * The collection is backed by the map, so changes to the map are reflected + * in the collection, and vice-versa. The collection supports element + * removal, which removes the corresponding mapping from this map, via the + * Iterator.remove, Collection.remove, + * removeAll, retainAll, and clear operations. + * It does not support the add or addAll operations. + * + *

The view's iterator is a "weakly consistent" iterator that + * will never throw {@link ConcurrentModificationException}, and guarantees + * to traverse elements as they existed upon construction of the iterator, + * and may (but is not guaranteed to) reflect any modifications subsequent + * to construction. */ + @Override + public Collection values() { + return new AbstractCollection() { + @Override public void clear ( ) { NonBlockingHashMap.this.clear ( ); } + @Override public int size ( ) { return NonBlockingHashMap.this.size ( ); } + @Override public boolean contains( Object v ) { return NonBlockingHashMap.this.containsValue(v); } + @Override public Iterator iterator() { return new SnapshotV(); } + }; + } + + // --- keySet -------------------------------------------------------------- + private class SnapshotK implements Iterator, Enumeration { + final SnapshotV _ss; + public SnapshotK() { _ss = new SnapshotV(); } + public void remove() { _ss.remove(); } + public TypeK next() { _ss.next(); return (TypeK)_ss._prevK; } + public boolean hasNext() { return _ss.hasNext(); } + public TypeK nextElement() { return next(); } + public boolean hasMoreElements() { return hasNext(); } + } + + /** Returns an enumeration of the keys in this table. + * @return an enumeration of the keys in this table + * @see #keySet() */ + public Enumeration keys() { return new SnapshotK(); } + + /** Returns a {@link Set} view of the keys contained in this map. The set + * is backed by the map, so changes to the map are reflected in the set, + * and vice-versa. The set supports element removal, which removes the + * corresponding mapping from this map, via the Iterator.remove, + * Set.remove, removeAll, retainAll, and + * clear operations. It does not support the add or + * addAll operations. + * + *

The view's iterator is a "weakly consistent" iterator that + * will never throw {@link ConcurrentModificationException}, and guarantees + * to traverse elements as they existed upon construction of the iterator, + * and may (but is not guaranteed to) reflect any modifications subsequent + * to construction. */ + @Override + public Set keySet() { + return new AbstractSet () { + @Override public void clear ( ) { NonBlockingHashMap.this.clear ( ); } + @Override public int size ( ) { return NonBlockingHashMap.this.size ( ); } + @Override public boolean contains( Object k ) { return NonBlockingHashMap.this.containsKey(k); } + @Override public boolean remove ( Object k ) { return NonBlockingHashMap.this.remove (k) != null; } + @Override public Iterator iterator() { return new SnapshotK(); } + // This is an efficient implementation of toArray instead of the standard + // one. In particular it uses a smart iteration over the NBHM. + @Override public T[] toArray(T[] a) { + Object[] kvs = raw_array(); + // Estimate size of array; be prepared to see more or fewer elements + int sz = size(); + T[] r = a.length >= sz ? a : + (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), sz); + // Fast efficient element walk. + int j=0; + for( int i=0; i= r.length ) { + int sz2 = (int)Math.min(Integer.MAX_VALUE-8,((long)j)<<1); + if( sz2<=r.length ) throw new OutOfMemoryError("Required array size too large"); + r = Arrays.copyOf(r,sz2); + } + r[j++] = (T)K; + } + } + if( j <= a.length ) { // Fit in the original array? + if( a!=r ) System.arraycopy(r,0,a,0,j); + if( j> { + final SnapshotV _ss; + public SnapshotE() { _ss = new SnapshotV(); } + public void remove() { _ss.remove(); } + public Entry next() { _ss.next(); return new SimpleImmutableEntry<>((TypeK)_ss._prevK,_ss._prevV); } + public boolean hasNext() { return _ss.hasNext(); } + } + + /** Returns a {@link Set} view of the mappings contained in this map. The + * set is backed by the map, so changes to the map are reflected in the + * set, and vice-versa. The set supports element removal, which removes + * the corresponding mapping from the map, via the + * Iterator.remove, Set.remove, removeAll, + * retainAll, and clear operations. It does not support + * the add or addAll operations. + * + *

The view's iterator is a "weakly consistent" iterator + * that will never throw {@link ConcurrentModificationException}, + * and guarantees to traverse elements as they existed upon + * construction of the iterator, and may (but is not guaranteed to) + * reflect any modifications subsequent to construction. + * + *

Warning: the iterator associated with this Set + * requires the creation of {@link Entry} objects with each + * iteration. The {@link NonBlockingHashMap} does not normally create or + * using {@link Entry} objects so they will be created soley + * to support this iteration. Iterating using {@link #keySet} or {@link + * #values} will be more efficient. + */ + @Override + public Set> entrySet() { + return new AbstractSet>() { + @Override public void clear ( ) { NonBlockingHashMap.this.clear( ); } + @Override public int size ( ) { return NonBlockingHashMap.this.size ( ); } + @Override public boolean remove( final Object o ) { + if( !(o instanceof Map.Entry)) return false; + final Entry e = (Entry)o; + return NonBlockingHashMap.this.remove(e.getKey(), e.getValue()); + } + @Override public boolean contains(final Object o) { + if( !(o instanceof Map.Entry)) return false; + final Entry e = (Entry)o; + TypeV v = get(e.getKey()); + return v != null && v.equals(e.getValue()); + } + @Override public Iterator> iterator() { return new SnapshotE(); } + }; + } + + // --- writeObject ------------------------------------------------------- + // Write a NBHM to a stream + private void writeObject(java.io.ObjectOutputStream s) throws IOException { + s.defaultWriteObject(); // Nothing to write + for( Object K : keySet() ) { + final Object V = get(K); // Do an official 'get' + s.writeObject(K); // Write the pair + s.writeObject(V); + } + s.writeObject(null); // Sentinel to indicate end-of-data + s.writeObject(null); + } + + // --- readObject -------------------------------------------------------- + // Read a CHM from a stream + private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException { + s.defaultReadObject(); // Read nothing + initialize(MIN_SIZE); + for(;;) { + final TypeK K = (TypeK) s.readObject(); + final TypeV V = (TypeV) s.readObject(); + if( K == null ) break; + put(K,V); // Insert with an offical put + } + } + +} // End NonBlockingHashMap class diff --git a/rsocket-core/src/main/java/io/rsocket/util/NonBlockingHashMapLong.java b/rsocket-core/src/main/java/io/rsocket/util/NonBlockingHashMapLong.java new file mode 100644 index 000000000..cb437f533 --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/util/NonBlockingHashMapLong.java @@ -0,0 +1,1208 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.util; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Field; +import java.util.*; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static io.rsocket.util.UnsafeAccess.UNSAFE; + +/** + * A lock-free alternate implementation of {@link java.util.concurrent.ConcurrentHashMap} + * with primitive long keys, better scaling properties and + * generally lower costs. The use of {@code long} keys allows for faster + * compares and lower memory costs. The Map provides identical correctness + * properties as ConcurrentHashMap. All operations are non-blocking and + * multi-thread safe, including all update operations. {@link + * NonBlockingHashMapLong} scales substatially better than {@link + * java.util.concurrent.ConcurrentHashMap} for high update rates, even with a large + * concurrency factor. Scaling is linear up to 768 CPUs on a 768-CPU Azul + * box, even with 100% updates or 100% reads or any fraction in-between. + * Linear scaling up to all cpus has been observed on a 32-way Sun US2 box, + * 32-way Sun Niagra box, 8-way Intel box and a 4-way Power box. + * + *

The main benefit of this class over using plain {@link + * io.rsocket.util.NonBlockingHashMap} with {@link Long} keys is + * that it avoids the auto-boxing and unboxing costs. Since auto-boxing is + * automatic, it is easy to accidentally cause auto-boxing and negate + * the space and speed benefits. + * + *

This class obeys the same functional specification as {@link + * Hashtable}, and includes versions of methods corresponding to + * each method of Hashtable. However, even though all operations are + * thread-safe, operations do not entail locking and there is + * not any support for locking the entire table in a way that + * prevents all access. This class is fully interoperable with + * Hashtable in programs that rely on its thread safety but not on + * its synchronization details. + * + *

Operations (including put) generally do not block, so may + * overlap with other update operations (including other puts and + * removes). Retrievals reflect the results of the most recently + * completed update operations holding upon their onset. For + * aggregate operations such as putAll, concurrent retrievals may + * reflect insertion or removal of only some entries. Similarly, Iterators + * and Enumerations return elements reflecting the state of the hash table at + * some point at or since the creation of the iterator/enumeration. They do + * not throw {@link ConcurrentModificationException}. However, + * iterators are designed to be used by only one thread at a time. + * + *

Very full tables, or tables with high reprobe rates may trigger an + * internal resize operation to move into a larger table. Resizing is not + * terribly expensive, but it is not free either; during resize operations + * table throughput may drop somewhat. All threads that visit the table + * during a resize will 'help' the resizing but will still be allowed to + * complete their operation before the resize is finished (i.e., a simple + * 'get' operation on a million-entry table undergoing resizing will not need + * to block until the entire million entries are copied). + * + *

This class and its views and iterators implement all of the + * optional methods of the {@link Map} and {@link Iterator} + * interfaces. + * + *

Like {@link Hashtable} but unlike {@link HashMap}, this class + * does not allow null to be used as a value. + * + * + * @since 1.5 + * @author Cliff Click + * @param the type of mapped values + */ + +public class NonBlockingHashMapLong + extends AbstractMap + implements ConcurrentMap, Serializable { + + private static final long serialVersionUID = 1234123412341234124L; + + private static final int REPROBE_LIMIT=10; // Too many reprobes then force a table-resize + + // --- Bits to allow Unsafe access to arrays + private static final int _Obase = UNSAFE.arrayBaseOffset(Object[].class); + private static final int _Oscale = UNSAFE.arrayIndexScale(Object[].class); + private static long rawIndex(final Object[] ary, final int idx) { + assert idx >= 0 && idx < ary.length; + // Note the long-math requirement, to handle arrays of more than 2^31 bytes + // - or 2^28 - or about 268M - 8-byte pointer elements. + return _Obase + ((long)idx * _Oscale); + } + private static final int _Lbase = UNSAFE.arrayBaseOffset(long[].class); + private static final int _Lscale = UNSAFE.arrayIndexScale(long[].class); + private static long rawIndex(final long[] ary, final int idx) { + assert idx >= 0 && idx < ary.length; + // Note the long-math requirement, to handle arrays of more than 2^31 bytes + // - or 2^28 - or about 268M - 8-byte pointer elements. + return _Lbase + ((long)idx * _Lscale); + } + + // --- Bits to allow Unsafe CAS'ing of the CHM field + private static final long _chm_offset; + private static final long _val_1_offset; + static { // + Field f; + try { f = NonBlockingHashMapLong.class.getDeclaredField("_chm"); } + catch( NoSuchFieldException e ) { throw new RuntimeException(e); } + _chm_offset = UNSAFE.objectFieldOffset(f); + + try { f = NonBlockingHashMapLong.class.getDeclaredField("_val_1"); } + catch( NoSuchFieldException e ) { throw new RuntimeException(e); } + _val_1_offset = UNSAFE.objectFieldOffset(f); + } + private final boolean CAS( final long offset, final Object old, final Object nnn ) { + return UNSAFE.compareAndSwapObject(this, offset, old, nnn ); + } + + // --- Adding a 'prime' bit onto Values via wrapping with a junk wrapper class + private static final class Prime { + final Object _V; + Prime( Object V ) { _V = V; } + static Object unbox( Object V ) { return V instanceof Prime ? ((Prime)V)._V : V; } + } + + // --- The Hash Table -------------------- + private transient CHM _chm; + // This next field holds the value for Key 0 - the special key value which + // is the initial array value, and also means: no-key-inserted-yet. + private transient Object _val_1; // Value for Key: NO_KEY + + // Time since last resize + private transient long _last_resize_milli; + + // Optimize for space: use a 1/2-sized table and allow more re-probes + private final boolean _opt_for_space; + + // --- Minimum table size ---------------- + // Pick size 16 K/V pairs, which turns into (16*2)*4+12 = 140 bytes on a + // standard 32-bit HotSpot, and (16*2)*8+12 = 268 bytes on 64-bit Azul. + private static final int MIN_SIZE_LOG=4; // + private static final int MIN_SIZE=(1<>4); + } + + // --- NonBlockingHashMapLong ---------------------------------------------- + // Constructors + + /** Create a new NonBlockingHashMapLong with default minimum size (currently set + * to 8 K/V pairs or roughly 84 bytes on a standard 32-bit JVM). */ + public NonBlockingHashMapLong( ) { this(MIN_SIZE,true); } + + /** Create a new NonBlockingHashMapLong with initial room for the given + * number of elements, thus avoiding internal resizing operations to reach + * an appropriate size. Large numbers here when used with a small count of + * elements will sacrifice space for a small amount of time gained. The + * initial size will be rounded up internally to the next larger power of 2. */ + public NonBlockingHashMapLong(final int initial_sz ) { this(initial_sz,true); } + + /** Create a new NonBlockingHashMapLong, setting the space-for-speed + * tradeoff. {@code true} optimizes for space and is the default. {@code + * false} optimizes for speed and doubles space costs for roughly a 10% + * speed improvement. */ + public NonBlockingHashMapLong(final boolean opt_for_space ) { this(1,opt_for_space); } + + /** Create a new NonBlockingHashMapLong, setting both the initial size and + * the space-for-speed tradeoff. {@code true} optimizes for space and is + * the default. {@code false} optimizes for speed and doubles space costs + * for roughly a 10% speed improvement. */ + public NonBlockingHashMapLong(final int initial_sz, final boolean opt_for_space ) { + _opt_for_space = opt_for_space; + initialize(initial_sz); + } + private void initialize( final int initial_sz ) { + if (initial_sz < 0) { + throw new IllegalArgumentException("initial_sz: " + initial_sz + " (expected: >= 0)"); + } + int i; // Convert to next largest power-of-2 + for( i=MIN_SIZE_LOG; (1<true if the key is in the table */ + public boolean containsKey( long key ) { return get(key) != null; } + + /** Legacy method testing if some key maps into the specified value in this + * table. This method is identical in functionality to {@link + * #containsValue}, and exists solely to ensure full compatibility with + * class {@link Hashtable}, which supported this method prior to + * introduction of the Java Collections framework. + * @param val a value to search for + * @return true if this map maps one or more keys to the specified value + * @throws NullPointerException if the specified value is null */ + public boolean contains ( Object val ) { return containsValue(val); } + + /** Maps the specified key to the specified value in the table. The value + * cannot be null.

The value can be retrieved by calling {@link #get} + * with a key that is equal to the original key. + * @param key key with which the specified value is to be associated + * @param val value to be associated with the specified key + * @return the previous value associated with key, or + * null if there was no mapping for key + * @throws NullPointerException if the specified value is null */ + public TypeV put ( long key, TypeV val ) { return putIfMatch( key, val,NO_MATCH_OLD);} + + /** Atomically, do a {@link #put} if-and-only-if the key is not mapped. + * Useful to ensure that only a single mapping for the key exists, even if + * many threads are trying to create the mapping in parallel. + * @return the previous value associated with the specified key, + * or null if there was no mapping for the key + * @throws NullPointerException if the specified is value is null */ + public TypeV putIfAbsent( long key, TypeV val ) { return putIfMatch( key, val,TOMBSTONE );} + + /** Removes the key (and its corresponding value) from this map. + * This method does nothing if the key is not in the map. + * @return the previous value associated with key, or + * null if there was no mapping for key*/ + public TypeV remove ( long key ) { return putIfMatch( key,TOMBSTONE,NO_MATCH_OLD);} + + /** Atomically do a {@link #remove(long)} if-and-only-if the key is mapped + * to a value which is equals to the given value. + * @throws NullPointerException if the specified value is null */ + public boolean remove ( long key,Object val ) { return putIfMatch( key,TOMBSTONE,val ) == val ;} + + /** Atomically do a put(key,val) if-and-only-if the key is + * mapped to some value already. + * @throws NullPointerException if the specified value is null */ + public TypeV replace ( long key, TypeV val ) { return putIfMatch( key, val,MATCH_ANY );} + + /** Atomically do a put(key,newValue) if-and-only-if the key is + * mapped a value which is equals to oldValue. + * @throws NullPointerException if the specified value is null */ + public boolean replace ( long key, TypeV oldValue, TypeV newValue ) { + return putIfMatch( key, newValue, oldValue ) == oldValue; + } + + private TypeV putIfMatch( long key, Object newVal, Object oldVal ) { + if (oldVal == null || newVal == null) throw new NullPointerException(); + if( key == NO_KEY ) { + Object curVal = _val_1; + if( oldVal == NO_MATCH_OLD || // Do we care about expected-Value at all? + curVal == oldVal || // No instant match already? + (oldVal == MATCH_ANY && curVal != TOMBSTONE) || + oldVal.equals(curVal) ) { // Expensive equals check + if( !CAS(_val_1_offset,curVal,newVal) ) // One shot CAS update attempt + curVal = _val_1; // Failed; get failing witness + } + return curVal == TOMBSTONE ? null : (TypeV)curVal; // Return the last value present + } + final Object res = _chm.putIfMatch( key, newVal, oldVal ); + assert !(res instanceof Prime); + assert res != null; + return res == TOMBSTONE ? null : (TypeV)res; + } + + /** Removes all of the mappings from this map. */ + public void clear() { // Smack a new empty table down + CHM newchm = new CHM(this,new ConcurrentAutoTable(),MIN_SIZE_LOG); + while( !CAS(_chm_offset,_chm,newchm) ) { /*Spin until the clear works*/} + CAS(_val_1_offset,_val_1,TOMBSTONE); + } + + /** Returns true if this Map maps one or more keys to the specified + * value. Note: This method requires a full internal traversal of the + * hash table and is much slower than {@link #containsKey}. + * @param val value whose presence in this map is to be tested + * @return true if this Map maps one or more keys to the specified value + * @throws NullPointerException if the specified value is null */ + public boolean containsValue( Object val ) { + if( val == null ) return false; + if( val == _val_1 ) return true; // Key 0 + for( TypeV V : values() ) + if( V == val || V.equals(val) ) + return true; + return false; + } + + // --- get ----------------------------------------------------------------- + /** Returns the value to which the specified key is mapped, or {@code null} + * if this map contains no mapping for the key. + *

More formally, if this map contains a mapping from a key {@code k} to + * a value {@code v} such that {@code key==k}, then this method + * returns {@code v}; otherwise it returns {@code null}. (There can be at + * most one such mapping.) + * @throws NullPointerException if the specified key is null */ + // Never returns a Prime nor a Tombstone. + public final TypeV get( long key ) { + if( key == NO_KEY ) { + final Object V = _val_1; + return V == TOMBSTONE ? null : (TypeV)V; + } + final Object V = _chm.get_impl(key); + assert !(V instanceof Prime); // Never return a Prime + assert V != TOMBSTONE; + return (TypeV)V; + } + + /** Auto-boxing version of {@link #get(long)}. */ + public TypeV get ( Object key ) { return (key instanceof Long) ? get (((Long)key).longValue()) : null; } + /** Auto-boxing version of {@link #remove(long)}. */ + public TypeV remove ( Object key ) { return (key instanceof Long) ? remove (((Long)key).longValue()) : null; } + /** Auto-boxing version of {@link #remove(long,Object)}. */ + public boolean remove ( Object key, Object Val ) { return (key instanceof Long) && remove(((Long) key).longValue(), Val); } + /** Auto-boxing version of {@link #containsKey(long)}. */ + public boolean containsKey( Object key ) { return (key instanceof Long) && containsKey(((Long) key).longValue()); } + /** Auto-boxing version of {@link #putIfAbsent}. */ + public TypeV putIfAbsent( Long key, TypeV val ) { return putIfAbsent( key.longValue(), val ); } + /** Auto-boxing version of {@link #replace}. */ + public TypeV replace( Long key, TypeV Val ) { return replace(key.longValue(), Val); } + /** Auto-boxing version of {@link #put}. */ + public TypeV put ( Long key, TypeV val ) { return put(key.longValue(),val); } + /** Auto-boxing version of {@link #replace}. */ + public boolean replace( Long key, TypeV oldValue, TypeV newValue ) { + return replace(key.longValue(), oldValue, newValue); + } + + // --- help_copy ----------------------------------------------------------- + // Help along an existing resize operation. This is just a fast cut-out + // wrapper, to encourage inlining for the fast no-copy-in-progress case. We + // always help the top-most table copy, even if there are nested table + // copies in progress. + private void help_copy( ) { + // Read the top-level CHM only once. We'll try to help this copy along, + // even if it gets promoted out from under us (i.e., the copy completes + // and another KVS becomes the top-level copy). + CHM topchm = _chm; + if( topchm._newchm == null ) return; // No copy in-progress + topchm.help_copy_impl(false); + } + + + // --- CHM ----------------------------------------------------------------- + // The control structure for the NonBlockingHashMapLong + private static final class CHM implements Serializable { + // Back-pointer to top-level structure + final NonBlockingHashMapLong _nbhml; + + // Size in active K,V pairs + private final ConcurrentAutoTable _size; + public int size () { return (int)_size.get(); } + + // --- + // These next 2 fields are used in the resizing heuristics, to judge when + // it is time to resize or copy the table. Slots is a count of used-up + // key slots, and when it nears a large fraction of the table we probably + // end up reprobing too much. Last-resize-milli is the time since the + // last resize; if we are running back-to-back resizes without growing + // (because there are only a few live keys but many slots full of dead + // keys) then we need a larger table to cut down on the churn. + + // Count of used slots, to tell when table is full of dead unusable slots + private final ConcurrentAutoTable _slots; + public int slots() { return (int)_slots.get(); } + + // --- + // New mappings, used during resizing. + // The 'next' CHM - created during a resize operation. This represents + // the new table being copied from the old one. It's the volatile + // variable that is read as we cross from one table to the next, to get + // the required memory orderings. It monotonically transits from null to + // set (once). + volatile CHM _newchm; + private static final AtomicReferenceFieldUpdater _newchmUpdater = + AtomicReferenceFieldUpdater.newUpdater(CHM.class,CHM.class, "_newchm"); + // Set the _newchm field if we can. AtomicUpdaters do not fail spuriously. + boolean CAS_newchm( CHM newchm ) { + return _newchmUpdater.compareAndSet(this,null,newchm); + } + // Sometimes many threads race to create a new very large table. Only 1 + // wins the race, but the losers all allocate a junk large table with + // hefty allocation costs. Attempt to control the overkill here by + // throttling attempts to create a new table. I cannot really block here + // (lest I lose the non-blocking property) but late-arriving threads can + // give the initial resizing thread a little time to allocate the initial + // new table. The Right Long Term Fix here is to use array-lets and + // incrementally create the new very large array. In C I'd make the array + // with malloc (which would mmap under the hood) which would only eat + // virtual-address and not real memory - and after Somebody wins then we + // could in parallel initialize the array. Java does not allow + // un-initialized array creation (especially of ref arrays!). + volatile long _resizers; // count of threads attempting an initial resize + private static final AtomicLongFieldUpdater _resizerUpdater = + AtomicLongFieldUpdater.newUpdater(CHM.class, "_resizers"); + + // --- key,val ------------------------------------------------------------- + // Access K,V for a given idx + private boolean CAS_key( int idx, long old, long key ) { + return UNSAFE.compareAndSwapLong ( _keys, rawIndex(_keys, idx), old, key ); + } + private boolean CAS_val( int idx, Object old, Object val ) { + return UNSAFE.compareAndSwapObject( _vals, rawIndex(_vals, idx), old, val ); + } + + final long [] _keys; + final Object [] _vals; + + // Simple constructor + CHM(final NonBlockingHashMapLong nbhml, ConcurrentAutoTable size, final int logsize ) { + _nbhml = nbhml; + _size = size; + _slots= new ConcurrentAutoTable(); + _keys = new long [1<= reprobe_limit(len) ) // too many probes + return _newchm == null // Table copy in progress? + ? null // Nope! A clear miss + : copy_slot_and_check(idx,key).get_impl(key); // Retry in the new table + + idx = (idx+1)&(len-1); // Reprobe by 1! (could now prefetch) + } + } + + // --- putIfMatch --------------------------------------------------------- + // Put, Remove, PutIfAbsent, etc. Return the old value. If the returned + // value is equal to expVal (or expVal is NO_MATCH_OLD) then the put can + // be assumed to work (although might have been immediately overwritten). + // Only the path through copy_slot passes in an expected value of null, + // and putIfMatch only returns a null if passed in an expected null. + private Object putIfMatch( final long key, final Object putval, final Object expVal ) { + assert putval != null; + assert !(putval instanceof Prime); + assert !(expVal instanceof Prime); + final int len = _keys.length; + int idx = (int)(key & (len-1)); // The first key + + // --- + // Key-Claim stanza: spin till we can claim a Key (or force a resizing). + int reprobe_cnt=0; + long K; + Object V; + while( true ) { // Spin till we get a Key slot + V = _vals[idx]; // Get old value + K = _keys[idx]; // Get current key + if( K == NO_KEY ) { // Slot is free? + // Found an empty Key slot - which means this Key has never been in + // this table. No need to put a Tombstone - the Key is not here! + if( putval == TOMBSTONE ) return putval; // Not-now & never-been in this table + if( expVal == MATCH_ANY ) return null; // Will not match, even after K inserts + // Claim the zero key-slot + if( CAS_key(idx, NO_KEY, key) ) { // Claim slot for Key + _slots.add(1); // Raise key-slots-used count + break; // Got it! + } + // CAS to claim the key-slot failed. + // + // This re-read of the Key points out an annoying short-coming of Java + // CAS. Most hardware CAS's report back the existing value - so that + // if you fail you have a *witness* - the value which caused the CAS + // to fail. The Java API turns this into a boolean destroying the + // witness. Re-reading does not recover the witness because another + // thread can write over the memory after the CAS. Hence we can be in + // the unfortunate situation of having a CAS fail *for cause* but + // having that cause removed by a later store. This turns a + // non-spurious-failure CAS (such as Azul has) into one that can + // apparently spuriously fail - and we avoid apparent spurious failure + // by not allowing Keys to ever change. + K = _keys[idx]; // CAS failed, get updated value + assert K != NO_KEY ; // If keys[idx] is NO_KEY, CAS shoulda worked + } + // Key slot was not null, there exists a Key here + if( K == key ) + break; // Got it! + + // get and put must have the same key lookup logic! Lest 'get' give + // up looking too soon. + //topmap._reprobes.add(1); + if( ++reprobe_cnt >= reprobe_limit(len) ) { + // We simply must have a new table to do a 'put'. At this point a + // 'get' will also go to the new table (if any). We do not need + // to claim a key slot (indeed, we cannot find a free one to claim!). + final CHM newchm = resize(); + if( expVal != null ) _nbhml.help_copy(); // help along an existing copy + return newchm.putIfMatch(key,putval,expVal); + } + + idx = (idx+1)&(len-1); // Reprobe! + } // End of spinning till we get a Key slot + + // --- + // Found the proper Key slot, now update the matching Value slot. We + // never put a null, so Value slots monotonically move from null to + // not-null (deleted Values use Tombstone). Thus if 'V' is null we + // fail this fast cutout and fall into the check for table-full. + if( putval == V ) return V; // Fast cutout for no-change + + // See if we want to move to a new table (to avoid high average re-probe + // counts). We only check on the initial set of a Value from null to + // not-null (i.e., once per key-insert). + if( (V == null && tableFull(reprobe_cnt,len)) || + // Or we found a Prime: resize is already in progress. The resize + // call below will do a CAS on _newchm forcing the read. + V instanceof Prime) { + resize(); // Force the new table copy to start + return copy_slot_and_check(idx,expVal).putIfMatch(key,putval,expVal); + } + + // --- + // We are finally prepared to update the existing table + //assert !(V instanceof Prime); // always true, so IDE warnings if uncommented + + // Must match old, and we do not? Then bail out now. Note that either V + // or expVal might be TOMBSTONE. Also V can be null, if we've never + // inserted a value before. expVal can be null if we are called from + // copy_slot. + + if( expVal != NO_MATCH_OLD && // Do we care about expected-Value at all? + V != expVal && // No instant match already? + (expVal != MATCH_ANY || V == TOMBSTONE || V == null) && + !(V==null && expVal == TOMBSTONE) && // Match on null/TOMBSTONE combo + (expVal == null || !expVal.equals(V)) ) // Expensive equals check at the last + return V; // Do not update! + + // Actually change the Value in the Key,Value pair + if( CAS_val(idx, V, putval ) ) { + // CAS succeeded - we did the update! + // Both normal put's and table-copy calls putIfMatch, but table-copy + // does not (effectively) increase the number of live k/v pairs. + if( expVal != null ) { + // Adjust sizes - a striped counter + if( (V == null || V == TOMBSTONE) && putval != TOMBSTONE ) _size.add( 1); + if( !(V == null || V == TOMBSTONE) && putval == TOMBSTONE ) _size.add(-1); + } + } else { // Else CAS failed + V = _vals[idx]; // Get new value + // If a Prime'd value got installed, we need to re-run the put on the + // new table. Otherwise we lost the CAS to another racing put. + // Simply retry from the start. + if( V instanceof Prime ) + return copy_slot_and_check(idx,expVal).putIfMatch(key,putval,expVal); + } + // Win or lose the CAS, we are done. If we won then we know the update + // happened as expected. If we lost, it means "we won but another thread + // immediately stomped our update with no chance of a reader reading". + return (V==null && expVal!=null) ? TOMBSTONE : V; + } + + // --- tableFull --------------------------------------------------------- + // Heuristic to decide if this table is too full, and we should start a + // new table. Note that if a 'get' call has reprobed too many times and + // decided the table must be full, then always the estimate_sum must be + // high and we must report the table is full. If we do not, then we might + // end up deciding that the table is not full and inserting into the + // current table, while a 'get' has decided the same key cannot be in this + // table because of too many reprobes. The invariant is: + // slots.estimate_sum >= max_reprobe_cnt >= reprobe_limit(len) + private final boolean tableFull( int reprobe_cnt, int len ) { + return + // Do the cheap check first: we allow some number of reprobes always + reprobe_cnt >= REPROBE_LIMIT && + (reprobe_cnt >= reprobe_limit(len) || + // More expensive check: see if the table is > 1/2 full. + _slots.estimate_get() >= (len>>1)); + } + + // --- resize ------------------------------------------------------------ + // Resizing after too many probes. "How Big???" heuristics are here. + // Callers will (not this routine) will 'help_copy' any in-progress copy. + // Since this routine has a fast cutout for copy-already-started, callers + // MUST 'help_copy' lest we have a path which forever runs through + // 'resize' only to discover a copy-in-progress which never progresses. + private final CHM resize() { + // Check for resize already in progress, probably triggered by another thread + CHM newchm = _newchm; // VOLATILE READ + if( newchm != null ) // See if resize is already in progress + return newchm; // Use the new table already + + // No copy in-progress, so start one. First up: compute new table size. + int oldlen = _keys.length; // Old count of K,V pairs allowed + int sz = size(); // Get current table count of active K,V pairs + int newsz = sz; // First size estimate + + // Heuristic to determine new size. We expect plenty of dead-slots-with-keys + // and we need some decent padding to avoid endless reprobing. + if( _nbhml._opt_for_space ) { + // This heuristic leads to a much denser table with a higher reprobe rate + if( sz >= (oldlen>>1) ) // If we are >50% full of keys then... + newsz = oldlen<<1; // Double size + } else { + if( sz >= (oldlen>>2) ) { // If we are >25% full of keys then... + newsz = oldlen<<1; // Double size + if( sz >= (oldlen>>1) ) // If we are >50% full of keys then... + newsz = oldlen<<2; // Double double size + } + } + + // Last (re)size operation was very recent? Then double again + // despite having few live keys; slows down resize operations + // for tables subject to a high key churn rate - but do not + // forever grow the table. If there is a high key churn rate + // the table needs a steady state of rare same-size resize + // operations to clean out the dead keys. + long tm = System.currentTimeMillis(); + if( newsz <= oldlen && // New table would shrink or hold steady? + tm <= _nbhml._last_resize_milli+10000) // Recent resize (less than 10 sec ago) + newsz = oldlen<<1; // Double the existing size + + // Do not shrink, ever. If we hit this size once, assume we + // will again. + if( newsz < oldlen ) newsz = oldlen; + + // Convert to power-of-2 + int log2; + for( log2=MIN_SIZE_LOG; (1< ((len >> 2) + (len >> 1))) throw new RuntimeException("Table is full."); + } + + // Now limit the number of threads actually allocating memory to a + // handful - lest we have 750 threads all trying to allocate a giant + // resized array. + long r = _resizers; + while( !_resizerUpdater.compareAndSet(this,r,r+1) ) + r = _resizers; + // Size calculation: 2 words (K+V) per table entry, plus a handful. We + // guess at 64-bit pointers; 32-bit pointers screws up the size calc by + // 2x but does not screw up the heuristic very much. + long megs = ((((1L<>20/*megs*/; + if( r >= 2 && megs > 0 ) { // Already 2 guys trying; wait and see + newchm = _newchm; // Between dorking around, another thread did it + if( newchm != null ) // See if resize is already in progress + return newchm; // Use the new table already + // We could use a wait with timeout, so we'll wakeup as soon as the new table + // is ready, or after the timeout in any case. + //synchronized( this ) { wait(8*megs); } // Timeout - we always wakeup + // For now, sleep a tad and see if the 2 guys already trying to make + // the table actually get around to making it happen. + try { Thread.sleep(megs); } catch( Exception e ) { /*empty*/} + } + // Last check, since the 'new' below is expensive and there is a chance + // that another thread slipped in a new thread while we ran the heuristic. + newchm = _newchm; + if( newchm != null ) // See if resize is already in progress + return newchm; // Use the new table already + + // New CHM - actually allocate the big arrays + newchm = new CHM(_nbhml,_size,log2); + + // Another check after the slow allocation + if( _newchm != null ) // See if resize is already in progress + return _newchm; // Use the new table already + + // The new table must be CAS'd in so only 1 winner amongst duplicate + // racing resizing threads. Extra CHM's will be GC'd. + if( CAS_newchm( newchm ) ) { // NOW a resize-is-in-progress! + //notifyAll(); // Wake up any sleepers + //long nano = System.nanoTime(); + //System.out.println(" "+nano+" Resize from "+oldlen+" to "+(1< _copyIdxUpdater = + AtomicLongFieldUpdater.newUpdater(CHM.class, "_copyIdx"); + + // Work-done reporting. Used to efficiently signal when we can move to + // the new table. From 0 to len(oldkvs) refers to copying from the old + // table to the new. + volatile long _copyDone= 0; + static private final AtomicLongFieldUpdater _copyDoneUpdater = + AtomicLongFieldUpdater.newUpdater(CHM.class, "_copyDone"); + + // --- help_copy_impl ---------------------------------------------------- + // Help along an existing resize operation. We hope its the top-level + // copy (it was when we started) but this CHM might have been promoted out + // of the top position. + private final void help_copy_impl( final boolean copy_all ) { + final CHM newchm = _newchm; + assert newchm != null; // Already checked by caller + int oldlen = _keys.length; // Total amount to copy + final int MIN_COPY_WORK = Math.min(oldlen,1024); // Limit per-thread work + + // --- + int panic_start = -1; + int copyidx=-9999; // Fool javac to think it's initialized + while( _copyDone < oldlen ) { // Still needing to copy? + // Carve out a chunk of work. The counter wraps around so every + // thread eventually tries to copy every slot repeatedly. + + // We "panic" if we have tried TWICE to copy every slot - and it still + // has not happened. i.e., twice some thread somewhere claimed they + // would copy 'slot X' (by bumping _copyIdx) but they never claimed to + // have finished (by bumping _copyDone). Our choices become limited: + // we can wait for the work-claimers to finish (and become a blocking + // algorithm) or do the copy work ourselves. Tiny tables with huge + // thread counts trying to copy the table often 'panic'. + if( panic_start == -1 ) { // No panic? + copyidx = (int)_copyIdx; + while( !_copyIdxUpdater.compareAndSet(this,copyidx,copyidx+MIN_COPY_WORK) ) + copyidx = (int)_copyIdx; // Re-read + if( !(copyidx < (oldlen<<1)) ) // Panic! + panic_start = copyidx; // Record where we started to panic-copy + } + + // We now know what to copy. Try to copy. + int workdone = 0; + for( int i=0; i 0 ) // Report work-done occasionally + copy_check_and_promote( workdone );// See if we can promote + //for( int i=0; i 0 ) { + while( !_copyDoneUpdater.compareAndSet(this,copyDone,copyDone+workdone) ) { + copyDone = _copyDone; // Reload, retry + assert (copyDone+workdone) <= oldlen; + } + } + + // Check for copy being ALL done, and promote. Note that we might have + // nested in-progress copies and manage to finish a nested copy before + // finishing the top-level copy. We only promote top-level copies. + if( copyDone+workdone == oldlen && // Ready to promote this table? + _nbhml._chm == this && // Looking at the top-level table? + // Attempt to promote + _nbhml.CAS(_chm_offset,this,_newchm) ) { + _nbhml._last_resize_milli = System.currentTimeMillis(); // Record resize time for next check + } + } + + // --- copy_slot --------------------------------------------------------- + // Copy one K/V pair from oldkvs[i] to newkvs. Returns true if we can + // confirm that we set an old-table slot to TOMBPRIME, and only returns after + // updating the new table. We need an accurate confirmed-copy count so + // that we know when we can promote (if we promote the new table too soon, + // other threads may 'miss' on values not-yet-copied from the old table). + // We don't allow any direct updates on the new table, unless they first + // happened to the old table - so that any transition in the new table from + // null to not-null must have been from a copy_slot (or other old-table + // overwrite) and not from a thread directly writing in the new table. + private boolean copy_slot( int idx ) { + // Blindly set the key slot from NO_KEY to some key which hashes here, + // to eagerly stop fresh put's from inserting new values in the old + // table when the old table is mid-resize. We don't need to act on the + // results here, because our correctness stems from box'ing the Value + // field. Slamming the Key field is a minor speed optimization. + long key; + while( (key=_keys[idx]) == NO_KEY ) + CAS_key(idx, NO_KEY, (idx+_keys.length)/*a non-zero key which hashes here*/); + + // --- + // Prevent new values from appearing in the old table. + // Box what we see in the old table, to prevent further updates. + Object oldval = _vals[idx]; // Read OLD table + while( !(oldval instanceof Prime) ) { + final Prime box = (oldval == null || oldval == TOMBSTONE) ? TOMBPRIME : new Prime(oldval); + if( CAS_val(idx,oldval,box) ) { // CAS down a box'd version of oldval + // If we made the Value slot hold a TOMBPRIME, then we both + // prevented further updates here but also the (absent) oldval is + // vaccuously available in the new table. We return with true here: + // any thread looking for a value for this key can correctly go + // straight to the new table and skip looking in the old table. + if( box == TOMBPRIME ) + return true; + // Otherwise we boxed something, but it still needs to be + // copied into the new table. + oldval = box; // Record updated oldval + break; // Break loop; oldval is now boxed by us + } + oldval = _vals[idx]; // Else try, try again + } + if( oldval == TOMBPRIME ) return false; // Copy already complete here! + + // --- + // Copy the value into the new table, but only if we overwrite a null. + // If another value is already in the new table, then somebody else + // wrote something there and that write is happens-after any value that + // appears in the old table. + Object old_unboxed = ((Prime)oldval)._V; + assert old_unboxed != TOMBSTONE; + _newchm.putIfMatch(key, old_unboxed, null); + + // --- + // Finally, now that any old value is exposed in the new table, we can + // forever hide the old-table value by slapping a TOMBPRIME down. This + // will stop other threads from uselessly attempting to copy this slot + // (i.e., it's a speed optimization not a correctness issue). + while( oldval != TOMBPRIME && !CAS_val(idx,oldval,TOMBPRIME) ) + oldval = _vals[idx]; + + return oldval != TOMBPRIME; // True if we slammed the TOMBPRIME down + } // end copy_slot + } // End of CHM + + + // --- Snapshot ------------------------------------------------------------ + // The main class for iterating over the NBHM. It "snapshots" a clean + // view of the K/V array. + private class SnapshotV implements Iterator, Enumeration { + final CHM _sschm; + public SnapshotV() { + CHM topchm; + while( true ) { // Verify no table-copy-in-progress + topchm = _chm; + if( topchm._newchm == null ) // No table-copy-in-progress + break; + // Table copy in-progress - so we cannot get a clean iteration. We + // must help finish the table copy before we can start iterating. + topchm.help_copy_impl(true); + } + // The "linearization point" for the iteration. Every key in this table + // will be visited, but keys added later might be skipped or even be + // added to a following table (also not iterated over). + _sschm = topchm; + // Warm-up the iterator + _idx = -1; + next(); + } + int length() { return _sschm._keys.length; } + long key(final int idx) { return _sschm._keys[idx]; } + private int _idx; // -2 for NO_KEY, -1 for CHECK_NEW_TABLE_LONG, 0-keys.length + private long _nextK, _prevK; // Last 2 keys found + private TypeV _nextV, _prevV; // Last 2 values found + public boolean hasNext() { return _nextV != null; } + public TypeV next() { + // 'next' actually knows what the next value will be - it had to + // figure that out last go 'round lest 'hasNext' report true and + // some other thread deleted the last value. Instead, 'next' + // spends all its effort finding the key that comes after the + // 'next' key. + if( _idx != -1 && _nextV == null ) throw new NoSuchElementException(); + _prevK = _nextK; // This will become the previous key + _prevV = _nextV; // This will become the previous value + _nextV = null; // We have no more next-key + // Attempt to set <_nextK,_nextV> to the next K,V pair. + // _nextV is the trigger: stop searching when it is != null + if( _idx == -1 ) { // Check for NO_KEY + _idx = 0; // Setup for next phase of search + _nextK = NO_KEY; + if( (_nextV=get(_nextK)) != null ) return _prevV; + } + while( _idx elements() { return new SnapshotV(); } + + // --- values -------------------------------------------------------------- + /** Returns a {@link Collection} view of the values contained in this map. + * The collection is backed by the map, so changes to the map are reflected + * in the collection, and vice-versa. The collection supports element + * removal, which removes the corresponding mapping from this map, via the + * Iterator.remove, Collection.remove, + * removeAll, retainAll, and clear operations. + * It does not support the add or addAll operations. + * + *

The view's iterator is a "weakly consistent" iterator that + * will never throw {@link ConcurrentModificationException}, and guarantees + * to traverse elements as they existed upon construction of the iterator, + * and may (but is not guaranteed to) reflect any modifications subsequent + * to construction. */ + public Collection values() { + return new AbstractCollection() { + public void clear ( ) { NonBlockingHashMapLong.this.clear ( ); } + public int size ( ) { return NonBlockingHashMapLong.this.size ( ); } + public boolean contains( Object v ) { return NonBlockingHashMapLong.this.containsValue(v); } + public Iterator iterator() { return new SnapshotV(); } + }; + } + + // --- keySet -------------------------------------------------------------- + /** A class which implements the {@link Iterator} and {@link Enumeration} + * interfaces, generified to the {@link Long} class and supporting a + * non-auto-boxing {@link #nextLong} function. */ + public class IteratorLong implements Iterator, Enumeration { + private final SnapshotV _ss; + /** A new IteratorLong */ + public IteratorLong() { _ss = new SnapshotV(); } + /** Remove last key returned by {@link #next} or {@link #nextLong}. */ + public void remove() { _ss.remove(); } + /** Auto-box and return the next key. */ + public Long next () { _ss.next(); return _ss._prevK; } + /** Return the next key as a primitive {@code long}. */ + public long nextLong() { _ss.next(); return _ss._prevK; } + /** True if there are more keys to iterate over. */ + public boolean hasNext() { return _ss.hasNext(); } + /** Auto-box and return the next key. */ + public Long nextElement() { return next(); } + /** True if there are more keys to iterate over. */ + public boolean hasMoreElements() { return hasNext(); } + } + /** Returns an enumeration of the auto-boxed keys in this table. + * Warning: this version will auto-box all returned keys. + * @return an enumeration of the auto-boxed keys in this table + * @see #keySet() */ + public Enumeration keys() { return new IteratorLong(); } + + /** Returns a {@link Set} view of the keys contained in this map; with care + * the keys may be iterated over without auto-boxing. The + * set is backed by the map, so changes to the map are reflected in the + * set, and vice-versa. The set supports element removal, which removes + * the corresponding mapping from this map, via the + * Iterator.remove, Set.remove, removeAll, + * retainAll, and clear operations. It does not support + * the add or addAll operations. + * + *

The view's iterator is a "weakly consistent" iterator that + * will never throw {@link ConcurrentModificationException}, and guarantees + * to traverse elements as they existed upon construction of the iterator, + * and may (but is not guaranteed to) reflect any modifications subsequent + * to construction. */ + public Set keySet() { + return new AbstractSet () { + public void clear ( ) { NonBlockingHashMapLong.this.clear ( ); } + public int size ( ) { return NonBlockingHashMapLong.this.size ( ); } + public boolean contains( Object k ) { return NonBlockingHashMapLong.this.containsKey(k); } + public boolean remove ( Object k ) { return NonBlockingHashMapLong.this.remove (k) != null; } + public IteratorLong iterator() { return new IteratorLong(); } + }; + } + + /** Keys as a long array. Array may be zero-padded if keys are concurrently deleted. */ + public long[] keySetLong() { + long[] dom = new long[size()]; + IteratorLong i=(IteratorLong)keySet().iterator(); + int j=0; + while( j < dom.length && i.hasNext() ) + dom[j++] = i.nextLong(); + return dom; + } + + // --- entrySet ------------------------------------------------------------ + // Warning: Each call to 'next' in this iterator constructs a new Long and a + // new SimpleImmutableEntry. + private class SnapshotE implements Iterator> { + final SnapshotV _ss; + public SnapshotE() { _ss = new SnapshotV(); } + public void remove() { _ss.remove(); } + public Entry next() { _ss.next(); return new SimpleImmutableEntry<>(_ss._prevK,_ss._prevV); } + public boolean hasNext() { return _ss.hasNext(); } + } + + /** Returns a {@link Set} view of the mappings contained in this map. The + * set is backed by the map, so changes to the map are reflected in the + * set, and vice-versa. The set supports element removal, which removes + * the corresponding mapping from the map, via the + * Iterator.remove, Set.remove, removeAll, + * retainAll, and clear operations. It does not support + * the add or addAll operations. + * + *

The view's iterator is a "weakly consistent" iterator + * that will never throw {@link ConcurrentModificationException}, + * and guarantees to traverse elements as they existed upon + * construction of the iterator, and may (but is not guaranteed to) + * reflect any modifications subsequent to construction. + * + *

Warning: the iterator associated with this Set + * requires the creation of {@link Entry} objects with each + * iteration. The {@link io.rsocket.util.NonBlockingHashMap} + * does not normally create or using {@link Entry} objects so + * they will be created soley to support this iteration. Iterating using + * {@link #keySet} or {@link #values} will be more efficient. In addition, + * this version requires auto-boxing the keys. + */ + public Set> entrySet() { + return new AbstractSet>() { + public void clear ( ) { NonBlockingHashMapLong.this.clear( ); } + public int size ( ) { return NonBlockingHashMapLong.this.size ( ); } + public boolean remove( final Object o ) { + if (!(o instanceof Map.Entry)) return false; + final Entry e = (Entry)o; + return NonBlockingHashMapLong.this.remove(e.getKey(), e.getValue()); + } + public boolean contains(final Object o) { + if (!(o instanceof Map.Entry)) return false; + final Entry e = (Entry)o; + TypeV v = get(e.getKey()); + return v != null && v.equals(e.getValue()); + } + public Iterator> iterator() { return new SnapshotE(); } + }; + } + + // --- writeObject ------------------------------------------------------- + // Write a NBHML to a stream + private void writeObject(java.io.ObjectOutputStream s) throws IOException { + s.defaultWriteObject(); // Write nothing + for( long K : keySet() ) { + final Object V = get(K); // Do an official 'get' + s.writeLong (K); // Write the pair + s.writeObject(V); + } + s.writeLong(NO_KEY); // Sentinel to indicate end-of-data + s.writeObject(null); + } + + // --- readObject -------------------------------------------------------- + // Read a CHM from a stream + private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException { + s.defaultReadObject(); // Read nothing + initialize(MIN_SIZE); + for (;;) { + final long K = s.readLong(); + final TypeV V = (TypeV) s.readObject(); + if( K == NO_KEY && V == null ) break; + put(K,V); // Insert with an offical put + } + } + +} // End NonBlockingHashMapLong class diff --git a/rsocket-core/src/main/java/io/rsocket/util/UnsafeAccess.java b/rsocket-core/src/main/java/io/rsocket/util/UnsafeAccess.java new file mode 100644 index 000000000..f227c79c8 --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/util/UnsafeAccess.java @@ -0,0 +1,81 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.util; + +import sun.misc.Unsafe; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +/** + * Why should we resort to using Unsafe?
+ *

    + *
  1. To construct class fields which allow volatile/ordered/plain access: This requirement is covered by + * {@link AtomicReferenceFieldUpdater} and similar but their performance is arguably worse than the DIY approach + * (depending on JVM version) while Unsafe intrinsification is a far lesser challenge for JIT compilers. + *
  2. To construct flavors of {@link AtomicReferenceArray}. + *
  3. Other use cases exist but are not present in this library yet. + *
+ * + * @author nitsanw + */ +public class UnsafeAccess +{ + public static final boolean SUPPORTS_GET_AND_SET; + public static final Unsafe UNSAFE; + + static + { + Unsafe instance; + try + { + final Field field = Unsafe.class.getDeclaredField("theUnsafe"); + field.setAccessible(true); + instance = (Unsafe) field.get(null); + } + catch (Exception ignored) + { + // Some platforms, notably Android, might not have a sun.misc.Unsafe + // implementation with a private `theUnsafe` static instance. In this + // case we can try and call the default constructor, which proves + // sufficient for Android usage. + try + { + Constructor c = Unsafe.class.getDeclaredConstructor(); + c.setAccessible(true); + instance = c.newInstance(); + } + catch (Exception e) + { + SUPPORTS_GET_AND_SET = false; + throw new RuntimeException(e); + } + } + + boolean getAndSetSupport = false; + try + { + Unsafe.class.getMethod("getAndSetObject", Object.class, Long.TYPE, Object.class); + getAndSetSupport = true; + } + catch (Exception ignored) + { + } + + UNSAFE = instance; + SUPPORTS_GET_AND_SET = getAndSetSupport; + } +} diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketTest.java index 1e4b168f8..7d8b739ab 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketTest.java @@ -87,22 +87,6 @@ public void testChannel() throws Exception { latch.await(); } - - @Test(timeout = 2_000L) - public void testCleanup() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - rule.crs - .requestStream(DefaultPayload.create("hi")) - .doOnError(t -> { - Assert.assertTrue(t instanceof ClosedChannelException); - latch.countDown(); - }) - .subscribe(); - - rule.crs.cleanup(); - - latch.await(); - } public static class SocketRule extends ExternalResource { diff --git a/rsocket-test/build.gradle b/rsocket-test/build.gradle index 8c0243533..499308547 100644 --- a/rsocket-test/build.gradle +++ b/rsocket-test/build.gradle @@ -20,5 +20,5 @@ dependencies { compile "org.mockito:mockito-core:2.10.0" compile "org.hamcrest:hamcrest-library:1.3" compile "org.hdrhistogram:HdrHistogram:2.1.9" - compile "io.projectreactor:reactor-test:3.1.1.RELEASE" + compile "io.projectreactor:reactor-test:3.1.2.RELEASE" } diff --git a/rsocket-transport-netty/build.gradle b/rsocket-transport-netty/build.gradle index 018cf6e25..b5df60b37 100644 --- a/rsocket-transport-netty/build.gradle +++ b/rsocket-transport-netty/build.gradle @@ -16,11 +16,11 @@ dependencies { compile project(':rsocket-core') - compile "io.projectreactor.ipc:reactor-netty:0.7.1.RELEASE" - compile "io.netty:netty-handler:4.1.16.Final" - compile "io.netty:netty-handler-proxy:4.1.16.Final" - compile "io.netty:netty-codec-http:4.1.16.Final" - compile "io.netty:netty-transport-native-epoll:4.1.16.Final" + compile "io.projectreactor.ipc:reactor-netty:0.7.2.RELEASE" + compile "io.netty:netty-handler:4.1.17.Final" + compile "io.netty:netty-handler-proxy:4.1.17.Final" + compile "io.netty:netty-codec-http:4.1.17.Final" + compile "io.netty:netty-transport-native-epoll:4.1.17.Final" testCompile project(':rsocket-test') }