Skip to content

Commit

Permalink
Merge pull request #293 from bruto1/bug/discard-issue-292
Browse files Browse the repository at this point in the history
issue #292 - discard support
  • Loading branch information
rmichela committed Apr 11, 2023
2 parents 0934199 + 6c9298b commit 5f3e1be
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void request(long n) {
private volatile boolean done;
private Throwable error;

private volatile Subscriber<? super T> downstream;
protected volatile Subscriber<? super T> downstream;

private volatile boolean cancelled;

Expand Down Expand Up @@ -226,7 +226,7 @@ private void drainFused(final Subscriber<? super T> subscriber) {

for (;;) {
if (cancelled) {
queue.clear();
discardQueue(queue);
downstream = null;
return;
}
Expand Down Expand Up @@ -283,7 +283,7 @@ private void drain() {

private boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> subscriber, Queue<T> q) {
if (cancelled) {
q.clear();
discardQueue(q);
downstream = null;
return true;
}
Expand All @@ -305,6 +305,7 @@ private boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T>
@Override
public void onNext(T t) {
if (done || cancelled) {
discardElement(t);
return;
}

Expand Down Expand Up @@ -419,7 +420,7 @@ public void cancel() {

if (!outputFused) {
if (WIP.getAndIncrement(this) == 0) {
queue.clear();
discardQueue(queue);
downstream = null;
}
}
Expand Down Expand Up @@ -456,4 +457,10 @@ public boolean isEmpty() {
public void clear() {
queue.clear();
}

protected void discardQueue(Queue<T> q) {
q.clear();
}

protected void discardElement(T t) { }
}
13 changes: 13 additions & 0 deletions reactor/reactor-grpc-stub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.10</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@
import com.salesforce.reactivegrpc.common.AbstractClientStreamObserverAndPublisher;
import com.salesforce.reactivegrpc.common.Consumer;
import io.grpc.stub.CallStreamObserver;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;

import java.util.Queue;

/**
* TODO: Explain what this class does.
* @param <T> T
Expand Down Expand Up @@ -46,4 +50,20 @@ public int requestFusion(int requestedMode) {
}
return Fuseable.NONE;
}

@Override
protected void discardQueue(Queue<T> q) {
if (downstream instanceof CoreSubscriber) {
Operators.onDiscardQueueWithClear(q, ((CoreSubscriber) downstream).currentContext(), null);
} else {
q.clear();
}
}

@Override
protected void discardElement(T t) {
if (downstream instanceof CoreSubscriber) {
Operators.onDiscard(t, ((CoreSubscriber) downstream).currentContext());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
import com.salesforce.reactivegrpc.common.Consumer;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.ServerCallStreamObserver;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;

import java.util.Queue;

/**
* TODO: Explain what this class does.
* @param <T> T
Expand All @@ -38,4 +42,20 @@ public int requestFusion(int requestedMode) {
}
return Fuseable.NONE;
}

@Override
protected void discardQueue(Queue<T> q) {
if (downstream instanceof CoreSubscriber) {
Operators.onDiscardQueueWithClear(q, ((CoreSubscriber) downstream).currentContext(), null);
} else {
q.clear();
}
}

@Override
protected void discardElement(T t) {
if (downstream instanceof CoreSubscriber) {
Operators.onDiscard(t, ((CoreSubscriber) downstream).currentContext());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,31 @@

package com.salesforce.reactorgrpc.stub;

import java.util.concurrent.ForkJoinPool;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.assertj.core.api.Assertions.assertThat;


public class ReactorClientStreamObserverAndPublisherTest {
private static final Logger log = LoggerFactory.getLogger(ReactorClientStreamObserverAndPublisherTest.class.getName());

private static final int DEFAULT_CHUNK_SIZE = 512;
private static final int PART_OF_CHUNK = DEFAULT_CHUNK_SIZE * 2 / 3;

@Test
public void multiThreadedProducerTest() {
ReactorClientStreamObserverAndPublisher<Integer> processor =
new ReactorClientStreamObserverAndPublisher<>(null);
new ReactorClientStreamObserverAndPublisher<>(null);
int countPerThread = 100000;
TestCallStreamObserverProducer observer =
new TestCallStreamObserverProducer(ForkJoinPool.commonPool(), processor, countPerThread);
Expand All @@ -34,21 +40,76 @@ public void multiThreadedProducerTest() {
.expectNextCount(countPerThread)
.verifyComplete();

Assertions.assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1, (countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3);
assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1,
(countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3);
}

@Test
public void producerFusedTest() {
ReactorClientStreamObserverAndPublisher<Integer> processor =
new ReactorClientStreamObserverAndPublisher<>(null);
new ReactorClientStreamObserverAndPublisher<>(null);
int countPerThread = 100000;
TestCallStreamObserverProducer observer = new TestCallStreamObserverProducer(ForkJoinPool.commonPool(), processor, countPerThread);
TestCallStreamObserverProducer observer = new TestCallStreamObserverProducer(ForkJoinPool.commonPool(),
processor, countPerThread);
processor.beforeStart(observer);
StepVerifier.create(Flux.from(processor))
.expectFusion(Fuseable.ANY, Fuseable.ASYNC)
.expectNextCount(countPerThread)
.verifyComplete();

Assertions.assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1, (countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3);
assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1,
(countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3);
}

@Test
public void discardQueueTest() {
ReactorClientStreamObserverAndPublisher<Integer> processor =
new ReactorClientStreamObserverAndPublisher<>(null);
int countPerThread = 5;
TestCallStreamObserverProducer observer = new TestCallStreamObserverProducer(ForkJoinPool.commonPool(),
processor, countPerThread);
processor.beforeStart(observer);

ConcurrentLinkedQueue<Integer> discardedByObserverAndPublisher = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Integer> discardedByPublishOn = new ConcurrentLinkedQueue<>();

AtomicBoolean firstHandled = new AtomicBoolean();
Flux<Integer> consumer =
Flux.from(processor)
.doOnDiscard(Integer.class, i -> {
log.info("Processor: discarding {}", i);
discardedByObserverAndPublisher.add(i);
})
.log("processor")
.limitRate(1)
.publishOn(Schedulers.parallel())
.limitRate(1)
.doOnDiscard(Integer.class, i -> {
log.info("publishOn: discarding {}", i);
discardedByPublishOn.add(i);
})
.<Integer>handle((i, sink) -> {
if (firstHandled.compareAndSet(false, true)) {
try {
Thread.sleep(100);
} catch (Exception e) {
// noop
}
sink.next(i);
} else {
sink.complete();
}
})
.log("handled");

StepVerifier.create(consumer)
.expectNext(0)
.verifyComplete();

// 1 is dropped in handle without invoking the discard hook,
assertThat(discardedByObserverAndPublisher).containsExactly(3, 4);
// impl details: processor is able to schedule 2 before it's cancelled
// also, discard hooks are cumulative, so not using containsExactly
assertThat(discardedByPublishOn).contains(2);
}
}

0 comments on commit 5f3e1be

Please sign in to comment.