Skip to content

Commit

Permalink
test: Moved writeStream to stack tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Apr 29, 2024
1 parent 41be07d commit 344bbb4
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import com.redis.spring.batch.writer.Sugadd;
import com.redis.spring.batch.writer.TsAdd;
import com.redis.spring.batch.writer.TsAddAll;
import com.redis.spring.batch.writer.Xadd;

import io.lettuce.core.Consumer;
import io.lettuce.core.GeoArgs;
Expand Down Expand Up @@ -808,29 +807,6 @@ void writeWait(TestInfo info) throws Exception {
assertEquals("Insufficient replication level (0/1)", exceptions.get(0).getCause().getCause().getMessage());
}

@Test
void writeStream(TestInfo info) throws Exception {
String stream = "stream:0";
List<Map<String, String>> messages = new ArrayList<>();
for (int index = 0; index < 100; index++) {
Map<String, String> body = new HashMap<>();
body.put("field1", "value1");
body.put("field2", "value2");
messages.add(body);
}
ListItemReader<Map<String, String>> reader = new ListItemReader<>(messages);
Xadd<String, String, Map<String, String>> xadd = new Xadd<>(keyFunction(stream), Function.identity());
RedisItemWriter<String, String, Map<String, String>> writer = writer(xadd);
run(info, reader, writer);
Assertions.assertEquals(messages.size(), redisCommands.xlen(stream));
List<StreamMessage<String, String>> xrange = redisCommands.xrange(stream,
io.lettuce.core.Range.create("-", "+"));
for (int index = 0; index < xrange.size(); index++) {
StreamMessage<String, String> message = xrange.get(index);
Assertions.assertEquals(messages.get(index), message.getBody());
}
}

private <K, V, T> void replicateLive(TestInfo info, RedisItemReader<K, V, ? extends T> reader,
RedisItemWriter<K, V, T> writer, RedisItemReader<K, V, ? extends T> liveReader,
RedisItemWriter<K, V, T> liveWriter) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,29 @@ void writeSet(TestInfo info) throws Exception {
assertEquals(values.size(), redisCommands.scard(key));
}

@Test
void writeStream(TestInfo info) throws Exception {
String stream = "stream:0";
List<Map<String, String>> messages = new ArrayList<>();
for (int index = 0; index < 100; index++) {
Map<String, String> body = new HashMap<>();
body.put("field1", "value1");
body.put("field2", "value2");
messages.add(body);
}
ListItemReader<Map<String, String>> reader = new ListItemReader<>(messages);
Xadd<String, String, Map<String, String>> xadd = new Xadd<>(keyFunction(stream), Function.identity());
RedisItemWriter<String, String, Map<String, String>> writer = writer(xadd);
run(info, reader, writer);
Assertions.assertEquals(messages.size(), redisCommands.xlen(stream));
List<StreamMessage<String, String>> xrange = redisCommands.xrange(stream,
io.lettuce.core.Range.create("-", "+"));
for (int index = 0; index < xrange.size(); index++) {
StreamMessage<String, String> message = xrange.get(index);
Assertions.assertEquals(messages.get(index), message.getBody());
}
}

private MapOptions hashOptions(Range fieldCount) {
MapOptions options = new MapOptions();
options.setFieldCount(fieldCount);
Expand Down Expand Up @@ -550,4 +573,4 @@ void writeStructMerge(TestInfo info) throws Exception {
assertEquals(10, actual.size());
}

}
}

0 comments on commit 344bbb4

Please sign in to comment.