diff --git a/CHANGELOG.md b/CHANGELOG.md
index a19cf23..e181bb4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [1.2.1] - 2021-07-17
+### Changed
+- Modified Confluent archive to follow new standards
+- Stopped using reactive Lettuce
+- Upgraded various dependencies
+
## [1.2.0] - 2021-02-13
### Added
- Handle Redis cluster topology changes on the fly
diff --git a/lombok.config b/lombok.config
index df71bb6..e150afd 100644
--- a/lombok.config
+++ b/lombok.config
@@ -1,2 +1,4 @@
config.stopBubbling = true
lombok.addLombokGeneratedAnnotation = true
+lombok.builder.className = Builder
+lombok.log.fieldname = LOG
diff --git a/pom.xml b/pom.xml
index 5b8affc..0bb92ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,11 +5,11 @@
io.github.jaredpetersen
kafka-connect-redis
- 1.2.0
+ 1.2.1
jar
- kafka-connect-redis
- Kafka Connect source and sink connector for Redis
+ Kafka Redis Connector (Sink and Source)
+ Kafka sink and source connector for Redis
https://github.com/jaredpetersen/kafka-connect-redis
@@ -62,69 +62,57 @@
org.apache.kafka
connect-api
- 2.7.0
+ 2.8.0
provided
io.lettuce
lettuce-core
- 6.0.2.RELEASE
-
-
-
- io.projectreactor
- reactor-core
- 3.4.2
+ 6.1.4.RELEASE
org.projectlombok
lombok
- 1.18.18
+ 1.18.20
provided
org.slf4j
- slf4j-nop
- 1.7.30
+ slf4j-simple
+ 1.7.31
test
org.junit.jupiter
junit-jupiter
- 5.7.1
+ 5.7.2
test
org.mockito
mockito-core
- 3.7.7
+ 3.11.2
test
org.testcontainers
testcontainers
- 1.15.2
+ 1.15.3
test
org.testcontainers
junit-jupiter
- 1.15.2
+ 1.15.3
test
-
- io.projectreactor
- reactor-test
- 3.4.2
- test
-
@@ -218,8 +206,9 @@
kafka-connect
- Kafka Connect Redis
- ${project.url}/blob/main/README.md
+ redis-connector
+ Redis Connector (Sink and Source)
+ ${project.url}
docs/logos/jaredpetersen-logo.png
jaredpetersen
user
@@ -228,6 +217,17 @@
Open Source Community
Support provided through community involvement.
${project.issueManagement.url}
+
+
+ ${project.basedir}
+ doc
+
+ README.md
+ LICENSE.md
+ CHANGELOG.md
+
+
+
true
source
@@ -252,7 +252,7 @@
jar-with-dependencies
- ${project.name}-${project.version}
+ ${project.artifactId}-${project.version}
false
diff --git a/src/main/java/io/github/jaredpetersen/kafkaconnectredis/sink/RedisSinkTask.java b/src/main/java/io/github/jaredpetersen/kafkaconnectredis/sink/RedisSinkTask.java
index 52c17e1..9fa48c1 100644
--- a/src/main/java/io/github/jaredpetersen/kafkaconnectredis/sink/RedisSinkTask.java
+++ b/src/main/java/io/github/jaredpetersen/kafkaconnectredis/sink/RedisSinkTask.java
@@ -3,29 +3,31 @@
import io.github.jaredpetersen.kafkaconnectredis.sink.config.RedisSinkConfig;
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.RecordConverter;
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.Writer;
+import io.github.jaredpetersen.kafkaconnectredis.sink.writer.record.RedisCommand;
import io.github.jaredpetersen.kafkaconnectredis.util.VersionUtil;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
-import io.lettuce.core.api.reactive.RedisReactiveCommands;
+import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
-import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
+import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
import java.util.Collection;
import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Flux;
/**
* Kafka Connect Task for Kafka Connect Redis Sink.
*/
+@Slf4j
public class RedisSinkTask extends SinkTask {
+ private static final RecordConverter RECORD_CONVERTER = new RecordConverter();
+
private RedisClient redisStandaloneClient;
private StatefulRedisConnection redisStandaloneConnection;
@@ -34,10 +36,6 @@ public class RedisSinkTask extends SinkTask {
private Writer writer;
- private static final RecordConverter RECORD_CONVERTER = new RecordConverter();
-
- private static final Logger LOG = LoggerFactory.getLogger(RedisSinkTask.class);
-
@Override
public String version() {
return VersionUtil.getVersion();
@@ -67,14 +65,14 @@ public void start(final Map props) {
this.redisClusterConnection = this.redisClusterClient.connect();
- final RedisClusterReactiveCommands redisClusterCommands = this.redisClusterConnection.reactive();
+ final RedisClusterCommands redisClusterCommands = this.redisClusterConnection.sync();
this.writer = new Writer(redisClusterCommands);
}
else {
this.redisStandaloneClient = RedisClient.create(config.getRedisUri());
this.redisStandaloneConnection = this.redisStandaloneClient.connect();
- final RedisReactiveCommands redisStandaloneCommands = this.redisStandaloneConnection.reactive();
+ final RedisCommands redisStandaloneCommands = this.redisStandaloneConnection.sync();
this.writer = new Writer(redisStandaloneCommands);
}
}
@@ -88,14 +86,27 @@ public void put(final Collection records) {
LOG.info("writing {} record(s) to redis", records.size());
LOG.debug("records: {}", records);
- Flux
- .fromIterable(records)
- .flatMapSequential(RECORD_CONVERTER::convert)
- .onErrorMap(error -> new ConnectException("failed to convert record", error))
- .flatMapSequential(redisCommand -> this.writer.write(redisCommand))
- .onErrorMap(error -> new ConnectException("failed to write record", error))
- .then()
- .block();
+ for (SinkRecord record : records) {
+ put(record);
+ }
+ }
+
+ private void put(SinkRecord record) {
+ final RedisCommand redisCommand;
+
+ try {
+ redisCommand = RECORD_CONVERTER.convert(record);
+ }
+ catch (Exception exception) {
+ throw new ConnectException("failed to convert record", exception);
+ }
+
+ try {
+ writer.write(redisCommand);
+ }
+ catch (Exception exception) {
+ throw new ConnectException("failed to write record", exception);
+ }
}
@Override
diff --git a/src/main/java/io/github/jaredpetersen/kafkaconnectredis/sink/writer/RecordConverter.java b/src/main/java/io/github/jaredpetersen/kafkaconnectredis/sink/writer/RecordConverter.java
index ca17e95..8f848cc 100644
--- a/src/main/java/io/github/jaredpetersen/kafkaconnectredis/sink/writer/RecordConverter.java
+++ b/src/main/java/io/github/jaredpetersen/kafkaconnectredis/sink/writer/RecordConverter.java
@@ -8,175 +8,162 @@
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.record.RedisPexpireCommand;
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.record.RedisSaddCommand;
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.record.RedisSetCommand;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
+@Slf4j
public class RecordConverter {
- private static final Logger LOG = LoggerFactory.getLogger(RecordConverter.class);
-
/**
* Convert sink record to Redis command.
*
* @param sinkRecord Record to convert.
* @return Redis command.
*/
- public Mono convert(SinkRecord sinkRecord) {
+ public RedisCommand convert(SinkRecord sinkRecord) {
LOG.debug("converting record {}", sinkRecord);
final Struct recordValue = (Struct) sinkRecord.value();
final String recordValueSchemaName = recordValue.schema().name();
- final Mono redisCommandMono;
+ final RedisCommand redisCommand;
switch (recordValueSchemaName) {
case "io.github.jaredpetersen.kafkaconnectredis.RedisSetCommand":
- redisCommandMono = convertSet(recordValue);
+ redisCommand = convertSet(recordValue);
break;
case "io.github.jaredpetersen.kafkaconnectredis.RedisExpireCommand":
- redisCommandMono = convertExpire(recordValue);
+ redisCommand = convertExpire(recordValue);
break;
case "io.github.jaredpetersen.kafkaconnectredis.RedisExpireatCommand":
- redisCommandMono = convertExpireat(recordValue);
+ redisCommand = convertExpireat(recordValue);
break;
case "io.github.jaredpetersen.kafkaconnectredis.RedisPexpireCommand":
- redisCommandMono = convertPexpire(recordValue);
+ redisCommand = convertPexpire(recordValue);
break;
case "io.github.jaredpetersen.kafkaconnectredis.RedisSaddCommand":
- redisCommandMono = convertSadd(recordValue);
+ redisCommand = convertSadd(recordValue);
break;
case "io.github.jaredpetersen.kafkaconnectredis.RedisGeoaddCommand":
- redisCommandMono = convertGeoadd(recordValue);
+ redisCommand = convertGeoadd(recordValue);
break;
case "io.github.jaredpetersen.kafkaconnectredis.RedisArbitraryCommand":
- redisCommandMono = convertArbitrary(recordValue);
+ redisCommand = convertArbitrary(recordValue);
break;
default:
- redisCommandMono = Mono.error(new ConnectException("unsupported command schema " + recordValueSchemaName));
+ throw new ConnectException("unsupported command schema " + recordValueSchemaName);
}
- return redisCommandMono;
+ return redisCommand;
}
- private Mono convertSet(Struct value) {
- return Mono.fromCallable(() -> {
- final Struct expirationStruct = value.getStruct("expiration");
- final RedisSetCommand.Payload.Expiration expiration = (expirationStruct == null)
- ? null
- : RedisSetCommand.Payload.Expiration.builder()
- .type(RedisSetCommand.Payload.Expiration.Type
- .valueOf(expirationStruct.getString("type")))
- .time(expirationStruct.getInt64("time"))
- .build();
-
- final String conditionString = value.getString("condition");
- final RedisSetCommand.Payload.Condition condition = (conditionString == null)
- ? null
- : RedisSetCommand.Payload.Condition.valueOf(conditionString.toUpperCase());
-
- final RedisSetCommand.Payload payload = RedisSetCommand.Payload.builder()
- .key(value.getString("key"))
- .value(value.getString("value"))
- .expiration(expiration)
- .condition(condition)
- .build();
+ private RedisCommand convertSet(Struct value) {
+ final Struct expirationStruct = value.getStruct("expiration");
+ final RedisSetCommand.Payload.Expiration expiration = (expirationStruct == null)
+ ? null
+ : RedisSetCommand.Payload.Expiration.builder()
+ .type(RedisSetCommand.Payload.Expiration.Type
+ .valueOf(expirationStruct.getString("type")))
+ .time(expirationStruct.getInt64("time"))
+ .build();
+
+ final String conditionString = value.getString("condition");
+ final RedisSetCommand.Payload.Condition condition = (conditionString == null)
+ ? null
+ : RedisSetCommand.Payload.Condition.valueOf(conditionString.toUpperCase());
+
+ final RedisSetCommand.Payload payload = RedisSetCommand.Payload.builder()
+ .key(value.getString("key"))
+ .value(value.getString("value"))
+ .expiration(expiration)
+ .condition(condition)
+ .build();
+
+ return RedisSetCommand.builder()
+ .payload(payload)
+ .build();
+ }
- return RedisSetCommand.builder()
- .payload(payload)
- .build();
- });
+ private RedisCommand convertExpire(Struct value) {
+ final RedisExpireCommand.Payload payload = RedisExpireCommand.Payload.builder()
+ .key(value.getString("key"))
+ .seconds(value.getInt64("seconds"))
+ .build();
+
+ return RedisExpireCommand.builder()
+ .payload(payload)
+ .build();
}
- private Mono convertExpire(Struct value) {
- return Mono.fromCallable(() -> {
- final RedisExpireCommand.Payload payload = RedisExpireCommand.Payload.builder()
- .key(value.getString("key"))
- .seconds(value.getInt64("seconds"))
- .build();
+ private RedisCommand convertExpireat(Struct value) {
+ final RedisExpireatCommand.Payload payload = RedisExpireatCommand.Payload.builder()
+ .key(value.getString("key"))
+ .timestamp(value.getInt64("timestamp"))
+ .build();
- return RedisExpireCommand.builder()
- .payload(payload)
- .build();
- });
+ return RedisExpireatCommand.builder()
+ .payload(payload)
+ .build();
}
- private Mono convertExpireat(Struct value) {
- return Mono.fromCallable(() -> {
- final RedisExpireatCommand.Payload payload = RedisExpireatCommand.Payload.builder()
- .key(value.getString("key"))
- .timestamp(value.getInt64("timestamp"))
- .build();
+ private RedisCommand convertPexpire(Struct value) {
+ final RedisPexpireCommand.Payload payload = RedisPexpireCommand.Payload.builder()
+ .key(value.getString("key"))
+ .milliseconds(value.getInt64("milliseconds"))
+ .build();
- return RedisExpireatCommand.builder()
- .payload(payload)
- .build();
- });
+ return RedisPexpireCommand.builder()
+ .payload(payload)
+ .build();
}
- private Mono convertPexpire(Struct value) {
- return Mono.fromCallable(() -> {
- final RedisPexpireCommand.Payload payload = RedisPexpireCommand.Payload.builder()
- .key(value.getString("key"))
- .milliseconds(value.getInt64("milliseconds"))
- .build();
+ private RedisCommand convertSadd(Struct value) {
+ final RedisSaddCommand.Payload payload = RedisSaddCommand.Payload.builder()
+ .key(value.getString("key"))
+ .values(value.getArray("values"))
+ .build();
- return RedisPexpireCommand.builder()
- .payload(payload)
- .build();
- });
+ return RedisSaddCommand.builder()
+ .payload(payload)
+ .build();
}
- private Mono convertSadd(Struct value) {
- return Mono.fromCallable(() -> {
- final RedisSaddCommand.Payload payload = RedisSaddCommand.Payload.builder()
- .key(value.getString("key"))
- .values(value.getArray("values"))
- .build();
+ private RedisCommand convertGeoadd(Struct value) {
+ final List geoLocations = new ArrayList<>();
+
+ for (Object rawGeoLocation : value.getArray("values")) {
+ final Struct rawGeolocationStruct = (Struct) rawGeoLocation;
- return RedisSaddCommand.builder()
- .payload(payload)
+ final RedisGeoaddCommand.Payload.GeoLocation geoLocation = RedisGeoaddCommand.Payload.GeoLocation.builder()
+ .latitude(rawGeolocationStruct.getFloat64("latitude"))
+ .longitude(rawGeolocationStruct.getFloat64("longitude"))
+ .member(rawGeolocationStruct.getString("member"))
.build();
- });
- }
- private Mono convertGeoadd(Struct value) {
- return Flux
- .fromIterable(value.getArray("values"))
- .flatMap(rawGeolocation -> Mono.fromCallable(() -> {
- final Struct rawGeolocationStruct = (Struct) rawGeolocation;
- return RedisGeoaddCommand.Payload.GeoLocation.builder()
- .latitude(rawGeolocationStruct.getFloat64("latitude"))
- .longitude(rawGeolocationStruct.getFloat64("longitude"))
- .member(rawGeolocationStruct.getString("member"))
- .build();
- }))
- .collectList()
- .flatMap(geolocations -> Mono.fromCallable(() -> {
- final RedisGeoaddCommand.Payload payload = RedisGeoaddCommand.Payload.builder()
- .key(value.getString("key"))
- .values(geolocations)
- .build();
-
- return RedisGeoaddCommand.builder()
- .payload(payload)
- .build();
- }));
+ geoLocations.add(geoLocation);
+ }
+
+ final RedisGeoaddCommand.Payload payload = RedisGeoaddCommand.Payload.builder()
+ .key(value.getString("key"))
+ .values(geoLocations)
+ .build();
+
+ return RedisGeoaddCommand.builder()
+ .payload(payload)
+ .build();
}
- private Mono convertArbitrary(Struct value) {
- return Mono.fromCallable(() -> {
- final RedisArbitraryCommand.Payload payload = RedisArbitraryCommand.Payload.builder()
- .command(value.getString("command"))
- .arguments(value.getArray("arguments"))
- .build();
+ private RedisCommand convertArbitrary(Struct value) {
+ final RedisArbitraryCommand.Payload payload = RedisArbitraryCommand.Payload.builder()
+ .command(value.getString("command"))
+ .arguments(value.getArray("arguments"))
+ .build();
- return RedisArbitraryCommand.builder()
- .payload(payload)
- .build();
- });
+ return RedisArbitraryCommand.builder()
+ .payload(payload)
+ .build();
}
}
diff --git a/src/main/java/io/github/jaredpetersen/kafkaconnectredis/sink/writer/Writer.java b/src/main/java/io/github/jaredpetersen/kafkaconnectredis/sink/writer/Writer.java
index 4aa2f9e..f46012b 100644
--- a/src/main/java/io/github/jaredpetersen/kafkaconnectredis/sink/writer/Writer.java
+++ b/src/main/java/io/github/jaredpetersen/kafkaconnectredis/sink/writer/Writer.java
@@ -9,34 +9,31 @@
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.record.RedisSaddCommand;
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.record.RedisSetCommand;
import io.lettuce.core.SetArgs;
-import io.lettuce.core.api.reactive.RedisReactiveCommands;
-import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.output.CommandOutput;
import io.lettuce.core.output.VoidOutput;
import io.lettuce.core.protocol.CommandArgs;
import io.lettuce.core.protocol.ProtocolKeyword;
import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.errors.ConnectException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
+@Slf4j
public class Writer {
- private final RedisReactiveCommands redisStandaloneCommands;
- private final RedisClusterReactiveCommands redisClusterCommands;
+ private final RedisCommands redisStandaloneCommands;
+ private final RedisClusterCommands redisClusterCommands;
private final boolean clusterEnabled;
- private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
-
/**
* Set up writer to interact with standalone Redis.
*
- * @param redisStandaloneCommands Standalone Redis to write to.
+ * @param redisStandaloneCommands Standalone Redis to write to
*/
- public Writer(RedisReactiveCommands redisStandaloneCommands) {
+ public Writer(RedisCommands redisStandaloneCommands) {
this.redisStandaloneCommands = redisStandaloneCommands;
this.redisClusterCommands = null;
this.clusterEnabled = false;
@@ -45,9 +42,9 @@ public Writer(RedisReactiveCommands redisStandaloneCommands) {
/**
* Set up writer to interact with Redis cluster.
*
- * @param redisClusterCommands Redis cluster to write to.
+ * @param redisClusterCommands Redis cluster to write to
*/
- public Writer(RedisClusterReactiveCommands redisClusterCommands) {
+ public Writer(RedisClusterCommands redisClusterCommands) {
this.redisStandaloneCommands = null;
this.redisClusterCommands = redisClusterCommands;
this.clusterEnabled = true;
@@ -56,145 +53,145 @@ public Writer(RedisClusterReactiveCommands redisClusterCommands)
/**
* Apply write-type command to Redis.
*
- * @param redisCommand Command to apply.
- * @return Mono used to indicate the write has completed.
+ * @param redisCommand Command to apply
*/
- public Mono write(RedisCommand redisCommand) {
+ public void write(RedisCommand redisCommand) {
LOG.debug("writing {}", redisCommand);
- final Mono response;
-
switch (redisCommand.getCommand()) {
case SET:
- response = set((RedisSetCommand) redisCommand);
+ set((RedisSetCommand) redisCommand);
break;
case EXPIRE:
- response = expire((RedisExpireCommand) redisCommand);
+ expire((RedisExpireCommand) redisCommand);
break;
case EXPIREAT:
- response = expireat((RedisExpireatCommand) redisCommand);
+ expireat((RedisExpireatCommand) redisCommand);
break;
case PEXPIRE:
- response = pexpire((RedisPexpireCommand) redisCommand);
+ pexpire((RedisPexpireCommand) redisCommand);
break;
case SADD:
- response = sadd((RedisSaddCommand) redisCommand);
+ sadd((RedisSaddCommand) redisCommand);
break;
case GEOADD:
- response = geoadd((RedisGeoaddCommand) redisCommand);
+ geoadd((RedisGeoaddCommand) redisCommand);
break;
case ARBITRARY:
- response = arbitrary((RedisArbitraryCommand) redisCommand);
+ arbitrary((RedisArbitraryCommand) redisCommand);
break;
default:
- response = Mono.error(new ConnectException("redis command " + redisCommand + " is not supported"));
+ throw new ConnectException("redis command " + redisCommand + " is not supported");
}
-
- return response;
}
- private Mono set(RedisSetCommand setCommand) {
+ private void set(RedisSetCommand setCommand) {
final RedisSetCommand.Payload payload = setCommand.getPayload();
- final Mono setArgsMono = Mono
- .fromCallable(() -> {
- final SetArgs setArgs = new SetArgs();
-
- if (payload.getExpiration() != null) {
- final RedisSetCommand.Payload.Expiration expiration = payload.getExpiration();
-
- if (expiration.getType() == RedisSetCommand.Payload.Expiration.Type.EX) {
- setArgs.ex(expiration.getTime());
- }
- else if (expiration.getType() == RedisSetCommand.Payload.Expiration.Type.PX) {
- setArgs.px(expiration.getTime());
- }
- else if (expiration.getType() == RedisSetCommand.Payload.Expiration.Type.KEEPTTL) {
- setArgs.keepttl();
- }
- }
-
- if (payload.getCondition() != null) {
- if (payload.getCondition() == RedisSetCommand.Payload.Condition.NX) {
- setArgs.nx();
- }
- else if (payload.getCondition() == RedisSetCommand.Payload.Condition.XX) {
- setArgs.xx();
- }
- }
-
- return setArgs;
- });
- final Mono setResult = setArgsMono
- .flatMap(setArgs ->
- (this.clusterEnabled)
- ? this.redisClusterCommands.set(payload.getKey(), payload.getValue(), setArgs)
- : this.redisStandaloneCommands.set(payload.getKey(), payload.getValue(), setArgs));
-
- return setResult.then();
+ final SetArgs setArgs = new SetArgs();
+
+ if (payload.getExpiration() != null) {
+ final RedisSetCommand.Payload.Expiration expiration = payload.getExpiration();
+
+ if (expiration.getType() == RedisSetCommand.Payload.Expiration.Type.EX) {
+ setArgs.ex(expiration.getTime());
+ }
+ else if (expiration.getType() == RedisSetCommand.Payload.Expiration.Type.PX) {
+ setArgs.px(expiration.getTime());
+ }
+ else if (expiration.getType() == RedisSetCommand.Payload.Expiration.Type.KEEPTTL) {
+ setArgs.keepttl();
+ }
+ }
+
+ if (payload.getCondition() != null) {
+ if (payload.getCondition() == RedisSetCommand.Payload.Condition.NX) {
+ setArgs.nx();
+ }
+ else if (payload.getCondition() == RedisSetCommand.Payload.Condition.XX) {
+ setArgs.xx();
+ }
+ }
+
+ if (clusterEnabled) {
+ redisClusterCommands.set(payload.getKey(), payload.getValue(), setArgs);
+ }
+ else {
+ redisStandaloneCommands.set(payload.getKey(), payload.getValue(), setArgs);
+ }
}
- private Mono expire(RedisExpireCommand expireCommand) {
+ private void expire(RedisExpireCommand expireCommand) {
final RedisExpireCommand.Payload payload = expireCommand.getPayload();
- final Mono expirationResult = (this.clusterEnabled)
- ? this.redisClusterCommands.expire(payload.getKey(), payload.getSeconds())
- : this.redisStandaloneCommands.expire(payload.getKey(), payload.getSeconds());
- return expirationResult.then();
+ if (clusterEnabled) {
+ redisClusterCommands.expire(payload.getKey(), payload.getSeconds());
+ }
+ else {
+ redisStandaloneCommands.expire(payload.getKey(), payload.getSeconds());
+ }
}
- private Mono expireat(RedisExpireatCommand expireAtCommand) {
+ private void expireat(RedisExpireatCommand expireAtCommand) {
final RedisExpireatCommand.Payload payload = expireAtCommand.getPayload();
- final Mono expirationResult = (this.clusterEnabled)
- ? this.redisClusterCommands.expireat(payload.getKey(), payload.getTimestamp())
- : this.redisStandaloneCommands.expireat(payload.getKey(), payload.getTimestamp());
- return expirationResult.then();
+ if (clusterEnabled) {
+ redisClusterCommands.expireat(payload.getKey(), payload.getTimestamp());
+ }
+ else {
+ redisStandaloneCommands.expireat(payload.getKey(), payload.getTimestamp());
+ }
}
- private Mono pexpire(RedisPexpireCommand pexpireCommand) {
+ private void pexpire(RedisPexpireCommand pexpireCommand) {
final RedisPexpireCommand.Payload payload = pexpireCommand.getPayload();
- final Mono expirationResult = (this.clusterEnabled)
- ? this.redisClusterCommands.pexpire(payload.getKey(), payload.getMilliseconds())
- : this.redisStandaloneCommands.pexpire(payload.getKey(), payload.getMilliseconds());
- return expirationResult.then();
+ if (clusterEnabled) {
+ redisClusterCommands.pexpire(payload.getKey(), payload.getMilliseconds());
+ }
+ else {
+ redisStandaloneCommands.pexpire(payload.getKey(), payload.getMilliseconds());
+ }
}
- private Mono sadd(RedisSaddCommand saddCommand) {
+ private void sadd(RedisSaddCommand saddCommand) {
final RedisSaddCommand.Payload payload = saddCommand.getPayload();
final String[] members = payload.getValues().toArray(new String[0]);
- final Mono saddResult = (this.clusterEnabled)
- ? this.redisClusterCommands.sadd(payload.getKey(), members)
- : this.redisStandaloneCommands.sadd(payload.getKey(), members);
- return saddResult.then();
+ if (clusterEnabled) {
+ redisClusterCommands.sadd(payload.getKey(), members);
+ }
+ else {
+ redisStandaloneCommands.sadd(payload.getKey(), members);
+ }
}
- private Mono geoadd(RedisGeoaddCommand geoaddCommand) {
+ private void geoadd(RedisGeoaddCommand geoaddCommand) {
final RedisGeoaddCommand.Payload payload = geoaddCommand.getPayload();
- final Flux