Skip to content

Commit

Permalink
refactor: Replaced Item with KeyValue in generator
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Apr 30, 2024
1 parent 952bbb2 commit 208ae9d
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public enum DataType {

NONE("none"), HASH("hash"), JSON("ReJSON-RL"), LIST("list"), SET("set"), STREAM("stream"), STRING("string"),
TIMESERIES("TSDB-TYPE"), ZSET("zset");

private static final Map<String, DataType> TYPE_MAP = Stream.of(DataType.values())
.collect(Collectors.toMap(t -> t.getString().toLowerCase(), Function.identity()));

Expand All @@ -39,10 +39,8 @@ public static DataType of(String string) {
private K key;
private String type;
private T value;

/**
* Expiration POSIX time in milliseconds for this key.
*
*/
private long ttl;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.redis.lettucemod.timeseries.Sample;
import com.redis.spring.batch.gen.Item.Type;
import com.redis.spring.batch.KeyValue;
import com.redis.spring.batch.KeyValue.DataType;

import io.lettuce.core.ScoredValue;
import io.lettuce.core.StreamMessage;

public class GeneratorItemReader extends AbstractItemCountingItemStreamItemReader<Item> {
public class GeneratorItemReader extends AbstractItemCountingItemStreamItemReader<KeyValue<String, Object>> {

public static final String DEFAULT_KEYSPACE = "gen";
public static final String DEFAULT_KEY_SEPARATOR = ":";
public static final Range DEFAULT_KEY_RANGE = Range.from(1);
protected static final List<Type> DEFAULT_TYPES = Arrays.asList(Type.values());
protected static final List<DataType> DEFAULT_TYPES = Arrays.asList(DataType.HASH, DataType.JSON, DataType.LIST,
DataType.SET, DataType.STREAM, DataType.STRING, DataType.TIMESERIES, DataType.ZSET);

private static final int LEFT_LIMIT = 48; // numeral '0'
private static final int RIGHT_LIMIT = 122; // letter 'z'
Expand All @@ -46,14 +48,14 @@ public class GeneratorItemReader extends AbstractItemCountingItemStreamItemReade
private CollectionOptions setOptions = new CollectionOptions();
private StringOptions stringOptions = new StringOptions();
private ZsetOptions zsetOptions = new ZsetOptions();
private List<Type> types = DEFAULT_TYPES;
private List<DataType> types = DEFAULT_TYPES;
private int maxItemCount;

public GeneratorItemReader() {
setName(ClassUtils.getShortName(getClass()));
}

public static List<Type> defaultTypes() {
public static List<DataType> defaultTypes() {
return DEFAULT_TYPES;
}

Expand Down Expand Up @@ -113,7 +115,7 @@ public String getKeyspace() {
return keyspace;
}

public List<Type> getTypes() {
public List<DataType> getTypes() {
return types;
}

Expand Down Expand Up @@ -157,11 +159,11 @@ public void setKeyspace(String keyspace) {
this.keyspace = keyspace;
}

public void setTypes(Type... types) {
public void setTypes(DataType... types) {
setTypes(Arrays.asList(types));
}

public void setTypes(List<Type> types) {
public void setTypes(List<DataType> types) {
this.types = types;
}

Expand All @@ -178,7 +180,7 @@ public String key(int index) {
return builder.toString();
}

private Object value(Type type) throws JsonProcessingException {
private Object value(DataType type) throws JsonProcessingException {
switch (type) {
case HASH:
return map(hashOptions);
Expand All @@ -203,7 +205,7 @@ private Object value(Type type) throws JsonProcessingException {

private List<Sample> samples() {
List<Sample> samples = new ArrayList<>();
long size = randomLong(timeSeriesOptions.getSampleCount());
int size = randomInt(timeSeriesOptions.getSampleCount());
long startTime = timeSeriesStartTime();
for (int index = 0; index < size; index++) {
long time = startTime + getCurrentItemCount() + index;
Expand All @@ -228,27 +230,27 @@ private ScoredValue<String> scoredValue(String value) {
private Collection<StreamMessage<String, String>> streamMessages() {
String key = key();
Collection<StreamMessage<String, String>> messages = new ArrayList<>();
for (int elementIndex = 0; elementIndex < randomLong(streamOptions.getMessageCount()); elementIndex++) {
for (int elementIndex = 0; elementIndex < randomInt(streamOptions.getMessageCount()); elementIndex++) {
messages.add(new StreamMessage<>(key, null, map(streamOptions.getBodyOptions())));
}
return messages;
}

private Map<String, String> map(MapOptions options) {
Map<String, String> hash = new HashMap<>();
for (int index = 0; index < randomLong(options.getFieldCount()); index++) {
for (int index = 0; index < randomInt(options.getFieldCount()); index++) {
int fieldIndex = index + 1;
hash.put("field" + fieldIndex, string(options.getFieldLength()));
}
return hash;
}

private String string(Range range) {
long length = randomLong(range);
int length = randomInt(range);
return string(length);
}

public static String string(long length) {
public static String string(int length) {
return random.ints(LEFT_LIMIT, RIGHT_LIMIT + 1).filter(i -> (i <= 57 || i >= 65) && (i <= 90 || i >= 97))
.limit(length).collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
Expand All @@ -257,18 +259,18 @@ public static String string(long length) {
private List<String> members(CollectionOptions options) {
List<String> members = new ArrayList<>();
int spread = options.getMemberRange().getMax() - options.getMemberRange().getMin() + 1;
for (int index = 0; index < randomLong(options.getMemberCount()); index++) {
for (int index = 0; index < randomInt(options.getMemberCount()); index++) {
int memberId = options.getMemberRange().getMin() + index % spread;
members.add(String.valueOf(memberId));
}
return members;
}

private long randomLong(Range range) {
private int randomInt(Range range) {
if (range.getMin() == range.getMax()) {
return range.getMin();
}
return ThreadLocalRandom.current().nextLong(range.getMin(), range.getMax());
return ThreadLocalRandom.current().nextInt(range.getMin(), range.getMax());
}

private double randomDouble(Range range) {
Expand All @@ -289,11 +291,11 @@ protected void doClose() throws Exception {
}

@Override
protected Item doRead() throws JsonProcessingException {
Item struct = new Item();
protected KeyValue<String, Object> doRead() throws JsonProcessingException {
KeyValue<String, Object> struct = new KeyValue<>();
struct.setKey(key());
Type type = types.get(getCurrentItemCount() % types.size());
struct.setType(type);
DataType type = types.get(getCurrentItemCount() % types.size());
struct.setType(type.getString());
struct.setValue(value(type));
if (expiration != null) {
struct.setTtl(ttl());
Expand All @@ -302,7 +304,7 @@ protected Item doRead() throws JsonProcessingException {
}

private long ttl() {
return System.currentTimeMillis() + randomLong(expiration);
return System.currentTimeMillis() + randomInt(expiration);
}

@Override
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.function.FunctionItemProcessor;
import org.springframework.retry.policy.MaxAttemptsRetryPolicy;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
Expand All @@ -45,15 +44,14 @@
import com.redis.lettucemod.cluster.RedisModulesClusterClient;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.spring.batch.KeyValue;
import com.redis.spring.batch.KeyValue.DataType;
import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.RedisItemReader.ReaderMode;
import com.redis.spring.batch.RedisItemWriter;
import com.redis.spring.batch.common.FlushingStepBuilder;
import com.redis.spring.batch.common.JobFactory;
import com.redis.spring.batch.common.PollableItemReader;
import com.redis.spring.batch.gen.GeneratorItemReader;
import com.redis.spring.batch.gen.Item;
import com.redis.spring.batch.gen.ItemToKeyValueFunction;
import com.redis.spring.batch.gen.Range;
import com.redis.spring.batch.gen.StreamOptions;
import com.redis.spring.batch.reader.MemKeyValue;
Expand Down Expand Up @@ -81,9 +79,6 @@ public abstract class AbstractTestBase {
public static final Duration DEFAULT_AWAIT_POLL_INTERVAL = Duration.ofMillis(1);
public static final Duration DEFAULT_AWAIT_TIMEOUT = Duration.ofSeconds(3);

protected static final ItemProcessor<Item, KeyValue<String, Object>> genItemProcessor = new FunctionItemProcessor<>(
new ItemToKeyValueFunction<>(KeyValue::new));

private int chunkSize = DEFAULT_CHUNK_SIZE;
private Duration idleTimeout = DEFAULT_IDLE_TIMEOUT;
private Duration pollDelay = DEFAULT_POLL_DELAY;
Expand Down Expand Up @@ -167,7 +162,7 @@ public static void assertDbNotEmpty(RedisModulesCommands<String, String> command
Assertions.assertTrue(commands.dbsize() > 0, "Redis database is empty");
}

protected GeneratorItemReader generator(int count, Item.Type... types) {
protected GeneratorItemReader generator(int count, DataType... types) {
GeneratorItemReader gen = new GeneratorItemReader();
gen.setMaxItemCount(count);
if (!ObjectUtils.isEmpty(types)) {
Expand Down Expand Up @@ -339,11 +334,6 @@ protected void generate(TestInfo info, AbstractRedisClient client, GeneratorItem
run(testInfo, reader, writer);
}

protected void run(TestInfo info, GeneratorItemReader reader, ItemWriter<KeyValue<String, Object>> writer)
throws JobExecutionException, TimeoutException, InterruptedException {
run(info, reader, genItemProcessor, writer);
}

protected <T> JobExecution run(TestInfo info, ItemReader<? extends T> reader, ItemWriter<T> writer)
throws JobExecutionException, TimeoutException, InterruptedException {
return run(info, reader, null, writer);
Expand Down Expand Up @@ -380,7 +370,7 @@ protected <I, S extends I, O> FlushingStepBuilder<S, O> flushingStep(TestInfo in

protected void generateStreams(TestInfo info, int messageCount)
throws JobExecutionException, TimeoutException, InterruptedException {
GeneratorItemReader gen = generator(3, Item.Type.STREAM);
GeneratorItemReader gen = generator(3, DataType.STREAM);
StreamOptions streamOptions = new StreamOptions();
streamOptions.setMessageCount(Range.of(messageCount));
gen.setStreamOptions(streamOptions);
Expand Down
Loading

0 comments on commit 208ae9d

Please sign in to comment.