Skip to content

Commit

Permalink
deps: Upgraded spring batch redis
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Apr 24, 2024
1 parent 601e00d commit 5535c50
Show file tree
Hide file tree
Showing 69 changed files with 2,019 additions and 1,237 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.jdbc.core.ColumnMapRowMapper;

import com.redis.riot.core.AbstractImport;
import com.redis.riot.core.AbstractMapImport;

public class DatabaseImport extends AbstractImport {
public class DatabaseImport extends AbstractMapImport {

public static final int DEFAULT_FETCH_SIZE = AbstractCursorItemReader.VALUE_NOT_SET;
public static final int DEFAULT_MAX_RESULT_SET_ROWS = AbstractCursorItemReader.VALUE_NOT_SET;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,14 @@
import org.springframework.expression.Expression;
import org.springframework.util.Assert;

import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.api.sync.RediSearchCommands;
import com.redis.lettucemod.search.Field;
import com.redis.lettucemod.search.IndexInfo;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.riot.core.AbstractImport;
import com.redis.riot.core.AbstractMapImport;
import com.redis.riot.core.RiotUtils;
import com.redis.spring.batch.gen.Range;

public class FakerImport extends AbstractImport {
public class FakerImport extends AbstractMapImport {

public static final int DEFAULT_COUNT = 1000;
public static final Locale DEFAULT_LOCALE = Locale.ENGLISH;
Expand Down Expand Up @@ -84,13 +82,9 @@ private Map<String, Expression> fields() {

private Map<String, Expression> searchIndexFields() {
Map<String, Expression> searchFields = new LinkedHashMap<>();
try (StatefulRedisModulesConnection<String, String> connection = RedisModulesUtils
.connection(getRedisClient())) {
RediSearchCommands<String, String> commands = connection.sync();
IndexInfo info = RedisModulesUtils.indexInfo(commands.ftInfo(searchIndex));
for (Field<String> field : info.getFields()) {
searchFields.put(field.getName(), RiotUtils.parse(expression(field)));
}
IndexInfo info = RedisModulesUtils.indexInfo(redisCommands.ftInfo(searchIndex));
for (Field<String> field : info.getFields()) {
searchFields.put(field.getName(), RiotUtils.parse(expression(field)));
}
return searchFields;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ private JsonObjectMarshaller<KeyValue<String, Object>> xmlMarshaller() {
@Override
protected Job job() {
RedisItemReader<String, String, KeyValue<String, Object>> reader = RedisItemReader.struct();
reader.setClient(getRedisClient());
configureReader(reader);
configure(reader);
ItemProcessor<KeyValue<String, Object>, KeyValue<String, Object>> processor = processor(StringCodec.UTF8);
return jobBuilder().start(step(getName(), reader, writer()).processor(processor).build()).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
import org.springframework.batch.item.ItemReader;
import org.springframework.core.io.Resource;

import com.redis.riot.core.AbstractStructImport;
import com.redis.riot.core.AbstractImport;
import com.redis.spring.batch.KeyValue;
import com.redis.spring.batch.RedisItemWriter;

public class FileDumpImport extends AbstractStructImport {
public class FileDumpImport extends AbstractImport {

private List<String> files;

Expand Down Expand Up @@ -46,7 +47,9 @@ protected Job job() {
}
List<TaskletStep> steps = new ArrayList<>();
for (Resource resource : resources) {
steps.add(step(resource.getFilename(), reader(resource), writer()).build());
RedisItemWriter<String, String, KeyValue<String, Object>> writer = RedisItemWriter.struct();
configure(writer);
steps.add(step(resource.getFilename(), reader(resource), writer).build());
}
Iterator<TaskletStep> iterator = steps.iterator();
SimpleJobBuilder job = jobBuilder().start(iterator.next());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;

import com.redis.riot.core.AbstractImport;
import com.redis.riot.core.AbstractMapImport;
import com.redis.riot.core.RiotUtils;
import com.redis.riot.core.function.RegexNamedGroupFunction;

public class FileImport extends AbstractImport {
public class FileImport extends AbstractMapImport {

public static final String DEFAULT_CONTINUATION_STRING = "\\";
public static final Character DEFAULT_QUOTE_CHARACTER = DelimitedLineTokenizer.DEFAULT_QUOTE_CHARACTER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import java.util.Map;
import java.util.Set;

import org.springframework.util.StringUtils;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -17,7 +15,7 @@
import com.fasterxml.jackson.databind.node.LongNode;
import com.redis.lettucemod.timeseries.Sample;
import com.redis.spring.batch.KeyValue;
import com.redis.spring.batch.KeyValue.Type;
import com.redis.spring.batch.KeyValue.DataType;

import io.lettuce.core.ScoredValue;
import io.lettuce.core.StreamMessage;
Expand Down Expand Up @@ -55,10 +53,7 @@ public KeyValue<String, Object> deserialize(JsonParser p, DeserializationContext
}
JsonNode typeNode = node.get(TYPE);
if (typeNode != null) {
String typeString = typeNode.asText();
if (StringUtils.hasLength(typeString)) {
keyValue.setType(Type.valueOf(typeString.toUpperCase()));
}
keyValue.setType(typeNode.asText());
}
LongNode ttlNode = (LongNode) node.get(TTL);
if (ttlNode != null) {
Expand All @@ -68,11 +63,17 @@ public KeyValue<String, Object> deserialize(JsonParser p, DeserializationContext
if (memUsageNode != null) {
keyValue.setMem(memUsageNode.asLong());
}
keyValue.setValue(value(keyValue.getType(), node.get(VALUE), ctxt));
JsonNode valueNode = node.get(VALUE);
if (valueNode != null) {
DataType type = KeyValue.type(keyValue);
if (type != null) {
keyValue.setValue(value(type, valueNode, ctxt));
}
}
return keyValue;
}

private Object value(Type type, JsonNode node, DeserializationContext ctxt) throws IOException {
private Object value(DataType type, JsonNode node, DeserializationContext ctxt) throws IOException {
switch (type) {
case STREAM:
return streamMessages((ArrayNode) node, ctxt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,17 @@ abstract class AbstractFileTests extends AbstractTestBase {
@SuppressWarnings("unchecked")
@Test
void fileImportJSON(TestInfo info) throws Exception {
FileImport executable = new FileImport();
executable.setRedisClientOptions(redisClientOptions());
executable.setFiles(BEERS_JSON_URL);
HsetBuilder hsetBuilder = new HsetBuilder();
hsetBuilder.setKeyspace(KEYSPACE);
hsetBuilder.setKeyFields(ID);
executable.setOperations(hsetBuilder.build());
executable.setName(name(info));
executable.execute();
try (FileImport executable = new FileImport()) {
executable.setRedisClientOptions(redisClientOptions());
executable.setFiles(BEERS_JSON_URL);
HsetBuilder hsetBuilder = new HsetBuilder();
hsetBuilder.setKeyspace(KEYSPACE);
hsetBuilder.setKeyFields(ID);
executable.setOperations(hsetBuilder.build());
executable.setName(name(info));
executable.afterPropertiesSet();
executable.call();
}
List<String> keys = redisCommands.keys("*");
assertEquals(216, keys.size());
for (String key : keys) {
Expand All @@ -60,16 +62,18 @@ private RedisClientOptions redisClientOptions() {
@SuppressWarnings("unchecked")
@Test
void fileApiImportCSV(TestInfo info) throws Exception {
FileImport executable = new FileImport();
executable.setRedisClientOptions(redisClientOptions());
executable.setFiles("https://storage.googleapis.com/jrx/beers.csv");
executable.setHeader(true);
executable.setName(name(info));
HsetBuilder hsetBuilder = new HsetBuilder();
hsetBuilder.setKeyspace(KEYSPACE);
hsetBuilder.setKeyFields(ID);
executable.setOperations(hsetBuilder.build());
executable.execute();
try (FileImport executable = new FileImport()) {
executable.setRedisClientOptions(redisClientOptions());
executable.setFiles("https://storage.googleapis.com/jrx/beers.csv");
executable.setHeader(true);
executable.setName(name(info));
HsetBuilder hsetBuilder = new HsetBuilder();
hsetBuilder.setKeyspace(KEYSPACE);
hsetBuilder.setKeyFields(ID);
executable.setOperations(hsetBuilder.build());
executable.afterPropertiesSet();
executable.call();
}
List<String> keys = redisCommands.keys("*");
assertEquals(2410, keys.size());
for (String key : keys) {
Expand All @@ -87,16 +91,18 @@ void fileApiFileExpansion(TestInfo info) throws Exception {
IOUtils.copy(getClass().getClassLoader().getResourceAsStream("beers1.csv"), new FileOutputStream(file1));
File file2 = temp.resolve("beers2.csv").toFile();
IOUtils.copy(getClass().getClassLoader().getResourceAsStream("beers2.csv"), new FileOutputStream(file2));
FileImport executable = new FileImport();
executable.setRedisClientOptions(redisClientOptions());
executable.setFiles(temp.resolve("*.csv").toFile().getPath());
executable.setHeader(true);
executable.setName(name(info));
HsetBuilder hsetBuilder = new HsetBuilder();
hsetBuilder.setKeyspace(KEYSPACE);
hsetBuilder.setKeyFields(ID);
executable.setOperations(hsetBuilder.build());
executable.execute();
try (FileImport executable = new FileImport()) {
executable.setRedisClientOptions(redisClientOptions());
executable.setFiles(temp.resolve("*.csv").toFile().getPath());
executable.setHeader(true);
executable.setName(name(info));
HsetBuilder hsetBuilder = new HsetBuilder();
hsetBuilder.setKeyspace(KEYSPACE);
hsetBuilder.setKeyFields(ID);
executable.setOperations(hsetBuilder.build());
executable.afterPropertiesSet();
executable.call();
}
List<String> keys = redisCommands.keys("*");
assertEquals(2410, keys.size());
for (String key : keys) {
Expand All @@ -109,17 +115,19 @@ void fileApiFileExpansion(TestInfo info) throws Exception {
@SuppressWarnings("unchecked")
@Test
void fileImportCSVMultiThreaded(TestInfo info) throws Exception {
FileImport executable = new FileImport();
executable.setRedisClientOptions(redisClientOptions());
executable.setFiles("https://storage.googleapis.com/jrx/beers.csv");
executable.setHeader(true);
executable.setThreads(3);
executable.setName(name(info));
HsetBuilder hset = new HsetBuilder();
hset.setKeyspace(KEYSPACE);
hset.setKeyFields(ID);
executable.setOperations(hset.build());
executable.execute();
try (FileImport executable = new FileImport()) {
executable.setRedisClientOptions(redisClientOptions());
executable.setFiles("https://storage.googleapis.com/jrx/beers.csv");
executable.setHeader(true);
executable.setThreads(3);
executable.setName(name(info));
HsetBuilder hset = new HsetBuilder();
hset.setKeyspace(KEYSPACE);
hset.setKeyFields(ID);
executable.setOperations(hset.build());
executable.afterPropertiesSet();
executable.call();
}
List<String> keys = redisCommands.keys("*");
assertEquals(2410, keys.size());
for (String key : keys) {
Expand All @@ -132,15 +140,17 @@ void fileImportCSVMultiThreaded(TestInfo info) throws Exception {
@SuppressWarnings("unchecked")
@Test
void fileImportJSONL(TestInfo info) throws Exception {
FileImport executable = new FileImport();
executable.setRedisClientOptions(redisClientOptions());
executable.setFiles(BEERS_JSONL_URL);
HsetBuilder hsetBuilder = new HsetBuilder();
hsetBuilder.setKeyspace(KEYSPACE);
hsetBuilder.setKeyFields(ID);
executable.setOperations(hsetBuilder.build());
executable.setName(name(info));
executable.execute();
try (FileImport executable = new FileImport()) {
executable.setRedisClientOptions(redisClientOptions());
executable.setFiles(BEERS_JSONL_URL);
HsetBuilder hsetBuilder = new HsetBuilder();
hsetBuilder.setKeyspace(KEYSPACE);
hsetBuilder.setKeyFields(ID);
executable.setOperations(hsetBuilder.build());
executable.setName(name(info));
executable.afterPropertiesSet();
executable.call();
}
List<String> keys = redisCommands.keys("*");
assertEquals(6, keys.size());
for (String key : keys) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.fasterxml.jackson.databind.node.DoubleNode;
import com.redis.lettucemod.timeseries.Sample;
import com.redis.spring.batch.KeyValue;
import com.redis.spring.batch.KeyValue.Type;
import com.redis.spring.batch.KeyValue.DataType;
import com.redis.spring.batch.gen.GeneratorItemReader;
import com.redis.spring.batch.gen.ItemToKeyValueFunction;
import com.redis.spring.batch.test.AbstractTestBase;
Expand Down Expand Up @@ -59,7 +59,7 @@ void serialize() throws JsonProcessingException {
ts.setKey(key);
ts.setMem(memoryUsage);
ts.setTtl(ttl);
ts.setType(Type.TIMESERIES);
ts.setType(DataType.TIMESERIES.getString());
Sample sample1 = Sample.of(Instant.now().toEpochMilli(), 123.456);
Sample sample2 = Sample.of(Instant.now().toEpochMilli() + 1000, 456.123);
ts.setValue(Arrays.asList(sample1, sample2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import com.redis.riot.file.resource.XmlResourceItemWriter;
import com.redis.riot.file.resource.XmlResourceItemWriterBuilder;
import com.redis.spring.batch.KeyValue;
import com.redis.spring.batch.KeyValue.Type;
import com.redis.spring.batch.KeyValue.DataType;

class XmlItemWriterTests {

Expand All @@ -37,13 +37,13 @@ void test() throws Exception {
KeyValue<String, Object> item1 = new KeyValue<>();
item1.setKey("key1");
item1.setTtl(123l);
item1.setType(Type.HASH);
item1.setType(DataType.HASH.getString());
Map<String, String> hash1 = Map.of("field1", "value1", "field2", "value2");
item1.setValue(hash1);
KeyValue<String, Object> item2 = new KeyValue<>();
item2.setKey("key2");
item2.setTtl(456l);
item2.setType(Type.STREAM);
item2.setType(DataType.STREAM.getString());
Map<String, String> hash2 = Map.of("field1", "value1", "field2", "value2");
item2.setValue(hash2);
writer.write(Chunk.of(item1, item2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.function.FunctionItemProcessor;

import com.redis.riot.core.AbstractStructImport;
import com.redis.riot.core.AbstractImport;
import com.redis.spring.batch.KeyValue;
import com.redis.spring.batch.RedisItemWriter;
import com.redis.spring.batch.gen.CollectionOptions;
import com.redis.spring.batch.gen.GeneratorItemReader;
import com.redis.spring.batch.gen.Item;
Expand All @@ -20,13 +21,10 @@
import com.redis.spring.batch.gen.TimeSeriesOptions;
import com.redis.spring.batch.gen.ZsetOptions;

public class GeneratorImport extends AbstractStructImport {
public class GeneratorImport extends AbstractImport {

public static final int DEFAULT_COUNT = 1000;

private static final ItemProcessor<Item, KeyValue<String, Object>> PROCESSOR = new FunctionItemProcessor<>(
new ItemToKeyValueFunction());

private int count = DEFAULT_COUNT;
private String keyspace = GeneratorItemReader.DEFAULT_KEYSPACE;
private Range keyRange = GeneratorItemReader.DEFAULT_KEY_RANGE;
Expand All @@ -43,7 +41,13 @@ public class GeneratorImport extends AbstractStructImport {

@Override
protected Job job() {
return jobBuilder().start(step(getName(), reader(), writer()).processor(PROCESSOR).build()).build();
RedisItemWriter<String, String, KeyValue<String, Object>> writer = RedisItemWriter.struct();
configure(writer);
return jobBuilder().start(step(getName(), reader(), writer).processor(processor()).build()).build();
}

private ItemProcessor<? super Item, ? extends KeyValue<String, Object>> processor() {
return new FunctionItemProcessor<>(new ItemToKeyValueFunction());
}

private GeneratorItemReader reader() {
Expand Down

0 comments on commit 5535c50

Please sign in to comment.