Skip to content

Commit

Permalink
ISPN-12244 Remove RxJavaInterop duplication from core
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanemerson authored and wburns committed Aug 20, 2020
1 parent 5d72601 commit 4ac5c92
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* @since 10.0
*/
public class RxJavaInterop {
private RxJavaInterop() { }
protected RxJavaInterop() { }

/**
* Provides a {@link Function} that can be used to convert from an instance of {@link java.util.Map.Entry} to
Expand Down
35 changes: 1 addition & 34 deletions core/src/main/java/org/infinispan/reactive/RxJavaInterop.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package org.infinispan.reactive;

import java.util.Map;
import java.util.concurrent.CompletionStage;

import org.infinispan.commons.util.Util;
import org.infinispan.util.concurrent.CompletionStages;
import org.reactivestreams.Publisher;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.processors.AsyncProcessor;

/**
Expand All @@ -18,7 +13,7 @@
* @author wburns
* @since 10.0
*/
public class RxJavaInterop {
public class RxJavaInterop extends org.infinispan.commons.reactive.RxJavaInterop {
private RxJavaInterop() { }

public static <R> Flowable<R> voidCompletionStageToFlowable(CompletionStage<Void> stage) {
Expand All @@ -37,32 +32,4 @@ public static <R> Flowable<R> voidCompletionStageToFlowable(CompletionStage<Void

return ap;
}

/**
* Provides a {@link Function} that can be used to convert from an instance of {@link java.util.Map.Entry} to
* the key of the entry. This is useful for the instance passed to a method like {@link Flowable#map(Function)}.
* @param <K> key type
* @param <V> value type
* @return rxjava function to convert from a Map.Entry to its key.
*/
public static <K, V> Function<Map.Entry<K, V>, K> entryToKeyFunction() {
return (Function) entryToKeyFunction;
}

public static <R> Function<? super Throwable, Publisher<R>> cacheExceptionWrapper() {
return (Function) wrapThrowable;
}

public static <R> Function<R, R> identityFunction() {
return (Function) identityFunction;
}

public static <R> Consumer<R> emptyConsumer() {
return (Consumer) emptyConsumer;
}

private static final Function<Object, Object> identityFunction = i -> i;
private static final Consumer<Object> emptyConsumer = ignore -> {};
private static final Function<Map.Entry<Object, Object>, Object> entryToKeyFunction = Map.Entry::getKey;
private static final Function<? super Throwable, Publisher<?>> wrapThrowable = t -> Flowable.error(Util.rewrapAsCacheException(t));
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.marshall.MarshallingException;
import org.infinispan.commons.marshall.ProtoStreamTypeIds;
import org.infinispan.commons.reactive.RxJavaInterop;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
import org.infinispan.configuration.parsing.ParserRegistry;
Expand All @@ -40,7 +41,6 @@
import org.infinispan.protostream.ImmutableSerializationContext;
import org.infinispan.protostream.annotations.ProtoField;
import org.infinispan.protostream.annotations.ProtoTypeId;
import org.infinispan.reactive.RxJavaInterop;
import org.infinispan.reactive.publisher.PublisherTransformers;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import org.infinispan.commons.CacheException;
import org.infinispan.commons.marshall.ProtoStreamTypeIds;
import org.infinispan.commons.reactive.RxJavaInterop;
import org.infinispan.counter.api.CounterConfiguration;
import org.infinispan.counter.api.CounterManager;
import org.infinispan.counter.api.CounterType;
Expand Down Expand Up @@ -87,9 +88,7 @@ public CompletionStage<Void> backup() {
return e;
})
.doOnNext(e -> writeMessageStream(e, serCtx, output))
.doOnError(t -> {
throw new CacheException("Unable to create counter backup", t);
}),
.onErrorResumeNext(RxJavaInterop.cacheExceptionWrapper()),
OutputStream::close
), "write-counters");
}
Expand Down

0 comments on commit 4ac5c92

Please sign in to comment.