Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-12244 Remove RxJavaInterop duplication from core #8636

Merged
merged 1 commit into from Aug 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -16,7 +16,7 @@
* @since 10.0
*/
public class RxJavaInterop {
private RxJavaInterop() { }
protected RxJavaInterop() { }

/**
* Provides a {@link Function} that can be used to convert from an instance of {@link java.util.Map.Entry} to
Expand Down
35 changes: 1 addition & 34 deletions core/src/main/java/org/infinispan/reactive/RxJavaInterop.java
@@ -1,15 +1,10 @@
package org.infinispan.reactive;

import java.util.Map;
import java.util.concurrent.CompletionStage;

import org.infinispan.commons.util.Util;
import org.infinispan.util.concurrent.CompletionStages;
import org.reactivestreams.Publisher;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.processors.AsyncProcessor;

/**
Expand All @@ -18,7 +13,7 @@
* @author wburns
* @since 10.0
*/
public class RxJavaInterop {
public class RxJavaInterop extends org.infinispan.commons.reactive.RxJavaInterop {
private RxJavaInterop() { }

public static <R> Flowable<R> voidCompletionStageToFlowable(CompletionStage<Void> stage) {
Expand All @@ -37,32 +32,4 @@ public static <R> Flowable<R> voidCompletionStageToFlowable(CompletionStage<Void

return ap;
}

/**
* Provides a {@link Function} that can be used to convert from an instance of {@link java.util.Map.Entry} to
* the key of the entry. This is useful for the instance passed to a method like {@link Flowable#map(Function)}.
* @param <K> key type
* @param <V> value type
* @return rxjava function to convert from a Map.Entry to its key.
*/
public static <K, V> Function<Map.Entry<K, V>, K> entryToKeyFunction() {
return (Function) entryToKeyFunction;
}

public static <R> Function<? super Throwable, Publisher<R>> cacheExceptionWrapper() {
return (Function) wrapThrowable;
}

public static <R> Function<R, R> identityFunction() {
return (Function) identityFunction;
}

public static <R> Consumer<R> emptyConsumer() {
return (Consumer) emptyConsumer;
}

private static final Function<Object, Object> identityFunction = i -> i;
private static final Consumer<Object> emptyConsumer = ignore -> {};
private static final Function<Map.Entry<Object, Object>, Object> entryToKeyFunction = Map.Entry::getKey;
private static final Function<? super Throwable, Publisher<?>> wrapThrowable = t -> Flowable.error(Util.rewrapAsCacheException(t));
}
Expand Up @@ -23,6 +23,7 @@
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.marshall.MarshallingException;
import org.infinispan.commons.marshall.ProtoStreamTypeIds;
import org.infinispan.commons.reactive.RxJavaInterop;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
import org.infinispan.configuration.parsing.ParserRegistry;
Expand All @@ -40,7 +41,6 @@
import org.infinispan.protostream.ImmutableSerializationContext;
import org.infinispan.protostream.annotations.ProtoField;
import org.infinispan.protostream.annotations.ProtoTypeId;
import org.infinispan.reactive.RxJavaInterop;
import org.infinispan.reactive.publisher.PublisherTransformers;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
Expand Down
Expand Up @@ -14,6 +14,7 @@

import org.infinispan.commons.CacheException;
import org.infinispan.commons.marshall.ProtoStreamTypeIds;
import org.infinispan.commons.reactive.RxJavaInterop;
import org.infinispan.counter.api.CounterConfiguration;
import org.infinispan.counter.api.CounterManager;
import org.infinispan.counter.api.CounterType;
Expand Down Expand Up @@ -87,9 +88,7 @@ public CompletionStage<Void> backup() {
return e;
})
.doOnNext(e -> writeMessageStream(e, serCtx, output))
.doOnError(t -> {
throw new CacheException("Unable to create counter backup", t);
}),
.onErrorResumeNext(RxJavaInterop.cacheExceptionWrapper()),
OutputStream::close
), "write-counters");
}
Expand Down