From 1b3232dd5f5e1870161a55ac4da65083b2f8b889 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Tue, 30 Apr 2024 12:05:10 +0200 Subject: [PATCH] Review feedback - comments added --- .../src/main/java/reactor/core/publisher/ConnectableLift.java | 1 + .../java/reactor/core/publisher/ConnectableLiftFuseable.java | 1 + .../FluxContextWriteRestoringThreadLocalsFuseable.java | 2 +- .../src/main/java/reactor/core/publisher/GroupedLift.java | 1 + .../main/java/reactor/core/publisher/GroupedLiftFuseable.java | 1 + reactor-core/src/main/java/reactor/core/publisher/MonoLift.java | 1 + .../src/main/java/reactor/core/publisher/MonoLiftFuseable.java | 1 + .../src/main/java/reactor/core/publisher/ParallelLift.java | 1 + .../main/java/reactor/core/publisher/ParallelLiftFuseable.java | 1 + 9 files changed, 9 insertions(+), 1 deletion(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/ConnectableLift.java b/reactor-core/src/main/java/reactor/core/publisher/ConnectableLift.java index 5037d6cb32..e650a2a252 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/ConnectableLift.java +++ b/reactor-core/src/main/java/reactor/core/publisher/ConnectableLift.java @@ -68,6 +68,7 @@ public String stepName() { @Override public final CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { + // No need to wrap actual for CP, the Operators$LiftFunction handles it. CoreSubscriber input = liftFunction.lifter.apply(source, actual); Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null"); diff --git a/reactor-core/src/main/java/reactor/core/publisher/ConnectableLiftFuseable.java b/reactor-core/src/main/java/reactor/core/publisher/ConnectableLiftFuseable.java index 75dd3793e7..651b54b596 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/ConnectableLiftFuseable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/ConnectableLiftFuseable.java @@ -70,6 +70,7 @@ public String stepName() { @Override public final CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { + // No need to wrap actual for CP, the Operators$LiftFunction handles it. CoreSubscriber input = liftFunction.lifter.apply(source, actual); Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null"); diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxContextWriteRestoringThreadLocalsFuseable.java b/reactor-core/src/main/java/reactor/core/publisher/FluxContextWriteRestoringThreadLocalsFuseable.java index d7cd90fe37..215f558136 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxContextWriteRestoringThreadLocalsFuseable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxContextWriteRestoringThreadLocalsFuseable.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/reactor-core/src/main/java/reactor/core/publisher/GroupedLift.java b/reactor-core/src/main/java/reactor/core/publisher/GroupedLift.java index e139378e7a..1347c3fa80 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/GroupedLift.java +++ b/reactor-core/src/main/java/reactor/core/publisher/GroupedLift.java @@ -79,6 +79,7 @@ public String stepName() { @Override public void subscribe(CoreSubscriber actual) { + // No need to wrap actual for CP, the Operators$LiftFunction handles it. CoreSubscriber input = liftFunction.lifter.apply(source, actual); Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null"); diff --git a/reactor-core/src/main/java/reactor/core/publisher/GroupedLiftFuseable.java b/reactor-core/src/main/java/reactor/core/publisher/GroupedLiftFuseable.java index 225b19a52b..f30fcc3b2d 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/GroupedLiftFuseable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/GroupedLiftFuseable.java @@ -81,6 +81,7 @@ public String stepName() { @Override public void subscribe(CoreSubscriber actual) { + // No need to wrap actual for CP, the Operators$LiftFunction handles it. CoreSubscriber input = liftFunction.lifter.apply(source, actual); Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null"); diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoLift.java b/reactor-core/src/main/java/reactor/core/publisher/MonoLift.java index 2baf492fe5..60923edde4 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoLift.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoLift.java @@ -37,6 +37,7 @@ final class MonoLift extends InternalMonoOperator { @Override public CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { + // No need to wrap actual for CP, the Operators$LiftFunction handles it. CoreSubscriber input = liftFunction.lifter.apply(source, actual); Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null"); diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoLiftFuseable.java b/reactor-core/src/main/java/reactor/core/publisher/MonoLiftFuseable.java index 6b524b50f0..e937acc628 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoLiftFuseable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoLiftFuseable.java @@ -55,6 +55,7 @@ public Object scanUnsafe(Attr key) { @Override public CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { + // No need to wrap actual for CP, the Operators$LiftFunction handles it. CoreSubscriber input = liftFunction.lifter.apply(source, actual); Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null"); diff --git a/reactor-core/src/main/java/reactor/core/publisher/ParallelLift.java b/reactor-core/src/main/java/reactor/core/publisher/ParallelLift.java index bc6684f8d2..a47a16f32d 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/ParallelLift.java +++ b/reactor-core/src/main/java/reactor/core/publisher/ParallelLift.java @@ -84,6 +84,7 @@ public void subscribe(CoreSubscriber[] s) { int i = 0; while (i < subscribers.length) { subscribers[i] = + // No need to wrap actual for CP, the Operators$LiftFunction handles it. Objects.requireNonNull(liftFunction.lifter.apply(source, s[i]), "Lifted subscriber MUST NOT be null"); i++; diff --git a/reactor-core/src/main/java/reactor/core/publisher/ParallelLiftFuseable.java b/reactor-core/src/main/java/reactor/core/publisher/ParallelLiftFuseable.java index 421c24e76e..bf77b2e2ae 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/ParallelLiftFuseable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/ParallelLiftFuseable.java @@ -88,6 +88,7 @@ public void subscribe(CoreSubscriber[] s) { while (i < subscribers.length) { CoreSubscriber actual = s[i]; CoreSubscriber converted = + // No need to wrap actual for CP, the Operators$LiftFunction handles it. Objects.requireNonNull(liftFunction.lifter.apply(source, actual), "Lifted subscriber MUST NOT be null"); Objects.requireNonNull(converted, "Lifted subscriber MUST NOT be null");