Skip to content

Commit

Permalink
test added
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Koksharov committed May 9, 2024
1 parent a82bd11 commit 05c5b8a
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 2 deletions.
18 changes: 18 additions & 0 deletions redisson-spring-data/redisson-spring-data-32/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,28 @@
<version>1.4.12</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.github.hazendaz.jmockit</groupId>
<artifactId>jmockit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<redirectTestOutputToFile>true</redirectTestOutputToFile>

<forkCount>4</forkCount>
<reuseForks>true</reuseForks>
<argLine>${argLine} -javaagent:"${settings.localRepository}"/com/github/hazendaz/jmockit/jmockit/1.52.0/jmockit-1.52.0.jar</argLine>
</configuration>
</plugin>

<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.4.1</version>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package org.redisson;

import com.github.dockerjava.api.model.ContainerNetwork;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import org.junit.Before;
import org.junit.BeforeClass;
import org.redisson.api.NatMapper;
Expand All @@ -9,19 +13,22 @@
import org.redisson.misc.RedisURI;
import org.redisson.spring.data.connection.RedissonClusterConnection;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public abstract class BaseTest {

protected static final String NOTIFY_KEYSPACE_EVENTS = "--notify-keyspace-events";

private static final GenericContainer<?> REDIS = createRedis();
protected static final GenericContainer<?> REDIS = createRedis();

protected static final Protocol protocol = Protocol.RESP2;

Expand Down Expand Up @@ -130,6 +137,141 @@ public RedisURI map(RedisURI uri) {
redissonCallback.accept(new RedissonClusterConnection(redissonCluster));
}

protected void withSentinel(BiConsumer<List<GenericContainer<?>>, Config> callback, int slaves) throws InterruptedException {
Network network = Network.newNetwork();

List<GenericContainer<? extends GenericContainer<?>>> nodes = new ArrayList<>();

GenericContainer<?> master =
new GenericContainer<>("bitnami/redis:7.2.4")
.withNetwork(network)
.withEnv("REDIS_REPLICATION_MODE", "master")
.withEnv("ALLOW_EMPTY_PASSWORD", "yes")
.withNetworkAliases("redis")
.withExposedPorts(6379);
master.start();
assert master.getNetwork() == network;
int masterPort = master.getFirstMappedPort();
master.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(masterPort)),
cmd.getExposedPorts()[0]));
});
nodes.add(master);

for (int i = 0; i < slaves; i++) {
GenericContainer<?> slave =
new GenericContainer<>("bitnami/redis:7.2.4")
.withNetwork(network)
.withEnv("REDIS_REPLICATION_MODE", "slave")
.withEnv("REDIS_MASTER_HOST", "redis")
.withEnv("ALLOW_EMPTY_PASSWORD", "yes")
.withNetworkAliases("slave" + i)
.withExposedPorts(6379);
slave.start();
int slavePort = slave.getFirstMappedPort();
slave.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(slavePort)),
cmd.getExposedPorts()[0]));
});
nodes.add(slave);
}

GenericContainer<?> sentinel1 =
new GenericContainer<>("bitnami/redis-sentinel:7.2.4")

.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel1")
.withExposedPorts(26379);
sentinel1.start();
int sentinel1Port = sentinel1.getFirstMappedPort();
sentinel1.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel1Port)),
cmd.getExposedPorts()[0]));
});
nodes.add(sentinel1);

GenericContainer<?> sentinel2 =
new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel2")
.withExposedPorts(26379);
sentinel2.start();
int sentinel2Port = sentinel2.getFirstMappedPort();
sentinel2.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel2Port)),
cmd.getExposedPorts()[0]));
});
nodes.add(sentinel2);

GenericContainer<?> sentinel3 =
new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel3")
.withExposedPorts(26379);
sentinel3.start();
int sentinel3Port = sentinel3.getFirstMappedPort();
sentinel3.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel3Port)),
cmd.getExposedPorts()[0]));
});
nodes.add(sentinel3);

Thread.sleep(5000);

