Skip to content

Commit

Permalink
refactor!: Added DataType enum
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Apr 24, 2024
1 parent 7e3b425 commit caff3a7
Show file tree
Hide file tree
Showing 15 changed files with 300 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,41 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.springframework.util.StringUtils;

public class KeyValue<K, T> {

public enum Type {
public enum DataType {

HASH("hash"), JSON("ReJSON-RL"), LIST("list"), SET("set"), STREAM("stream"), STRING("string"),
NONE("none"), HASH("hash"), JSON("ReJSON-RL"), LIST("list"), SET("set"), STREAM("stream"), STRING("string"),
TIMESERIES("TSDB-TYPE"), ZSET("zset");

private static final Function<Type, String> DATATYPE_STRING = Type::getCode;
private static final UnaryOperator<String> TO_LOWER_CASE = String::toLowerCase;
private static final Map<String, Type> TYPE_MAP = Stream.of(Type.values())
.collect(Collectors.toMap(DATATYPE_STRING.andThen(TO_LOWER_CASE), Function.identity()));
private static final Map<String, DataType> TYPE_MAP = Stream.of(DataType.values())
.collect(Collectors.toMap(t -> t.getString().toLowerCase(), Function.identity()));

private final String code;
private final String string;

private Type(String string) {
this.code = string;
private DataType(String string) {
this.string = string;
}

public String getCode() {
return code;
public String getString() {
return string;
}

public static Type of(String string) {
return TYPE_MAP.get(TO_LOWER_CASE.apply(string));
public static DataType of(String string) {
return TYPE_MAP.get(string.toLowerCase());
}

}

public static final long TTL_NO_KEY = -2;

private K key;
private Type type;
private String type;
private T value;

/**
Expand Down Expand Up @@ -73,11 +72,11 @@ public void setKey(K key) {
this.key = key;
}

public Type getType() {
public String getType() {
return type;
}

public void setType(Type type) {
public void setType(String type) {
this.type = type;
}

Expand Down Expand Up @@ -105,15 +104,12 @@ public void setMem(long bytes) {
this.mem = bytes;
}

public boolean exists() {
return type != null && ttl != KeyValue.TTL_NO_KEY;
}

@Override
public int hashCode() {
return Objects.hash(key, mem, ttl, type, value);
}

@SuppressWarnings("rawtypes")
@Override
public boolean equals(Object obj) {
if (this == obj)
Expand All @@ -122,15 +118,32 @@ public boolean equals(Object obj) {
return false;
if (getClass() != obj.getClass())
return false;
KeyValue<?, ?> other = (KeyValue<?, ?>) obj;
return Objects.equals(key, other.key) && mem == other.mem && ttl == other.ttl && type == other.type
&& Objects.equals(value, other.value);
KeyValue other = (KeyValue) obj;
return Objects.equals(key, other.key) && mem == other.mem && ttl == other.ttl
&& Objects.equals(type, other.type) && Objects.equals(value, other.value);
}

@Override
public String toString() {
return "KeyValue [key=" + key + ", type=" + type + ", value=" + value + ", ttl=" + ttl + ", memoryUsage=" + mem
+ "]";
public static boolean exists(KeyValue<?, ?> kv) {
return kv.getTtl() != TTL_NO_KEY && type(kv) != DataType.NONE;
}

public static boolean hasTtl(KeyValue<?, ?> kv) {
return kv.getTtl() > 0;
}

public static boolean hasValue(KeyValue<?, ?> keyValue) {
return keyValue.getValue() != null;
}

public static boolean hasType(KeyValue<?, ?> keyValue) {
return StringUtils.hasLength(keyValue.getType());
}

public static DataType type(KeyValue<?, ?> keyValue) {
if (hasType(keyValue)) {
return DataType.of(keyValue.getType());
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ public static KeyComparisonItemReader<String, String> compare() {
public static <K, V> KeyComparisonItemReader<K, V> compare(RedisCodec<K, V> codec) {
return new KeyComparisonItemReader<>(codec, KeyValueRead.struct(codec), KeyValueRead.struct(codec));
}
public static KeyComparisonItemReader<String,String> compareQuick() {

public static KeyComparisonItemReader<String, String> compareQuick() {
return compareQuick(StringCodec.UTF8);
}

Expand Down Expand Up @@ -136,7 +136,7 @@ public static <K, V> RedisItemReader<K, V, KeyValue<K, Object>> struct(RedisCode

@Override
protected synchronized void doOpen() throws Exception {
Assert.notNull(client, "Redis client not set");
Assert.notNull(client, getName() + ": Redis client not set");
if (jobFactory == null) {
jobFactory = new JobFactory();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamWriter;
import org.springframework.batch.item.support.AbstractItemStreamItemWriter;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

import com.redis.spring.batch.operation.KeyValueRestore;
import com.redis.spring.batch.operation.KeyValueWrite;
Expand All @@ -19,7 +21,7 @@
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;

public class RedisItemWriter<K, V, T> implements ItemStreamWriter<T> {
public class RedisItemWriter<K, V, T> extends AbstractItemStreamItemWriter<T> {

public static final int DEFAULT_POOL_SIZE = OperationExecutor.DEFAULT_POOL_SIZE;
public static final Duration DEFAULT_WAIT_TIMEOUT = Duration.ofSeconds(1);
Expand All @@ -36,6 +38,7 @@ public class RedisItemWriter<K, V, T> implements ItemStreamWriter<T> {
private OperationExecutor<K, V, T, Object> executor;

public RedisItemWriter(RedisCodec<K, V> codec, Operation<K, V, T, Object> operation) {
setName(ClassUtils.getShortName(getClass()));
this.codec = codec;
this.operation = operation;
}
Expand All @@ -46,6 +49,7 @@ public Operation<K, V, T, Object> getOperation() {

@Override
public synchronized void open(ExecutionContext executionContext) {
Assert.notNull(client, "Redis client not set");
if (executor == null) {
executor = new OperationExecutor<>(codec, operation());
executor.setClient(client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import java.util.function.Function;

import com.redis.spring.batch.KeyValue;
import com.redis.spring.batch.KeyValue.Type;
import com.redis.spring.batch.KeyValue.DataType;
import com.redis.spring.batch.gen.Item.Type;

public class ItemToKeyValueFunction implements Function<Item, KeyValue<String, Object>> {

Expand All @@ -12,13 +13,32 @@ public KeyValue<String, Object> apply(Item item) {
KeyValue<String, Object> kv = new KeyValue<>();
kv.setKey(item.getKey());
kv.setTtl(item.getTtl());
kv.setType(dataType(item));
kv.setType(toRedisTypeString(item.getType()));
kv.setValue(item.getValue());
return kv;
}

private Type dataType(Item item) {
return Type.valueOf(item.getType().name());
private String toRedisTypeString(Type type) {
switch (type) {
case HASH:
return DataType.HASH.getString();
case JSON:
return DataType.JSON.getString();
case LIST:
return DataType.LIST.getString();
case SET:
return DataType.SET.getString();
case STREAM:
return DataType.STREAM.getString();
case STRING:
return DataType.STRING.getString();
case TIMESERIES:
return DataType.TIMESERIES.getString();
case ZSET:
return DataType.ZSET.getString();
default:
return type.name().toLowerCase();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ public ExpireAt(Function<T, K> keyFunction) {
super(keyFunction);
}

public void setEpoch(long epoch) {
this.epochFunction = t -> epoch;
public ExpireAt<K, V, T> epoch(long epoch) {
return epoch(t -> epoch);
}

public void setEpochFunction(ToLongFunction<T> epochFunction) {
this.epochFunction = epochFunction;
public ExpireAt<K, V, T> epoch(ToLongFunction<T> function) {
this.epochFunction = function;
return this;
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand All @@ -33,4 +34,10 @@ protected void execute(BaseRedisAsyncCommands<K, V> commands, T item, K key, Lis
}
}

public static <K, V, T> ExpireAt<K, V, T> of(Function<T, K> key, ToLongFunction<T> epoch) {
ExpireAt<K, V, T> operation = new ExpireAt<>(key);
operation.epoch(epoch);
return operation;
}

}
Loading

0 comments on commit caff3a7

Please sign in to comment.