Skip to content

Commit

Permalink
back to CorePublisher
Browse files Browse the repository at this point in the history
  • Loading branch information
bsideup committed Jan 17, 2019
1 parent 886c225 commit d53b85a
Show file tree
Hide file tree
Showing 10 changed files with 242 additions and 196 deletions.
48 changes: 48 additions & 0 deletions reactor-core/src/main/java/reactor/core/CorePublisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2011-2019 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core;

import java.util.function.Function;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Hooks;
import reactor.util.context.Context;

/**
* A {@link CoreSubscriber} aware publisher.
*
*
* @param <T> the {@link CoreSubscriber} data type
*
* @since 3.2.6
*/
public interface CorePublisher<T> extends Publisher<T> {

/**
* An internal {@link Publisher#subscribe(Subscriber)} that will bypass
* {@link Hooks#onLastOperator(Function)} pointcut.
* <p>
* In addition to behave as expected by {@link Publisher#subscribe(Subscriber)}
* in a controlled manner, it supports direct subscribe-time {@link Context} passing.
*
* @param subscriber the {@link Subscriber} interested into the published sequence
* @see Publisher#subscribe(Subscriber)
*/
void subscribe(CoreSubscriber<? super T> subscriber);

}
15 changes: 9 additions & 6 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CorePublisher;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
Expand Down Expand Up @@ -110,7 +111,7 @@
*
* @see Mono
*/
public abstract class Flux<T> implements Publisher<T> {
public abstract class Flux<T> implements CorePublisher<T> {

// ==============================================================================================================
// Static Generators
Expand Down Expand Up @@ -2340,7 +2341,7 @@ public final <P> P as(Function<? super Flux<T>, P> transformer) {
@Nullable
public final T blockFirst() {
BlockingFirstSubscriber<T> subscriber = new BlockingFirstSubscriber<>();
onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber));
Hooks.onLastOperator(this).subscribe(Operators.toCoreSubscriber(subscriber));
return subscriber.blockingGet();
}

Expand All @@ -2363,7 +2364,7 @@ public final T blockFirst() {
@Nullable
public final T blockFirst(Duration timeout) {
BlockingFirstSubscriber<T> subscriber = new BlockingFirstSubscriber<>();
onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber));
Hooks.onLastOperator(this).subscribe(Operators.toCoreSubscriber(subscriber));
return subscriber.blockingGet(timeout.toMillis(), TimeUnit.MILLISECONDS);
}

Expand All @@ -2385,7 +2386,7 @@ public final T blockFirst(Duration timeout) {
@Nullable
public final T blockLast() {
BlockingLastSubscriber<T> subscriber = new BlockingLastSubscriber<>();
onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber));
Hooks.onLastOperator(this).subscribe(Operators.toCoreSubscriber(subscriber));
return subscriber.blockingGet();
}

Expand All @@ -2409,7 +2410,7 @@ public final T blockLast() {
@Nullable
public final T blockLast(Duration timeout) {
BlockingLastSubscriber<T> subscriber = new BlockingLastSubscriber<>();
onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber));
Hooks.onLastOperator(this).subscribe(Operators.toCoreSubscriber(subscriber));
return subscriber.blockingGet(timeout.toMillis(), TimeUnit.MILLISECONDS);
}

