Skip to content

Commit

Permalink
ISPN-9242 Address refactorings brought up in segmented data container
Browse files Browse the repository at this point in the history
review

* Making all DataContainer instances segmented
* Update all references internally to use SegmentedDataContainer
* Removed SSC#UNKNOWN_SEGMENT
* CommandFactory require segment for commands
* Rename SegmentedDataContainer to InternalDataContainer
  • Loading branch information
wburns authored and danberindei committed Jun 15, 2018
1 parent 3cee685 commit 55b67fa
Show file tree
Hide file tree
Showing 106 changed files with 1,433 additions and 1,010 deletions.
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,81 @@
package org.infinispan.commons.util;

import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Predicate;

/**
* Spliterator that only returns entries that pass the given predicate. This spliterator will inherit all of the
* characteristics of the underlying spliterator, except that it won't return {@link Spliterator#SIZED} or
* {@link Spliterator#SUBSIZED}.
* <p>
* The {@link #forEachRemaining(Consumer)} method should provide better performance than calling
* {@link #tryAdvance(Consumer)} until it returns false. This is due to having to capture the argument before testing
* it and finally invoking the provided {@link Consumer}.
* @author wburns
* @since 9.3
*/
public class FilterSpliterator<T> implements CloseableSpliterator<T> {
private final Spliterator<T> spliterator;
private final Predicate<? super T> predicate;

// We assume that spliterator is not used concurrently - normally it is split so we can use these variables safely
private final Consumer<? super T> consumer = t -> current = t;

private T current;

public FilterSpliterator(Spliterator<T> spliterator, Predicate<? super T> predicate) {
this.spliterator = spliterator;
this.predicate = predicate;
}

@Override
public void close() {
if (spliterator instanceof CloseableSpliterator) {
((CloseableSpliterator) spliterator).close();
}
}

@Override
public boolean tryAdvance(Consumer<? super T> action) {
while (spliterator.tryAdvance(consumer)) {
T objectToUse = current;
// If object passes then accept it and return
if (predicate.test(objectToUse)) {
action.accept(objectToUse);
return true;
}
}

return false;
}

@Override
public void forEachRemaining(Consumer<? super T> action) {
spliterator.forEachRemaining(e -> {
if (predicate.test(e)) {
action.accept(e);
}
});
}

@Override
public Spliterator<T> trySplit() {
Spliterator<T> split = spliterator.trySplit();
if (split != null) {
return new FilterSpliterator<>(split, predicate);
}
return null;
}

@Override
public long estimateSize() {
return spliterator.estimateSize();
}

@Override
public int characteristics() {
// Unset the SIZED and SUBSIZED as we don't have an exact amount
return spliterator.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED);
}
}
49 changes: 28 additions & 21 deletions core/src/main/java/org/infinispan/cache/impl/CacheImpl.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.infinispan.atomic.impl.ApplyDelta; import org.infinispan.atomic.impl.ApplyDelta;
import org.infinispan.batch.BatchContainer; import org.infinispan.batch.BatchContainer;
import org.infinispan.commands.CommandsFactory; import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.SegmentSpecificCommand;
import org.infinispan.commands.VisitableCommand; import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.control.LockControlCommand; import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.functional.ReadWriteKeyCommand; import org.infinispan.commands.functional.ReadWriteKeyCommand;
Expand Down Expand Up @@ -81,13 +80,15 @@
import org.infinispan.configuration.global.GlobalConfiguration; import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.container.DataContainer; import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.CacheEntry; import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.context.Flag; import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext; import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextContainer; import org.infinispan.context.InvocationContextContainer;
import org.infinispan.context.InvocationContextFactory; import org.infinispan.context.InvocationContextFactory;
import org.infinispan.context.impl.FlagBitSets; import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.context.impl.TxInvocationContext; import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.DistributionManager; import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.encoding.DataConversion; import org.infinispan.encoding.DataConversion;
import org.infinispan.eviction.EvictionManager; import org.infinispan.eviction.EvictionManager;
import org.infinispan.eviction.PassivationManager; import org.infinispan.eviction.PassivationManager;
Expand Down Expand Up @@ -160,9 +161,10 @@ public class CacheImpl<K, V> implements AdvancedCache<K, V> {
@Inject protected TransactionManager transactionManager; @Inject protected TransactionManager transactionManager;
@Inject protected RpcManager rpcManager; @Inject protected RpcManager rpcManager;
@Inject protected StreamingMarshaller marshaller; @Inject protected StreamingMarshaller marshaller;
@Inject protected KeyPartitioner keyPartitioner;
@Inject private EvictionManager evictionManager; @Inject private EvictionManager evictionManager;
@Inject private InternalExpirationManager<K, V> expirationManager; @Inject private InternalExpirationManager<K, V> expirationManager;
@Inject private DataContainer dataContainer; @Inject private InternalDataContainer dataContainer;
@Inject private EmbeddedCacheManager cacheManager; @Inject private EmbeddedCacheManager cacheManager;
@Inject private LockManager lockManager; @Inject private LockManager lockManager;
@Inject private DistributionManager distributionManager; @Inject private DistributionManager distributionManager;
Expand Down Expand Up @@ -298,7 +300,8 @@ V computeInternal(K key, BiFunction<? super K, ? super V, ? extends V> remapping
Metadata metadata, long flags, ContextBuilder contextBuilder) { Metadata metadata, long flags, ContextBuilder contextBuilder) {
assertKeyNotNull(key); assertKeyNotNull(key);
assertFunctionNotNull(remappingFunction); assertFunctionNotNull(remappingFunction);
ComputeCommand command = commandsFactory.buildComputeCommand(key, remappingFunction, computeIfPresent, metadata, flags); ComputeCommand command = commandsFactory.buildComputeCommand(key, remappingFunction, computeIfPresent,
keyPartitioner.getSegment(key), metadata, flags);
return executeCommandAndCommitIfNeeded(contextBuilder, command, 1); return executeCommandAndCommitIfNeeded(contextBuilder, command, 1);
} }


