Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(cats/sql): made sharding configuration consistent #5512

Merged
merged 2 commits into from Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -24,6 +24,7 @@ import com.netflix.spinnaker.cats.cluster.ShardingFilter
import com.netflix.spinnaker.cats.sql.SqlUtil
import com.netflix.spinnaker.clouddriver.core.provider.CoreProvider
import com.netflix.spinnaker.config.ConnectionPools
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
import com.netflix.spinnaker.kork.sql.routing.withPool
import org.jooq.DSLContext
import org.jooq.impl.DSL
Expand All @@ -40,14 +41,16 @@ class SqlCachingPodsObserver (
private val jooq: DSLContext,
private val nodeIdentity: NodeIdentity,
private val tableNamespace: String? = null,
private val liveReplicasRecheckIntervalSeconds : Long? = null,
private val dynamicConfigService : DynamicConfigService,
private val liveReplicasScheduler: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
ThreadFactoryBuilder().setNameFormat(SqlCachingPodsObserver::class.java.simpleName + "-%d").build()
)
) : ShardingFilter, Runnable{
private val log = LoggerFactory.getLogger(javaClass)
private var podCount: Int = 0
private var podIndex: Int = -1
private var ttlSeconds = dynamicConfigService.getConfig(Long::class.java, "cache-sharding.replica-ttl-seconds", 60)

companion object {
private val POOL_NAME = ConnectionPools.CACHE_WRITER.value
const val LAST_HEARTBEAT_TIME = "last_heartbeat_time"
Expand All @@ -66,8 +69,9 @@ class SqlCachingPodsObserver (
SqlUtil.createTableLike(jooq, replicasTable, replicasReferenceTable)
}
}
refreshHeartbeat(TimeUnit.SECONDS.toMillis(60))
val recheckInterval = liveReplicasRecheckIntervalSeconds ?: 30L
refreshHeartbeat(TimeUnit.SECONDS.toMillis(ttlSeconds))
val recheckInterval =
dynamicConfigService.getConfig(Long::class.java, "cache-sharding.heartbeat-interval-seconds", 30)
liveReplicasScheduler.scheduleAtFixedRate(this, 0, recheckInterval, TimeUnit.SECONDS)
log.info("Account based sharding across caching pods is enabled.")
}
Expand Down
Expand Up @@ -18,11 +18,10 @@
package com.netflix.spinnaker.config

import com.netflix.spinnaker.cats.cluster.DefaultNodeIdentity
import com.netflix.spinnaker.cats.cluster.NodeIdentity
import com.netflix.spinnaker.cats.sql.cluster.SqlCachingPodsObserver
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
import org.jooq.DSLContext
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
Expand All @@ -36,18 +35,19 @@ class SqlShardingFilterConfiguration {
value = [
"sql.enabled",
"sql.scheduler.enabled",
"caching.sharding-enabled"
"cache-sharding.enabled"
]
)
fun shardingFilter(
jooq: DSLContext,
@Value("\${sql.table-namespace:#{null}}") tableNamespace: String?,
sqlAgentProperties: SqlAgentProperties): SqlCachingPodsObserver {
dynamicConfigService: DynamicConfigService
): SqlCachingPodsObserver {
return SqlCachingPodsObserver(
jooq = jooq,
nodeIdentity = DefaultNodeIdentity(),
tableNamespace = tableNamespace,
liveReplicasRecheckIntervalSeconds = sqlAgentProperties.poll.intervalSeconds
dynamicConfigService = dynamicConfigService
)
}

Expand Down