Skip to content

Commit

Permalink
feat(titus): Force cache refresh support for server groups
Browse files Browse the repository at this point in the history
This change should eliminate (or greatly reduce!) the chance for stale
reads that would otherwise occur after a server group has been modified.

In particular, it allows for an orchestrated rollback to work correctly.
Previously all `WaitForCapacityMatch` tasks completed immediately as
they were targeting the previous/stale state vs. the new desired state.
  • Loading branch information
ajordens authored and tomaslin committed Mar 26, 2018
1 parent bff2873 commit d350c33
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 11 deletions.
Expand Up @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.cats.agent.CachingAgent
import com.netflix.spinnaker.clouddriver.cache.OnDemandMetricsSupport
import com.netflix.spinnaker.clouddriver.security.AccountCredentialsRepository
import com.netflix.spinnaker.clouddriver.titus.TitusClientProvider
import com.netflix.spinnaker.clouddriver.titus.TitusCloudProvider
Expand All @@ -46,17 +47,28 @@ class TitusCachingProviderConfig {
@Bean
@DependsOn('netflixTitusCredentials')
TitusCachingProvider titusCachingProvider(AccountCredentialsRepository accountCredentialsRepository,
TitusCloudProvider titusCloudProvider,
TitusClientProvider titusClientProvider,
ObjectMapper objectMapper,
Provider<AwsLookupUtil> awsLookupUtilProvider
) {
Registry registry,
Provider<AwsLookupUtil> awsLookupUtilProvider) {
List<CachingAgent> agents = []
def allAccounts = accountCredentialsRepository.all.findAll {
it instanceof NetflixTitusCredentials
} as Collection<NetflixTitusCredentials>
allAccounts.each { NetflixTitusCredentials account ->
account.regions.each { region ->
agents << new TitusClusterCachingAgent(titusClientProvider, account, region.name, objectMapper, awsLookupUtilProvider, pollIntervalMillis, timeOutMilis)
agents << new TitusClusterCachingAgent(
titusCloudProvider,
titusClientProvider,
account,
region.name,
objectMapper,
registry,
awsLookupUtilProvider,
pollIntervalMillis,
timeOutMilis
)
}
}
new TitusCachingProvider(agents)
Expand Down
Expand Up @@ -18,19 +18,26 @@ package com.netflix.spinnaker.clouddriver.titus.caching.agents

import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.protobuf.util.JsonFormat
import com.netflix.frigga.Names
import com.netflix.frigga.autoscaling.AutoScalingGroupNameBuilder
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.cats.agent.AgentDataType
import com.netflix.spinnaker.cats.agent.CacheResult
import com.netflix.spinnaker.cats.agent.CachingAgent
import com.netflix.spinnaker.cats.agent.DefaultCacheResult
import com.netflix.spinnaker.cats.cache.CacheData
import com.netflix.spinnaker.cats.cache.DefaultCacheData
import com.netflix.spinnaker.cats.cache.RelationshipCacheFilter
import com.netflix.spinnaker.cats.provider.ProviderCache
import com.netflix.spinnaker.clouddriver.cache.CustomScheduledAgent
import com.netflix.spinnaker.clouddriver.cache.OnDemandAgent
import com.netflix.spinnaker.clouddriver.cache.OnDemandMetricsSupport
import com.netflix.spinnaker.clouddriver.model.HealthState
import com.netflix.spinnaker.clouddriver.titus.TitusClientProvider
import com.netflix.spinnaker.clouddriver.titus.TitusCloudProvider
import com.netflix.spinnaker.clouddriver.titus.caching.Keys
import com.netflix.spinnaker.clouddriver.titus.caching.TitusCachingProvider
import com.netflix.spinnaker.clouddriver.titus.caching.utils.AwsLookupUtil
Expand All @@ -54,7 +61,7 @@ import static com.netflix.spinnaker.cats.agent.AgentDataType.Authority.INFORMATI
import static com.netflix.spinnaker.clouddriver.core.provider.agent.Namespace.HEALTH
import static com.netflix.spinnaker.clouddriver.titus.caching.Keys.Namespace.*

class TitusClusterCachingAgent implements CachingAgent, CustomScheduledAgent {
class TitusClusterCachingAgent implements CachingAgent, CustomScheduledAgent, OnDemandAgent {

private static final Logger log = LoggerFactory.getLogger(TitusClusterCachingAgent)

Expand All @@ -65,26 +72,36 @@ class TitusClusterCachingAgent implements CachingAgent, CustomScheduledAgent {
AUTHORITATIVE.forType(INSTANCES.ns)
] as Set)

private final TitusCloudProvider titusCloudProvider
private final TitusClient titusClient
private final TitusAutoscalingClient titusAutoscalingClient
private final NetflixTitusCredentials account
private final String region
private final ObjectMapper objectMapper
private final OnDemandMetricsSupport metricsSupport
private final Provider<AwsLookupUtil> awsLookupUtil
private final long pollIntervalMillis
private final long timeoutMillis

TitusClusterCachingAgent(TitusClientProvider titusClientProvider,
TitusClusterCachingAgent(TitusCloudProvider titusCloudProvider,
TitusClientProvider titusClientProvider,
NetflixTitusCredentials account,
String region,
ObjectMapper objectMapper,
Registry registry,
Provider<AwsLookupUtil> awsLookupUtil,
pollIntervalMillis,
timeoutMillis
) {
timeoutMillis) {
this.account = account
this.region = region

this.titusCloudProvider = titusCloudProvider
this.objectMapper = objectMapper
this.metricsSupport = new OnDemandMetricsSupport(
registry,
this,
"${titusCloudProvider.id}:${OnDemandAgent.OnDemandType.ServerGroup}" as String
)
this.titusClient = titusClientProvider.getTitusClient(account, region)
this.titusAutoscalingClient = titusClientProvider.getTitusAutoscalingClient(account, region)
this.awsLookupUtil = awsLookupUtil
Expand All @@ -97,6 +114,88 @@ class TitusClusterCachingAgent implements CachingAgent, CustomScheduledAgent {
TitusCachingProvider.PROVIDER_NAME
}

@Override
String getOnDemandAgentType() {
return "${getAgentType()}-OnDemand"
}

@Override
OnDemandMetricsSupport getMetricsSupport() {
return metricsSupport
}

@Override
boolean handles(OnDemandAgent.OnDemandType type, String cloudProvider) {
return type == OnDemandAgent.OnDemandType.ServerGroup && cloudProvider == titusCloudProvider.id
}

@Override
OnDemandAgent.OnDemandResult handle(ProviderCache providerCache, Map<String, ?> data) {
if (["serverGroupName", "account", "region"].any { !data.containsKey(it) }) {
return null
}

if (account.name != data.account) {
return null
}

if (region != data.region) {
return null
}

Job job = metricsSupport.readData {
titusClient.findJobByName(data.serverGroupName as String)
}

CacheResult result = metricsSupport.transformData { buildCacheResult([job]) }
def cacheResultAsJson = objectMapper.writeValueAsString(result.cacheResults)
def serverGroupKey = Keys.getServerGroupKey(job.name, account.name, region)

if (result.cacheResults.values().flatten().isEmpty()) {
providerCache.evictDeletedItems(ON_DEMAND.ns, [serverGroupKey])
} else {
def cacheData = metricsSupport.onDemandStore {
new DefaultCacheData(
serverGroupKey,
10 * 60, // ttl is 10 minutes,
[
cacheTime : new Date(),
cacheResults : cacheResultAsJson
],
[:]
)
}
providerCache.putCacheData(ON_DEMAND.ns, cacheData)
}

Map<String, Collection<String>> evictions = job ? [:] : [(SERVER_GROUPS.ns): [serverGroupKey]]

log.info("onDemand cache refresh (data: ${data}, evictions: ${evictions}, cacheResult: ${cacheResultAsJson})")
new OnDemandAgent.OnDemandResult(
sourceAgentType: getOnDemandAgentType(),
cacheResult: result,
evictions: evictions
)
}

@Override
Collection<Map> pendingOnDemandRequests(ProviderCache providerCache) {
Set<String> keys = providerCache.getIdentifiers('onDemand').findAll {
def key = Keys.parse(it)
return key && key.type == SERVER_GROUPS.ns && key.account == account.name && key.region == region
}

return providerCache.getAll('onDemand', keys, RelationshipCacheFilter.none()).collect {
[
id : it.id,
details : Keys.parse(it.id),
cacheTime : it.attributes.cacheTime,
processedCount: it.attributes.processedCount,
processedTime : it.attributes.processedTime
]
}
}

@Override
String getAgentType() {
"${account.name}/${region}/${TitusClusterCachingAgent.simpleName}"
Expand Down Expand Up @@ -140,11 +239,36 @@ class TitusClusterCachingAgent implements CachingAgent, CustomScheduledAgent {

@Override
CacheResult loadData(ProviderCache providerCache) {
Long start = System.currentTimeMillis()
List<Job> jobs = titusClient.getAllJobs()
return buildCacheResult(jobs)

List<CacheData> evictFromOnDemand = []
List<CacheData> keepInOnDemand = []

def serverGroupKeys = jobs.collect { job -> Keys.getServerGroupKey(job.name, account.name, region) }

providerCache.getAll(ON_DEMAND.ns, serverGroupKeys).each { CacheData onDemandEntry ->
if (onDemandEntry.attributes.cacheTime < start && onDemandEntry.attributes.processedCount > 0) {
evictFromOnDemand << onDemandEntry
} else {
keepInOnDemand << onDemandEntry
}
}

def onDemandMap = keepInOnDemand.collectEntries { CacheData onDemandEntry -> [(onDemandEntry.id): onDemandEntry] }
CacheResult result = buildCacheResult(jobs, onDemandMap, evictFromOnDemand*.id)

result.cacheResults[ON_DEMAND.ns].each { CacheData onDemandEntry ->
onDemandEntry.attributes.processedTime = System.currentTimeMillis()
onDemandEntry.attributes.processedCount = (onDemandEntry.attributes.processedCount ?: 0) + 1
}

return result
}

private CacheResult buildCacheResult(List<Job> jobs) {
private CacheResult buildCacheResult(List<Job> jobs,
Map<String, CacheData> onDemandKeep = [:],
List<String> onDemandEvict = []) {
Map<String, CacheData> applications = createCache()
Map<String, CacheData> clusters = createCache()
Map<String, CacheData> serverGroups = createCache()
Expand Down Expand Up @@ -174,9 +298,10 @@ class TitusClusterCachingAgent implements CachingAgent, CustomScheduledAgent {
[(APPLICATIONS.ns) : applications.values(),
(CLUSTERS.ns) : clusters.values(),
(SERVER_GROUPS.ns): serverGroups.values(),
(INSTANCES.ns) : instances.values()
(INSTANCES.ns) : instances.values(),
(ON_DEMAND.ns) : onDemandKeep.values()
],
[:]
[(ON_DEMAND.ns): onDemandEvict]
)
}

Expand Down

0 comments on commit d350c33

Please sign in to comment.