Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion reactivesocket-core/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
dependencies {
testCompile 'io.reactivex:rxjava:2.0.0-DP0-20151003.214425-143'
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,13 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static io.reactivesocket.ConnectionSetupPayload.NO_FLAGS;
import static io.reactivesocket.TestUtil.byteToString;
import static io.reactivesocket.TestUtil.utf8EncodedPayload;
import static io.reactivex.Observable.error;
import static io.reactivex.Observable.fromPublisher;
import static io.reactivex.Observable.range;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class TestFlowControlRequestN {
import static io.reactivesocket.ConnectionSetupPayload.*;
import static io.reactivesocket.TestUtil.*;
import static org.junit.Assert.*;
import static rx.Observable.*;
import static rx.RxReactiveStreams.*;

public class FlowControlRequestNTest {

@Test(timeout=2000)
public void testRequestStream_batches() throws InterruptedException {
Expand Down Expand Up @@ -89,7 +84,7 @@ public void onNext(Payload t) {
// be a "slow" consumer
try {
Thread.sleep(1000);
} catch (InterruptedException e) { }
} catch (InterruptedException ignored) { }
System.out.println("Emitted on server: " + emitted.get()
+ " Received on client: " + received);
}
Expand Down Expand Up @@ -154,9 +149,8 @@ public void testRequestSubscription_batches() throws InterruptedException {
@Test(timeout=2000)
public void testRequestChannel_batches_downstream() throws InterruptedException {
ControlledSubscriber s = new ControlledSubscriber();
socketClient.requestChannel(
range(1, 10).map(i -> utf8EncodedPayload(String.valueOf(i), "1000"))
).subscribe(s);
socketClient.requestChannel(toPublisher(range(1, 10).map(i -> utf8EncodedPayload(String.valueOf(i), "1000"))))
.subscribe(s);

// if flatMap is being used, then each of the 10 streams will emit at least 128 (default)

Expand Down Expand Up @@ -189,15 +183,14 @@ public void testRequestChannel_batches_downstream() throws InterruptedException
public void testRequestChannel_batches_upstream_echo() throws InterruptedException {
ControlledSubscriber s = new ControlledSubscriber();
AtomicInteger emittedClient = new AtomicInteger();
socketClient.requestChannel(
range(1, 10000)
.doOnNext(n -> emittedClient.incrementAndGet())
.doOnRequest(r -> System.out.println("CLIENT REQUESTS requestN: " + r))
.map(i -> {
// metadata to route us to the echo behavior (only actually need
// this in the first payload)
return utf8EncodedPayload(String.valueOf(i), "echo");
})).subscribe(s);
socketClient.requestChannel(toPublisher(range(1, 10000)
.doOnNext(n -> emittedClient.incrementAndGet())
.doOnRequest(r -> System.out.println("CLIENT REQUESTS requestN: " + r))
.map(i -> {
// metadata to route us to the echo behavior (only actually need
// this in the first payload)
return utf8EncodedPayload(String.valueOf(i), "echo");
}))).subscribe(s);

assertEquals(0, s.received.get());
assertEquals(0, emitted.get());
Expand Down Expand Up @@ -225,15 +218,14 @@ public void testRequestChannel_batches_upstream_echo() throws InterruptedExcepti
public void testRequestChannel_batches_upstream_decoupled() throws InterruptedException {
ControlledSubscriber s = new ControlledSubscriber();
AtomicInteger emittedClient = new AtomicInteger();
socketClient.requestChannel(
range(1, 10000)
socketClient.requestChannel(toPublisher(range(1, 10000)
.doOnNext(n -> emittedClient.incrementAndGet())
.doOnRequest(r -> System.out.println("CLIENT REQUESTS requestN: " + r))
.map(i -> {
// metadata to route us to the echo behavior (only actually need this
// in the first payload)
return utf8EncodedPayload(String.valueOf(i), "decoupled");
})).subscribe(s);
}))).subscribe(s);

assertEquals(0, s.received.get());
assertEquals(0, emitted.get());
Expand All @@ -254,7 +246,7 @@ public void testRequestChannel_batches_upstream_decoupled() throws InterruptedEx
+ " requests and received " + s.received.get() + " responses");
}

private void waitForAsyncValue(AtomicInteger value, int n) throws InterruptedException {
private static void waitForAsyncValue(AtomicInteger value, int n) throws InterruptedException {
while (value.get() != n && !Thread.interrupted()) {
Thread.sleep(1);
}
Expand All @@ -270,7 +262,7 @@ private static class ControlledSubscriber implements Subscriber<Payload> {

@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
subscription = s;
}

@Override
Expand All @@ -297,9 +289,9 @@ public void onComplete() {
private static TestConnection clientConnection;
private static ReactiveSocket socketServer;
private static ReactiveSocket socketClient;
private static AtomicInteger emitted = new AtomicInteger();
private static AtomicInteger numRequests = new AtomicInteger();
private static AtomicLong requested = new AtomicLong();
private static final AtomicInteger emitted = new AtomicInteger();
private static final AtomicInteger numRequests = new AtomicInteger();
private static final AtomicLong requested = new AtomicLong();

@Before
public void init() {
Expand All @@ -321,22 +313,22 @@ public static void setup() throws InterruptedException {
public Publisher<Payload> handleRequestStream(Payload payload) {
String request = byteToString(payload.getData());
System.out.println("responder received requestStream: " + request);
return range(0, Integer.parseInt(request))
return toPublisher(range(0, Integer.parseInt(request))
.doOnRequest(n -> System.out.println("requested in responder: " + n))
.doOnRequest(r -> requested.addAndGet(r))
.doOnRequest(r -> numRequests.incrementAndGet())
.doOnNext(i -> emitted.incrementAndGet())
.map(i -> utf8EncodedPayload(String.valueOf(i), null));
.map(i -> utf8EncodedPayload(String.valueOf(i), null)));
}

@Override
public Publisher<Payload> handleSubscription(Payload payload) {
return range(0, Integer.MAX_VALUE)
return toPublisher(range(0, Integer.MAX_VALUE)
.doOnRequest(n -> System.out.println("requested in responder: " + n))
.doOnRequest(r -> requested.addAndGet(r))
.doOnRequest(r -> numRequests.incrementAndGet())
.doOnNext(i -> emitted.incrementAndGet())
.map(i -> utf8EncodedPayload(String.valueOf(i), null));
.map(i -> utf8EncodedPayload(String.valueOf(i), null)));
}

/**
Expand All @@ -347,96 +339,101 @@ public Publisher<Payload> handleChannel(Payload initialPayload, Publisher<Payloa
String requestMetadata = byteToString(initialPayload.getMetadata());
System.out.println("responder received requestChannel: " + requestMetadata);

if(requestMetadata.equals("echo")) {
if("echo".equals(requestMetadata)) {
// TODO I want this to be concatMap instead of flatMap but apparently concatMap has a bug
return fromPublisher(payloads).map(payload -> {
return toPublisher(toObservable(payloads).map(payload -> {
String payloadData = byteToString(payload.getData());
return utf8EncodedPayload(String.valueOf(payloadData) + "_echo", null);
return utf8EncodedPayload(payloadData + "_echo", null);
}).doOnRequest(n -> System.out.println(">>> requested in echo responder: " + n))
.doOnRequest(r -> requested.addAndGet(r))
.doOnRequest(r -> numRequests.incrementAndGet())
.doOnError(t -> System.out.println("Error in 'echo' handler: " + t.getMessage()))
.doOnNext(i -> emitted.incrementAndGet());
} else if (requestMetadata.equals("decoupled")) {
.doOnNext(i -> emitted.incrementAndGet()));
} else {
if ("decoupled".equals(requestMetadata)) {
/*
* Consume 300 from request and then stop requesting more (but no cancel from responder side)
*/
fromPublisher(payloads).doOnNext(payload -> {
String payloadData = byteToString(payload.getData());
System.out.println("DECOUPLED side-effect of request: " + payloadData);
}).subscribe(new Subscriber<Payload>() {

int count=0;
Subscription s;

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Payload t) {
count++;
if(count == 50) {
s.request(250);
toPublisher(toObservable(payloads).doOnNext(payload -> {
String payloadData = byteToString(payload.getData());
System.out.println("DECOUPLED side-effect of request: " + payloadData);
})).subscribe(new Subscriber<Payload>() {

int count;
Subscription s;

@Override
public void onError(Throwable e) {

}
}

@Override
public void onSubscribe(Subscription s) {
this.s = s;
// start with 50
s.request(50);
}

@Override
public void onComplete() {
// TODO Auto-generated method stub

}


});

return range(1, 1000)
.doOnNext(n -> System.out.println("RESPONDER sending value: " + n))
.map(i -> {
return utf8EncodedPayload(String.valueOf(i) + "_decoupled", null);
})
.doOnRequest(n -> System.out.println(">>> requested in decoupled responder: " + n))
.doOnRequest(r -> requested.addAndGet(r))
.doOnRequest(r -> numRequests.incrementAndGet())
.doOnError(t -> System.out.println("Error in 'decoupled' handler: " + t.getMessage()))
.doOnNext(i -> emitted.incrementAndGet());
} else {
// TODO I want this to be concatMap instead of flatMap but apparently concatMap has a bug
return fromPublisher(payloads).flatMap(payload -> {
String payloadData = byteToString(payload.getData());
System.out.println("responder handleChannel received payload: " + payloadData);
return range(0, Integer.parseInt(requestMetadata))
.doOnRequest(n -> System.out.println("requested in responder [" + payloadData + "]: " + n))
.doOnRequest(r -> requested.addAndGet(r))
.doOnRequest(r -> numRequests.incrementAndGet())
.doOnNext(i -> emitted.incrementAndGet())
.map(i -> utf8EncodedPayload(String.valueOf(i), null));
}).doOnRequest(n -> System.out.println(">>> response stream request(n) in responder: " + n));

@Override
public void onNext(Payload t) {
count++;
if (count == 50) {
s.request(250);
}
}

@Override
public void onSubscribe(Subscription s) {
this.s = s;
// start with 50
s.request(50);
}

@Override
public void onComplete() {
// TODO Auto-generated method stub

}


});

return toPublisher(range(1, 1000)
.doOnNext(n -> System.out.println("RESPONDER sending value: " + n))
.map(i -> {
return utf8EncodedPayload(i + "_decoupled", null);
})
.doOnRequest(n -> System.out
.println(">>> requested in decoupled responder: " + n))
.doOnRequest(r -> requested.addAndGet(r))
.doOnRequest(r -> numRequests.incrementAndGet())
.doOnError(t -> System.out
.println("Error in 'decoupled' handler: " + t.getMessage()))
.doOnNext(i -> emitted.incrementAndGet()));
} else {
// TODO I want this to be concatMap instead of flatMap but apparently concatMap has a bug
return toPublisher(toObservable(payloads).flatMap(payload -> {
String payloadData = byteToString(payload.getData());
System.out.println("responder handleChannel received payload: " + payloadData);
return range(0, Integer.parseInt(requestMetadata))
.doOnRequest(n -> System.out
.println("requested in responder [" + payloadData + "]: " + n))
.doOnRequest(r -> requested.addAndGet(r))
.doOnRequest(r -> numRequests.incrementAndGet())
.doOnNext(i -> emitted.incrementAndGet())
.map(i -> utf8EncodedPayload(String.valueOf(i), null));
}).doOnRequest(n -> System.out.println(">>> response stream request(n) in responder: " + n)));
}
}
}

@Override
public Publisher<Void> handleFireAndForget(Payload payload) {
return error(new RuntimeException("Not Found"));
return toPublisher(error(new RuntimeException("Not Found")));
}

@Override
public Publisher<Payload> handleRequestResponse(Payload payload) {
return error(new RuntimeException("Not Found"));
return toPublisher(error(new RuntimeException("Not Found")));
}

@Override
public Publisher<Void> handleMetadataPush(Payload payload)
{
return error(new RuntimeException("Not Found"));
return toPublisher(error(new RuntimeException("Not Found")));
}
}, LeaseGovernor.UNLIMITED_LEASE_GOVERNOR, Throwable::printStackTrace);

Expand Down
Loading