Expand Down Expand Up @@ -7740,7 +7741,7 @@ public final Disposable subscribe(

@Override
public final void subscribe(Subscriber<? super T> actual) {
onLastAssembly(this).subscribe(Operators.toCoreSubscriber(actual));
Hooks.onLastOperator(this).subscribe(Operators.toCoreSubscriber(actual));
}

/**
Expand Down Expand Up @@ -9147,8 +9148,10 @@ protected static <T> Flux<T> onAssembly(Flux<T> source) {
* @param source the source to apply assembly hooks onto
*
* @return the source, potentially wrapped with assembly time cross-cutting behavior
* @deprecated use {@link Hooks#onLastOperator(CorePublisher)}
*/
@SuppressWarnings("unchecked")
@Deprecated
protected static <T> Flux<T> onLastAssembly(Flux<T> source) {
Function<Publisher, Publisher> hook = Hooks.onLastOperatorHook;
if(hook == null) {
Expand Down
38 changes: 36 additions & 2 deletions reactor-core/src/main/java/reactor/core/publisher/Hooks.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import java.util.function.Function;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.CorePublisher;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.publisher.FluxOnAssembly.AssemblySnapshot;
import reactor.core.scheduler.Schedulers;
Expand Down Expand Up @@ -490,8 +493,10 @@ public static boolean removeTracker(String key) {

static void installTracing() {
synchronized (log) {
onLastOperator(KEY_TRACKING, new TrackingReactorPublisherTransformer());
Schedulers.addExecutorServiceDecorator(KEY_TRACKING, new TrackingExecutorServiceDecorator());
Collection<Tracker> trackersView = Collections.unmodifiableCollection(trackers.values());

onLastOperator(KEY_TRACKING, source -> new TrackingPublisher(source, trackersView));
Schedulers.addExecutorServiceDecorator(KEY_TRACKING, new TrackingExecutorServiceDecorator(trackersView));
}
}

Expand All @@ -502,6 +507,35 @@ static void uninstallTracing() {
}
}

@SuppressWarnings("unchecked")
static <T> CorePublisher<T> onLastOperator(CorePublisher<T> source) {
Function<Publisher, Publisher> hook = Hooks.onLastOperatorHook;
final Publisher<T> publisher;
if (hook == null) {
publisher = source;
}
else {
publisher = Objects.requireNonNull(hook.apply(source),"LastOperator hook returned null");
}

if (publisher instanceof CorePublisher) {
return (CorePublisher<T>) publisher;
}
else {
return new CorePublisher<T>() {
@Override
public void subscribe(CoreSubscriber<? super T> subscriber) {
publisher.subscribe(subscriber);
}

@Override
public void subscribe(Subscriber<? super T> s) {
publisher.subscribe(s);
}
};
}
}

@Nullable
@SuppressWarnings("unchecked")
static Function<Publisher, Publisher> createOrUpdateOpHook(Collection<Function<? super Publisher<Object>, ? extends Publisher<Object>>> hooks) {
Expand Down
15 changes: 9 additions & 6 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CorePublisher;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Fuseable;
Expand Down Expand Up @@ -98,7 +99,7 @@
* @author Simon Baslé
* @see Flux
*/
public abstract class Mono<T> implements Publisher<T> {
public abstract class Mono<T> implements CorePublisher<T> {

// ==============================================================================================================
// Static Generators
Expand Down Expand Up @@ -1496,7 +1497,7 @@ public final Mono<Void> and(Publisher<?> other) {
@Nullable
public T block() {
BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>();
onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber));
Hooks.onLastOperator(this).subscribe(Operators.toCoreSubscriber(subscriber));
return subscriber.blockingGet();
}

Expand All @@ -1520,7 +1521,7 @@ public T block() {
@Nullable
public T block(Duration timeout) {
BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>();
onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber));
Hooks.onLastOperator(this).subscribe(Operators.toCoreSubscriber(subscriber));
return subscriber.blockingGet(timeout.toMillis(), TimeUnit.MILLISECONDS);
}

Expand All @@ -1541,7 +1542,7 @@ public T block(Duration timeout) {
*/
public Optional<T> blockOptional() {
BlockingOptionalMonoSubscriber<T> subscriber = new BlockingOptionalMonoSubscriber<>();
onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber));
Hooks.onLastOperator(this).subscribe(Operators.toCoreSubscriber(subscriber));
return subscriber.blockingGet();
}

Expand All @@ -1566,7 +1567,7 @@ public Optional<T> blockOptional() {
*/
public Optional<T> blockOptional(Duration timeout) {
BlockingOptionalMonoSubscriber<T> subscriber = new BlockingOptionalMonoSubscriber<>();
onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber));
Hooks.onLastOperator(this).subscribe(Operators.toCoreSubscriber(subscriber));
return subscriber.blockingGet(timeout.toMillis(), TimeUnit.MILLISECONDS);
}

Expand Down Expand Up @@ -3692,7 +3693,7 @@ public final Disposable subscribe(

@Override
public final void subscribe(Subscriber<? super T> actual) {
onLastAssembly(this).subscribe(Operators.toCoreSubscriber(actual));
Hooks.onLastOperator(this).subscribe(Operators.toCoreSubscriber(actual));
}

/**
Expand Down Expand Up @@ -4299,8 +4300,10 @@ protected static <T> Mono<T> onAssembly(Mono<T> source) {
* @param source the source to apply assembly hooks onto
*
* @return the source, potentially wrapped with assembly time cross-cutting behavior
* @deprecated use {@link Hooks#onLastOperator(CorePublisher)}
*/
@SuppressWarnings("unchecked")
@Deprecated
protected static <T> Mono<T> onLastAssembly(Mono<T> source) {
Function<Publisher, Publisher> hook = Hooks.onLastOperatorHook;
if(hook == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CorePublisher;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
Expand Down Expand Up @@ -65,7 +66,7 @@
*
* @param <T> the value type
*/
public abstract class ParallelFlux<T> implements Publisher<T> {
public abstract class ParallelFlux<T> implements CorePublisher<T> {

/**
* Take a Publisher and prepare to consume it on multiple 'rails' (one per CPU core)
Expand Down Expand Up @@ -1002,6 +1003,17 @@ public final Disposable subscribe(
return subscribe(onNext, onError, onComplete, null);
}

@Override
@SuppressWarnings("unchecked")
public final void subscribe(CoreSubscriber<? super T> subscriber) {
if (subscriber instanceof MultiSubscriber) {
subscribe(((MultiSubscriber<? super T>) subscriber).getSubscribers());
}
else {
subscribe((Subscriber<? super T>) subscriber);
}
}

/**
* Subscribes to this {@link ParallelFlux} by providing an onNext, onError,
* onComplete and onSubscribe callback and triggers the execution chain for all
Expand All @@ -1027,7 +1039,9 @@ public final Disposable subscribe(
new LambdaSubscriber<>(onNext, onError, onComplete, onSubscribe);
}

onLastAssembly(this).subscribe(subscribers);
@SuppressWarnings("unchecked")
MultiSubscriber<T> multiSubscriber = new MultiSubscriber<>((CoreSubscriber<T>[]) subscribers);
Hooks.onLastOperator(this).subscribe(multiSubscriber);
return Disposables.composite(subscribers);
}

Expand All @@ -1040,7 +1054,7 @@ public final Disposable subscribe(
@Override
@SuppressWarnings("unchecked")
public final void subscribe(Subscriber<? super T> s) {
Flux.onLastAssembly(sequential())
Hooks.onLastOperator(sequential())
.subscribe(new FluxHide.SuppressFuseableSubscriber<>(Operators.toCoreSubscriber(s)));
}

Expand Down Expand Up @@ -1196,8 +1210,10 @@ protected static <T> ParallelFlux<T> onAssembly(ParallelFlux<T> source) {
* @param source the source to wrap
*
* @return the potentially wrapped source
* @deprecated use {@link Hooks#onLastOperator(CorePublisher)}
*/
@SuppressWarnings("unchecked")
@Deprecated
protected static <T> ParallelFlux<T> onLastAssembly(ParallelFlux<T> source) {
Function<Publisher, Publisher> hook = Hooks.onLastOperatorHook;
if (hook == null) {
Expand Down Expand Up @@ -1268,4 +1284,37 @@ else if (s2 != null) {
return both;
}

private static class MultiSubscriber<T> implements CoreSubscriber<T> {

final CoreSubscriber<T>[] subscribers;

private MultiSubscriber(CoreSubscriber<T>[] subscribers) {
this.subscribers = subscribers;
}

public CoreSubscriber<T>[] getSubscribers() {
return subscribers;
}

@Override
public void onSubscribe(Subscription s) {
throw new IllegalStateException("Not implemented");
}

@Override
public void onNext(T t) {
throw new IllegalStateException("Not implemented");
}

@Override
public void onError(Throwable t) {
throw new IllegalStateException("Not implemented");
}

@Override
public void onComplete() {
throw new IllegalStateException("Not implemented");
}
}

}
Loading

0 comments on commit d53b85a

Please sign in to comment.