From e9327c7eb711fc4e416b8a879b14e8a61f1557ac Mon Sep 17 00:00:00 2001 From: Ryland Degnan Date: Wed, 16 May 2018 12:15:12 -0700 Subject: [PATCH] Use ReferenceCountUtil.safeRelease instead of release --- .../main/java/io/rsocket/internal/SwitchTransform.java | 9 +++++---- .../java/io/rsocket/internal/UnboundedProcessor.java | 8 ++++---- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransform.java b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransform.java index 1f0091c4e..0690660cb 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransform.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransform.java @@ -25,6 +25,7 @@ import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Operators; +import reactor.core.publisher.UnicastProcessor; public final class SwitchTransform extends Flux { @@ -39,7 +40,7 @@ public SwitchTransform( @Override public void subscribe(CoreSubscriber actual) { - Flux.from(source).subscribe(new SwitchTransformSubscriber<>(actual, transformer)); + source.subscribe(new SwitchTransformSubscriber<>(actual, transformer)); } static final class SwitchTransformSubscriber @@ -50,7 +51,7 @@ static final class SwitchTransformSubscriber final CoreSubscriber actual; final BiFunction, Publisher> transformer; - final UnboundedProcessor processor = new UnboundedProcessor<>(); + final UnicastProcessor processor = UnicastProcessor.create(); Subscription s; volatile int once; @@ -76,10 +77,10 @@ public void onNext(T t) { Publisher result = Objects.requireNonNull( transformer.apply(t, processor), "The transformer returned a null value"); - Flux.from(result).subscribe(actual); + result.subscribe(actual); } catch (Throwable e) { onError(Operators.onOperatorError(s, e, t, actual.currentContext())); - ReferenceCountUtil.release(t); + ReferenceCountUtil.safeRelease(t); return; } } diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index d8e1997d0..99b86abc0 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -158,7 +158,7 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber a, Queue while (!q.isEmpty()) { T t = q.poll(); if (t != null) { - ReferenceCountUtil.release(t); + ReferenceCountUtil.safeRelease(t); } } actual = null; @@ -202,7 +202,7 @@ public Context currentContext() { public void onNext(T t) { if (done || cancelled) { Operators.onNextDropped(t, currentContext()); - ReferenceCountUtil.release(t); + ReferenceCountUtil.safeRelease(t); return; } @@ -210,7 +210,7 @@ public void onNext(T t) { Throwable ex = Operators.onOperatorError(null, Exceptions.failWithOverflow(), t, currentContext()); onError(Operators.onOperatorError(null, ex, t, currentContext())); - ReferenceCountUtil.release(t); + ReferenceCountUtil.safeRelease(t); return; } drain(); @@ -301,7 +301,7 @@ public void clear() { while (!queue.isEmpty()) { T t = queue.poll(); if (t != null) { - ReferenceCountUtil.release(t); + ReferenceCountUtil.safeRelease(t); } } }