Skip to content

Commit

Permalink
Fix for LoadbalanceTest concurrency issue
Browse files Browse the repository at this point in the history
Signed-off-by: Rossen Stoyanchev <rstoyanchev@vmware.com>
  • Loading branch information
rstoyanchev committed Feb 19, 2021
1 parent 9baf974 commit 27347dc
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 37 deletions.
Expand Up @@ -6,12 +6,9 @@
import io.rsocket.transport.ClientTransport;
import io.rsocket.util.EmptyPayload;
import io.rsocket.util.RSocketProxy;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
Expand All @@ -24,12 +21,26 @@
import reactor.test.util.RaceTestUtils;
import reactor.util.context.Context;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class LoadbalanceTest {

@Test
public void shouldDeliverAllTheRequestsWithRoundRobinStrategy() {
@BeforeEach
void setUp() {
Hooks.onErrorDropped((__) -> {});
}

@AfterAll
static void afterAll() {
Hooks.resetOnErrorDropped();
}

@Test
public void shouldDeliverAllTheRequestsWithRoundRobinStrategy() throws Exception {
final AtomicInteger counter = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
Expand Down Expand Up @@ -76,21 +87,28 @@ public Mono<Void> fireAndForget(Payload payload) {
});

Assertions.assertThat(counter.get()).isEqualTo(1000);

counter.set(0);
}
}

@Test
public void shouldDeliverAllTheRequestsWithWightedStrategy() {
Hooks.onErrorDropped((__) -> {});

public void shouldDeliverAllTheRequestsWithWeightedStrategy() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);

Mockito.when(rSocketConnectorMock.connect(Mockito.any(ClientTransport.class)))
.then(im -> Mono.just(new TestRSocket(new WeightedRSocket(counter))));
final ClientTransport mockTransport1 = Mockito.mock(ClientTransport.class);
final ClientTransport mockTransport2 = Mockito.mock(ClientTransport.class);

final LoadbalanceTarget target1 = LoadbalanceTarget.from("1", mockTransport1);
final LoadbalanceTarget target2 = LoadbalanceTarget.from("2", mockTransport2);

final WeightedRSocket weightedRSocket1 = new WeightedRSocket(counter);
final WeightedRSocket weightedRSocket2 = new WeightedRSocket(counter);

final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
Mockito.when(rSocketConnectorMock.connect(mockTransport1))
.then(im -> Mono.just(new TestRSocket(weightedRSocket1)));
Mockito.when(rSocketConnectorMock.connect(mockTransport2))
.then(im -> Mono.just(new TestRSocket(weightedRSocket2)));

for (int i = 0; i < 1000; i++) {
final TestPublisher<List<LoadbalanceTarget>> source = TestPublisher.create();
Expand All @@ -99,42 +117,39 @@ public void shouldDeliverAllTheRequestsWithWightedStrategy() {
rSocketConnectorMock,
source,
WeightedLoadbalanceStrategy.builder()
.weightedStatsResolver(r -> (WeightedStats) r)
.weightedStatsResolver(
rsocket ->
((PooledRSocket) rsocket).target() == target1
? weightedRSocket1
: weightedRSocket2)
.build());

RaceTestUtils.race(
() -> {
for (int j = 0; j < 1000; j++) {
Mono.defer(() -> rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE))
.retry()
.subscribe();
.subscribe(aVoid -> {}, Throwable::printStackTrace);
}
},
() -> {
for (int j = 0; j < 100; j++) {
source.next(Collections.emptyList());
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport)));
source.next(
Arrays.asList(
LoadbalanceTarget.from("1", mockTransport),
LoadbalanceTarget.from("2", mockTransport)));
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport)));
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport)));
source.next(Collections.singletonList(target1));
source.next(Arrays.asList(target1, target2)).next(Collections.singletonList(target1));
source.next(Collections.singletonList(target2));
source.next(Collections.emptyList());
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport)));
source.next(Collections.singletonList(target2));
}
});

Assertions.assertThat(counter.get()).isEqualTo(1000);

counter.set(0);
}
}

@Test
public void ensureRSocketIsCleanedFromThePoolIfSourceRSocketIsDisposed() {
Hooks.onErrorDropped((__) -> {});

final AtomicInteger counter = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
Expand Down Expand Up @@ -179,8 +194,6 @@ public Mono<Void> fireAndForget(Payload payload) {

@Test
public void ensureContextIsPropagatedCorrectlyForRequestChannel() {
Hooks.onErrorDropped((__) -> {});

final AtomicInteger counter = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
Expand Down
Expand Up @@ -11,6 +11,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.Offset;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import reactor.core.publisher.Hooks;
Expand All @@ -19,10 +21,18 @@

public class RoundRobinLoadbalanceStrategyTest {

@Test
public void shouldDeliverValuesProportionally() {
@BeforeEach
void setUp() {
Hooks.onErrorDropped((__) -> {});
}

@AfterAll
static void afterAll() {
Hooks.resetOnErrorDropped();
}

@Test
public void shouldDeliverValuesProportionally() {
final AtomicInteger counter1 = new AtomicInteger();
final AtomicInteger counter2 = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
Expand Down Expand Up @@ -71,8 +81,6 @@ public Mono<Void> fireAndForget(Payload payload) {

@Test
public void shouldDeliverValuesToNewlyConnectedSockets() {
Hooks.onErrorDropped((__) -> {});

final AtomicInteger counter1 = new AtomicInteger();
final AtomicInteger counter2 = new AtomicInteger();
final ClientTransport mockTransport1 = Mockito.mock(ClientTransport.class);
Expand Down Expand Up @@ -104,7 +112,7 @@ public Mono<Void> fireAndForget(Payload payload) {
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
}

source.next(Arrays.asList(LoadbalanceTarget.from("1", mockTransport1)));
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport1)));

Assertions.assertThat(counter1.get()).isCloseTo(1000, Offset.offset(1));

Expand All @@ -114,7 +122,7 @@ public Mono<Void> fireAndForget(Payload payload) {
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
}

source.next(Arrays.asList(LoadbalanceTarget.from("1", mockTransport1)));
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport1)));

Assertions.assertThat(counter1.get()).isCloseTo(2000, Offset.offset(1));

Expand All @@ -130,7 +138,7 @@ public Mono<Void> fireAndForget(Payload payload) {
Assertions.assertThat(counter1.get()).isCloseTo(2500, Offset.offset(1));
Assertions.assertThat(counter2.get()).isCloseTo(500, Offset.offset(1));

source.next(Arrays.asList(LoadbalanceTarget.from("2", mockTransport1)));
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport1)));

for (int j = 0; j < 1000; j++) {
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
Expand Down

0 comments on commit 27347dc

Please sign in to comment.