Skip to content

Commit

Permalink
feat(sql): sql NotificationClusterLock implementation (#3141)
Browse files Browse the repository at this point in the history
  • Loading branch information
asher committed Sep 11, 2019
1 parent 154daba commit c830a1d
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.*;
Expand Down Expand Up @@ -67,13 +68,15 @@ public ExecutionRepository redisExecutionRepository(
value = "redis.cluster-enabled",
havingValue = "false",
matchIfMissing = true)
@ConditionalOnMissingBean(NotificationClusterLock.class)
public NotificationClusterLock redisNotificationClusterLock(
RedisClientSelector redisClientSelector) {
return new RedisNotificationClusterLock(redisClientSelector);
}

@Bean
@ConditionalOnProperty(value = "redis.cluster-enabled")
@ConditionalOnMissingBean(NotificationClusterLock.class)
public NotificationClusterLock redisClusterNotificationClusterLock(JedisCluster cluster) {
return new RedisClusterNotificationClusterLock(cluster);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package com.netflix.spinnaker.config
import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
import com.netflix.spinnaker.orca.notifications.NotificationClusterLock
import com.netflix.spinnaker.orca.notifications.SqlNotificationClusterLock
import com.netflix.spinnaker.orca.sql.JooqSqlCommentAppender
import com.netflix.spinnaker.orca.sql.JooqToSpringExceptionTransformer
import com.netflix.spinnaker.orca.sql.QueryLogger
Expand All @@ -34,19 +36,22 @@ import org.jooq.impl.DataSourceConnectionProvider
import org.jooq.impl.DefaultConfiguration
import org.jooq.impl.DefaultDSLContext
import org.jooq.impl.DefaultExecuteListenerProvider
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
import org.springframework.context.annotation.Primary
import org.springframework.jdbc.datasource.DataSourceTransactionManager
import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy
import sun.net.InetAddressCachePolicy
import java.lang.reflect.Field
import java.security.Security
import java.sql.Connection
import java.time.Clock
import javax.sql.DataSource

@Configuration
Expand Down Expand Up @@ -126,6 +131,20 @@ class SqlConfiguration {
sqlProperties: SqlProperties
) =
SqlHealthIndicator(sqlHealthcheckActivator, sqlProperties.connectionPool.dialect)

@ConditionalOnProperty("execution-repository.sql.enabled")
@ConditionalOnMissingBean(NotificationClusterLock::class)
@Primary
@Bean
fun sqlNotificationClusterLock(
jooq: DSLContext,
clock: Clock,
properties: SqlProperties
) = SqlNotificationClusterLock(
jooq = jooq,
clock = clock,
retryProperties = properties.transactionRetry
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.notifications

import com.netflix.spinnaker.config.TransactionRetryProperties
import io.github.resilience4j.retry.Retry
import io.github.resilience4j.retry.RetryConfig
import io.vavr.control.Try
import org.jooq.DSLContext
import org.jooq.exception.SQLDialectNotSupportedException
import org.jooq.impl.DSL
import org.slf4j.LoggerFactory
import java.time.Clock
import java.time.Duration

class SqlNotificationClusterLock(
private val jooq: DSLContext,
private val clock: Clock,
private val retryProperties: TransactionRetryProperties
) : NotificationClusterLock {

companion object {
private val lockTable = DSL.table("notification_lock")
private val lockField = DSL.field("lock_name")
private val expiryField = DSL.field("expiry")

private val log = LoggerFactory.getLogger(NotificationClusterLock::class.java)
}

init {
log.info("Configured $javaClass for NotificationClusterLock")
}

override fun tryAcquireLock(notificationType: String, lockTimeoutSeconds: Long): Boolean {
val now = clock.instant()

var changed = withRetry {
jooq.insertInto(lockTable)
.set(lockField, notificationType)
.set(expiryField, now.plusSeconds(lockTimeoutSeconds).toEpochMilli())
.onDuplicateKeyIgnore()
.execute()
}

if (changed == 0) {
changed = withRetry {
jooq.update(lockTable)
.set(expiryField, now.plusSeconds(lockTimeoutSeconds).toEpochMilli())
.where(
lockField.eq(notificationType),
expiryField.lt(now.toEpochMilli())
)
.execute()
}
}

return changed == 1
}

private fun <T> withRetry(action: () -> T): T {
val retry = Retry.of(
"sqlWrite",
RetryConfig.custom<T>()
.maxAttempts(retryProperties.maxRetries)
.waitDuration(Duration.ofMillis(retryProperties.backoffMs))
.ignoreExceptions(SQLDialectNotSupportedException::class.java)
.build()
)

return Try.ofSupplier(Retry.decorateSupplier(retry, action)).get()
}
}
3 changes: 3 additions & 0 deletions orca-sql/src/main/resources/db/changelog-master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ databaseChangeLog:
- include:
file: changelog/20190711-create-pending-table.yml
relativeToChangelogFile: true
- include:
file: changelog/20190911-notification-cluster-lock.yml
relativeToChangelogFile: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
databaseChangeLog:
- changeSet:
id: create-notification-lock-table
author: afeldman
changes:
- createTable:
tableName: notification_lock
columns:
- column:
name: lock_name
type: varchar(256)
constraints:
primaryKey: true
nullable: false
- column:
name: expiry
type: bigint(13)
constraints:
nullable: false

0 comments on commit c830a1d

Please sign in to comment.