Skip to content

Commit

Permalink
Fixed typo in Reactor's Publisher are not supported by this operator (
Browse files Browse the repository at this point in the history
  • Loading branch information
hexmind authored and RobWin committed Sep 18, 2019
1 parent e94cb88 commit 1a26679
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.github.resilience4j.reactor;

import org.reactivestreams.Publisher;

public class IllegalPublisherException extends IllegalStateException {

public IllegalPublisherException(Publisher publisher) {
super("Publisher of type <" + publisher.getClass().getSimpleName()
+ "> is not supported by this operator");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadFullException;
import org.reactivestreams.Publisher;
import io.github.resilience4j.reactor.IllegalPublisherException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.function.UnaryOperator;

import org.reactivestreams.Publisher;

/**
* A Bulkhead operator which checks if a subscriber/observer can acquire a permission to subscribe to an upstream Publisher.
* Otherwise emits a {@link BulkheadFullException}, if the Bulkhead is full.
Expand Down Expand Up @@ -53,9 +55,8 @@ public Publisher<T> apply(Publisher<T> publisher) {
return new MonoBulkhead<>((Mono<? extends T>) publisher, bulkhead);
} else if (publisher instanceof Flux) {
return new FluxBulkhead<>((Flux<? extends T>) publisher, bulkhead);
} else {
throw new IllegalPublisherException(publisher);
}

throw new IllegalStateException("Publisher of type <" + publisher.getClass().getSimpleName()
+ "> are not supported by this operator");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import org.reactivestreams.Publisher;
import io.github.resilience4j.reactor.IllegalPublisherException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.function.UnaryOperator;

import org.reactivestreams.Publisher;

/**
* A CircuitBreaker operator which checks if a downstream subscriber/observer can acquire a permission to subscribe to an upstream Publisher.
* Otherwise emits a {@link CallNotPermittedException} if the CircuitBreaker is OPEN.
Expand Down Expand Up @@ -54,9 +56,8 @@ public Publisher<T> apply(Publisher<T> publisher) {
return new MonoCircuitBreaker<>((Mono<? extends T>) publisher, circuitBreaker);
} else if (publisher instanceof Flux) {
return new FluxCircuitBreaker<>((Flux<? extends T>) publisher, circuitBreaker);
} else {
throw new IllegalPublisherException(publisher);
}

throw new IllegalStateException("Publisher of type <" + publisher.getClass().getSimpleName()
+ "> are not supported by this operator");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
import org.reactivestreams.Publisher;
import io.github.resilience4j.reactor.IllegalPublisherException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.function.UnaryOperator;

import org.reactivestreams.Publisher;


/**
* A RateLimiter operator which checks if a downstream subscriber/observer can acquire a permission to subscribe to an upstream Publisher.
Expand Down Expand Up @@ -54,9 +56,8 @@ public Publisher<T> apply(Publisher<T> publisher) {
return new MonoRateLimiter<>((Mono<? extends T>) publisher, rateLimiter);
} else if (publisher instanceof Flux) {
return new FluxRateLimiter<>((Flux<? extends T>) publisher, rateLimiter);
} else {
throw new IllegalPublisherException(publisher);
}

throw new IllegalStateException("Publisher of type <" + publisher.getClass().getSimpleName()
+ "> are not supported by this operator");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
*/
package io.github.resilience4j.reactor.retry;

import io.github.resilience4j.reactor.IllegalPublisherException;
import io.github.resilience4j.retry.Retry;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.function.Consumer;
import java.util.function.UnaryOperator;

import org.reactivestreams.Publisher;

/**
* A Reactor Retry operator which wraps a reactive type in a Retry.
* @param <T> the value type of the upstream and downstream
Expand Down Expand Up @@ -60,9 +62,9 @@ public Publisher<T> apply(Publisher<T> publisher) {
return upstream.doOnNext(context::throwExceptionToForceRetryOnResult)
.retryWhen(errors -> errors.doOnNext(throwingConsumerWrapper(context::onError)))
.doOnComplete(context::onComplete);
}
throw new IllegalStateException("Publisher of type <" + publisher.getClass().getSimpleName()
+ "> are not supported by this operator");
} else {
throw new IllegalPublisherException(publisher);
}
}


Expand Down

0 comments on commit 1a26679

Please sign in to comment.