Skip to content

Commit

Permalink
feat(cats/redis): account name based sharding logic (#5382)
Browse files Browse the repository at this point in the history
* feat(cats/redis): account name based sharding logic

* changed all the sharding related properties to a new root cache-sharding

Co-authored-by: kiran-godishala <kirangodishala@opsmx.io>
  • Loading branch information
kirangodishala and kiran-godishala committed Jun 18, 2021
1 parent b431f10 commit 205a83a
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 2021 OpsMx.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.cats.redis.cluster;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.spinnaker.cats.agent.Agent;
import com.netflix.spinnaker.cats.cluster.NodeIdentity;
import com.netflix.spinnaker.cats.cluster.ShardingFilter;
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService;
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CachingPodsObserver implements ShardingFilter, Runnable {

private static final Logger logger = LoggerFactory.getLogger(CachingPodsObserver.class);
private static final String REPLICA_SSET_KEY = "clouddriver:caching:replicas";
private static final String CORE_PROVIDER =
"com.netflix.spinnaker.clouddriver.core.provider.CoreProvider";
private final RedisClientDelegate redisClientDelegate;
private final NodeIdentity nodeIdentity;
private final long replicaKeyTtl;
private int podCount = 0;
private int podIndex = -1;
// this script adds or updates a unique id as a member of a sorted set with score equal to current
// time plus sharding.replica-key-ttl-seconds, deletes the members having scores less than current
// time(ms) and finally fetches list of all members of the sorted set which represent the live
// caching pods
private static final String HEARTBEAT_REFRESH_SCRIPT =
"redis.call('zadd', KEYS[1], ARGV[1], ARGV[2])"
+ " redis.call('zremrangebyscore', KEYS[1], '-inf', ARGV[3])"
+ " return redis.call('zrange', KEYS[1], '0', '-1')";

public CachingPodsObserver(
RedisClientDelegate redisClientDelegate,
NodeIdentity nodeIdentity,
DynamicConfigService dynamicConfigService) {
this.redisClientDelegate = redisClientDelegate;
this.nodeIdentity = nodeIdentity;
long observerIntervalSeconds =
dynamicConfigService.getConfig(
Integer.class, "cache-sharding.heartbeat-interval-seconds", 30);
replicaKeyTtl =
dynamicConfigService.getConfig(Integer.class, "cache-sharding.replica-ttl-seconds", 60);
ScheduledExecutorService podsObserverExecutorService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat(CachingPodsObserver.class.getSimpleName() + "-%d")
.build());
podsObserverExecutorService.scheduleAtFixedRate(
this, 0, observerIntervalSeconds, TimeUnit.SECONDS);
refreshHeartbeat();
logger.info("Account based sharding is enabled for all caching pods.");
}

@Override
public void run() {
try {
refreshHeartbeat();
} catch (Throwable t) {
logger.error("Failed to manage replicas heartbeat", t);
}
}

private void refreshHeartbeat() {
String now = String.valueOf(System.currentTimeMillis());
String expiry =
String.valueOf(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(replicaKeyTtl));
Object evalResponse =
redisClientDelegate.withScriptingClient(
client -> {
return client.eval(
HEARTBEAT_REFRESH_SCRIPT,
Collections.singletonList(REPLICA_SSET_KEY),
Arrays.asList(expiry, nodeIdentity.getNodeIdentity(), now));
});
if (evalResponse instanceof List) {
List<String> replicaList = (List) evalResponse;
podCount = replicaList.size();
podIndex =
replicaList.stream()
.sorted()
.collect(Collectors.toList())
.indexOf(nodeIdentity.getNodeIdentity());
logger.debug("caching pods = {} and this pod's index = {}", podCount, podIndex);
} else {
logger.error("Something is wrong, please check if the eval script and params are valid");
}

if (podCount == 0 || podIndex == -1) {
logger.error(
"No caching pod heartbeat records detected. Sharding logic can't be applied!!!!");
}
}

@Override
public boolean filter(Agent agent) {
if (agent.getProviderName().equals(CORE_PROVIDER)) {
return true;
}
return podCount == 1
|| Math.abs(getAccountName(agent.getAgentType()).hashCode() % podCount) == podIndex;
}

private String getAccountName(String agentType) {
if (agentType.contains("/")) {
return agentType.substring(0, agentType.indexOf('/'));
}
return agentType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.netflix.spinnaker.cats.cluster.AgentIntervalProvider;
import com.netflix.spinnaker.cats.cluster.NodeIdentity;
import com.netflix.spinnaker.cats.cluster.NodeStatusProvider;
import com.netflix.spinnaker.cats.cluster.ShardingFilter;
import com.netflix.spinnaker.cats.module.CatsModuleAware;
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService;
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate;
Expand Down Expand Up @@ -62,6 +63,7 @@ private static enum Status {
private final Map<String, NextAttempt> activeAgents = new ConcurrentHashMap<>();
private final NodeStatusProvider nodeStatusProvider;
private final DynamicConfigService dynamicConfigService;
private final ShardingFilter shardingFilter;

public ClusteredAgentScheduler(
RedisClientDelegate redisClientDelegate,
Expand All @@ -70,7 +72,8 @@ public ClusteredAgentScheduler(
NodeStatusProvider nodeStatusProvider,
String enabledAgentPattern,
Integer agentLockAcquisitionIntervalSeconds,
DynamicConfigService dynamicConfigService) {
DynamicConfigService dynamicConfigService,
ShardingFilter shardingFilter) {
this(
redisClientDelegate,
nodeIdentity,
Expand All @@ -86,7 +89,8 @@ public ClusteredAgentScheduler(
.build()),
enabledAgentPattern,
agentLockAcquisitionIntervalSeconds,
dynamicConfigService);
dynamicConfigService,
shardingFilter);
}

public ClusteredAgentScheduler(
Expand All @@ -98,14 +102,16 @@ public ClusteredAgentScheduler(
ExecutorService agentExecutionPool,
String enabledAgentPattern,
Integer agentLockAcquisitionIntervalSeconds,
DynamicConfigService dynamicConfigService) {
DynamicConfigService dynamicConfigService,
ShardingFilter shardingFilter) {
this.redisClientDelegate = redisClientDelegate;
this.nodeIdentity = nodeIdentity;
this.intervalProvider = intervalProvider;
this.nodeStatusProvider = nodeStatusProvider;
this.agentExecutionPool = agentExecutionPool;
this.enabledAgentPattern = Pattern.compile(enabledAgentPattern);
this.dynamicConfigService = dynamicConfigService;
this.shardingFilter = shardingFilter;
Integer lockInterval =
agentLockAcquisitionIntervalSeconds == null ? 1 : agentLockAcquisitionIntervalSeconds;

Expand All @@ -131,7 +137,7 @@ private Map<String, NextAttempt> acquire() {
new ArrayList<>(agents.entrySet());
Collections.shuffle(agentsEntrySet);
for (Map.Entry<String, AgentExecutionAction> agent : agentsEntrySet) {
if (!skip.contains(agent.getKey())) {
if (shardingFilter.filter(agent.getValue().getAgent()) && !skip.contains(agent.getKey())) {
final String agentType = agent.getKey();
AgentIntervalProvider.Interval interval =
intervalProvider.getInterval(agent.getValue().getAgent());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2021 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.config;

import com.netflix.spinnaker.cats.cluster.DefaultNodeIdentity;
import com.netflix.spinnaker.cats.cluster.ShardingFilter;
import com.netflix.spinnaker.cats.redis.cluster.CachingPodsObserver;
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService;
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty(value = "caching.write-enabled", matchIfMissing = true)
public class RedisShardingFilterConfiguration {

@Bean
@ConditionalOnExpression(
"${redis.enabled:true} && ${redis.scheduler.enabled:true} && ${cache-sharding.enabled:false}")
ShardingFilter shardingFilter(
RedisClientDelegate redisClientDelegate, DynamicConfigService dynamicConfigService) {
return new CachingPodsObserver(
redisClientDelegate, new DefaultNodeIdentity(), dynamicConfigService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.netflix.spinnaker.cats.agent.ExecutionInstrumentation
import com.netflix.spinnaker.cats.cluster.DefaultAgentIntervalProvider
import com.netflix.spinnaker.cats.cluster.DefaultNodeIdentity
import com.netflix.spinnaker.cats.cluster.DefaultNodeStatusProvider
import com.netflix.spinnaker.cats.cluster.NoopShardingFilter
import com.netflix.spinnaker.cats.test.ManualRunnableScheduler
import com.netflix.spinnaker.cats.test.TestAgent
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
Expand Down Expand Up @@ -65,7 +66,8 @@ class ClusteredAgentSchedulerSpec extends Specification {
agentExecutionScheduler,
".*",
null,
dcs
dcs,
new NoopShardingFilter()
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,4 @@ class SqlCacheConfiguration {
fun nodeStatusProvider(discoveryStatusListener: DiscoveryStatusListener): NodeStatusProvider {
return DiscoveryStatusNodeStatusProvider(discoveryStatusListener)
}

@Bean
@ConditionalOnMissingBean(ShardingFilter::class)
fun shardingFilter() : ShardingFilter {
return NoopShardingFilter()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.netflix.spinnaker.cats.cluster.AgentIntervalProvider;
import com.netflix.spinnaker.cats.cluster.DefaultNodeIdentity;
import com.netflix.spinnaker.cats.cluster.NodeStatusProvider;
import com.netflix.spinnaker.cats.cluster.ShardingFilter;
import com.netflix.spinnaker.cats.redis.cluster.ClusteredAgentScheduler;
import com.netflix.spinnaker.cats.redis.cluster.ClusteredSortAgentScheduler;
import com.netflix.spinnaker.clouddriver.core.RedisConfigurationProperties;
Expand All @@ -43,7 +44,8 @@ AgentScheduler redisAgentScheduler(
JedisPool jedisPool,
AgentIntervalProvider agentIntervalProvider,
NodeStatusProvider nodeStatusProvider,
DynamicConfigService dynamicConfigService) {
DynamicConfigService dynamicConfigService,
ShardingFilter shardingFilter) {
if (redisConfigurationProperties.getScheduler().equalsIgnoreCase("default")) {
URI redisUri = URI.create(redisConfigurationProperties.getConnection());
String redisHost = redisUri.getHost();
Expand All @@ -58,7 +60,8 @@ AgentScheduler redisAgentScheduler(
nodeStatusProvider,
redisConfigurationProperties.getAgent().getEnabledPattern(),
redisConfigurationProperties.getAgent().getAgentLockAcquisitionIntervalSeconds(),
dynamicConfigService);
dynamicConfigService,
shardingFilter);
} else if (redisConfigurationProperties.getScheduler().equalsIgnoreCase("sort")) {
return new ClusteredSortAgentScheduler(
jedisPool,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2021 OpsMx.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.clouddriver.cache;

import com.netflix.spinnaker.cats.cluster.NoopShardingFilter;
import com.netflix.spinnaker.cats.cluster.ShardingFilter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty(value = "caching.write-enabled", matchIfMissing = true)
public class NoopShardingFilterConfig {

@Bean
@ConditionalOnMissingBean(ShardingFilter.class)
ShardingFilter shardingFilter() {
return new NoopShardingFilter();
}
}

0 comments on commit 205a83a

Please sign in to comment.