Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[9.4.x] REPL read opt perf #7777

Merged
merged 1 commit into from
Feb 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand All @@ -28,7 +27,6 @@
import org.infinispan.commons.util.FilterSpliterator;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IteratorMapper;
import org.infinispan.commons.util.PeekableMap;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.ImmortalCacheEntry;
Expand Down Expand Up @@ -71,12 +69,12 @@ public abstract class AbstractInternalDataContainer<K, V> implements InternalDat

protected final List<Consumer<Iterable<InternalCacheEntry<K, V>>>> listeners = new CopyOnWriteArrayList<>();

protected abstract ConcurrentMap<K, InternalCacheEntry<K, V>> getMapForSegment(int segment);
protected abstract PeekableTouchableMap<K, InternalCacheEntry<K, V>> getMapForSegment(int segment);
protected abstract int getSegmentForKey(Object key);

@Override
public InternalCacheEntry<K, V> get(int segment, Object k) {
ConcurrentMap<K, InternalCacheEntry<K, V>> map = getMapForSegment(segment);
PeekableTouchableMap<K, InternalCacheEntry<K, V>> map = getMapForSegment(segment);
InternalCacheEntry<K, V> e = map != null ? map.get(k) : null;
if (e != null && e.canExpire()) {
long currentTimeMillis = timeService.wallClockTime();
Expand All @@ -97,12 +95,9 @@ public InternalCacheEntry<K, V> get(Object k) {

@Override
public InternalCacheEntry<K, V> peek(int segment, Object k) {
ConcurrentMap<K, InternalCacheEntry<K, V>> entries = getMapForSegment(segment);
PeekableTouchableMap<K, InternalCacheEntry<K, V>> entries = getMapForSegment(segment);
if (entries != null) {
if (entries instanceof PeekableMap) {
return ((PeekableMap<K, InternalCacheEntry<K, V>>) entries).peek(k);
}
return entries.get(k);
return entries.peek(k);
}
return null;
}
Expand All @@ -114,21 +109,16 @@ public InternalCacheEntry<K, V> peek(Object k) {

@Override
public boolean touch(int segment, Object k, long currentTimeMillis) {
ConcurrentMap<K, InternalCacheEntry<K, V>> entries = getMapForSegment(segment);
PeekableTouchableMap<K, InternalCacheEntry<K, V>> entries = getMapForSegment(segment);
if (entries != null) {
// We use get to also update eviction recency access
InternalCacheEntry<K, V> entry = entries.get(k);
if (entry != null) {
entry.touch(currentTimeMillis);
return true;
}
return entries.touchKey(k, currentTimeMillis);
}
return false;
}

@Override
public void put(int segment, K k, V v, Metadata metadata, long createdTimestamp, long lastUseTimestamp) {
ConcurrentMap<K, InternalCacheEntry<K, V>> entries = getMapForSegment(segment);
PeekableTouchableMap<K, InternalCacheEntry<K, V>> entries = getMapForSegment(segment);
if (entries != null) {
boolean l1Entry = false;
if (metadata instanceof L1Metadata) {
Expand Down Expand Up @@ -195,7 +185,7 @@ public boolean containsKey(Object k) {

@Override
public InternalCacheEntry<K, V> remove(int segment, Object k) {
ConcurrentMap<K, InternalCacheEntry<K, V>> entries = getMapForSegment(segment);
PeekableTouchableMap<K, InternalCacheEntry<K, V>> entries = getMapForSegment(segment);
if (entries != null) {
final ByRef<InternalCacheEntry<K, V>> reference = new ByRef<>(null);
entries.compute((K) k, (key, entry) -> {
Expand Down Expand Up @@ -223,7 +213,7 @@ public InternalCacheEntry<K, V> remove(Object k) {

@Override
public void evict(int segment, K key) {
ConcurrentMap<K, InternalCacheEntry<K, V>> entries = getMapForSegment(segment);
PeekableTouchableMap<K, InternalCacheEntry<K, V>> entries = getMapForSegment(segment);
if (entries != null) {
entries.computeIfPresent(key, (o, entry) -> {
passivator.running().passivate(entry);
Expand All @@ -240,7 +230,7 @@ public void evict(K key) {

@Override
public InternalCacheEntry<K, V> compute(int segment, K key, DataContainer.ComputeAction<K, V> action) {
ConcurrentMap<K, InternalCacheEntry<K, V>> entries = getMapForSegment(segment);
PeekableTouchableMap<K, InternalCacheEntry<K, V>> entries = getMapForSegment(segment);
return entries != null ? entries.compute(key, (k, oldEntry) -> {
InternalCacheEntry<K, V> newEntry = action.compute(k, oldEntry, entryFactory);
if (newEntry == oldEntry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
import java.util.Optional;
import java.util.PrimitiveIterator;
import java.util.Spliterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.infinispan.commons.util.FlattenSpliterator;
import org.infinispan.commons.util.ConcatIterator;
import org.infinispan.commons.util.EntrySizeCalculator;
import org.infinispan.commons.util.FlattenSpliterator;
import org.infinispan.commons.util.IntSet;
import org.infinispan.container.entries.CacheEntrySizeCalculator;
import org.infinispan.container.entries.InternalCacheEntry;
Expand All @@ -40,10 +39,10 @@
*/
public class BoundedSegmentedDataContainer<K, V> extends DefaultSegmentedDataContainer<K, V> {
protected final Cache<K, InternalCacheEntry<K, V>> evictionCache;
protected final ConcurrentMap<K, InternalCacheEntry<K, V>> entries;
protected final PeekableTouchableMap<K, InternalCacheEntry<K, V>> entries;

public BoundedSegmentedDataContainer(int numSegments, long thresholdSize, EvictionType thresholdPolicy) {
super(ConcurrentHashMap::new, numSegments);
super(PeekableTouchableContainerMap::new, numSegments);

Caffeine<K, InternalCacheEntry<K, V>> caffeine = caffeineBuilder();

Expand All @@ -61,20 +60,20 @@ public BoundedSegmentedDataContainer(int numSegments, long thresholdSize, Evicti
}
DefaultEvictionListener evictionListener = new DefaultEvictionListener();
evictionCache = applyListener(caffeine, evictionListener, new SegmentMapUpdater()).build();
entries = evictionCache.asMap();
entries = new PeekableTouchableContainerMap<>(evictionCache.asMap());
}

public BoundedSegmentedDataContainer(int numSegments, long thresholdSize,
EntrySizeCalculator<? super K, ? super InternalCacheEntry<K, V>> sizeCalculator) {
super(ConcurrentHashMap::new, numSegments);
super(PeekableTouchableContainerMap::new, numSegments);
DefaultEvictionListener evictionListener = new DefaultEvictionListener();

evictionCache = applyListener(Caffeine.newBuilder()
.weigher((K k, InternalCacheEntry<K, V> v) -> (int) sizeCalculator.calculateSize(k, v))
.maximumWeight(thresholdSize), evictionListener, new SegmentMapUpdater())
.build();

entries = evictionCache.asMap();
entries = new PeekableTouchableContainerMap<>(evictionCache.asMap());
}

/**
Expand Down Expand Up @@ -112,7 +111,7 @@ protected void computeEntryRemoved(K key, InternalCacheEntry<K, V> value) {
}

@Override
public ConcurrentMap<K, InternalCacheEntry<K, V>> getMapForSegment(int segment) {
public PeekableTouchableMap<K, InternalCacheEntry<K, V>> getMapForSegment(int segment) {
// All writes and other ops go directly to the caffeine cache
return entries;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.Spliterator;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;

import org.infinispan.commons.logging.Log;
Expand Down Expand Up @@ -46,12 +45,12 @@ public class DefaultDataContainer<K, V> extends AbstractInternalDataContainer<K,

private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());

private final ConcurrentMap<K, InternalCacheEntry<K, V>> entries;
private final PeekableTouchableMap<K, InternalCacheEntry<K, V>> entries;
private final Cache<K, InternalCacheEntry<K, V>> evictionCache;

public DefaultDataContainer(int concurrencyLevel) {
// If no comparing implementations passed, could fallback on JDK CHM
entries = CollectionFactory.makeConcurrentParallelMap(128, concurrencyLevel);
entries = new PeekableTouchableContainerMap<>(CollectionFactory.makeConcurrentParallelMap(128, concurrencyLevel));
evictionCache = null;
}

Expand All @@ -72,7 +71,7 @@ protected DefaultDataContainer(int concurrencyLevel, long thresholdSize, Evictio
throw new UnsupportedOperationException("Policy not supported: " + thresholdPolicy);
}
evictionCache = applyListener(caffeine, evictionListener, null).build();
entries = evictionCache.asMap();
entries = new PeekableTouchableContainerMap<>(evictionCache.asMap());
}

/**
Expand Down Expand Up @@ -100,7 +99,7 @@ protected DefaultDataContainer(long thresholdSize,
.maximumWeight(thresholdSize), evictionListener, null)
.build();

entries = evictionCache.asMap();
entries = new PeekableTouchableContainerMap<>(evictionCache.asMap());
}

public static <K, V> DefaultDataContainer<K, V> boundedDataContainer(int concurrencyLevel, long maxEntries,
Expand All @@ -118,7 +117,7 @@ public static <K, V> DefaultDataContainer<K, V> unBoundedDataContainer(int concu
}

@Override
protected ConcurrentMap<K, InternalCacheEntry<K, V>> getMapForSegment(int segment) {
protected PeekableTouchableMap<K, InternalCacheEntry<K, V>> getMapForSegment(int segment) {
return entries;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ public class DefaultSegmentedDataContainer<K, V> extends AbstractInternalDataCon
private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
private static final boolean trace = log.isTraceEnabled();

protected final AtomicReferenceArray<ConcurrentMap<K, InternalCacheEntry<K, V>>> maps;
protected final Supplier<ConcurrentMap<K, InternalCacheEntry<K, V>>> mapSupplier;
protected final AtomicReferenceArray<PeekableTouchableMap<K, InternalCacheEntry<K, V>>> maps;
protected final Supplier<PeekableTouchableMap<K, InternalCacheEntry<K, V>>> mapSupplier;
protected boolean shouldStopSegments;


public DefaultSegmentedDataContainer(Supplier<ConcurrentMap<K, InternalCacheEntry<K, V>>> mapSupplier, int numSegments) {
public DefaultSegmentedDataContainer(Supplier<PeekableTouchableMap<K, InternalCacheEntry<K, V>>> mapSupplier, int numSegments) {
maps = new AtomicReferenceArray<>(numSegments);
this.mapSupplier = Objects.requireNonNull(mapSupplier);
}
Expand Down Expand Up @@ -78,7 +78,7 @@ public int getSegmentForKey(Object key) {
}

@Override
public ConcurrentMap<K, InternalCacheEntry<K, V>> getMapForSegment(int segment) {
public PeekableTouchableMap<K, InternalCacheEntry<K, V>> getMapForSegment(int segment) {
return maps.get(segment);
}

Expand Down Expand Up @@ -234,11 +234,11 @@ public void removeSegments(IntSet segments) {

private void startNewMap(int segment) {
if (maps.get(segment) == null) {
ConcurrentMap<K, InternalCacheEntry<K, V>> newMap = mapSupplier.get();
PeekableTouchableContainerMap<K, V> newMap = new PeekableTouchableContainerMap<>(mapSupplier.get());
// Just in case of concurrent starts - this shouldn't be possible
if (!maps.compareAndSet(segment, null, newMap) && newMap instanceof AutoCloseable) {
if (!maps.compareAndSet(segment, null, newMap) && newMap.delegate() instanceof AutoCloseable) {
try {
((AutoCloseable) newMap).close();
((AutoCloseable) newMap.delegate()).close();
} catch (Exception e) {
throw new CacheException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
* @since 9.3
*/
public class L1SegmentedDataContainer<K, V> extends DefaultSegmentedDataContainer<K, V> {
private final ConcurrentMap<K, InternalCacheEntry<K, V>> nonOwnedEntries;
private final PeekableTouchableMap<K, InternalCacheEntry<K, V>> nonOwnedEntries;

public L1SegmentedDataContainer(Supplier<ConcurrentMap<K, InternalCacheEntry<K, V>>> mapSupplier, int numSegments) {
public L1SegmentedDataContainer(Supplier<PeekableTouchableMap<K, InternalCacheEntry<K, V>>> mapSupplier, int numSegments) {
super(mapSupplier, numSegments);
this.nonOwnedEntries = mapSupplier.get();
}
Expand All @@ -49,8 +49,8 @@ public void stop() {
}

@Override
public ConcurrentMap<K, InternalCacheEntry<K, V>> getMapForSegment(int segment) {
ConcurrentMap<K, InternalCacheEntry<K, V>> map = super.getMapForSegment(segment);
public PeekableTouchableMap<K, InternalCacheEntry<K, V>> getMapForSegment(int segment) {
PeekableTouchableMap<K, InternalCacheEntry<K, V>> map = super.getMapForSegment(segment);
if (map == null) {
map = nonOwnedEntries;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.infinispan.container.impl;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.infinispan.commons.util.AbstractDelegatingConcurrentMap;
import org.infinispan.container.entries.InternalCacheEntry;

public class PeekableTouchableContainerMap<K, V> extends AbstractDelegatingConcurrentMap<K, InternalCacheEntry<K, V>>
implements PeekableTouchableMap<K, InternalCacheEntry<K, V>> {
private final ConcurrentMap<K, InternalCacheEntry<K, V>> map;

public PeekableTouchableContainerMap() {
this(new ConcurrentHashMap<>());
}

public PeekableTouchableContainerMap(ConcurrentMap<K, InternalCacheEntry<K, V>> map) {
this.map = map;
}

@Override
protected ConcurrentMap<K, InternalCacheEntry<K, V>> delegate() {
return map;
}

@Override
public InternalCacheEntry<K, V> peek(Object key) {
return delegate().get(key);
}

@Override
public boolean touchKey(Object key, long currentTimeMillis) {
InternalCacheEntry<K, V> ice = peek(key);
if (ice != null) {
ice.touch(currentTimeMillis);
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.infinispan.container.impl;

import java.util.concurrent.ConcurrentMap;

import org.infinispan.commons.util.PeekableMap;

public interface PeekableTouchableMap<K, V> extends PeekableMap<K, V>, TouchableMap, ConcurrentMap<K, V> {
}
12 changes: 12 additions & 0 deletions core/src/main/java/org/infinispan/container/impl/TouchableMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.infinispan.container.impl;

public interface TouchableMap {
/**
* Touches the entry for the given key in this map. This method will update any recency timestamps for both
* expiration or eviction as needed.
* @param key key to touch
* @param currentTimeMillis the recency timestamp to set
* @return whether the entry was touched or not
*/
boolean touchKey(Object key, long currentTimeMillis);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

import org.infinispan.commons.api.Lifecycle;
import org.infinispan.commons.marshall.WrappedBytes;
import org.infinispan.commons.util.PeekableMap;
import org.infinispan.commons.util.ProcessorInfo;
import org.infinispan.commons.util.Util;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.PeekableTouchableMap;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

Expand All @@ -31,7 +31,7 @@
* @since 9.4
*/
public class OffHeapConcurrentMap implements ConcurrentMap<WrappedBytes, InternalCacheEntry<WrappedBytes, WrappedBytes>>,
PeekableMap<WrappedBytes, InternalCacheEntry<WrappedBytes, WrappedBytes>>, Lifecycle {
PeekableTouchableMap<WrappedBytes, InternalCacheEntry<WrappedBytes, WrappedBytes>>, Lifecycle {
private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
private static final boolean trace = log.isTraceEnabled();

Expand All @@ -55,6 +55,12 @@ public class OffHeapConcurrentMap implements ConcurrentMap<WrappedBytes, Interna
// This variable should always be read first after acquiring either the read or write lock
private boolean dellocated = false;

@Override
public boolean touchKey(Object key, long currentTimeMillis) {
// OFF HEAP does not support max idle in this version - just say it wasn't touched
return false;
}

/**
* Listener interface that is notified when certain operations occur for various memory addresses. Note that when
* this listener is used certain operations are not performed and require the listener to do these instead. Please
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.util.Iterator;
import java.util.Spliterator;
import java.util.concurrent.ConcurrentMap;

import org.infinispan.commons.marshall.WrappedBytes;
import org.infinispan.commons.util.FilterIterator;
Expand All @@ -13,6 +12,7 @@
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.AbstractInternalDataContainer;
import org.infinispan.container.impl.InternalDataContainerAdapter;
import org.infinispan.container.impl.PeekableTouchableMap;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
Expand Down Expand Up @@ -51,7 +51,7 @@ public void stop() {
}

@Override
protected ConcurrentMap<WrappedBytes, InternalCacheEntry<WrappedBytes, WrappedBytes>> getMapForSegment(int segment) {
protected PeekableTouchableMap<WrappedBytes, InternalCacheEntry<WrappedBytes, WrappedBytes>> getMapForSegment(int segment) {
return map;
}

Expand Down