Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transparent contextCapture in block operators #3420

Merged
merged 6 commits into from Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package reactor.core.publisher;

import reactor.util.context.Context;

/**
* Blocks until the upstream signals its first value or completes.
*
Expand All @@ -24,6 +26,10 @@
*/
final class BlockingFirstSubscriber<T> extends BlockingSingleSubscriber<T> {

public BlockingFirstSubscriber(Context context) {
super(context);
}

@Override
public void onNext(T t) {
if (value == null) {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,9 +54,13 @@ final class BlockingIterable<T> implements Iterable<T>, Scannable {

final Supplier<Queue<T>> queueSupplier;

final Supplier<Context> contextSupplier;

BlockingIterable(CorePublisher<? extends T> source,
int batchSize,
Supplier<Queue<T>> queueSupplier) {
Supplier<Queue<T>> queueSupplier,
Supplier<Context> contextSupplier) {
this.contextSupplier = contextSupplier;
if (batchSize <= 0) {
throw new IllegalArgumentException("batchSize > 0 required but it was " + batchSize);
}
Expand Down Expand Up @@ -114,7 +118,7 @@ SubscriberIterator<T> createIterator() {
throw Exceptions.propagate(e);
}

return new SubscriberIterator<>(q, batchSize);
return new SubscriberIterator<>(q, contextSupplier.get(), batchSize);
}

static final class SubscriberIterator<T>
Expand All @@ -130,6 +134,8 @@ static final class SubscriberIterator<T>

final Condition condition;

final Context context;

long produced;

volatile Subscription s;
Expand All @@ -142,17 +148,18 @@ static final class SubscriberIterator<T>
volatile boolean done;
Throwable error;

SubscriberIterator(Queue<T> queue, int batchSize) {
SubscriberIterator(Queue<T> queue, Context context, int batchSize) {
this.queue = queue;
this.batchSize = batchSize;
this.limit = Operators.unboundedOrLimit(batchSize);
this.lock = new ReentrantLock();
this.condition = lock.newCondition();
this.context = context;
}

@Override
public Context currentContext() {
return Context.empty();
return this.context;
}

@Override
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2015-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package reactor.core.publisher;

import reactor.util.context.Context;

/**
* Blocks until the upstream signals its last value or completes.
*
Expand All @@ -24,6 +26,10 @@
*/
final class BlockingLastSubscriber<T> extends BlockingSingleSubscriber<T> {

public BlockingLastSubscriber(Context context) {
super(context);
}

@Override
public void onNext(T t) {
value = t;
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package reactor.core.publisher;

import reactor.util.context.Context;

/**
* Blocks assuming the upstream is a Mono, until it signals its value or completes.
* Compared to {@link BlockingFirstSubscriber}, this variant doesn't cancel the upstream
Expand All @@ -26,6 +28,10 @@
*/
final class BlockingMonoSubscriber<T> extends BlockingSingleSubscriber<T> {

public BlockingMonoSubscriber(Context context) {
super(context);
}

@Override
public void onNext(T t) {
if (value == null) {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,8 +45,11 @@ final class BlockingOptionalMonoSubscriber<T> extends CountDownLatch

volatile boolean cancelled;

BlockingOptionalMonoSubscriber() {
final Context context;

BlockingOptionalMonoSubscriber(Context context) {
super(1);
this.context = context;
}

@Override
Expand Down Expand Up @@ -80,7 +83,7 @@ public final void onComplete() {

@Override
public Context currentContext() {
return Context.empty();
return this.context;
}

@Override
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,10 +37,13 @@ abstract class BlockingSingleSubscriber<T> extends CountDownLatch

Subscription s;

final Context context;

volatile boolean cancelled;

BlockingSingleSubscriber() {
BlockingSingleSubscriber(Context context) {
super(1);
this.context = context;
}

@Override
Expand All @@ -58,7 +61,7 @@ public final void onComplete() {

@Override
public Context currentContext() {
return Context.empty();
return this.context;
}

@Override
Expand Down
Expand Up @@ -75,6 +75,10 @@ static Function<Context, Context> contextCapture() {
return WITH_GLOBAL_REGISTRY_NO_PREDICATE;
}

static Context contextCaptureToEmpty() {
return contextCapture().apply(Context.empty());
}

/**
* Create a support function that takes a snapshot of thread locals and merges them with the
* provided {@link Context}, resulting in a new {@link Context} which includes entries
Expand Down
26 changes: 20 additions & 6 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Expand Up @@ -2696,7 +2696,9 @@ public final <P> P as(Function<? super Flux<T>, P> transformer) {
*/
@Nullable
public final T blockFirst() {
BlockingFirstSubscriber<T> subscriber = new BlockingFirstSubscriber<>();
Context context = ContextPropagationSupport.shouldPropagateContextToThreadLocals()
? ContextPropagation.contextCaptureToEmpty() : Context.empty();
BlockingFirstSubscriber<T> subscriber = new BlockingFirstSubscriber<>(context);
subscribe((Subscriber<T>) subscriber);
return subscriber.blockingGet();
}
Expand All @@ -2719,7 +2721,9 @@ public final T blockFirst() {
*/
@Nullable
public final T blockFirst(Duration timeout) {
BlockingFirstSubscriber<T> subscriber = new BlockingFirstSubscriber<>();
Context context = ContextPropagationSupport.shouldPropagateContextToThreadLocals()
? ContextPropagation.contextCaptureToEmpty() : Context.empty();
BlockingFirstSubscriber<T> subscriber = new BlockingFirstSubscriber<>(context);
subscribe((Subscriber<T>) subscriber);
return subscriber.blockingGet(timeout.toNanos(), TimeUnit.NANOSECONDS);
}
Expand All @@ -2741,7 +2745,9 @@ public final T blockFirst(Duration timeout) {
*/
@Nullable
public final T blockLast() {
BlockingLastSubscriber<T> subscriber = new BlockingLastSubscriber<>();
Context context = ContextPropagationSupport.shouldPropagateContextToThreadLocals()
? ContextPropagation.contextCaptureToEmpty() : Context.empty();
BlockingLastSubscriber<T> subscriber = new BlockingLastSubscriber<>(context);
subscribe((Subscriber<T>) subscriber);
return subscriber.blockingGet();
}
Expand All @@ -2765,7 +2771,9 @@ public final T blockLast() {
*/
@Nullable
public final T blockLast(Duration timeout) {
BlockingLastSubscriber<T> subscriber = new BlockingLastSubscriber<>();
Context context = ContextPropagationSupport.shouldPropagateContextToThreadLocals()
? ContextPropagation.contextCaptureToEmpty() : Context.empty();
BlockingLastSubscriber<T> subscriber = new BlockingLastSubscriber<>(context);
subscribe((Subscriber<T>) subscriber);
return subscriber.blockingGet(timeout.toNanos(), TimeUnit.NANOSECONDS);
}
Expand Down Expand Up @@ -9652,7 +9660,10 @@ public final Iterable<T> toIterable(int batchSize, @Nullable Supplier<Queue<T>>
else{
provider = () -> Hooks.wrapQueue(queueProvider.get());
}
return new BlockingIterable<>(this, batchSize, provider);
Supplier<Context> contextSupplier =
ContextPropagationSupport.shouldPropagateContextToThreadLocals() ?
ContextPropagation::contextCaptureToEmpty : Context::empty;
return new BlockingIterable<>(this, batchSize, provider, contextSupplier);
}

/**
Expand Down Expand Up @@ -9690,7 +9701,10 @@ public final Stream<T> toStream() {
public final Stream<T> toStream(int batchSize) {
final Supplier<Queue<T>> provider;
provider = Queues.get(batchSize);
return new BlockingIterable<>(this, batchSize, provider).stream();
Supplier<Context> contextSupplier =
ContextPropagationSupport.shouldPropagateContextToThreadLocals() ?
ContextPropagation::contextCaptureToEmpty : Context::empty;
return new BlockingIterable<>(this, batchSize, provider, contextSupplier).stream();
}

/**
Expand Down
16 changes: 9 additions & 7 deletions reactor-core/src/main/java/reactor/core/publisher/Hooks.java
Expand Up @@ -533,16 +533,18 @@ public static void disableContextLossTracking() {
* {@code contextWrite(...)} call and the unmodified (downstream) {@link Context} is
* used when signals are delivered downstream, making the {@code contextWrite(...)}
* a logical boundary for the context propagation mechanism.
* <p>
* This mechanism automatically performs {@link Flux#contextCapture()}
* and {@link Mono#contextCapture()} in {@link Flux#blockFirst()},
* {@link Flux#blockLast()}, {@link Flux#toIterable()}, and {@link Mono#block()} (and
* their overloads).
* @since 3.5.3
*/
public static void enableAutomaticContextPropagation() {
if (ContextPropagationSupport.isContextPropagationOnClasspath) {
Hooks.addQueueWrapper(
CONTEXT_IN_THREAD_LOCALS_KEY, ContextPropagation.ContextQueue::new
);
Schedulers.onScheduleHook(
CONTEXT_IN_THREAD_LOCALS_KEY,
ContextPropagation.scopePassingOnScheduleHook()
);
Hooks.addQueueWrapper(CONTEXT_IN_THREAD_LOCALS_KEY, ContextPropagation.ContextQueue::new);
Schedulers.onScheduleHook(CONTEXT_IN_THREAD_LOCALS_KEY,
ContextPropagation.scopePassingOnScheduleHook());
ContextPropagationSupport.propagateContextToThreadLocals = true;
}
}
Expand Down
18 changes: 14 additions & 4 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Expand Up @@ -1705,7 +1705,9 @@ public final Mono<Void> and(Publisher<?> other) {
*/
@Nullable
public T block() {
BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>();
Context context = ContextPropagationSupport.shouldPropagateContextToThreadLocals()
? ContextPropagation.contextCaptureToEmpty() : Context.empty();
BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>(context);
subscribe((Subscriber<T>) subscriber);
return subscriber.blockingGet();
}
Expand All @@ -1729,7 +1731,9 @@ public T block() {
*/
@Nullable
public T block(Duration timeout) {
BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>();
Context context = ContextPropagationSupport.shouldPropagateContextToThreadLocals()
? ContextPropagation.contextCaptureToEmpty() : Context.empty();
BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>(context);
subscribe((Subscriber<T>) subscriber);
return subscriber.blockingGet(timeout.toNanos(), TimeUnit.NANOSECONDS);
}
Expand All @@ -1750,7 +1754,10 @@ public T block(Duration timeout) {
* @return T the result
*/
public Optional<T> blockOptional() {
BlockingOptionalMonoSubscriber<T> subscriber = new BlockingOptionalMonoSubscriber<>();
Context context = ContextPropagationSupport.shouldPropagateContextToThreadLocals()
? ContextPropagation.contextCaptureToEmpty() : Context.empty();
BlockingOptionalMonoSubscriber<T> subscriber =
new BlockingOptionalMonoSubscriber<>(context);
subscribe((Subscriber<T>) subscriber);
return subscriber.blockingGet();
}
Expand All @@ -1775,7 +1782,10 @@ public Optional<T> blockOptional() {
* @return T the result
*/
public Optional<T> blockOptional(Duration timeout) {
BlockingOptionalMonoSubscriber<T> subscriber = new BlockingOptionalMonoSubscriber<>();
Context context = ContextPropagationSupport.shouldPropagateContextToThreadLocals()
? ContextPropagation.contextCaptureToEmpty() : Context.empty();
BlockingOptionalMonoSubscriber<T> subscriber =
new BlockingOptionalMonoSubscriber<>(context);
subscribe((Subscriber<T>) subscriber);
return subscriber.blockingGet(timeout.toNanos(), TimeUnit.NANOSECONDS);
}
Expand Down