Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.prebid.cache.helpers;

import org.apache.commons.lang3.BooleanUtils;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.type.AnnotatedTypeMetadata;

public class ValidateRedisPropertyConditional implements Condition {

@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
final boolean isSingleNode = context.getEnvironment().containsProperty("spring.redis.single-node.timeout");
final boolean isCluster = context.getEnvironment().containsProperty("spring.redis.cluster.timeout");

return BooleanUtils.xor(new Boolean[]{isSingleNode, isCluster});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,37 @@
@ConditionalOnProperty(prefix = "spring.aerospike", name = {"host"})
@ConfigurationProperties(prefix = "spring.aerospike")
public class AerospikePropertyConfiguration {

@NotNull
private String host;

@NotNull
private Integer port;

@NotNull
private String password;

@NotNull
private Integer cores;

@NotNull
private Long firstBackoff;

@NotNull
private Long maxBackoff;

@NotNull
private int maxRetry;

@NotNull
private String namespace;

@NotNull
private boolean preventUUIDDuplication;

private static final int DEFAULT_PORT = 3000;

public static Host[] extractHosts(@NotNull String hostList) {
public static Host[] extractHosts(String hostList) {
return Arrays.stream(hostList.split(","))
.map(host -> {
String[] params = host.split(":");
Expand All @@ -57,7 +75,7 @@ public static Host[] extractHosts(@NotNull String hostList) {
.toArray(Host[]::new);
}

public static boolean isAerospikeCluster(@NotNull String hostList) {
public static boolean isAerospikeCluster(String hostList) {
return hostList.split(",").length > 1;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package org.prebid.cache.repository.redis;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.reactive.RedisStringReactiveCommands;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
Expand All @@ -18,40 +16,34 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.validation.constraints.NotNull;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;

@Data
@NoArgsConstructor
@AllArgsConstructor
@Configuration
@ConditionalOnProperty(prefix = "spring.redis", name = {"timeout"})
@ConfigurationProperties(prefix = "spring.redis")
public class RedisPropertyConfiguration {
@ConditionalOnProperty(prefix = "spring.redis.cluster", name = {"timeout"})
@ConfigurationProperties(prefix = "spring.redis.cluster")
public class RedisClusterPropertyConfiguration {

private String host;
private long timeout;
private String password;
private int port;
private Cluster cluster;
@Singular
@NotNull
private List<String> nodes;

@Data
public static class Cluster {
boolean enableTopologyRefresh;

@Singular
List<String> nodes;
Integer topologyPeriodicRefreshPeriod;

boolean enableTopologyRefresh;
String password;

Integer topologyPeriodicRefreshPeriod;
}
@NotNull
long timeout;

private RedisURI createRedisURI(String host, int port) {
requireNonNull(host);
final RedisURI.Builder builder = RedisURI.Builder.redis(host, port)
.withTimeout(Duration.ofMillis(timeout));
if (password != null) {
Expand All @@ -62,18 +54,17 @@ private RedisURI createRedisURI(String host, int port) {
}

private List<RedisURI> createRedisClusterURIs() {

return cluster.getNodes().stream()
return getNodes().stream()
.map(node -> node.split(":"))
.map(host -> createRedisURI(host[0], Integer.parseInt(host[1])))
.collect(Collectors.toList());
}

private ClusterClientOptions createRedisClusterOptions() {
final ClusterTopologyRefreshOptions topologyRefreshOptions = cluster.isEnableTopologyRefresh()
final ClusterTopologyRefreshOptions topologyRefreshOptions = isEnableTopologyRefresh()
? ClusterTopologyRefreshOptions.builder()
.enablePeriodicRefresh()
.refreshPeriod(Duration.of(cluster.getTopologyPeriodicRefreshPeriod(), ChronoUnit.SECONDS))
.refreshPeriod(Duration.of(getTopologyPeriodicRefreshPeriod(), ChronoUnit.SECONDS))
.enableAllAdaptiveRefreshTriggers()
.build()
: null;
Expand All @@ -85,19 +76,6 @@ private ClusterClientOptions createRedisClusterOptions() {
}

@Bean(destroyMethod = "shutdown")
@ConditionalOnProperty(prefix = "spring.redis", name = "host")
RedisClient client() {
return RedisClient.create(createRedisURI(getHost(), getPort()));
}

@Bean(destroyMethod = "close")
@ConditionalOnProperty(prefix = "spring.redis", name = "host")
StatefulRedisConnection<String, String> connection() {
return client().connect();
}

@Bean(destroyMethod = "shutdown")
@ConditionalOnProperty(prefix = "spring.redis", name = "host", matchIfMissing = true, havingValue = "null")
RedisClusterClient clusterClient() {
final RedisClusterClient redisClusterClient = RedisClusterClient.create(createRedisClusterURIs());
redisClusterClient.setOptions(createRedisClusterOptions());
Expand All @@ -106,17 +84,12 @@ RedisClusterClient clusterClient() {
}

@Bean(destroyMethod = "close")
@ConditionalOnProperty(prefix = "spring.redis", name = "host", matchIfMissing = true, havingValue = "null")
StatefulRedisClusterConnection<String, String> clusterConnection() {
return clusterClient().connect();
}

@Bean
RedisStringReactiveCommands<String, String> reactiveCommands() {
if (getHost() == null) {
return clusterConnection().reactive();
} else {
return connection().reactive();
}
return clusterConnection().reactive();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@
import org.prebid.cache.exceptions.PayloadWrapperPropertyException;
import org.prebid.cache.exceptions.RepositoryException;
import org.prebid.cache.helpers.Json;
import org.prebid.cache.helpers.ValidateRedisPropertyConditional;
import org.prebid.cache.model.PayloadWrapper;
import org.prebid.cache.repository.ReactiveRepository;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Conditional;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;

@Repository
@Slf4j
@ConditionalOnProperty(prefix = "spring.redis", name = {"timeout"})
@Conditional(ValidateRedisPropertyConditional.class)
@RequiredArgsConstructor
public class RedisRepositoryImpl implements ReactiveRepository<PayloadWrapper, String> {
private final RedisStringReactiveCommands<String, String> reactiveCommands;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.prebid.cache.repository.redis;

import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.reactive.RedisStringReactiveCommands;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.annotation.Validated;

import javax.validation.constraints.NotNull;
import java.time.Duration;

import static java.util.Objects.requireNonNull;

@Data
@NoArgsConstructor
@AllArgsConstructor
@Configuration
@Validated
@ConditionalOnProperty(prefix = "spring.redis.single-node", name = {"timeout"})
@ConfigurationProperties(prefix = "spring.redis.single-node")
public class RedisSingleNodePropertyConfiguration {

@NotNull
private int port;

@NotNull
private String host;

@NotNull
private long timeout;

private String password;

private RedisURI createRedisURI(String host, int port) {
requireNonNull(host);
final RedisURI.Builder builder = RedisURI.Builder.redis(host, port)
.withTimeout(Duration.ofMillis(timeout));
if (password != null) {
builder.withPassword((CharSequence) password);
}

return builder.build();
}

@Bean(destroyMethod = "shutdown")
RedisClient client() {
return RedisClient.create(createRedisURI(getHost(), getPort()));
}

@Bean(destroyMethod = "close")
StatefulRedisConnection<String, String> connection() {
return client().connect();
}

@Bean
RedisStringReactiveCommands<String, String> reactiveCommands() {
return connection().reactive();
}
}