Skip to content

Commit

Permalink
feat: Combined scan and live key readers
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed May 20, 2024
1 parent 304c6ed commit eb1ee2b
Show file tree
Hide file tree
Showing 17 changed files with 335 additions and 203 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4.3.2
4.3.3-SNAPSHOT
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ public MemcachedItemReader(Supplier<MemcachedClient> clientSupplier) {
this.clientSupplier = clientSupplier;
}

@Override
protected boolean isFlushing() {
return false;
}

@Override
protected ItemReader<LruMetadumpEntry> reader() {
return new LruMetadumpItemReader(clientSupplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.redis.spring.batch.item.redis.common.Operation;
import com.redis.spring.batch.item.redis.common.OperationExecutor;
import com.redis.spring.batch.item.redis.reader.KeyNotificationItemReader;
import com.redis.spring.batch.item.redis.reader.KeyScanNotificationItemReader;
import com.redis.spring.batch.item.redis.reader.MemKeyValue;
import com.redis.spring.batch.item.redis.reader.MemKeyValueRead;

Expand All @@ -28,13 +29,19 @@

public class RedisItemReader<K, V, T> extends AbstractAsyncItemReader<K, T> {

public enum ReaderMode {
SCAN, LIVE, LIVE_SCAN
}

public static final int DEFAULT_POOL_SIZE = OperationExecutor.DEFAULT_POOL_SIZE;
public static final int DEFAULT_NOTIFICATION_QUEUE_CAPACITY = KeyNotificationItemReader.DEFAULT_QUEUE_CAPACITY;
public static final int DEFAULT_RETRY_LIMIT = MaxAttemptsRetryPolicy.DEFAULT_MAX_ATTEMPTS;
public static final ReaderMode DEFAULT_MODE = ReaderMode.SCAN;

private final RedisCodec<K, V> codec;
private final Operation<K, V, K, T> operation;

private ReaderMode mode = DEFAULT_MODE;
private int poolSize = DEFAULT_POOL_SIZE;
private int notificationQueueCapacity = DEFAULT_NOTIFICATION_QUEUE_CAPACITY;
private ReadFrom readFrom;
Expand Down Expand Up @@ -65,20 +72,46 @@ protected FaultTolerantStepBuilder<K, K> faultTolerant(SimpleStepBuilder<K, K> s
return ftStep;
}

@Override
protected boolean isFlushing() {
switch (mode) {
case LIVE:
case LIVE_SCAN:
return true;
default:
return false;
}
}

@Override
protected ItemReader<K> reader() {
if (isFlushing()) {
KeyNotificationItemReader<K, V> notificationReader = new KeyNotificationItemReader<>(client, codec);
notificationReader.setName(getName() + "-key-notification-reader");
notificationReader.setQueueCapacity(notificationQueueCapacity);
notificationReader.setDatabase(database);
notificationReader.setKeyPattern(keyPattern);
notificationReader.setKeyType(keyType);
notificationReader.setPollTimeout(pollTimeout);
return notificationReader;
switch (mode) {
case LIVE:
return notificationReader();
case LIVE_SCAN:
return scanAndNotificationReader();
default:
return scanReader();
}
ScanIterator<K> scanIterator = ScanIterator.scan(connection().sync(), scanArgs());
return new IteratorItemReader<>(scanIterator);
}

private ItemReader<K> scanAndNotificationReader() {
return new KeyScanNotificationItemReader<>(client, codec, scanReader());
}

private IteratorItemReader<K> scanReader() {
return new IteratorItemReader<>(ScanIterator.scan(connection().sync(), scanArgs()));
}

private KeyNotificationItemReader<K, V> notificationReader() {
KeyNotificationItemReader<K, V> reader = new KeyNotificationItemReader<>(client, codec);
reader.setName(getName() + "-key-notification-reader");
reader.setQueueCapacity(notificationQueueCapacity);
reader.setDatabase(database);
reader.setKeyPattern(keyPattern);
reader.setKeyType(keyType);
reader.setPollTimeout(pollTimeout);
return reader;
}

@Override
Expand Down Expand Up @@ -201,4 +234,12 @@ public void setDatabase(int database) {
this.database = database;
}

public ReaderMode getMode() {
return mode;
}

public void setMode(ReaderMode mode) {
this.mode = mode;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ public KeyComparisonItemReader(RedisItemReader<K, V, MemKeyValue<K, Object>> sou
this.targetReader = targetReader;
}

@Override
protected boolean isFlushing() {
return false;
}

public RedisItemReader<K, V, MemKeyValue<K, Object>> getSourceReader() {
return sourceReader;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.redis.spring.batch.item.redis.reader;

public interface KeyEventListener<K> {

void event(K key, String event, KeyNotificationStatus status);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.redis.spring.batch.item.redis.reader;

public interface KeyNotificationConsumer<K, V> {

void accept(K channel, V message);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.redis.spring.batch.item.redis.reader;

import java.util.function.Function;

import com.redis.spring.batch.item.redis.common.DataType;

public class KeyNotificationDataTypeFunction implements Function<String, DataType> {

@Override
public DataType apply(String event) {
if (event == null) {
return DataType.NONE;
}
String code = event.toLowerCase();
if (code.startsWith("xgroup-")) {
return DataType.STREAM;
}
if (code.startsWith("ts.")) {
return DataType.TIMESERIES;
}
if (code.startsWith("json.")) {
return DataType.JSON;
}
switch (code) {
case "set":
case "setrange":
case "incrby":
case "incrbyfloat":
case "append":
return DataType.STRING;
case "lpush":
case "rpush":
case "rpop":
case "lpop":
case "linsert":
case "lset":
case "lrem":
case "ltrim":
return DataType.LIST;
case "hset":
case "hincrby":
case "hincrbyfloat":
case "hdel":
return DataType.HASH;
case "sadd":
case "spop":
case "sinterstore":
case "sunionstore":
case "sdiffstore":
return DataType.SET;
case "zincr":
case "zadd":
case "zrem":
case "zrembyscore":
case "zrembyrank":
case "zdiffstore":
case "zinterstore":
case "zunionstore":
return DataType.ZSET;
case "xadd":
case "xtrim":
case "xdel":
case "xsetid":
return DataType.STREAM;
default:
return DataType.NONE;
}
}

}

0 comments on commit eb1ee2b

Please sign in to comment.