Skip to content

Commit

Permalink
rsocket#1077 type safe discard element consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukasz wycisk committed Dec 1, 2022
1 parent e31038f commit 8a30105
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 18 deletions.
Expand Up @@ -22,6 +22,7 @@
import io.rsocket.frame.FrameType;
import java.util.AbstractMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.stream.Stream;
Expand All @@ -46,16 +47,20 @@
*/
class DefaultRSocketClient extends ResolvingOperator<RSocket>
implements CoreSubscriber<RSocket>, CorePublisher<RSocket>, RSocketClient {
static final Consumer<ReferenceCounted> DISCARD_ELEMENTS_CONSUMER =
referenceCounted -> {
if (referenceCounted.refCnt() > 0) {
try {
referenceCounted.release();
} catch (IllegalReferenceCountException e) {
// ignored
}
}
};
static final Consumer<?> DISCARD_ELEMENTS_CONSUMER =
item -> Optional.ofNullable(item)
.filter(ReferenceCounted.class::isInstance)
.map(ReferenceCounted.class::cast)
.ifPresent(referenceCounted ->
{
if (referenceCounted.refCnt() > 0) {
try {
referenceCounted.release();
} catch (IllegalReferenceCountException e) {
// ignored
}
}
});

static final Object ON_DISCARD_KEY;

Expand Down
21 changes: 13 additions & 8 deletions rsocket-core/src/main/java/io/rsocket/core/SendUtils.java
Expand Up @@ -33,20 +33,25 @@
import io.rsocket.frame.RequestFireAndForgetFrameCodec;
import io.rsocket.frame.RequestResponseFrameCodec;
import io.rsocket.frame.RequestStreamFrameCodec;

import java.util.Optional;
import java.util.function.Consumer;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

final class SendUtils {
private static final Consumer<?> DROPPED_ELEMENTS_CONSUMER =
data -> {
try {
ReferenceCounted referenceCounted = (ReferenceCounted) data;
referenceCounted.release();
} catch (Throwable e) {
// ignored
}
};
data -> Optional.ofNullable(data)
.filter(ReferenceCounted.class::isInstance)
.map(ReferenceCounted.class::cast)
.ifPresent(referenceCounted ->
{
try {
referenceCounted.release();
} catch (Throwable e) {
// ignored
}
});

static final Context DISCARD_CONTEXT = Operators.enableOnDiscard(null, DROPPED_ELEMENTS_CONSUMER);

Expand Down
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
Expand All @@ -62,6 +63,8 @@
import reactor.util.context.ContextView;
import reactor.util.retry.Retry;

import static org.mockito.Mockito.*;

public class DefaultRSocketClientTests {

ClientSocketRule rule;
Expand Down Expand Up @@ -637,6 +640,25 @@ public void shouldStartOriginalSourceOnceIfRacing() {
}
}


@Test
void discardElementsConsumerShouldAcceptOtherTypesThanReferenceCounted() {
Consumer discardElementsConsumer = DefaultRSocketClient.DISCARD_ELEMENTS_CONSUMER;
discardElementsConsumer.accept(new Object());
}

@Test
void droppedElementsConsumerReleaseReference() {
ReferenceCounted referenceCounted = mock(ReferenceCounted.class);
when(referenceCounted.release()).thenReturn(true);
when(referenceCounted.refCnt()).thenReturn(1);

Consumer discardElementsConsumer = DefaultRSocketClient.DISCARD_ELEMENTS_CONSUMER;
discardElementsConsumer.accept(referenceCounted);

verify(referenceCounted).release();
}

public static class ClientSocketRule extends AbstractSocketRule<RSocket> {

protected RSocketClient client;
Expand Down
32 changes: 32 additions & 0 deletions rsocket-core/src/test/java/io/rsocket/core/SendUtilsTest.java
@@ -0,0 +1,32 @@
package io.rsocket.core;

import io.netty.util.ReferenceCounted;
import org.junit.jupiter.api.Test;

import java.util.function.Consumer;

import static org.mockito.Mockito.*;

class SendUtilsTest {

@Test
void droppedElementsConsumerShouldAcceptOtherTypesThanReferenceCounted() {
Consumer value = extractDroppedElementConsumer();
value.accept(new Object());
}

@Test
void droppedElementsConsumerReleaseReference() {
ReferenceCounted referenceCounted = mock(ReferenceCounted.class);
when(referenceCounted.release()).thenReturn(true);

Consumer value = extractDroppedElementConsumer();
value.accept(referenceCounted);

verify(referenceCounted).release();
}

private static Consumer<?> extractDroppedElementConsumer() {
return (Consumer<?>) SendUtils.DISCARD_CONTEXT.stream().findAny().get().getValue();
}
}

0 comments on commit 8a30105

Please sign in to comment.