Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes LoadbalanceTest issues #983

Merged
merged 1 commit into from Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -12,6 +12,8 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
Expand All @@ -26,10 +28,18 @@

public class LoadbalanceTest {

@Test
public void shouldDeliverAllTheRequestsWithRoundRobinStrategy() {
@BeforeEach
void setUp() {
Hooks.onErrorDropped((__) -> {});
rstoyanchev marked this conversation as resolved.
Show resolved Hide resolved
}

@AfterAll
static void afterAll() {
Hooks.resetOnErrorDropped();
}

@Test
public void shouldDeliverAllTheRequestsWithRoundRobinStrategy() throws Exception {
final AtomicInteger counter = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
Expand Down Expand Up @@ -76,21 +86,28 @@ public Mono<Void> fireAndForget(Payload payload) {
});

Assertions.assertThat(counter.get()).isEqualTo(1000);

counter.set(0);
}
}

@Test
public void shouldDeliverAllTheRequestsWithWightedStrategy() {
Hooks.onErrorDropped((__) -> {});

public void shouldDeliverAllTheRequestsWithWeightedStrategy() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);

Mockito.when(rSocketConnectorMock.connect(Mockito.any(ClientTransport.class)))
.then(im -> Mono.just(new TestRSocket(new WeightedRSocket(counter))));
final ClientTransport mockTransport1 = Mockito.mock(ClientTransport.class);
final ClientTransport mockTransport2 = Mockito.mock(ClientTransport.class);

final LoadbalanceTarget target1 = LoadbalanceTarget.from("1", mockTransport1);
final LoadbalanceTarget target2 = LoadbalanceTarget.from("2", mockTransport2);

final WeightedRSocket weightedRSocket1 = new WeightedRSocket(counter);
final WeightedRSocket weightedRSocket2 = new WeightedRSocket(counter);

final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
Mockito.when(rSocketConnectorMock.connect(mockTransport1))
.then(im -> Mono.just(new TestRSocket(weightedRSocket1)));
Mockito.when(rSocketConnectorMock.connect(mockTransport2))
.then(im -> Mono.just(new TestRSocket(weightedRSocket2)));

for (int i = 0; i < 1000; i++) {
final TestPublisher<List<LoadbalanceTarget>> source = TestPublisher.create();
Expand All @@ -99,42 +116,39 @@ public void shouldDeliverAllTheRequestsWithWightedStrategy() {
rSocketConnectorMock,
source,
WeightedLoadbalanceStrategy.builder()
.weightedStatsResolver(r -> (WeightedStats) r)
.weightedStatsResolver(
rsocket ->
((PooledRSocket) rsocket).target() == target1
? weightedRSocket1
: weightedRSocket2)
.build());

RaceTestUtils.race(
() -> {
for (int j = 0; j < 1000; j++) {
Mono.defer(() -> rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE))
.retry()
.subscribe();
.subscribe(aVoid -> {}, Throwable::printStackTrace);
}
},
() -> {
for (int j = 0; j < 100; j++) {
source.next(Collections.emptyList());
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport)));
source.next(
Arrays.asList(
LoadbalanceTarget.from("1", mockTransport),
LoadbalanceTarget.from("2", mockTransport)));
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport)));
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport)));
source.next(Collections.singletonList(target1));
source.next(Arrays.asList(target1, target2)).next(Collections.singletonList(target1));
source.next(Collections.singletonList(target2));
source.next(Collections.emptyList());
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport)));
source.next(Collections.singletonList(target2));
}
});

Assertions.assertThat(counter.get()).isEqualTo(1000);

counter.set(0);
}
}

@Test
public void ensureRSocketIsCleanedFromThePoolIfSourceRSocketIsDisposed() {
Hooks.onErrorDropped((__) -> {});

final AtomicInteger counter = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
Expand Down Expand Up @@ -179,8 +193,6 @@ public Mono<Void> fireAndForget(Payload payload) {

@Test
public void ensureContextIsPropagatedCorrectlyForRequestChannel() {
Hooks.onErrorDropped((__) -> {});

final AtomicInteger counter = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
Expand Down
Expand Up @@ -11,6 +11,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.Offset;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import reactor.core.publisher.Hooks;
Expand All @@ -19,10 +21,18 @@

public class RoundRobinLoadbalanceStrategyTest {

@Test
public void shouldDeliverValuesProportionally() {
@BeforeEach
void setUp() {
Hooks.onErrorDropped((__) -> {});
}

@AfterAll
static void afterAll() {
Hooks.resetOnErrorDropped();
}

@Test
public void shouldDeliverValuesProportionally() {
final AtomicInteger counter1 = new AtomicInteger();
final AtomicInteger counter2 = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
Expand Down Expand Up @@ -71,8 +81,6 @@ public Mono<Void> fireAndForget(Payload payload) {

@Test
public void shouldDeliverValuesToNewlyConnectedSockets() {
Hooks.onErrorDropped((__) -> {});

final AtomicInteger counter1 = new AtomicInteger();
final AtomicInteger counter2 = new AtomicInteger();
final ClientTransport mockTransport1 = Mockito.mock(ClientTransport.class);
Expand Down Expand Up @@ -104,7 +112,7 @@ public Mono<Void> fireAndForget(Payload payload) {
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
}

source.next(Arrays.asList(LoadbalanceTarget.from("1", mockTransport1)));
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport1)));

Assertions.assertThat(counter1.get()).isCloseTo(1000, Offset.offset(1));

Expand All @@ -114,7 +122,7 @@ public Mono<Void> fireAndForget(Payload payload) {
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
}

source.next(Arrays.asList(LoadbalanceTarget.from("1", mockTransport1)));
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport1)));

Assertions.assertThat(counter1.get()).isCloseTo(2000, Offset.offset(1));

Expand All @@ -130,7 +138,7 @@ public Mono<Void> fireAndForget(Payload payload) {
Assertions.assertThat(counter1.get()).isCloseTo(2500, Offset.offset(1));
Assertions.assertThat(counter2.get()).isCloseTo(500, Offset.offset(1));

source.next(Arrays.asList(LoadbalanceTarget.from("2", mockTransport1)));
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport1)));

for (int j = 0; j < 1000; j++) {
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
Expand Down