Skip to content

Commit

Permalink
Revert "chore(*): Remove queue traffic shaping codebase"
Browse files Browse the repository at this point in the history
This reverts commit 077b9fe.
  • Loading branch information
robfletcher committed Mar 29, 2018
1 parent 077b9fe commit 7aa934b
Show file tree
Hide file tree
Showing 20 changed files with 1,418 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class RedisOrcaQueueConfiguration : RedisQueueConfiguration() {

@Bean fun orcaToKeikoSerializationMigrator(objectMapper: ObjectMapper) = OrcaToKeikoSerializationMigrator(objectMapper)

@Bean override fun queue(
@Bean(name = arrayOf("queueImpl")) override fun queue(
@Qualifier("queueRedisPool") redisPool: Pool<Jedis>,
redisQueueProperties: RedisQueueProperties,
clock: Clock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class RedisQueueShovelConfiguration {
return RedisConfiguration.createPool(redisPoolConfig, previousConnection, timeout, registry, "previousQueueJedisPool")
}

@Bean(name = ["previous"])
@Bean(name = ["previousQueueImpl"])
@ConditionalOnBean(name = ["previousQueueJedisPool"]) fun previousRedisQueue(
@Qualifier("previousQueueJedisPool") redisPool: Pool<Jedis>,
redisQueueProperties: RedisQueueProperties,
Expand All @@ -82,8 +82,8 @@ class RedisQueueShovelConfiguration {

@Bean
@ConditionalOnBean(name = arrayOf("previousQueueJedisPool")) fun redisQueueShovel(
queueImpl: RedisQueue,
@Qualifier("previous") previousQueueImpl: RedisQueue,
@Qualifier("queueImpl") queueImpl: RedisQueue,
@Qualifier("previousQueueImpl") previousQueueImpl: RedisQueue,
registry: Registry,
activator: Activator
) =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.config

import com.netflix.spinnaker.orca.q.redis.RedisPriorityCapacityRepository
import com.netflix.spinnaker.orca.q.redis.RedisRateLimitBackend
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import redis.clients.jedis.Jedis
import redis.clients.util.Pool
import java.time.Clock

@Configuration
@ConditionalOnExpression("\${queue.redis.enabled:true} && \${queue.trafficShaping.enabled:false}")
class RedisTrafficShapingConfiguration {
@Bean fun redisRateLimitBackend(
@Qualifier("jedisPool") redisPool: Pool<Jedis>,
clock: Clock
) =
RedisRateLimitBackend(redisPool, clock)

@Bean fun redisPriorityCapacityRepository(
@Qualifier("jedisPool") redisPool: Pool<Jedis>,
properties: TrafficShapingProperties.PriorityCapacityProperties
) =
RedisPriorityCapacityRepository(redisPool, properties)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.orca.q.redis

import com.netflix.spinnaker.config.TrafficShapingProperties
import com.netflix.spinnaker.orca.q.trafficshaping.capacity.GlobalCapacity
import com.netflix.spinnaker.orca.q.trafficshaping.capacity.Priority
import com.netflix.spinnaker.orca.q.trafficshaping.capacity.PriorityCapacityRepository
import redis.clients.jedis.Jedis
import redis.clients.util.Pool

class RedisPriorityCapacityRepository(
private val pool: Pool<Jedis>,
private val properties: TrafficShapingProperties.PriorityCapacityProperties
) : PriorityCapacityRepository {

private val key = "queue:trafficShaping:priorityCapacity"

override fun incrementExecutions(executionId: String, priority: Priority) {
pool.resource.use { redis ->
redis.sadd(getPriorityKey(priority), executionId)
}
}

override fun decrementExecutions(executionId: String, priority: Priority) {
pool.resource.use { redis ->
redis.srem(getPriorityKey(priority), executionId)
}
}

override fun getGlobalCapacity(): GlobalCapacity {
pool.resource.use { redis ->
return GlobalCapacity(
ceiling = (redis.get("$key:ceiling") ?: properties.capacity).toString().toInt(),
criticalUsage = (redis.scard(getPriorityKey(Priority.CRITICAL)) ?: 0).toString().toInt(),
highUsage = (redis.scard(getPriorityKey(Priority.HIGH)) ?: 0).toString().toInt(),
mediumUsage = (redis.scard(getPriorityKey(Priority.MEDIUM)) ?: 0).toString().toInt(),
lowUsage = (redis.scard(getPriorityKey(Priority.LOW)) ?: 0).toString().toInt(),
learning = if (redis.exists("$key:learning")) redis.get("$key:learning").toBoolean() else null
)
}
}

private fun getPriorityKey(priority: Priority) = "$key:${priority.name.toLowerCase()}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.orca.q.redis

import com.netflix.spinnaker.orca.q.trafficshaping.ratelimit.RateLimit
import com.netflix.spinnaker.orca.q.trafficshaping.ratelimit.RateLimitBackend
import com.netflix.spinnaker.orca.q.trafficshaping.ratelimit.RateLimitContext
import redis.clients.jedis.Jedis
import redis.clients.util.Pool
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit

/**
* Keys used, all namespaced by the interceptor's name, e.g. "appRateLimit":
*
* - queue:trafficShaping:{ns}:learning - interceptor flag to enable/disable
* - queue:trafficShaping:{ns}:capacity - number of actions per second
* - queue:trafficShaping:{ns}:ignoring - a list of ignored subjects
* - queue:trafficShaping:{ns}:enforcing - a list of enforced subjects
* - queue:trafficShaping:{ns}:{subject}:learning - subject-specific learning flag
* - queue:trafficShaping:{ns}:{subject}:capacity - subject-specific capacity
* - queue:trafficShaping:{ns}:duration - rate limit duration
*/
class RedisRateLimitBackend(
private val pool: Pool<Jedis>,
private val clock: Clock
) : RateLimitBackend {

override fun incrementAndGet(subject: String, context: RateLimitContext): RateLimit {
pool.resource.use { redis ->
val key = "queue:trafficShaping:${context.namespace}:$subject"
val count = redis.get(key)
val ttl = getTTL(redis, key, count == null)

val newCount = redis.incr(key)
if (newCount == 1L) {
redis.pexpire(key, ttl.minusMillis(clock.instant().toEpochMilli()).toEpochMilli())
}

val capacity = getCapacity(redis, context.namespace, subject, context.capacity)

val limiting = Math.max(capacity - newCount, 0) == 0L
if (isLearning(redis, context.namespace, subject, !context.enforcing)) {
return RateLimit(limiting, Duration.ZERO, false)
}

return RateLimit(limiting, Duration.of(getDuration(redis, context.namespace, context.duration), ChronoUnit.MILLIS), true)
}
}

private fun getTTL(redis: Jedis, key: String, missingKey: Boolean): Instant
= if (missingKey) clock.instant().plusMillis(1000) else clock.instant().plusMillis(redis.pttl(key))

private fun isLearning(redis: Jedis, ns: String, subject: String, default: Boolean): Boolean {
val ignoring = redis.smembers("queue:trafficShaping:$ns:ignoring")
if (ignoring.contains(subject)) {
return true
}

val enforcing = redis.smembers("queue:trafficShaping:$ns:enforcing")
if (enforcing.contains(subject)) {
return false
}

val nsLearning: String? = redis.get("queue:trafficShaping:$ns:learning")
val appLearning: String? = redis.get("queue:trafficShaping:$ns:$subject:learning")

return appLearning?.toBoolean() ?: nsLearning?.toBoolean() ?: default
}

private fun getCapacity(redis: Jedis, ns: String, subject: String, default: Int): Int {
val subjectCap: String? = redis.get("queue:trafficShaping:$ns:$subject:capacity")
if (subjectCap != null) {
return subjectCap.toInt()
}

val nsCap: String? = redis.get("queue:trafficShaping:$ns:capacity")
if (nsCap != null) {
return nsCap.toInt()
}

return default
}

private fun getDuration(redis: Jedis, ns: String, default: Long): Long {
val nsDuration: String? = redis.get("queue.trafficShaping:$ns:duration")
if (nsDuration != null) {
return nsDuration.toLong()
}

return default
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import java.time.Clock
@Configuration
@ComponentScan(basePackages = [
"com.netflix.spinnaker.orca.q",
"com.netflix.spinnaker.orca.q.handler"
"com.netflix.spinnaker.orca.q.handler",
"com.netflix.spinnaker.orca.q.trafficshaping"
])
@EnableScheduling
class OrcaQueueConfiguration {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.config

import com.netflix.spectator.api.Id
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.orca.q.trafficshaping.NoopTrafficShapingInterceptor
import com.netflix.spinnaker.orca.q.trafficshaping.TrafficShapingInterceptor
import com.netflix.spinnaker.orca.q.trafficshaping.capacity.ConstantPrioritizationStrategy
import com.netflix.spinnaker.orca.q.trafficshaping.capacity.PrioritizationStrategy
import com.netflix.spinnaker.orca.q.trafficshaping.capacity.PriorityCapacityListener
import com.netflix.spinnaker.orca.q.trafficshaping.capacity.PriorityCapacityRepository
import com.netflix.spinnaker.orca.q.trafficshaping.interceptor.ApplicationRateLimitQueueInterceptor
import com.netflix.spinnaker.orca.q.trafficshaping.interceptor.GlobalRateLimitQueueInterceptor
import com.netflix.spinnaker.orca.q.trafficshaping.interceptor.PriorityCapacityQueueInterceptor
import com.netflix.spinnaker.orca.q.trafficshaping.ratelimit.NoopRateLimitBackend
import com.netflix.spinnaker.orca.q.trafficshaping.ratelimit.RateLimitBackend
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
@ConditionalOnProperty("queue.trafficShaping.enabled")
@EnableConfigurationProperties(
TrafficShapingProperties::class,
TrafficShapingProperties.GlobalRateLimitingProperties::class,
TrafficShapingProperties.ApplicationRateLimitingProperties::class,
TrafficShapingProperties.PriorityCapacityProperties::class
)
class TrafficShapingConfiguration {

@Bean
fun timeShapedId(registry: Registry): Id = registry.createId("queue.trafficShaping.time")

@Bean
@ConditionalOnMissingBean(RateLimitBackend::class)
@ConditionalOnProperty("queue.trafficShaping.applicationRateLimiting.enabled")
fun noopRateLimitBackend(): RateLimitBackend = NoopRateLimitBackend()

@Bean @ConditionalOnProperty("queue.trafficShaping.globalRateLimiting.enabled")
fun globalRateLimitQueueInterceptor(rateLimitBackend: RateLimitBackend,
registry: Registry,
globalRateLimitingProperties: TrafficShapingProperties.GlobalRateLimitingProperties,
timeShapedId: Id
): TrafficShapingInterceptor
= GlobalRateLimitQueueInterceptor(rateLimitBackend, registry, globalRateLimitingProperties, timeShapedId)

@Bean @ConditionalOnProperty("queue.trafficShaping.applicationRateLimiting.enabled")
fun applicationRateLimitQueueInterceptor(rateLimitBackend: RateLimitBackend,
registry: Registry,
applicationRateLimitingProperties: TrafficShapingProperties.ApplicationRateLimitingProperties,
timeShapedId: Id
): TrafficShapingInterceptor
= ApplicationRateLimitQueueInterceptor(rateLimitBackend, registry, applicationRateLimitingProperties, timeShapedId)

@Bean
@ConditionalOnMissingBean(PrioritizationStrategy::class)
@ConditionalOnProperty("queue.trafficShaping.priorityCapacity.enabled")
fun constantPrioritizationStrategy() = ConstantPrioritizationStrategy()

@Bean @ConditionalOnProperty("queue.trafficShaping.priorityCapacity.enabled")
fun priorityCapacityListener(priorityCapacityRepository: PriorityCapacityRepository,
prioritizationStrategy: PrioritizationStrategy)
= PriorityCapacityListener(priorityCapacityRepository, prioritizationStrategy)

@Bean @ConditionalOnProperty("queue.trafficShaping.priorityCapacity.enabled")
fun priorityCapacityQueueInterceptor(priorityCapacityRepository: PriorityCapacityRepository,
prioritizationStrategy: PrioritizationStrategy,
registry: Registry,
properties: TrafficShapingProperties.PriorityCapacityProperties,
timeShapedId: Id): TrafficShapingInterceptor
= PriorityCapacityQueueInterceptor(priorityCapacityRepository, prioritizationStrategy, registry, properties, timeShapedId)

@Bean @ConditionalOnMissingBean(TrafficShapingInterceptor::class)
fun noopTrafficShapingInterceptor() = NoopTrafficShapingInterceptor()
}
Loading

0 comments on commit 7aa934b

Please sign in to comment.