Skip to content

Commit

Permalink
feat(peering): add ability to author custom peering agents (#3701)
Browse files Browse the repository at this point in the history
* feat(peering): add ability to author custom peering agents

This change adds an extension point (in the form of beans injected into the `PeeringAgent`) to allow peering of custom data in the `orca` [SQL] DB.
Currently, the custom peering agents are called without much context as to what the main peering agent is doing (they are simply called at the end of every agent tick, after all main peering agent logic completes).
This can be changed later if needed but for now I wanted to keep the two as independant and simple as possible.

* fix(custom peering): add init method

* fixup! fix(custom peering): add init method

* fixup! fixup! fix(custom peering): add init method

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
marchello2000 and mergify[bot] committed Jun 3, 2020
1 parent 9aaa8e8 commit 859082e
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
import com.netflix.spinnaker.orca.notifications.NotificationClusterLock
import com.netflix.spinnaker.orca.peering.CustomPeerer
import com.netflix.spinnaker.orca.peering.ExecutionCopier
import com.netflix.spinnaker.orca.peering.MySqlRawAccess
import com.netflix.spinnaker.orca.peering.PeeringAgent
Expand Down Expand Up @@ -47,7 +48,8 @@ class PeeringAgentConfiguration {
clusterLock: NotificationClusterLock,
dynamicConfigService: DynamicConfigService,
registry: Registry,
properties: PeeringAgentConfigurationProperties
properties: PeeringAgentConfigurationProperties,
customPeerer: CustomPeerer?
): PeeringAgent {
if (properties.peerId == null || properties.poolName == null) {
throw ConfigurationException("pollers.peering.id and pollers.peering.poolName must be specified for peering")
Expand Down Expand Up @@ -81,6 +83,7 @@ class PeeringAgentConfiguration {
dynamicConfigService,
metrics,
copier,
customPeerer,
clusterLock)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.peering

/**
* Interface that custom peerers should implement (and expose as a bean)
*/
interface CustomPeerer {
/**
* Let the custom peerer initialize itself
*
* @param srcDb source/foreign (read-only) database, use srcDb.runQuery to get the jooq context to perform queries on
* @param destDb destination/local database
* @param peerId the id of the peer we are peering
*/
fun init(srcDb: SqlRawAccess, destDb: SqlRawAccess, peerId: String)

/**
* doPeer function will be called AFTER all default peering actions are completed for this agent cycle
*
* @return true if peering completed successfully, false otherwise
*/
fun doPeer(): Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ import org.jooq.impl.DSL.table

/**
* Provides raw access to various tables in the orca SQL
* TODO(mvulfson): Need an extension point to deal with cases such as OCA
*/
open class MySqlRawAccess(
private val jooq: DSLContext,
private val poolName: String,
jooq: DSLContext,
poolName: String,
chunkSize: Int
) : SqlRawAccess(chunkSize) {
) : SqlRawAccess(jooq, poolName, chunkSize) {

private var maxPacketSize: Long = 0

Expand Down Expand Up @@ -180,7 +179,7 @@ open class MySqlRawAccess(
}

// Dump it to the DB
batchQuery
persisted += batchQuery
.onDuplicateKeyUpdate()
.set(updateSet)
.execute()
Expand All @@ -195,13 +194,12 @@ open class MySqlRawAccess(
.values(values)
}

persisted++
cumulativeSize += totalRecordSize
}

if (cumulativeSize > 0) {
// Dump the last bit to the DB
batchQuery
persisted += batchQuery
.onDuplicateKeyUpdate()
.set(updateSet)
.execute()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,42 @@ class PeeringAgent(

private val executionCopier: ExecutionCopier,

customPeerer: CustomPeerer?,

clusterLock: NotificationClusterLock
) : AbstractPollingNotificationAgent(clusterLock) {

private val log = LoggerFactory.getLogger(javaClass)
private val customPeerer: CustomPeerer?

private var completedPipelinesMostRecentUpdatedTime = 0L
private var completedOrchestrationsMostRecentUpdatedTime = 0L
private var deletedExecutionCursor = 0

init {
var initSuccess = false

if (customPeerer != null) {
try {
customPeerer.init(srcDB, destDB, peeredId)
initSuccess = true
} catch (e: Exception) {
peeringMetrics.incrementCustomPeererError(customPeerer.javaClass.simpleName, e)
log.error("Failed to initialize custom peerer '${customPeerer.javaClass.simpleName}' - this peerer will not be called", e)
}
}

this.customPeerer = if (initSuccess) customPeerer else null
}

override fun tick() {
if (dynamicConfigService.isEnabled("pollers.peering", true) &&
dynamicConfigService.isEnabled("pollers.peering.$peeredId", true)) {
peeringMetrics.recordOverallLag {
peerExecutions(ExecutionType.PIPELINE)
peerExecutions(ExecutionType.ORCHESTRATION)
peerDeletedExecutions()
invokeCustomPeerer()
}
}
}
Expand Down Expand Up @@ -189,6 +210,28 @@ class PeeringAgent(
}
}

/**
* If we have a custom peerer, invoke it
*/
private fun invokeCustomPeerer() {
if (customPeerer != null) {
val peererName = customPeerer.javaClass.simpleName

try {
log.info("Starting peering with custom peerer '$peererName'")
val peeringSuccess = customPeerer.doPeer()
if (peeringSuccess) {
log.info("Completed peering with custom peerer '$peererName'")
} else {
log.error("Completed peering with custom peerer '$peererName' with errors")
}
} catch (e: Exception) {
peeringMetrics.incrementCustomPeererError(peererName, e)
log.error("Custom peerer '$peererName' failed", e)
}
}
}

private fun doMigrate(executionType: ExecutionType, updatedAfter: Long): Long {
// Compute diff
val completedPipelineKeys = srcDB.getCompletedExecutionIds(executionType, peeredId, updatedAfter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ open class PeeringMetrics(
private val peeringNumDeletedId = registry.createId("pollers.peering.numDeleted").withTag("peerId", peeredId)
private val peeringNumStagesDeletedId = registry.createId("pollers.peering.numStagesDeleted").withTag("peerId", peeredId)
private val peeringNumErrorsId = registry.createId("pollers.peering.numErrors").withTag("peerId", peeredId)
private val peeringCustomPeererNumErrorsId = registry.createId("pollers.peering.customPeerer.numErrors").withTag("peerId", peeredId)

open fun recordOverallLag(block: () -> Unit) {
registry
Expand Down Expand Up @@ -69,6 +70,12 @@ open class PeeringMetrics(
.counter(peeringNumStagesDeletedId.tag(executionType))
.increment(count.toLong())
}

open fun incrementCustomPeererError(peererName: String, exception: Exception) {
registry
.counter(peeringCustomPeererNumErrorsId.withTags("peerer", peererName, "exception", exception.javaClass.simpleName))
.increment()
}
}

internal fun Id.tag(executionType: ExecutionType): Id {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@

package com.netflix.spinnaker.orca.peering

import com.netflix.spinnaker.kork.sql.routing.withPool
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType
import org.jooq.DSLContext
import org.jooq.Record
import org.jooq.Result
import org.slf4j.Logger
import org.slf4j.LoggerFactory

abstract class SqlRawAccess(
val jooq: DSLContext,
val poolName: String,
val chunkSize: Int
) {
val log: Logger = LoggerFactory.getLogger(this.javaClass)
Expand Down Expand Up @@ -75,6 +79,16 @@ abstract class SqlRawAccess(
*/
abstract fun loadRecords(tableName: String, records: Result<Record>): Int

/**
* Essentially an accessor for our DSL context.
* Used by derived classes (for custom peering agent extensions)
*/
public fun <T> runQuery(callback: (DSLContext) -> T): T {
return withPool(poolName) {
callback(jooq)
}
}

data class ExecutionDiffKey(
val id: String,
val updated_at: Long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class PeeringAgentSpec extends Specification {
ExecutionCopier copier = Mock(ExecutionCopier)
def clockDrift = 10

PeeringAgent constructPeeringAgent(DynamicConfigService dynamicConfigService = DynamicConfigService.NOOP) {
PeeringAgent constructPeeringAgent(DynamicConfigService dynamicConfigService = DynamicConfigService.NOOP, CustomPeerer customPeerer = null) {
return new PeeringAgent(
"peeredId",
1000,
Expand All @@ -41,6 +41,7 @@ class PeeringAgentSpec extends Specification {
dynamicConfigService,
metrics,
copier,
customPeerer,
Mock(NotificationClusterLock)
)
}
Expand Down Expand Up @@ -327,6 +328,58 @@ class PeeringAgentSpec extends Specification {
ORCHESTRATION | [key("ID3", 30)] | [key("IDx", 10), key("ID3", 10)] || ["IDx"] | ["ID3"]
}

def "does nothing when no custom peering agent"() {
def peeringAgent = constructPeeringAgent(DynamicConfigService.NOOP)

when:
peeringAgent.invokeCustomPeerer()

then:
noExceptionThrown()
}

def "calls custom peering agent"() {
def customPeerer = Mock(CustomPeerer)
def peeringAgent = constructPeeringAgent(DynamicConfigService.NOOP, customPeerer)

when:
peeringAgent.invokeCustomPeerer()

then:
1 * customPeerer.doPeer()
}

def "errors in custom peerer don't leak out"() {
def customPeerer = new ThrowingPeerer()
def peeringAgent = constructPeeringAgent(DynamicConfigService.NOOP, customPeerer)

when:
peeringAgent.invokeCustomPeerer()

then:
noExceptionThrown()
customPeerer.peerId == peeringAgent.peeredId
customPeerer.srcDb == peeringAgent.srcDB
customPeerer.destDb == peeringAgent.destDB
1 * metrics.incrementCustomPeererError(customPeerer.class.simpleName, _)
}

def "errors in custom peerer init don't leak out"() {
def customPeerer = Mock(CustomPeerer)

when:
def peeringAgent = constructPeeringAgent(DynamicConfigService.NOOP, customPeerer)

then:
1 * customPeerer.init(_, _, _) >> { throw new Exception("Init failed") }

when:
peeringAgent.invokeCustomPeerer()

then: 'failing peerer is not invoked'
0 * customPeerer.doPeer()
}

private static def key(String id, long updated_at) {
return new SqlRawAccess.ExecutionDiffKey(id, updated_at)
}
Expand All @@ -338,4 +391,22 @@ class PeeringAgentSpec extends Specification {
private static def dOKey(int id, String execution_id ) {
return new SqlRawAccess.DeletedExecution(id, execution_id, ORCHESTRATION.toString())
}

private class ThrowingPeerer implements CustomPeerer {
SqlRawAccess srcDb
SqlRawAccess destDb
String peerId

@Override
void init(SqlRawAccess srcDb, SqlRawAccess destDb, String peerId) {
this.srcDb = srcDb
this.destDb = destDb
this.peerId = peerId
}

@Override
boolean doPeer() {
throw new Exception("Custom peering failed")
}
}
}

0 comments on commit 859082e

Please sign in to comment.