Config config = new Config();
config.setProtocol(protocol);
config.useSentinelServers()
.setPingConnectionInterval(0)
.setNatMapper(new NatMapper() {

@Override
public RedisURI map(RedisURI uri) {
for (GenericContainer<? extends GenericContainer<?>> node : nodes) {
if (node.getContainerInfo() == null) {
continue;
}

Ports.Binding[] mappedPort = node.getContainerInfo().getNetworkSettings()
.getPorts().getBindings().get(new ExposedPort(uri.getPort()));

Map<String, ContainerNetwork> ss = node.getContainerInfo().getNetworkSettings().getNetworks();
ContainerNetwork s = ss.values().iterator().next();

if (uri.getPort() == 6379
&& !uri.getHost().equals("redis")
&& BaseTest.this.getClass() == BaseTest.class
&& node.getNetworkAliases().contains("slave0")) {
return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
}

if (mappedPort != null
&& s.getIpAddress().equals(uri.getHost())) {
return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
}
}
return uri;
}
})
.addSentinelAddress("redis://127.0.0.1:" + sentinel1.getFirstMappedPort())
.setMasterName("mymaster");

callback.accept(nodes, config);

nodes.forEach(n -> n.stop());
network.close();
}

@Before
public void beforeEach() {
redisson.getKeys().flushall();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,26 @@
package org.redisson.spring.data.connection;

import mockit.Invocation;
import mockit.Mock;
import mockit.MockUp;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.client.protocol.CommandData;
import org.redisson.connection.ClientConnectionsEntry;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -13,6 +29,105 @@
*/
public class RedissonStreamTest extends BaseConnectionTest {

@Test
public void testReattachment() throws InterruptedException {
withSentinel((nodes, config) -> {
RedissonClient redissonClient = Redisson.create(config);

RedisConnectionFactory redisConnectionFactory = new RedissonConnectionFactory(redissonClient);

StreamMessageListenerContainer listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, getOptions());


Consumer consumer = Consumer.from("group", "consumer1");
StreamOffset<String> streamOffset = StreamOffset.create("test", ReadOffset.from(">"));
String channel = "test";
AtomicInteger counter = new AtomicInteger();
Subscription subscription = listenerContainer.register(getReadRequest(consumer, streamOffset),
listener(redisConnectionFactory, channel, consumer, counter));

StringRedisTemplate t1 = new StringRedisTemplate(redisConnectionFactory);
t1.opsForStream().createGroup("test", "group");

listenerContainer.start();

AtomicReference<Boolean> invoked = new AtomicReference<>();

new MockUp<ClientConnectionsEntry>() {

@Mock
void reattachBlockingQueue(Invocation inv, CommandData<?, ?> commandData) {
try {
inv.proceed(commandData);
invoked.compareAndSet(null, true);
} catch (Exception e) {
e.printStackTrace();
invoked.set(false);
throw e;
}
}
};

for (int i = 0; i < 3; i++) {
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(redisConnectionFactory);
ObjectRecord<String, String> record = StreamRecords.newRecord()
.ofObject("message")
.withStreamKey(channel);
RecordId recordId = stringRedisTemplate.opsForStream().add(record);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
nodes.get(0).stop();
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

assertThat(invoked.get()).isTrue();

Assertions.assertThat(counter.get()).isEqualTo(3);
listenerContainer.stop();
redissonClient.shutdown();
}, 2);
}

private StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> getOptions() {
return StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.targetType(String.class)
.build();
}

private StreamMessageListenerContainer.StreamReadRequest<String> getReadRequest(Consumer consumer, StreamOffset<String> streamOffset) {
return StreamMessageListenerContainer.StreamReadRequest
.builder(streamOffset)
.consumer(consumer)
.autoAcknowledge(false)
.cancelOnError((err) -> false) // do not stop consuming after error
.build();
}

private <T> StreamListener listener(RedisConnectionFactory redisConnectionFactory, String channel, Consumer consumer, AtomicInteger counter) {

return message -> {
try {
System.out.println("Acknowledging message: " + message.getId());
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(redisConnectionFactory);
stringRedisTemplate.opsForStream().acknowledge(channel, consumer.getGroup(), message.getId());
System.out.println("RECEIVED " + consumer + " " + message);
counter.incrementAndGet();
} catch(Exception e) {
e.printStackTrace();
}
};
}

@Test
public void testPending() {
connection.streamCommands().xGroupCreate("test".getBytes(), "testGroup", ReadOffset.latest(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ protected final void nodeDown(ConnectionsHolder<RedisConnection> connectionsHold
connectionsHolder.getAllConnections().clear();
}

private void reattachBlockingQueue(CommandData<?, ?> commandData) {
void reattachBlockingQueue(CommandData<?, ?> commandData) {
if (commandData == null
|| !commandData.isBlockingCommand()
|| commandData.getPromise().isDone()) {
Expand Down

0 comments on commit 05c5b8a

Please sign in to comment.