Skip to content

Commit

Permalink
feat(queue): shovel support going to/from redis cluster (#3053)
Browse files Browse the repository at this point in the history
  • Loading branch information
asher authored Jul 22, 2019
1 parent 6eb1718 commit aef16bf
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 8 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
fiatVersion=1.1.0
enablePublishing=false
spinnakerGradleVersion=6.5.0
korkVersion=5.8.8
korkVersion=5.9.0
keikoVersion=2.12.0
org.gradle.parallel=true
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Primary
import redis.clients.jedis.Jedis
import redis.clients.jedis.JedisCluster
import redis.clients.util.Pool
Expand Down Expand Up @@ -73,6 +74,7 @@ class RedisOrcaQueueConfiguration : RedisQueueConfiguration() {

@Bean
@ConditionalOnProperty(value = ["redis.cluster-enabled"], havingValue = "false", matchIfMissing = true)
@Primary
override fun queue(
@Qualifier("queueRedisPool") redisPool: Pool<Jedis>,
redisQueueProperties: RedisQueueProperties,
Expand All @@ -87,6 +89,7 @@ class RedisOrcaQueueConfiguration : RedisQueueConfiguration() {

@Bean
@ConditionalOnProperty(value = ["redis.cluster-enabled"])
@Primary
override fun clusterQueue(
@Qualifier("queueRedisCluster") cluster: JedisCluster,
redisQueueProperties: RedisQueueProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,26 @@ import com.netflix.spinnaker.kork.jedis.JedisDriverProperties
import com.netflix.spinnaker.kork.jedis.JedisPoolFactory
import com.netflix.spinnaker.orca.q.QueueShovel
import com.netflix.spinnaker.q.Activator
import com.netflix.spinnaker.q.DeadMessageCallback
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.migration.SerializationMigrator
import com.netflix.spinnaker.q.redis.RedisDeadMessageHandler
import com.netflix.spinnaker.q.redis.AbstractRedisQueue
import com.netflix.spinnaker.q.redis.RedisClusterQueue
import com.netflix.spinnaker.q.redis.RedisQueue
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.springframework.beans.factory.BeanInitializationException
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import redis.clients.jedis.HostAndPort
import redis.clients.jedis.Jedis
import redis.clients.jedis.JedisCluster
import redis.clients.jedis.Protocol
import redis.clients.util.Pool
import java.net.URI
import java.time.Clock
import java.time.Duration
import java.util.Optional
Expand All @@ -45,7 +50,9 @@ import java.util.Optional
class RedisQueueShovelConfiguration {

@Bean
@ConditionalOnProperty("redis.connection-previous")
@ConditionalOnExpression(
"'\${redis.connection-previous:#{null}}' != null && '\${redis.previous-cluster-enabled:false}' == false"
)
fun previousQueueJedisPool(
@Value("\${redis.connection:redis://localhost:6379}") mainConnection: String,
@Value("\${redis.connection-previous:#{null}}") previousConnection: String?,
Expand All @@ -68,12 +75,42 @@ class RedisQueueShovelConfiguration {
)
}

@Bean
@ConditionalOnExpression(
"'\${redis.connection-previous:#{null}}' != null && '\${redis.previous-cluster-enabled:false}' == true"
)
fun previousJedisCluster(
@Value("\${redis.connection:redis://localhost:6379}") mainConnection: String,
@Value("\${redis.connection-previous}") previousConnection: String,
@Value("\${redis.timeout:2000}") timeout: Int,
@Value("\${redis.maxattempts:4}") maxAttempts: Int,
redisPoolConfig: GenericObjectPoolConfig<*>,
registry: Registry
): JedisCluster {
if (mainConnection == previousConnection) {
throw BeanInitializationException("previous Redis connection must not be the same as current connection")
}
URI.create(previousConnection).let { cx ->
val port = if (cx.port == -1) Protocol.DEFAULT_PORT else cx.port
val password = cx.userInfo?.substringAfter(":")
return JedisCluster(
HostAndPort(cx.host, port),
timeout,
timeout,
maxAttempts,
password,
redisPoolConfig
)
}
}

@Bean(name = ["previousQueue"])
@ConditionalOnBean(name = ["previousQueueJedisPool"]) fun previousRedisQueue(
@ConditionalOnBean(name = ["previousQueueJedisPool"])
fun previousRedisQueue(
@Qualifier("previousQueueJedisPool") redisPool: Pool<Jedis>,
redisQueueProperties: RedisQueueProperties,
clock: Clock,
deadMessageHandler: RedisDeadMessageHandler,
deadMessageHandler: DeadMessageCallback,
publisher: EventPublisher,
redisQueueObjectMapper: ObjectMapper,
serializationMigrator: Optional<SerializationMigrator>
Expand All @@ -89,9 +126,32 @@ class RedisQueueShovelConfiguration {
serializationMigrator = serializationMigrator
)

@Bean(name = ["previousClusterQueue"])
@ConditionalOnBean(name = ["previousJedisCluster"])
fun previousRedisClusterQueue(
@Qualifier("previousJedisCluster") cluster: JedisCluster,
redisQueueProperties: RedisQueueProperties,
clock: Clock,
deadMessageHandler: DeadMessageCallback,
publisher: EventPublisher,
redisQueueObjectMapper: ObjectMapper,
serializationMigrator: Optional<SerializationMigrator>
) =
RedisClusterQueue(
queueName = redisQueueProperties.queueName,
jedisCluster = cluster,
clock = clock,
mapper = redisQueueObjectMapper,
deadMessageHandlers = listOf(deadMessageHandler),
publisher = publisher,
ackTimeout = Duration.ofSeconds(redisQueueProperties.ackTimeoutSeconds.toLong()),
serializationMigrator = serializationMigrator
)

@Bean
@ConditionalOnBean(name = arrayOf("previousQueueJedisPool")) fun redisQueueShovel(
queue: RedisQueue,
@ConditionalOnBean(name = ["previousQueueJedisPool"])
fun redisQueueShovel(
queue: AbstractRedisQueue,
@Qualifier("previousQueue") previousQueueImpl: RedisQueue,
registry: Registry,
discoveryActivator: Activator
Expand All @@ -102,4 +162,19 @@ class RedisQueueShovelConfiguration {
registry = registry,
activator = discoveryActivator
)

@Bean
@ConditionalOnBean(name = ["previousClusterQueue"])
fun priorRedisClusterQueueShovel(
queue: AbstractRedisQueue,
@Qualifier("previousClusterQueue") previousQueueImpl: AbstractRedisQueue,
registry: Registry,
discoveryActivator: Activator
) =
QueueShovel(
queue = queue,
previousQueue = previousQueueImpl,
registry = registry,
activator = discoveryActivator
)
}

0 comments on commit aef16bf

Please sign in to comment.