Skip to content

Commit

Permalink
ISPN-12142 Implement conditional methods for RemoteCache compute
Browse files Browse the repository at this point in the history
  • Loading branch information
jabolina authored and wburns committed May 19, 2023
1 parent 72fc8f0 commit 279c9c9
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 22 deletions.
Expand Up @@ -64,6 +64,7 @@
import org.infinispan.commons.util.CloseableIteratorSet;
import org.infinispan.commons.util.Closeables;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.query.dsl.Query;
import org.reactivestreams.Publisher;

Expand Down Expand Up @@ -362,12 +363,40 @@ public CompletableFuture<V> computeAsync(K key, BiFunction<? super K, ? super V,

@Override
public CompletableFuture<V> computeIfAbsentAsync(K key, Function<? super K, ? extends V> mappingFunction, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
throw new UnsupportedOperationException();
CompletableFuture<V> cf = getAsync(key);
return cf.thenCompose(oldValue -> {
if (oldValue != null) return CompletableFuture.completedFuture(oldValue);

V newValue = mappingFunction.apply(key);
if (newValue == null) return CompletableFutures.completedNull();

return putIfAbsentAsync(key, newValue, lifespan, lifespanUnit, maxIdle, maxIdleUnit)
.thenApply(v -> v == null ? newValue : v);
});
}

@Override
public CompletableFuture<V> computeIfPresentAsync(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
throw new UnsupportedOperationException();
CompletableFuture<MetadataValue<V>> cf = getWithMetadataAsync(key);
return cf.thenCompose(metadata -> {
if (metadata == null || metadata.getValue() == null) return CompletableFutures.completedNull();

V newValue = remappingFunction.apply(key, metadata.getValue());
CompletableFuture<Boolean> done;
if (newValue == null) {
done = removeWithVersionAsync(key, metadata.getVersion());
} else {
done = replaceWithVersionAsync(key, newValue, metadata.getVersion(), lifespan, lifespanUnit, maxIdle, maxIdleUnit);
}

return done.thenCompose(success -> {
if (success) {
return CompletableFuture.completedFuture(newValue);
}

return computeIfPresentAsync(key, remappingFunction, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
});
});
}

@Override
Expand Down
Expand Up @@ -4,6 +4,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;

import javax.transaction.SystemException;
Expand Down Expand Up @@ -208,6 +209,16 @@ public CompletableFuture<Boolean> removeAsync(Object key, Object value) {
txContext.compute((K) key, entry -> removeEntryIfEquals(entry, value), remoteGet);
}

@Override
public CompletableFuture<V> computeIfAbsentAsync(K key, Function<? super K, ? extends V> mappingFunction, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
throw new UnsupportedOperationException();
}

@Override
public CompletableFuture<V> computeIfPresentAsync(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
throw new UnsupportedOperationException();
}

@Override
public TransactionManager getTransactionManager() {
return transactionManager;
Expand Down
Expand Up @@ -11,7 +11,6 @@
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;

import org.infinispan.client.hotrod.exceptions.TransportException;
Expand Down Expand Up @@ -105,33 +104,45 @@ public void testComputeIfAbsentMethods(Method method) {

final K targetKey = kvGenerator.generateKey(method, 0);

Function<? super K, ? extends V> remappingFunction = key ->
kvGenerator.generateValue(method, 1);
V value = kvGenerator.generateValue(method, 1);
assertNull(cache.computeIfAbsent(targetKey, ignore -> null));

Exceptions.expectException(UnsupportedOperationException.class, () -> cache.computeIfAbsent(targetKey, remappingFunction));
Exceptions.expectException(UnsupportedOperationException.class, () -> cache.computeIfAbsent(targetKey, remappingFunction, 1, TimeUnit.SECONDS));
Exceptions.expectException(UnsupportedOperationException.class, () -> cache.computeIfAbsent(targetKey, remappingFunction, 1, TimeUnit.SECONDS, 10, TimeUnit.SECONDS));
// Exception are only thrown when value not exists.
expectException(TransportException.class, RuntimeException.class, "expected exception", () ->
cache.computeIfAbsent(targetKey, ignore -> { throw new RuntimeException("expected exception"); }));

Exceptions.expectException(UnsupportedOperationException.class, () -> cache.computeIfAbsentAsync(targetKey, remappingFunction));
Exceptions.expectException(UnsupportedOperationException.class, () -> cache.computeIfAbsentAsync(targetKey, remappingFunction, 1, TimeUnit.SECONDS));
Exceptions.expectException(UnsupportedOperationException.class, () -> cache.computeIfAbsentAsync(targetKey, remappingFunction, 1, TimeUnit.SECONDS, 10, TimeUnit.SECONDS));
kvGenerator.assertValueEquals(value, cache.computeIfAbsent(targetKey, ignore -> value));
kvGenerator.assertValueEquals(value, cache.get(targetKey));
kvGenerator.assertValueEquals(value, cache.computeIfAbsent(targetKey, ignore -> kvGenerator.generateValue(method, 2)));
kvGenerator.assertValueEquals(value, cache.get(targetKey));

K anotherKey = kvGenerator.generateKey(method, 1);
V anotherValue = kvGenerator.generateValue(method, 3);
kvGenerator.assertValueEquals(anotherValue, cache.computeIfAbsent(anotherKey, ignore -> anotherValue, 1, TimeUnit.MINUTES, 3, TimeUnit.MINUTES));
}

public void testComputeIfPresentMethods(Method method) {
RemoteCache<K, V> cache = remoteCache();

final K targetKey = kvGenerator.generateKey(method, 0);

BiFunction<? super K, ? super V, ? extends V> remappingFunction = (key, value) ->
kvGenerator.generateValue(method, 1);

Exceptions.expectException(UnsupportedOperationException.class, () -> cache.computeIfPresent(targetKey, remappingFunction));
Exceptions.expectException(UnsupportedOperationException.class, () -> cache.computeIfPresent(targetKey, remappingFunction, 1, TimeUnit.SECONDS));
Exceptions.expectException(UnsupportedOperationException.class, () -> cache.computeIfPresent(targetKey, remappingFunction, 1, TimeUnit.SECONDS, 10, TimeUnit.SECONDS));

Exceptions.expectException(UnsupportedOperationException.class, () -> cache.computeIfPresentAsync(targetKey, remappingFunction));
Exceptions.expectException(UnsupportedOperationException.class, () -> cache.computeIfPresentAsync(targetKey, remappingFunction, 1, TimeUnit.SECONDS));
Exceptions.expectException(UnsupportedOperationException.class, () -> cache.computeIfPresentAsync(targetKey, remappingFunction, 1, TimeUnit.SECONDS, 10, TimeUnit.SECONDS));
V value = kvGenerator.generateValue(method, 0);
assertNull(cache.computeIfPresent(targetKey, (k, v) -> value));
assertNull(cache.get(targetKey));
assertNull(cache.put(targetKey, value));
kvGenerator.assertValueEquals(value, cache.get(targetKey));

V anotherValue = kvGenerator.generateValue(method, 1);
kvGenerator.assertValueEquals(anotherValue, cache.computeIfPresent(targetKey, (k, v) -> anotherValue));
kvGenerator.assertValueEquals(anotherValue, cache.get(targetKey));

// Exception are only thrown if a value exists.
expectException(TransportException.class, RuntimeException.class, "expected exception", () ->
cache.computeIfPresent(targetKey, (k, v) -> { throw new RuntimeException("expected exception"); }));

int beforeSize = cache.size();
assertNull(cache.computeIfPresent(targetKey, (k, v) -> null));
assertNull(cache.get(targetKey));
assertEquals(beforeSize - 1, cache.size());
}

public void testMergeMethods(Method method) {
Expand Down

0 comments on commit 279c9c9

Please sign in to comment.