Skip to content

Commit

Permalink
logs
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <oleh.dokuka@icloud.com>
Signed-off-by: OlegDokuka <odokuka@vmware.com>
  • Loading branch information
OlegDokuka committed Apr 4, 2023
1 parent ca4387e commit 032bb58
Showing 1 changed file with 31 additions and 7 deletions.
38 changes: 31 additions & 7 deletions rsocket-test/src/main/java/io/rsocket/test/TransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -247,7 +248,7 @@ default void requestChannel1() {
.requestChannel(Mono.just(createTestPayload(0)))
.doOnNext(Payload::release)
.as(StepVerifier::create)
.expectNextCount(1)
.thenConsumeWhile(new PayloadPredicate(1))
.expectComplete()
.verify(getTimeout());
}
Expand All @@ -262,7 +263,7 @@ default void requestChannel200_000() {
.doOnNext(Payload::release)
.limitRate(8)
.as(StepVerifier::create)
.expectNextCount(200_000)
.thenConsumeWhile(new PayloadPredicate(200_000))
.expectComplete()
.verify(getTimeout());
}
Expand All @@ -276,7 +277,7 @@ default void largePayloadRequestChannel50() {
.requestChannel(payloads)
.doOnNext(Payload::release)
.as(StepVerifier::create)
.expectNextCount(50)
.thenConsumeWhile(new PayloadPredicate(50))
.expectComplete()
.verify(getTimeout());
}
Expand All @@ -291,7 +292,7 @@ default void requestChannel20_000() {
.doOnNext(this::assertChannelPayload)
.doOnNext(Payload::release)
.as(StepVerifier::create)
.expectNextCount(20_000)
.thenConsumeWhile(new PayloadPredicate(20_000))
.expectComplete()
.verify(getTimeout());
}
Expand All @@ -306,7 +307,7 @@ default void requestChannel2_000_000() {
.doOnNext(Payload::release)
.limitRate(8)
.as(StepVerifier::create)
.expectNextCount(2_000_000)
.thenConsumeWhile(new PayloadPredicate(2_000_000))
.expectComplete()
.verify(getTimeout());
}
Expand All @@ -322,7 +323,7 @@ default void requestChannel3() {
.requestChannel(payloads)
.doOnNext(Payload::release)
.as(publisher -> StepVerifier.create(publisher, 3))
.expectNextCount(3)
.thenConsumeWhile(new PayloadPredicate(3))
.expectComplete()
.verify(getTimeout());

Expand Down Expand Up @@ -358,7 +359,7 @@ default void check(Flux<Payload> payloads) {
.doOnNext(ReferenceCounted::release)
.limitRate(8)
.as(StepVerifier::create)
.expectNextCount(256)
.thenConsumeWhile(new PayloadPredicate(256))
.as("expected 256 items")
.expectComplete()
.verify(getTimeout());
Expand Down Expand Up @@ -958,4 +959,27 @@ public String toString() {
}
}
}

class PayloadPredicate implements Predicate<Payload> {
final int expectedCnt;
int cnt;

public PayloadPredicate(int expectedCnt) {
this.expectedCnt = expectedCnt;
}

@Override
public boolean test(Payload p) {
boolean shouldConsume = cnt++ < expectedCnt;
if (!shouldConsume) {
logger.info(
"Metadata: \n\r{}\n\rData:{}",
p.hasMetadata()
? new ByteBufRepresentation().fallbackToStringOf(p.sliceMetadata())
: "Empty",
new ByteBufRepresentation().fallbackToStringOf(p.sliceData()));
}
return shouldConsume;
}
}
}

0 comments on commit 032bb58

Please sign in to comment.