Expand All @@ -317,7 +320,8 @@ V computeIfAbsentInternal(K key, Function<? super K, ? extends V> mappingFunctio
ContextBuilder contextBuilder) { ContextBuilder contextBuilder) {
assertKeyNotNull(key); assertKeyNotNull(key);
assertFunctionNotNull(mappingFunction); assertFunctionNotNull(mappingFunction);
ComputeIfAbsentCommand command = commandsFactory.buildComputeIfAbsentCommand(key, mappingFunction, metadata, flags); ComputeIfAbsentCommand command = commandsFactory.buildComputeIfAbsentCommand(key, mappingFunction,
keyPartitioner.getSegment(key), metadata, flags);
return executeCommandAndCommitIfNeeded(contextBuilder, command, 1); return executeCommandAndCommitIfNeeded(contextBuilder, command, 1);
} }


Expand Down Expand Up @@ -357,8 +361,8 @@ V mergeInternal(K key, V value, BiFunction<? super V, ? super V, ? extends V> re
assertValueNotNull(value); assertValueNotNull(value);
assertFunctionNotNull(remappingFunction); assertFunctionNotNull(remappingFunction);
ReadWriteKeyCommand<K, V, V> command = commandsFactory.buildReadWriteKeyCommand(key, ReadWriteKeyCommand<K, V, V> command = commandsFactory.buildReadWriteKeyCommand(key,
new MergeFunction<>(value, remappingFunction, metadata), Params.fromFlagsBitSet(flags), new MergeFunction<>(value, remappingFunction, metadata), keyPartitioner.getSegment(key),
getKeyDataConversion(), getValueDataConversion()); Params.fromFlagsBitSet(flags), getKeyDataConversion(), getValueDataConversion());
return executeCommandAndCommitIfNeeded(contextBuilder, command, 1); return executeCommandAndCommitIfNeeded(contextBuilder, command, 1);
} }


