Skip to content

Commit

Permalink
chore(*): Wiring up kork redis/dynomite client factories (#2009)
Browse files Browse the repository at this point in the history
  • Loading branch information
robzienert authored and asher committed Mar 17, 2018
1 parent 5edf551 commit 9034cae
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@

package com.netflix.spinnaker.orca.clouddriver.tasks.providers.aws

import com.netflix.spinnaker.kork.jedis.JedisClientDelegate
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate
import com.netflix.spinnaker.orca.Task
import com.netflix.spinnaker.orca.clouddriver.tasks.servergroup.WaitForRequiredInstancesDownTask
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import redis.clients.jedis.Jedis
import redis.clients.util.Pool

@Configuration
class NetflixAWSConfiguration {
Expand All @@ -32,4 +36,9 @@ class NetflixAWSConfiguration {
Class<? extends Task> waitForAllInstancesDownOnDisableTaskType() {
return useWaitForAllNetflixAWSInstancesDownTask ? WaitForAllNetflixAWSInstancesDownTask : WaitForRequiredInstancesDownTask
}

@Bean
RedisClientDelegate redisClientDelegate(Pool<Jedis> jedisPool) {
return new JedisClientDelegate("primaryDefault", jedisPool)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package com.netflix.spinnaker.orca.config;

import java.lang.reflect.Field;
import java.net.URI;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.kork.dynomite.DynomiteClientConfiguration;
import com.netflix.spinnaker.kork.jedis.JedisClientConfiguration;
import com.netflix.spinnaker.kork.jedis.JedisClientDelegate;
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate;
import org.apache.commons.pool2.impl.GenericObjectPool;
Expand All @@ -34,17 +34,29 @@
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Protocol;
import redis.clients.util.Pool;

import java.lang.reflect.Field;
import java.net.URI;

import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static redis.clients.jedis.Protocol.DEFAULT_DATABASE;

@Configuration
@Import({JedisClientConfiguration.class, DynomiteClientConfiguration.class})
public class RedisConfiguration {

public static class Clients {
public static final String EXECUTION_REPOSITORY = "executionRepository";
public static final String TASK_QUEUE = "taskQueue";
}

@Deprecated // rz - Kept for backwards compat with old connection configs
@Bean
@ConfigurationProperties("redis")
public GenericObjectPoolConfig redisPoolConfig() {
Expand All @@ -55,6 +67,7 @@ public GenericObjectPoolConfig redisPoolConfig() {
return config;
}

@Deprecated // rz - Kept for backwards compat with old connection configs
@Bean(name = "jedisPool")
@Primary
public Pool<Jedis> jedisPool(
Expand All @@ -66,6 +79,7 @@ public Pool<Jedis> jedisPool(
return createPool(redisPoolConfig, connection, timeout, registry, "jedisPool");
}

@Deprecated // rz - Kept for backwards compat with old connection configs
@Bean(name = "jedisPoolPrevious")
@ConditionalOnProperty("redis.connectionPrevious")
@ConditionalOnExpression("${redis.connection} != ${redis.connectionPrevious}")
Expand All @@ -77,22 +91,25 @@ JedisPool jedisPoolPrevious(
return createPool(null, previousConnection, timeout, registry, "jedisPoolPrevious");
}

@Deprecated // rz - Kept for backwards compat with old connection configs
@Bean(name = "redisClientDelegate")
@Primary
RedisClientDelegate redisClientDelegate(
@Qualifier("jedisPool") Pool<Jedis> jedisPool
) {
return new JedisClientDelegate(jedisPool);
return new JedisClientDelegate("primaryDefault", jedisPool);
}

@Deprecated // rz - Kept for backwards compat with old connection configs
@Bean(name = "previousRedisClientDelegate")
@ConditionalOnBean(name = "jedisPoolPrevious")
RedisClientDelegate previousRedisClientDelegate(
@Qualifier("jedisPoolPrevious") JedisPool jedisPoolPrevious
) {
return new JedisClientDelegate(jedisPoolPrevious);
return new JedisClientDelegate("previousDefault", jedisPoolPrevious);
}

@Deprecated // rz - Kept for backwards compat with old connection configs
@Bean
HealthIndicator redisHealth(
@Qualifier("jedisPool") Pool<Jedis> jedisPool
Expand Down Expand Up @@ -130,6 +147,7 @@ HealthIndicator redisHealth(
}
}

@Deprecated // rz - Kept for backwards compat with old connection configs
public static JedisPool createPool(
GenericObjectPoolConfig redisPoolConfig,
String connection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package com.netflix.spinnaker.orca.pipeline.persistence.dynomite;

import com.netflix.dyno.jedis.DynoJedisPipeline;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate;
import com.netflix.spinnaker.kork.jedis.RedisClientSelector;
import com.netflix.spinnaker.orca.pipeline.model.Execution;
import com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType;
import com.netflix.spinnaker.orca.pipeline.model.Stage;
Expand Down Expand Up @@ -63,27 +63,21 @@ public class DynomiteExecutionRepository extends AbstractRedisExecutionRepositor
private final Logger log = LoggerFactory.getLogger(getClass());

public DynomiteExecutionRepository(
Registry registry,
@Qualifier("redisClientDelegate") RedisClientDelegate redisClientDelegate,
@Qualifier("previousRedisClientDelegate") Optional<RedisClientDelegate> previousRedisClientDelegate,
RedisClientSelector redisClientSelector,
@Qualifier("queryAllScheduler") Scheduler queryAllScheduler,
@Qualifier("queryByAppScheduler") Scheduler queryByAppScheduler,
@Value("${chunkSize.executionRepository:75}") Integer threadPoolChunkSize
) {
super(registry, redisClientDelegate, previousRedisClientDelegate, queryAllScheduler, queryByAppScheduler, threadPoolChunkSize);
super(redisClientSelector, queryAllScheduler, queryByAppScheduler, threadPoolChunkSize);
}

public DynomiteExecutionRepository(
Registry registry,
RedisClientDelegate redisClientDelegate,
Optional<RedisClientDelegate> previousRedisClientDelegate,
RedisClientSelector redisClientSelector,
Integer threadPoolSize,
Integer threadPoolChunkSize
) {
super(
registry,
redisClientDelegate,
previousRedisClientDelegate,
redisClientSelector,
Schedulers.from(Executors.newFixedThreadPool(10)),
Schedulers.from(Executors.newFixedThreadPool(threadPoolSize)),
threadPoolChunkSize
Expand Down Expand Up @@ -267,5 +261,4 @@ protected String pipelineKey(String id) {
protected String orchestrationKey(String id) {
return format("{%s:%s}", ORCHESTRATION, id);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,11 @@

package com.netflix.spinnaker.orca.pipeline.persistence.jedis;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate;
import com.netflix.spinnaker.kork.jedis.RedisClientSelector;
import com.netflix.spinnaker.orca.ExecutionStatus;
import com.netflix.spinnaker.orca.jackson.OrcaObjectMapper;
import com.netflix.spinnaker.orca.pipeline.model.*;
Expand All @@ -46,15 +38,22 @@
import rx.functions.Func1;
import rx.functions.Func2;

import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static com.netflix.spinnaker.orca.config.RedisConfiguration.Clients.EXECUTION_REPOSITORY;
import static com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.ORCHESTRATION;
import static com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.PIPELINE;
import static com.netflix.spinnaker.orca.pipeline.model.Execution.NO_TRIGGER;
import static com.netflix.spinnaker.orca.pipeline.model.SyntheticStageOwner.*;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.*;
import static net.logstash.logback.argument.StructuredArguments.value;

public abstract class AbstractRedisExecutionRepository implements ExecutionRepository {
Expand All @@ -71,23 +70,19 @@ public abstract class AbstractRedisExecutionRepository implements ExecutionRepos
private final int chunkSize;
private final Scheduler queryAllScheduler;
private final Scheduler queryByAppScheduler;
private final Registry registry;
private final Logger log = LoggerFactory.getLogger(getClass());

public AbstractRedisExecutionRepository(
Registry registry,
RedisClientDelegate redisClientDelegate,
Optional<RedisClientDelegate> previousRedisClientDelegate,
RedisClientSelector redisClientSelector,
Scheduler queryAllScheduler,
Scheduler queryByAppScheduler,
Integer threadPoolChunkSize
) {
this.redisClientDelegate = redisClientDelegate;
this.previousRedisClientDelegate = previousRedisClientDelegate;
this.redisClientDelegate = redisClientSelector.primary(EXECUTION_REPOSITORY);
this.previousRedisClientDelegate = redisClientSelector.previous(EXECUTION_REPOSITORY);
this.queryAllScheduler = queryAllScheduler;
this.queryByAppScheduler = queryByAppScheduler;
this.chunkSize = threadPoolChunkSize;
this.registry = registry;
}

abstract protected Execution retrieveInternal(RedisClientDelegate redisClientDelegate, ExecutionType type, String id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,8 @@

package com.netflix.spinnaker.orca.pipeline.persistence.jedis;

import java.util.*;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate;
import com.netflix.spinnaker.kork.jedis.RedisClientSelector;
import com.netflix.spinnaker.orca.pipeline.model.Execution;
import com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType;
import com.netflix.spinnaker.orca.pipeline.model.Stage;
Expand All @@ -39,6 +34,11 @@
import rx.Scheduler;
import rx.schedulers.Schedulers;

import javax.annotation.Nonnull;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import static com.google.common.collect.Maps.filterValues;
import static com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.ORCHESTRATION;
import static com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.PIPELINE;
Expand All @@ -55,27 +55,21 @@ public class JedisExecutionRepository extends AbstractRedisExecutionRepository {

@Autowired
public JedisExecutionRepository(
Registry registry,
@Qualifier("redisClientDelegate") RedisClientDelegate redisClientDelegate,
@Qualifier("previousRedisClientDelegate") Optional<RedisClientDelegate> previousRedisClientDelegate,
RedisClientSelector redisClientSelector,
@Qualifier("queryAllScheduler") Scheduler queryAllScheduler,
@Qualifier("queryByAppScheduler") Scheduler queryByAppScheduler,
@Value("${chunkSize.executionRepository:75}") Integer threadPoolChunkSize
) {
super(registry, redisClientDelegate, previousRedisClientDelegate, queryAllScheduler, queryByAppScheduler, threadPoolChunkSize);
super(redisClientSelector, queryAllScheduler, queryByAppScheduler, threadPoolChunkSize);
}

public JedisExecutionRepository(
Registry registry,
RedisClientDelegate redisClientDelegate,
Optional<RedisClientDelegate> previousRedisClientDelegate,
RedisClientSelector redisClientSelector,
Integer threadPoolSize,
Integer threadPoolChunkSize
) {
super(
registry,
redisClientDelegate,
previousRedisClientDelegate,
redisClientSelector,
Schedulers.from(Executors.newFixedThreadPool(10)),
Schedulers.from(Executors.newFixedThreadPool(threadPoolSize)),
threadPoolChunkSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package com.netflix.spinnaker.orca.pipeline.persistence

import com.netflix.dyno.jedis.DynoJedisClient
import com.netflix.spectator.api.NoopRegistry
import com.netflix.spinnaker.kork.dynomite.DynomiteClientDelegate
import com.netflix.spinnaker.kork.dynomite.LocalRedisDynomiteClient
import com.netflix.spinnaker.kork.jedis.EmbeddedRedis
import com.netflix.spinnaker.kork.jedis.JedisClientDelegate
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate
import com.netflix.spinnaker.kork.jedis.RedisClientSelector
import com.netflix.spinnaker.orca.ExecutionStatus
import com.netflix.spinnaker.orca.pipeline.model.DefaultTrigger
import com.netflix.spinnaker.orca.pipeline.model.Execution
Expand Down Expand Up @@ -516,20 +516,21 @@ class JedisExecutionRepositorySpec extends ExecutionRepositoryTck<JedisExecution
Pool<Jedis> jedisPool = embeddedRedis.pool
Pool<Jedis> jedisPoolPrevious = embeddedRedisPrevious.pool

RedisClientDelegate redisClientDelegate = new JedisClientDelegate(jedisPool)
Optional<RedisClientDelegate> previousRedisClientDelegate = Optional.of(new JedisClientDelegate(jedisPoolPrevious))
RedisClientDelegate redisClientDelegate = new JedisClientDelegate("primaryDefault", jedisPool)
RedisClientDelegate previousRedisClientDelegate = new JedisClientDelegate("previousDefault", jedisPoolPrevious)
RedisClientSelector redisClientSelector = new RedisClientSelector([redisClientDelegate, previousRedisClientDelegate])

@AutoCleanup
def jedis = jedisPool.resource

@Override
JedisExecutionRepository createExecutionRepository() {
return new JedisExecutionRepository(new NoopRegistry(), redisClientDelegate, previousRedisClientDelegate, 1, 50)
return new JedisExecutionRepository(redisClientSelector, 1, 50)
}

@Override
JedisExecutionRepository createExecutionRepositoryPrevious() {
return new JedisExecutionRepository(new NoopRegistry(), previousRedisClientDelegate.get(), Optional.empty(), 1, 50)
return new JedisExecutionRepository(new RedisClientSelector([new JedisClientDelegate("primaryDefault", jedisPoolPrevious)]), 1, 50)
}


Expand Down Expand Up @@ -656,7 +657,7 @@ class JedisExecutionRepositorySpec extends ExecutionRepositoryTck<JedisExecution
repository.updateStageContext(stage)

then:
previousRepository.retrieve(ORCHESTRATION, orchestration.id).stages.first().getContext() == [why: 'hello']
repository.retrieve(ORCHESTRATION, orchestration.id).stages.first().getContext() == [why: 'hello']

}

Expand Down Expand Up @@ -1154,16 +1155,16 @@ class DynomiteExecutionRepositorySpec extends JedisExecutionRepositorySpec {
DynoJedisClient embeddedDyno = new LocalRedisDynomiteClient(embeddedRedis.port).getClient()
DynoJedisClient embeddedDynoPrevious = new LocalRedisDynomiteClient(embeddedRedisPrevious.port).getClient()

RedisClientDelegate dynoClientDelegate = new DynomiteClientDelegate(embeddedDyno)
Optional<RedisClientDelegate> previousDynoClientDelegate = Optional.of(new DynomiteClientDelegate(embeddedDynoPrevious))
RedisClientDelegate dynoClientDelegate = new DynomiteClientDelegate("primaryDefault", embeddedDyno)
RedisClientDelegate previousDynoClientDelegate = new DynomiteClientDelegate("previousDefault", embeddedDynoPrevious)

return new DynomiteExecutionRepository(new NoopRegistry(), dynoClientDelegate, previousDynoClientDelegate, 1, 50)
return new DynomiteExecutionRepository(new RedisClientSelector([dynoClientDelegate, previousDynoClientDelegate]), 1, 50)
}

DynomiteExecutionRepository createDynomiteExecutionRepositoryPrevious() {
DynoJedisClient embeddedDynoPrevious = new LocalRedisDynomiteClient(embeddedRedisPrevious.port).getClient()
RedisClientDelegate dynoClientDelegate = new DynomiteClientDelegate(embeddedDynoPrevious)
return new DynomiteExecutionRepository(new NoopRegistry(), dynoClientDelegate, Optional.empty(), 1, 50)
RedisClientDelegate dynoClientDelegate = new DynomiteClientDelegate("primaryDefault", embeddedDynoPrevious)
return new DynomiteExecutionRepository(new RedisClientSelector([dynoClientDelegate]), 1, 50)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import com.netflix.spinnaker.assertj.assertSoftly
import com.netflix.spinnaker.config.OrcaQueueConfiguration
import com.netflix.spinnaker.config.QueueConfiguration
import com.netflix.spinnaker.kork.eureka.RemoteStatusChangedEvent
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate
import com.netflix.spinnaker.kork.jedis.RedisClientSelector
import com.netflix.spinnaker.orca.ExecutionStatus.*
import com.netflix.spinnaker.orca.TaskResult
import com.netflix.spinnaker.orca.config.OrcaConfiguration
Expand Down Expand Up @@ -848,5 +850,8 @@ class TestConfig {
deadMessageHandler = deadMessageHandler,
publisher = publisher
)

@Bean fun redisClientSelector(redisClientDelegates: List<RedisClientDelegate>) =
RedisClientSelector(redisClientDelegates)
}

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ public class EmbeddedRedisConfiguration {
}

@Bean public RedisClientDelegate redisClientDelegate(Pool<Jedis> jedisPool) {
return new JedisClientDelegate(jedisPool);
return new JedisClientDelegate("primaryDefault", jedisPool);
}
}

0 comments on commit 9034cae

Please sign in to comment.