Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sql): conditionally compress large pipeline execution bodies #4620

Merged
merged 9 commits into from
Jan 31, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.config.ExecutionCompressionProperties
import com.netflix.spinnaker.config.ObjectMapperSubtypeProperties
import com.netflix.spinnaker.config.OrcaSqlProperties
import com.netflix.spinnaker.config.SpringObjectMapperConfigurer
Expand Down Expand Up @@ -119,15 +120,17 @@ class SqlTestConfig {
mapper: ObjectMapper,
registry: Registry,
properties: SqlProperties,
orcaSqlProperties: OrcaSqlProperties
orcaSqlProperties: OrcaSqlProperties,
compressionProperties: ExecutionCompressionProperties
) = SqlExecutionRepository(
orcaSqlProperties.partitionName,
dsl,
mapper,
properties.retries.transactions,
orcaSqlProperties.batchReadSize,
orcaSqlProperties.stageReadSize,
interlink = null
interlink = null,
compressionProperties = compressionProperties
)

@Bean
Expand Down Expand Up @@ -167,6 +170,7 @@ class SqlTestConfig {
classes = [
SqlTestConfig::class,
SqlProperties::class,
ExecutionCompressionProperties::class,
TestConfig::class,
DynamicConfigService.NoopDynamicConfig::class,
EmbeddedRedisConfiguration::class,
Expand Down
2 changes: 2 additions & 0 deletions orca-sql/orca-sql.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ dependencies {
testImplementation("io.strikt:strikt-core")
testImplementation("org.assertj:assertj-core")
testImplementation("org.junit.jupiter:junit-jupiter-api")
testImplementation("org.junit.jupiter:junit-jupiter-params")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("dev.minutest:minutest")
testImplementation("com.nhaarman:mockito-kotlin")
testImplementation("org.testcontainers:mysql")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2021 Salesforce, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.config

import java.io.InputStream
import java.io.OutputStream
import java.util.zip.DeflaterOutputStream
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
import java.util.zip.InflaterInputStream
import org.springframework.boot.context.properties.ConfigurationProperties

/**
* Defines properties to be used when compressing large execution bodies
* These properties apply when execution bodies are being upserted for
* 1. Orchestration bodies
* 2. Orchestration stage bodies
* 3. Pipeline bodies
* 4. Pipeline Stage bodies
*/
@ConfigurationProperties("execution-repository.sql.compression")
class ExecutionCompressionProperties {

/**
* Enables execution body compression for large stage and pipeline execution bodies
*/
var enabled: Boolean = false

/**
* Determines whether writing compressed bodies is enabled, or only reading.
* Only relevant when enabled is true.
*/
var compressionMode: CompressionMode = CompressionMode.READ_WRITE;

/**
* Defines the body size threshold, in bytes, above which the body will be compressed before
* upsertion
*/
var bodyCompressionThreshold: Int = 1024

/**
* Controls the library to be used when compressing bodies
*/
var compressionType: CompressionType = CompressionType.ZLIB

fun isWriteEnabled() = enabled && (compressionMode == CompressionMode.READ_WRITE)
}

/**
* Enum defining the support compression types
*/
enum class CompressionType(val type: String) {
GZIP("GZIP"),
ZLIB("ZLIB");

fun getDeflator(outStream: OutputStream) =
when (this) {
GZIP -> GZIPOutputStream(outStream)
ZLIB -> DeflaterOutputStream(outStream)
}

fun getInflator(inStream: InputStream) =
when (this) {
GZIP -> GZIPInputStream(inStream)
ZLIB -> InflaterInputStream(inStream)
}
}

enum class CompressionMode {
READ_WRITE,
READ_ONLY;
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import javax.sql.DataSource

@Configuration
@ConditionalOnProperty("sql.enabled")
@EnableConfigurationProperties(OrcaSqlProperties::class)
@EnableConfigurationProperties(OrcaSqlProperties::class, ExecutionCompressionProperties::class)
@Import(DefaultSqlConfiguration::class)
@ComponentScan("com.netflix.spinnaker.orca.sql")

Expand All @@ -72,7 +72,8 @@ class SqlConfiguration {
properties: SqlProperties,
orcaSqlProperties: OrcaSqlProperties,
interlink: Optional<Interlink>,
executionRepositoryListeners: Collection<ExecutionRepositoryListener>
executionRepositoryListeners: Collection<ExecutionRepositoryListener>,
compressionProperties: ExecutionCompressionProperties
) =
SqlExecutionRepository(
orcaSqlProperties.partitionName,
Expand All @@ -82,7 +83,8 @@ class SqlConfiguration {
orcaSqlProperties.batchReadSize,
orcaSqlProperties.stageReadSize,
interlink = interlink.orElse(null),
executionRepositoryListeners = executionRepositoryListeners
executionRepositoryListeners = executionRepositoryListeners,
compressionProperties = compressionProperties
).let {
InstrumentedProxy.proxy(registry, it, "sql.executions", mapOf(Pair("repository", "primary"))) as ExecutionRepository
}
Expand All @@ -95,7 +97,8 @@ class SqlConfiguration {
registry: Registry,
properties: SqlProperties,
orcaSqlProperties: OrcaSqlProperties,
@Value("\${execution-repository.sql.secondary.pool-name}") poolName: String
@Value("\${execution-repository.sql.secondary.pool-name}") poolName: String,
compressionProperties: ExecutionCompressionProperties
) =
SqlExecutionRepository(
orcaSqlProperties.partitionName,
Expand All @@ -104,7 +107,8 @@ class SqlConfiguration {
properties.retries.transactions,
orcaSqlProperties.batchReadSize,
orcaSqlProperties.stageReadSize,
poolName
poolName,
compressionProperties = compressionProperties
).let {
InstrumentedProxy.proxy(registry, it, "sql.executions", mapOf(Pair("repository", "secondary"))) as ExecutionRepository
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ package com.netflix.spinnaker.orca.sql.pipeline.persistence

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.google.common.annotations.VisibleForTesting
import com.netflix.spinnaker.config.CompressionType
import com.netflix.spinnaker.config.ExecutionCompressionProperties
import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution
import java.sql.ResultSet
import org.jooq.DSLContext
import org.slf4j.LoggerFactory
import java.nio.charset.StandardCharsets

/**
* Converts a SQL [ResultSet] into an Execution.
Expand All @@ -31,31 +35,60 @@ import org.slf4j.LoggerFactory
*/
class ExecutionMapper(
private val mapper: ObjectMapper,
private val stageBatchSize: Int
private val stageBatchSize: Int,
private val compressionProperties: ExecutionCompressionProperties
) {

private val log = LoggerFactory.getLogger(javaClass)

/**
* Conditionally decompresses a compressed execution body. if present, and provides the
* execution body content as a string
*
* @param rs [ResultSet] to pull the body from
*
* @return the decompressed execution body content
*/
@VisibleForTesting
fun getDecompressedBody(rs: ResultSet): String {
val body: String? = rs.getString("body")

// If compression is disabled, rs doesn't contain compression-related
// fields, so don't try to access them.
return if (compressionProperties.enabled && body.isNullOrEmpty()) {
val compressionType = CompressionType.valueOf(rs.getString("compression_type"))
val compressedBody = rs.getBytes("compressed_body")
compressionType.getInflator(compressedBody.inputStream())
.bufferedReader(StandardCharsets.UTF_8)
.use { it.readText() }
} else {
body ?: ""
}
}

fun map(rs: ResultSet, context: DSLContext): Collection<PipelineExecution> {
val results = mutableListOf<PipelineExecution>()
val executionMap = mutableMapOf<String, PipelineExecution>()
val legacyMap = mutableMapOf<String, String>()

while (rs.next()) {
mapper.readValue<PipelineExecution>(rs.getString("body"))
.also {
execution ->
results.add(execution)
execution.partition = rs.getString("partition")
val body = getDecompressedBody(rs)
if (body.isNotEmpty()) {
mapper.readValue<PipelineExecution>(body)
.also {
execution ->
results.add(execution)
execution.partition = rs.getString("partition")

if (rs.getString("id") != execution.id) {
// Map legacyId executions to their current ULID
legacyMap[execution.id] = rs.getString("id")
executionMap[rs.getString("id")] = execution
} else {
executionMap[execution.id] = execution
if (rs.getString("id") != execution.id) {
// Map legacyId executions to their current ULID
legacyMap[execution.id] = rs.getString("id")
executionMap[rs.getString("id")] = execution
} else {
executionMap[execution.id] = execution
}
}
}
}
}

if (results.isNotEmpty()) {
Expand All @@ -70,7 +103,7 @@ class ExecutionMapper(
}
}

context.selectExecutionStages(type, executionIds).let { stageResultSet ->
context.selectExecutionStages(type, executionIds, compressionProperties).let { stageResultSet ->
while (stageResultSet.next()) {
mapStage(stageResultSet, executionMap)
}
Expand All @@ -90,7 +123,7 @@ class ExecutionMapper(
executions.getValue(executionId)
.stages
.add(
mapper.readValue<StageExecution>(rs.getString("body"))
mapper.readValue<StageExecution>(getDecompressedBody(rs))
.apply {
execution = executions.getValue(executionId)
}
Expand Down