diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/config/RedisConfiguration.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/config/RedisConfiguration.java index 7835dc850e..cff2db0f0c 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/config/RedisConfiguration.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/config/RedisConfiguration.java @@ -27,6 +27,7 @@ import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.*; @@ -67,6 +68,7 @@ public ExecutionRepository redisExecutionRepository( value = "redis.cluster-enabled", havingValue = "false", matchIfMissing = true) + @ConditionalOnMissingBean(NotificationClusterLock.class) public NotificationClusterLock redisNotificationClusterLock( RedisClientSelector redisClientSelector) { return new RedisNotificationClusterLock(redisClientSelector); @@ -74,6 +76,7 @@ public NotificationClusterLock redisNotificationClusterLock( @Bean @ConditionalOnProperty(value = "redis.cluster-enabled") + @ConditionalOnMissingBean(NotificationClusterLock.class) public NotificationClusterLock redisClusterNotificationClusterLock(JedisCluster cluster) { return new RedisClusterNotificationClusterLock(cluster); } diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt index 2288cd4fac..2a073d9a13 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt @@ -18,6 +18,8 @@ package com.netflix.spinnaker.config import com.fasterxml.jackson.databind.ObjectMapper import com.netflix.spectator.api.Registry import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService +import com.netflix.spinnaker.orca.notifications.NotificationClusterLock +import com.netflix.spinnaker.orca.notifications.SqlNotificationClusterLock import com.netflix.spinnaker.orca.sql.JooqSqlCommentAppender import com.netflix.spinnaker.orca.sql.JooqToSpringExceptionTransformer import com.netflix.spinnaker.orca.sql.QueryLogger @@ -34,6 +36,7 @@ import org.jooq.impl.DataSourceConnectionProvider import org.jooq.impl.DefaultConfiguration import org.jooq.impl.DefaultDSLContext import org.jooq.impl.DefaultExecuteListenerProvider +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration import org.springframework.boot.context.properties.EnableConfigurationProperties @@ -41,12 +44,14 @@ import org.springframework.context.annotation.Bean import org.springframework.context.annotation.ComponentScan import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Import +import org.springframework.context.annotation.Primary import org.springframework.jdbc.datasource.DataSourceTransactionManager import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy import sun.net.InetAddressCachePolicy import java.lang.reflect.Field import java.security.Security import java.sql.Connection +import java.time.Clock import javax.sql.DataSource @Configuration @@ -126,6 +131,20 @@ class SqlConfiguration { sqlProperties: SqlProperties ) = SqlHealthIndicator(sqlHealthcheckActivator, sqlProperties.connectionPool.dialect) + + @ConditionalOnProperty("execution-repository.sql.enabled") + @ConditionalOnMissingBean(NotificationClusterLock::class) + @Primary + @Bean + fun sqlNotificationClusterLock( + jooq: DSLContext, + clock: Clock, + properties: SqlProperties + ) = SqlNotificationClusterLock( + jooq = jooq, + clock = clock, + retryProperties = properties.transactionRetry + ) } /** diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/notifications/SqlNotificationClusterLock.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/notifications/SqlNotificationClusterLock.kt new file mode 100644 index 0000000000..d97e3700af --- /dev/null +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/notifications/SqlNotificationClusterLock.kt @@ -0,0 +1,86 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.notifications + +import com.netflix.spinnaker.config.TransactionRetryProperties +import io.github.resilience4j.retry.Retry +import io.github.resilience4j.retry.RetryConfig +import io.vavr.control.Try +import org.jooq.DSLContext +import org.jooq.exception.SQLDialectNotSupportedException +import org.jooq.impl.DSL +import org.slf4j.LoggerFactory +import java.time.Clock +import java.time.Duration + +class SqlNotificationClusterLock( + private val jooq: DSLContext, + private val clock: Clock, + private val retryProperties: TransactionRetryProperties +) : NotificationClusterLock { + + companion object { + private val lockTable = DSL.table("notification_lock") + private val lockField = DSL.field("lock_name") + private val expiryField = DSL.field("expiry") + + private val log = LoggerFactory.getLogger(NotificationClusterLock::class.java) + } + + init { + log.info("Configured $javaClass for NotificationClusterLock") + } + + override fun tryAcquireLock(notificationType: String, lockTimeoutSeconds: Long): Boolean { + val now = clock.instant() + + var changed = withRetry { + jooq.insertInto(lockTable) + .set(lockField, notificationType) + .set(expiryField, now.plusSeconds(lockTimeoutSeconds).toEpochMilli()) + .onDuplicateKeyIgnore() + .execute() + } + + if (changed == 0) { + changed = withRetry { + jooq.update(lockTable) + .set(expiryField, now.plusSeconds(lockTimeoutSeconds).toEpochMilli()) + .where( + lockField.eq(notificationType), + expiryField.lt(now.toEpochMilli()) + ) + .execute() + } + } + + return changed == 1 + } + + private fun withRetry(action: () -> T): T { + val retry = Retry.of( + "sqlWrite", + RetryConfig.custom() + .maxAttempts(retryProperties.maxRetries) + .waitDuration(Duration.ofMillis(retryProperties.backoffMs)) + .ignoreExceptions(SQLDialectNotSupportedException::class.java) + .build() + ) + + return Try.ofSupplier(Retry.decorateSupplier(retry, action)).get() + } +} diff --git a/orca-sql/src/main/resources/db/changelog-master.yml b/orca-sql/src/main/resources/db/changelog-master.yml index 85e5848eda..087892a524 100644 --- a/orca-sql/src/main/resources/db/changelog-master.yml +++ b/orca-sql/src/main/resources/db/changelog-master.yml @@ -38,3 +38,6 @@ databaseChangeLog: - include: file: changelog/20190711-create-pending-table.yml relativeToChangelogFile: true +- include: + file: changelog/20190911-notification-cluster-lock.yml + relativeToChangelogFile: true diff --git a/orca-sql/src/main/resources/db/changelog/20190911-notification-cluster-lock.yml b/orca-sql/src/main/resources/db/changelog/20190911-notification-cluster-lock.yml new file mode 100644 index 0000000000..c88c74e49a --- /dev/null +++ b/orca-sql/src/main/resources/db/changelog/20190911-notification-cluster-lock.yml @@ -0,0 +1,19 @@ +databaseChangeLog: + - changeSet: + id: create-notification-lock-table + author: afeldman + changes: + - createTable: + tableName: notification_lock + columns: + - column: + name: lock_name + type: varchar(256) + constraints: + primaryKey: true + nullable: false + - column: + name: expiry + type: bigint(13) + constraints: + nullable: false