Skip to content

Commit

Permalink
feat(redis): redis cluster support for queue and locking (#3052)
Browse files Browse the repository at this point in the history
- Enables use of Redis Cluster for Orca's queue and distributed lock
service, when SQL is used for the execution repository. Running Orca
entirely on Redis Cluster remains unsupported.
  • Loading branch information
asher authored Jul 19, 2019
1 parent 20411ac commit 2ede1ac
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 200 deletions.
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
fiatVersion=1.1.0
enablePublishing=false
spinnakerGradleVersion=6.5.0
korkVersion=5.8.5
keikoVersion=2.10.1
korkVersion=5.8.8
keikoVersion=2.12.0
org.gradle.parallel=true
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import com.netflix.spinnaker.orca.q.redis.migration.TaskTypeDeserializer
import com.netflix.spinnaker.orca.q.redis.pending.RedisPendingExecutionService
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.migration.SerializationMigrator
import com.netflix.spinnaker.q.redis.RedisClusterDeadMessageHandler
import com.netflix.spinnaker.q.redis.RedisClusterQueue
import com.netflix.spinnaker.q.redis.RedisDeadMessageHandler
import com.netflix.spinnaker.q.redis.RedisQueue
import org.springframework.beans.factory.annotation.Autowired
Expand All @@ -36,6 +38,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import redis.clients.jedis.Jedis
import redis.clients.jedis.JedisCluster
import redis.clients.util.Pool
import java.time.Clock
import java.util.Optional
Expand Down Expand Up @@ -68,7 +71,9 @@ class RedisOrcaQueueConfiguration : RedisQueueConfiguration() {

@Bean fun orcaToKeikoSerializationMigrator(objectMapper: ObjectMapper) = OrcaToKeikoSerializationMigrator(objectMapper)

@Bean override fun queue(
@Bean
@ConditionalOnProperty(value = ["redis.cluster-enabled"], havingValue = "false", matchIfMissing = true)
override fun queue(
@Qualifier("queueRedisPool") redisPool: Pool<Jedis>,
redisQueueProperties: RedisQueueProperties,
clock: Clock,
Expand All @@ -80,6 +85,28 @@ class RedisOrcaQueueConfiguration : RedisQueueConfiguration() {
return super.queue(redisPool, redisQueueProperties, clock, deadMessageHandler, publisher, mapper, serializationMigrator)
}

@Bean
@ConditionalOnProperty(value = ["redis.cluster-enabled"])
override fun clusterQueue(
@Qualifier("queueRedisCluster") cluster: JedisCluster,
redisQueueProperties: RedisQueueProperties,
clock: Clock,
@Qualifier("queueEventPublisher") deadMessageHandler: RedisClusterDeadMessageHandler,
publisher: EventPublisher,
redisQueueObjectMapper: ObjectMapper,
serializationMigrator: Optional<SerializationMigrator>
): RedisClusterQueue {
return super.clusterQueue(
cluster,
redisQueueProperties,
clock,
deadMessageHandler,
publisher,
redisQueueObjectMapper,
serializationMigrator
)
}

@Bean
@ConditionalOnProperty(value = ["queue.pending-execution-service.redis.enabled"], matchIfMissing = true)
fun pendingExecutionService(
Expand Down
4 changes: 3 additions & 1 deletion orca-redis/orca-redis.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ apply from: "$rootDir/gradle/kotlin.gradle"
apply from: "$rootDir/gradle/spock.gradle"

dependencies {
api("redis.clients:jedis")
api("redis.clients:jedis:2.10.2") {
force = true
}
api("com.netflix.spinnaker.kork:kork-jedis")

implementation(project(":orca-core"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.netflix.spinnaker.kork.jedis.JedisClientConfiguration;
import com.netflix.spinnaker.kork.jedis.RedisClientSelector;
import com.netflix.spinnaker.orca.notifications.NotificationClusterLock;
import com.netflix.spinnaker.orca.notifications.RedisClusterNotificationClusterLock;
import com.netflix.spinnaker.orca.notifications.RedisNotificationClusterLock;
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository;
import com.netflix.spinnaker.orca.pipeline.persistence.jedis.RedisExecutionRepository;
Expand All @@ -29,6 +30,7 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.*;
import redis.clients.jedis.JedisCluster;
import rx.Scheduler;

@Configuration
Expand Down Expand Up @@ -61,11 +63,21 @@ public ExecutionRepository redisExecutionRepository(
}

@Bean
@ConditionalOnProperty(
value = "redis.cluster-enabled",
havingValue = "false",
matchIfMissing = true)
public NotificationClusterLock redisNotificationClusterLock(
RedisClientSelector redisClientSelector) {
return new RedisNotificationClusterLock(redisClientSelector);
}

@Bean
@ConditionalOnProperty(value = "redis.cluster-enabled")
public NotificationClusterLock redisClusterNotificationClusterLock(JedisCluster cluster) {
return new RedisClusterNotificationClusterLock(cluster);
}

@Bean
@ConfigurationProperties("redis")
public GenericObjectPoolConfig redisPoolConfig() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package com.netflix.spinnaker.orca.locks;

import static java.util.Arrays.asList;
import static net.logstash.logback.argument.StructuredArguments.kv;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractRedisLockManager implements LockManager {

private static final int LOCK_VALUE_APPLICATION_IDX = 0;
private static final int LOCK_VALUE_TYPE_IDX = 1;
private static final int LOCK_VALUE_IDX = 2;

private final LockingConfigurationProperties lockingConfigurationProperties;
private final Logger log = LoggerFactory.getLogger(getClass());

AbstractRedisLockManager(LockingConfigurationProperties lockingConfigurationProperties) {
this.lockingConfigurationProperties = lockingConfigurationProperties;
}

public abstract void acquireLock(
String lockName, LockValue lockValue, String lockHolder, int ttlSeconds)
throws LockFailureException;

public abstract void extendLock(String lockName, LockValue lockValue, int ttlSeconds)
throws LockFailureException;

public abstract void releaseLock(String lockName, LockValue lockValue, String lockHolder);

static class LockOperation {
static LockOperation acquire(
String lockName, LockValue lockValue, String lockHolder, int ttlSeconds) {
return new LockOperation("acquireLock", lockName, lockValue, lockHolder, ttlSeconds);
}

static LockOperation extend(String lockName, LockValue lockValue, int ttlSeconds) {
return new LockOperation("extendLock", lockName, lockValue, null, ttlSeconds);
}

static LockOperation release(String lockName, LockValue lockValue, String lockHolder) {
return new LockOperation("releaseLock", lockName, lockValue, lockHolder, -1);
}

final String operationName;
final String lockName;
final LockValue lockValue;
final String lockHolder;
final int ttlSeconds;

LockOperation(
String operationName,
String lockName,
LockValue lockValue,
String lockHolder,
int ttlSeconds) {
this.operationName = Objects.requireNonNull(operationName);
this.lockName = Objects.requireNonNull(lockName);
this.lockValue = Objects.requireNonNull(lockValue);
this.lockHolder = lockHolder;
this.ttlSeconds = ttlSeconds;
}

List<String> key() {
return asList(getLockKey(lockName));
}

List<String> acquireArgs() {
return asList(
lockValue.getApplication(),
lockValue.getType(),
lockValue.getId(),
lockHolder,
Integer.toString(ttlSeconds));
}

List<String> extendArgs() {
return asList(
lockValue.getApplication(),
lockValue.getType(),
lockValue.getId(),
Integer.toString(ttlSeconds));
}

List<String> releaseArgs() {
return asList(lockValue.getApplication(), lockValue.getType(), lockValue.getId(), lockHolder);
}
}

void checkResult(LockOperation op, List<String> result) {
final LockValue currentLockValue = buildResultLockValue(result);
if (!(op.lockValue.equals(currentLockValue))) {
throw new LockFailureException(op.lockName, currentLockValue);
}
}

LockValue buildResultLockValue(List<String> result) {
if (result == null || result.size() < 3) {
throw new IllegalStateException("Unexpected result from redis: " + result);
}
if (result.stream().allMatch(Objects::isNull)) {
return null;
}
return new LockValue(
result.get(LOCK_VALUE_APPLICATION_IDX),
result.get(LOCK_VALUE_TYPE_IDX),
result.get(LOCK_VALUE_IDX));
}

void withLockingConfiguration(
LockOperation lockOperation, Consumer<LockOperation> lockManagementOperation)
throws LockFailureException {
if (!lockingConfigurationProperties.isEnabled()) {
return;
}
try {
lockManagementOperation.accept(lockOperation);
} catch (Throwable t) {
if (t instanceof LockFailureException) {
LockFailureException lfe = (LockFailureException) t;
Optional<LockValue> currentLockValue = Optional.ofNullable(lfe.getCurrentLockValue());
log.debug(
"LockFailureException during {} for lock {} currently held by {} {} {} requested by {} {} {} {}",
kv("operationName", lockOperation.operationName),
kv("lockName", lockOperation.lockName),
kv(
"currentLockValue.application",
currentLockValue.map(LockValue::getApplication).orElse(null)),
kv("currentLockValue.type", currentLockValue.map(LockValue::getType).orElse(null)),
kv("currentLockValue.id", currentLockValue.map(LockValue::getId).orElse(null)),
kv("requestLockValue.application", lockOperation.lockValue.getApplication()),
kv("requestLockValue.type", lockOperation.lockValue.getType()),
kv("requestLockValue.id", lockOperation.lockValue.getId()),
kv(
"requestLockHolder",
Optional.ofNullable(lockOperation.lockHolder).orElse("UNSPECIFIED")),
lfe);
if (lockingConfigurationProperties.isLearningMode()) {
return;
}
throw lfe;
} else {
log.debug(
"Exception during {} for lock {} requested by {} {} {} {}",
kv("operationName", lockOperation.operationName),
kv("operationName", lockOperation.lockName),
kv("requestLockValue.application", lockOperation.lockValue.getApplication()),
kv("requestLockValue.type", lockOperation.lockValue.getType()),
kv("requestLockValue.id", lockOperation.lockValue.getId()),
kv(
"requestLockHolder",
Optional.ofNullable(lockOperation.lockHolder).orElse("UNSPECIFIED")),
t);
if (lockingConfigurationProperties.isLearningMode()) {
return;
}

if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
throw new RuntimeException("Exception in RedisLockManager", t);
}
}
}

static String getLockKey(String lockName) {
return "{namedlock}:" + lockName;
}

static final String ACQUIRE_LOCK =
""
+ "local lockKey, lockValueApplication, lockValueType, lockValue, holderHashKey, ttlSeconds = "
+ " KEYS[1], ARGV[1], ARGV[2], ARGV[3], 'lockHolder.' .. ARGV[4], tonumber(ARGV[5]);"
+ "if redis.call('exists', lockKey) == 1 then"
+ " if not (redis.call('hget', lockKey, 'lockValueApplication') == lockValueApplication and "
+ " redis.call('hget', lockKey, 'lockValueType') == lockValueType and"
+ " redis.call('hget', lockKey, 'lockValue') == lockValue) then"
+ " return redis.call('hmget', lockKey, 'lockValueApplication', 'lockValueType', 'lockValue');"
+ " end;"
+ "end;"
+ "redis.call('hmset', lockKey, 'lockValueApplication', lockValueApplication, "
+ " 'lockValueType', lockValueType, 'lockValue', lockValue, holderHashKey, 'true');"
+ "redis.call('expire', lockKey, ttlSeconds);"
+ "return {lockValueApplication, lockValueType, lockValue};";

static final String EXTEND_LOCK =
""
+ "local lockKey, lockValueApplication, lockValueType, lockValue, ttlSeconds = "
+ " KEYS[1], ARGV[1], ARGV[2], ARGV[3], tonumber(ARGV[4]);"
+ "if not (redis.call('hget', lockKey, 'lockValueApplication') == lockValueApplication and "
+ " redis.call('hget', lockKey, 'lockValueType') == lockValueType and"
+ " redis.call('hget', lockKey, 'lockValue') == lockValue) then"
+ " return redis.call('hmget', lockKey, 'lockValueApplication', 'lockValueType', 'lockValue');"
+ "end;"
+ "redis.call('expire', lockKey, ttlSeconds);"
+ "return {lockValueApplication, lockValueType, lockValue};";

static final String RELEASE_LOCK =
""
+ "local lockKey, lockValueApplication, lockValueType, lockValue, holderHashKey = "
+ " KEYS[1], ARGV[1], ARGV[2], ARGV[3], 'lockHolder.' .. ARGV[4];"
+ "if (redis.call('hget', lockKey, 'lockValueApplication') == lockValueApplication and "
+ " redis.call('hget', lockKey, 'lockValueType') == lockValueType and"
+ " redis.call('hget', lockKey, 'lockValue') == lockValue) then"
+ " redis.call('hdel', lockKey, holderHashKey);"
+ " if (redis.call('hlen', lockKey) == 3) then"
+ " redis.call('del', lockKey);"
+ " end;"
+ "end;"
+ "return 1;";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.netflix.spinnaker.orca.locks;

import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import redis.clients.jedis.JedisCluster;

@Component
@ConditionalOnProperty(value = "redis.cluster-enabled")
public class RedisClusterLockManager extends AbstractRedisLockManager {

private final JedisCluster cluster;

@Autowired
public RedisClusterLockManager(JedisCluster cluster, LockingConfigurationProperties properties) {
super(properties);
this.cluster = cluster;
}

@Override
@SuppressWarnings("unchecked")
public void acquireLock(String lockName, LockValue lockValue, String lockHolder, int ttlSeconds)
throws LockFailureException {
withLockingConfiguration(
LockOperation.acquire(lockName, lockValue, lockHolder, ttlSeconds),
(op) -> {
final List<String> result =
(List<String>) cluster.eval(ACQUIRE_LOCK, op.key(), op.acquireArgs());
checkResult(op, result);
});
}

@Override
@SuppressWarnings("unchecked")
public void extendLock(String lockName, LockValue lockValue, int ttlSeconds)
throws LockFailureException {
withLockingConfiguration(
LockOperation.extend(lockName, lockValue, ttlSeconds),
(op) -> {
final List<String> result =
(List<String>) cluster.eval(EXTEND_LOCK, op.key(), op.extendArgs());
checkResult(op, result);
});
}

@Override
public void releaseLock(String lockName, LockValue lockValue, String lockHolder) {
withLockingConfiguration(
LockOperation.release(lockName, lockValue, lockHolder),
(op) -> cluster.eval(RELEASE_LOCK, op.key(), op.releaseArgs()));
}
}
Loading

0 comments on commit 2ede1ac

Please sign in to comment.