Expand Down Expand Up @@ -425,7 +429,7 @@ public final boolean remove(Object key, Object value) {


final boolean remove(Object key, Object value, long explicitFlags, ContextBuilder contextBuilder) { final boolean remove(Object key, Object value, long explicitFlags, ContextBuilder contextBuilder) {
assertKeyValueNotNull(key, value); assertKeyValueNotNull(key, value);
RemoveCommand command = commandsFactory.buildRemoveCommand(key, value, explicitFlags); RemoveCommand command = commandsFactory.buildRemoveCommand(key, value, keyPartitioner.getSegment(key), explicitFlags);
return executeCommandAndCommitIfNeeded(contextBuilder, command, 1); return executeCommandAndCommitIfNeeded(contextBuilder, command, 1);
} }


Expand Down Expand Up @@ -471,13 +475,13 @@ public final V get(Object key) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final V get(Object key, long explicitFlags, InvocationContext ctx) { final V get(Object key, long explicitFlags, InvocationContext ctx) {
assertKeyNotNull(key); assertKeyNotNull(key);
GetKeyValueCommand command = commandsFactory.buildGetKeyValueCommand(key, explicitFlags); GetKeyValueCommand command = commandsFactory.buildGetKeyValueCommand(key, keyPartitioner.getSegment(key), explicitFlags);
return (V) invoker.invoke(ctx, command); return (V) invoker.invoke(ctx, command);
} }


final CacheEntry getCacheEntry(Object key, long explicitFlags, InvocationContext ctx) { final CacheEntry getCacheEntry(Object key, long explicitFlags, InvocationContext ctx) {
assertKeyNotNull(key); assertKeyNotNull(key);
GetCacheEntryCommand command = commandsFactory.buildGetCacheEntryCommand(key, SegmentSpecificCommand.UNKNOWN_SEGMENT, GetCacheEntryCommand command = commandsFactory.buildGetCacheEntryCommand(key, keyPartitioner.getSegment(key),
explicitFlags); explicitFlags);
Object ret = invoker.invoke(ctx, command); Object ret = invoker.invoke(ctx, command);
return (CacheEntry) ret; return (CacheEntry) ret;
Expand All @@ -495,7 +499,7 @@ public CompletableFuture<CacheEntry<K, V>> getCacheEntryAsync(Object key) {


final CompletableFuture<CacheEntry<K,V>> getCacheEntryAsync(Object key, long explicitFlags, InvocationContext ctx) { final CompletableFuture<CacheEntry<K,V>> getCacheEntryAsync(Object key, long explicitFlags, InvocationContext ctx) {
assertKeyNotNull(key); assertKeyNotNull(key);
GetCacheEntryCommand command = commandsFactory.buildGetCacheEntryCommand(key, SegmentSpecificCommand.UNKNOWN_SEGMENT, GetCacheEntryCommand command = commandsFactory.buildGetCacheEntryCommand(key, keyPartitioner.getSegment(key),
explicitFlags); explicitFlags);
return invoker.invokeAsync(ctx, command).thenApply(CacheEntry.class::cast); return invoker.invokeAsync(ctx, command).thenApply(CacheEntry.class::cast);
} }
Expand Down Expand Up @@ -630,12 +634,13 @@ final V remove(Object key, long explicitFlags, ContextBuilder contextBuilder) {


private RemoveCommand createRemoveCommand(Object key, long explicitFlags) { private RemoveCommand createRemoveCommand(Object key, long explicitFlags) {
long flags = addUnsafeFlags(explicitFlags); long flags = addUnsafeFlags(explicitFlags);
return commandsFactory.buildRemoveCommand(key, null, flags); return commandsFactory.buildRemoveCommand(key, null, keyPartitioner.getSegment(key), flags);
} }


@Override @Override
public CompletableFuture<Void> removeLifespanExpired(K key, V value, Long lifespan) { public CompletableFuture<Void> removeLifespanExpired(K key, V value, Long lifespan) {
RemoveExpiredCommand command = commandsFactory.buildRemoveExpiredCommand(key, value, lifespan); RemoveExpiredCommand command = commandsFactory.buildRemoveExpiredCommand(key, value, keyPartitioner.getSegment(key),
lifespan);
// Remove expired returns a boolean - just ignore it, the caller just needs to know that the expired // Remove expired returns a boolean - just ignore it, the caller just needs to know that the expired
// entry is removed when this completes // entry is removed when this completes
CompletableFuture<Boolean> completableFuture = performRemoveExpiredCommand(command); CompletableFuture<Boolean> completableFuture = performRemoveExpiredCommand(command);
Expand All @@ -644,7 +649,7 @@ public CompletableFuture<Void> removeLifespanExpired(K key, V value, Long lifesp


@Override @Override
public CompletableFuture<Boolean> removeMaxIdleExpired(K key, V value) { public CompletableFuture<Boolean> removeMaxIdleExpired(K key, V value) {
RemoveExpiredCommand command = commandsFactory.buildRemoveExpiredCommand(key, value); RemoveExpiredCommand command = commandsFactory.buildRemoveExpiredCommand(key, value, keyPartitioner.getSegment(key));
return performRemoveExpiredCommand(command); return performRemoveExpiredCommand(command);
} }


Expand Down Expand Up @@ -875,7 +880,7 @@ final void evict(K key, long explicitFlags) {
log.evictionDisabled(name); log.evictionDisabled(name);
} }
InvocationContext ctx = createSingleKeyNonTxInvocationContext(); InvocationContext ctx = createSingleKeyNonTxInvocationContext();
EvictCommand command = commandsFactory.buildEvictCommand(key, explicitFlags); EvictCommand command = commandsFactory.buildEvictCommand(key, keyPartitioner.getSegment(key), explicitFlags);
invoker.invoke(ctx, command); invoker.invoke(ctx, command);
} }


Expand Down Expand Up @@ -1020,7 +1025,8 @@ public void applyDelta(K deltaAwareValueKey, Delta delta, Object... locksToAcqui


private ReadWriteKeyValueCommand<K, Object, Object, Object> createApplyDelta(K deltaAwareValueKey, Delta delta, long explicitFlags) { private ReadWriteKeyValueCommand<K, Object, Object, Object> createApplyDelta(K deltaAwareValueKey, Delta delta, long explicitFlags) {
ReadWriteKeyValueCommand<K, Object, Object, Object> command = commandsFactory.buildReadWriteKeyValueCommand( ReadWriteKeyValueCommand<K, Object, Object, Object> command = commandsFactory.buildReadWriteKeyValueCommand(
deltaAwareValueKey, delta, new ApplyDelta<>(marshaller), Params.create(), getKeyDataConversion(), getValueDataConversion()); deltaAwareValueKey, delta, new ApplyDelta<>(marshaller), keyPartitioner.getSegment(deltaAwareValueKey),
Params.create(), getKeyDataConversion(), getValueDataConversion());
command.setFlagsBitSet(explicitFlags); command.setFlagsBitSet(explicitFlags);
return command; return command;
} }
Expand Down Expand Up @@ -1357,7 +1363,7 @@ final V put(K key, V value, Metadata metadata, long explicitFlags, ContextBuilde
private PutKeyValueCommand createPutCommand(K key, V value, Metadata metadata, long explicitFlags) { private PutKeyValueCommand createPutCommand(K key, V value, Metadata metadata, long explicitFlags) {
long flags = addUnsafeFlags(explicitFlags); long flags = addUnsafeFlags(explicitFlags);
Metadata merged = applyDefaultMetadata(metadata); Metadata merged = applyDefaultMetadata(metadata);
return commandsFactory.buildPutKeyValueCommand(key, value, merged, flags); return commandsFactory.buildPutKeyValueCommand(key, value, keyPartitioner.getSegment(key), merged, flags);
} }


private long addIgnoreReturnValuesFlag(long flagBitSet) { private long addIgnoreReturnValuesFlag(long flagBitSet) {
Expand Down Expand Up @@ -1397,7 +1403,8 @@ final V putIfAbsent(K key, V value, Metadata metadata, long explicitFlags, Conte
private PutKeyValueCommand createPutIfAbsentCommand(K key, V value, Metadata metadata, long explicitFlags) { private PutKeyValueCommand createPutIfAbsentCommand(K key, V value, Metadata metadata, long explicitFlags) {
long flags = addUnsafeFlags(explicitFlags); long flags = addUnsafeFlags(explicitFlags);
Metadata merged = applyDefaultMetadata(metadata); Metadata merged = applyDefaultMetadata(metadata);
PutKeyValueCommand command = commandsFactory.buildPutKeyValueCommand(key, value, merged, flags); PutKeyValueCommand command = commandsFactory.buildPutKeyValueCommand(key, value, keyPartitioner.getSegment(key),
merged, flags);
command.setPutIfAbsent(true); command.setPutIfAbsent(true);
command.setValueMatcher(ValueMatcher.MATCH_EXPECTED); command.setValueMatcher(ValueMatcher.MATCH_EXPECTED);
return command; return command;
Expand Down Expand Up @@ -1453,7 +1460,7 @@ final V replace(K key, V value, Metadata metadata, long explicitFlags, ContextBu
private ReplaceCommand createReplaceCommand(K key, V value, Metadata metadata, long explicitFlags) { private ReplaceCommand createReplaceCommand(K key, V value, Metadata metadata, long explicitFlags) {
long flags = addUnsafeFlags(explicitFlags); long flags = addUnsafeFlags(explicitFlags);
Metadata merged = applyDefaultMetadata(metadata); Metadata merged = applyDefaultMetadata(metadata);
return commandsFactory.buildReplaceCommand(key, null, value, merged, flags); return commandsFactory.buildReplaceCommand(key, null, value, keyPartitioner.getSegment(key), merged, flags);
} }


@Override @Override
Expand All @@ -1473,7 +1480,7 @@ final boolean replace(K key, V oldValue, V value, Metadata metadata, long explic


private ReplaceCommand createReplaceConditionalCommand(K key, V oldValue, V value, Metadata metadata, long explicitFlags) { private ReplaceCommand createReplaceConditionalCommand(K key, V oldValue, V value, Metadata metadata, long explicitFlags) {
Metadata merged = applyDefaultMetadata(metadata); Metadata merged = applyDefaultMetadata(metadata);
return commandsFactory.buildReplaceCommand(key, oldValue, value, merged, explicitFlags); return commandsFactory.buildReplaceCommand(key, oldValue, value, keyPartitioner.getSegment(key), merged, explicitFlags);
} }


@Override @Override
Expand Down Expand Up @@ -1560,7 +1567,7 @@ public final CompletableFuture<Boolean> removeAsync(Object key, Object value) {
final CompletableFuture<Boolean> removeAsync(final Object key, final Object value, final long explicitFlags, final CompletableFuture<Boolean> removeAsync(final Object key, final Object value, final long explicitFlags,
ContextBuilder contextBuilder) { ContextBuilder contextBuilder) {
assertKeyValueNotNull(key, value); assertKeyValueNotNull(key, value);
RemoveCommand command = commandsFactory.buildRemoveCommand(key, value, explicitFlags); RemoveCommand command = commandsFactory.buildRemoveCommand(key, value, keyPartitioner.getSegment(key), explicitFlags);
return executeCommandAndCommitIfNeededAsync(contextBuilder, command, 1); return executeCommandAndCommitIfNeededAsync(contextBuilder, command, 1);
} }


Expand Down Expand Up @@ -1613,7 +1620,7 @@ public CompletableFuture<V> getAsync(K key) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
CompletableFuture<V> getAsync(final K key, final long explicitFlags, InvocationContext ctx) { CompletableFuture<V> getAsync(final K key, final long explicitFlags, InvocationContext ctx) {
assertKeyNotNull(key); assertKeyNotNull(key);
GetKeyValueCommand command = commandsFactory.buildGetKeyValueCommand(key, explicitFlags); GetKeyValueCommand command = commandsFactory.buildGetKeyValueCommand(key, keyPartitioner.getSegment(key), explicitFlags);
return (CompletableFuture<V>) invoker.invokeAsync(ctx, command); return (CompletableFuture<V>) invoker.invokeAsync(ctx, command);
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@
import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.format.PropertyFormatter; import org.infinispan.configuration.format.PropertyFormatter;
import org.infinispan.container.DataContainer; import org.infinispan.container.DataContainer;
import org.infinispan.container.impl.InternalEntryFactory;
import org.infinispan.container.entries.CacheEntry; import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry; import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.container.impl.InternalEntryFactory;
import org.infinispan.context.Flag; import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContextContainer; import org.infinispan.context.InvocationContextContainer;
import org.infinispan.context.impl.ImmutableContext; import org.infinispan.context.impl.ImmutableContext;
Expand Down Expand Up @@ -124,7 +125,7 @@ public class SimpleCacheImpl<K, V> implements AdvancedCache<K, V> {
@Inject private ComponentRegistry componentRegistry; @Inject private ComponentRegistry componentRegistry;
@Inject private Configuration configuration; @Inject private Configuration configuration;
@Inject private EmbeddedCacheManager cacheManager; @Inject private EmbeddedCacheManager cacheManager;
@Inject private DataContainer<K, V> dataContainer; @Inject private InternalDataContainer<K, V> dataContainer;
@Inject private CacheNotifier<K, V> cacheNotifier; @Inject private CacheNotifier<K, V> cacheNotifier;
@Inject private TimeService timeService; @Inject private TimeService timeService;


Expand Down Expand Up @@ -548,7 +549,7 @@ public void clear() {
ArrayList<InternalCacheEntry<K, V>> copyEntries; ArrayList<InternalCacheEntry<K, V>> copyEntries;
if (hasListeners) { if (hasListeners) {
copyEntries = new ArrayList<>(dataContainer.sizeIncludingExpired()); copyEntries = new ArrayList<>(dataContainer.sizeIncludingExpired());
dataContainer.iterator().forEachRemaining(entry -> { dataContainer.forEach(entry -> {
copyEntries.add(entry); copyEntries.add(entry);
cacheNotifier.notifyCacheEntryRemoved(entry.getKey(), entry.getValue(), entry.getMetadata(), true, ImmutableContext.INSTANCE, null); cacheNotifier.notifyCacheEntryRemoved(entry.getKey(), entry.getValue(), entry.getMetadata(), true, ImmutableContext.INSTANCE, null);
}); });
Expand Down
Loading

0 comments on commit 55b67fa

Please sign in to comment.