Skip to content

Commit

Permalink
feat(sql): Support for a SQL StorageService implementation
Browse files Browse the repository at this point in the history
This PR maintains backwards compatibility and does not modify existing
controllers.

The `CompositeStorageService` allows for dual-read/write between two
storage services (ie. SQL and non-SQL).

The `StorageServiceMigrator` can be used to migrate objects between
two storage services (ie. non-SQL to SQL).

Enabling SQL:

```
sql:
  enabled: true
  connectionPools:
    default:
      jdbcUrl: jdbc:mysql://.....
      user: root
      password: null
      default: true
  migration:
    jdbcUrl: jdbc:mysql://.....
    user: root
    password: null
```

Enabling the `CompositeStorageService`:

```
spinnaker:
  migration:
    compositeStorageService:
      enabled: true
```

Enabling the `StorageServiceMigrator`:

```
spinnaker:
  migration:
    enabled: true
    primaryClass: com.netflix.spinnaker.front50.model.SqlStorageService
    previousClass: com.netflix.spinnaker.front50.model.S3StorageService
```
  • Loading branch information
ajordens committed Jul 8, 2019
1 parent 1899598 commit 08e119e
Show file tree
Hide file tree
Showing 23 changed files with 1,830 additions and 0 deletions.
17 changes: 17 additions & 0 deletions front50-sql/front50-sql.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apply from: "$rootDir/gradle/kotlin.gradle"

dependencies {
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core-common:1.2.1"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-slf4j:1.2.1"

implementation "com.netflix.spinnaker.front50:front50-core:${front50Version}"

implementation "com.netflix.spinnaker.kork:kork-exceptions"
implementation "com.netflix.spinnaker.kork:kork-sql"
implementation "io.github.resilience4j:resilience4j-retry"

runtime "mysql:mysql-connector-java:8.0.12"

testImplementation "com.netflix.spinnaker.kork:kork-sql-test"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.config

import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.front50.migrations.StorageServiceMigrator
import com.netflix.spinnaker.front50.model.CompositeStorageService
import com.netflix.spinnaker.front50.model.StorageService
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Primary

@Configuration
@EnableConfigurationProperties(StorageServiceMigratorConfigurationProperties::class)
class CompositeStorageServiceConfiguration() {
@Bean
@Primary
@ConditionalOnProperty("spinnaker.migration.compositeStorageService.enabled")
fun compositeStorageService(properties: StorageServiceMigratorConfigurationProperties,
storageServices: List<StorageService>) =
CompositeStorageService(
storageServices.first { it.javaClass.canonicalName.equals(properties.primaryClass) },
storageServices.first { it.javaClass.canonicalName.equals(properties.previousClass) }
)

@Bean
@ConditionalOnProperty("spinnaker.migration.enabled")
fun storageServiceMigrator(registry: Registry,
properties: StorageServiceMigratorConfigurationProperties,
storageServices: List<StorageService>) =
StorageServiceMigrator(
registry,
storageServices.first { it.javaClass.canonicalName.equals(properties.primaryClass) },
storageServices.first { it.javaClass.canonicalName.equals(properties.previousClass) }
)
}

@ConfigurationProperties("spinnaker.migration")
data class StorageServiceMigratorConfigurationProperties(
var primaryClass: String? = null,
var previousClass: String? = null
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.config

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.front50.config.CommonStorageServiceDAOConfig
import com.netflix.spinnaker.front50.model.SqlStorageService
import com.netflix.spinnaker.kork.sql.config.DefaultSqlConfiguration
import com.netflix.spinnaker.kork.sql.config.SqlProperties
import org.jooq.DSLContext
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
import java.time.Clock

@Configuration
@ConditionalOnProperty("sql.enabled")
@Import(DefaultSqlConfiguration::class)
class SqlConfiguration : CommonStorageServiceDAOConfig() {

@Bean
fun sqlStorageService(objectMapper: ObjectMapper,
registry: Registry,
jooq: DSLContext,
sqlProperties: SqlProperties): SqlStorageService =
SqlStorageService(objectMapper, registry, jooq, Clock.systemDefaultZone(), sqlProperties.retries)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.front50.migrations

import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.front50.model.ObjectType
import com.netflix.spinnaker.front50.model.StorageService
import com.netflix.spinnaker.front50.model.Timestamped
import com.netflix.spinnaker.security.AuthenticatedRequest
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory
import org.slf4j.MDC
import org.springframework.scheduling.annotation.Scheduled
import kotlin.system.measureTimeMillis

class StorageServiceMigrator(
private val registry: Registry,
private val target: StorageService,
private val source: StorageService
) {

companion object {
private val log = LoggerFactory.getLogger(StorageServiceMigrator::class.java)
}

var migratorObjectsId = registry.createId("storageServiceMigrator.objects")

fun migrate(objectType: ObjectType) {
log.info("Migrating {}", objectType)

val sourceObjectKeys = source.listObjectKeys(objectType)
val targetObjectKeys = target.listObjectKeys(objectType)

val migratableObjectKeys = sourceObjectKeys.filter { e ->
/*
* A migratable object is one that:
* - does not exist in 'target'
* or
* - has been more recently modified in 'source'
*/
!targetObjectKeys.containsKey(e.key) || targetObjectKeys[e.key]!! < e.value
}

if (migratableObjectKeys.isEmpty()) {
log.info(
"No objects to migrate (objectType: {}, sourceObjectCount: {}, targetObjectCount: {})",
objectType,
sourceObjectKeys.size,
targetObjectKeys.size
)

return
}

val deferred = migratableObjectKeys.keys.map { key ->
GlobalScope.async {
try {
val maxObjectVersions = if (objectType == ObjectType.ENTITY_TAGS) {
// current thinking is that ENTITY_TAGS will be separately migrated due to their volume (10-100k+)
1
} else {
// the history api defaults to returning 20 records so its arguably unnecessary to migrate much more than that
30
}

val objectVersions = mutableListOf<Timestamped>()

try {
objectVersions.addAll(source.listObjectVersions<Timestamped>(objectType, key, maxObjectVersions))
} catch (e: Exception) {
log.warn(
"Unable to list object versions (objectType: {}, objectKey: {}), reason: {}",
objectType,
key,
e.message
)

// we have a number of objects in our production bucket with broken permissions that prevent version lookups
// but can be fetched directly w/o versions
objectVersions.add(source.loadObject(objectType, key))
}

objectVersions.reversed().forEach { obj ->
try {
MDC.put(AuthenticatedRequest.Header.USER.header, obj.lastModifiedBy)
target.storeObject(objectType, key, obj)
registry.counter(
migratorObjectsId.withTag("objectType", objectType.name).withTag("success", true)
).increment()
} catch (e: Exception) {
registry.counter(
migratorObjectsId.withTag("objectType", objectType.name).withTag("success", false)
).increment()

throw e
} finally {
MDC.remove(AuthenticatedRequest.Header.USER.header)
}
}
} catch (e: Exception) {
log.error("Unable to migrate (objectType: {}, objectKey: {})", objectType, key, e)
}
}
}

val migrationDurationMs = measureTimeMillis {
runBlocking {
deferred.awaitAll()
}
}

log.info(
"Migration of {} took {}ms (objectCount: {})",
objectType,
migrationDurationMs,
migratableObjectKeys.size
)
}

@Scheduled(fixedDelay = 60000)
fun migrate() {
val migrationDurationMs = measureTimeMillis {
ObjectType.values().forEach {
if (it != ObjectType.ENTITY_TAGS) {
// need an alternative/more performant strategy for entity tags
migrate(it)
}
}
}

log.info("Migration complete in {}ms", migrationDurationMs)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.front50.model

import org.slf4j.LoggerFactory

class CompositeStorageService(
private val primary: StorageService,
private val previous: StorageService
) : StorageService {

companion object {
private val log = LoggerFactory.getLogger(CompositeStorageService::class.java)
}

override fun ensureBucketExists() {
primary.ensureBucketExists()
previous.ensureBucketExists()
}

override fun supportsVersioning(): Boolean {
return primary.supportsVersioning()
}

override fun <T : Timestamped?> loadObject(objectType: ObjectType?, objectKey: String?): T {
if (objectType == ObjectType.ENTITY_TAGS) {
return previous.loadObject<T>(objectType, objectKey)
}

try {
return primary.loadObject<T>(objectType, objectKey)
} catch (e: Exception) {
log.error("{}.loadObject({}, {}) failed (primary)", primary.javaClass.simpleName, objectType, objectKey)
return previous.loadObject<T>(objectType, objectKey)
}
}

override fun deleteObject(objectType: ObjectType?, objectKey: String?) {
primary.deleteObject(objectType, objectKey)
previous.deleteObject(objectType, objectKey)
}

override fun <T : Timestamped?> storeObject(objectType: ObjectType?, objectKey: String?, item: T) {
var exception: Exception? = null

try {
primary.storeObject(objectType, objectKey, item)
} catch (e: Exception) {
exception = e
log.error(
"{}.storeObject({}, {}) failed",
primary.javaClass.simpleName,
objectType,
objectKey,
e
)
}

try {
previous.storeObject(objectType, objectKey, item)
} catch (e: Exception) {
exception = e
log.error(
"{}.storeObject({}, {}) failed",
previous.javaClass.simpleName,
objectType,
objectKey,
e
)
}

if (exception != null) {
throw exception
}
}

override fun listObjectKeys(objectType: ObjectType?): Map<String, Long> {
val primaryObjectKeys = primary.listObjectKeys(objectType)
val previousObjectKeys = previous.listObjectKeys(objectType)

return previousObjectKeys + primaryObjectKeys
}

override fun <T : Timestamped?> listObjectVersions(objectType: ObjectType?,
objectKey: String?,
maxResults: Int): MutableCollection<T> {
try {
return primary.listObjectVersions(objectType, objectKey, maxResults)
} catch (e: Exception) {
log.error(
"{}.listObjectVersions({}, {}, {}) failed (primary)",
primary.javaClass.simpleName,
objectType,
objectKey,
maxResults
)
return previous.listObjectVersions(objectType, objectKey, maxResults)
}
}

override fun getLastModified(objectType: ObjectType?): Long {
try {
return primary.getLastModified(objectType)
} catch (e: Exception) {
log.error("{}.getLastModified({}) failed (primary)", primary.javaClass.simpleName, objectType)
return previous.getLastModified(objectType)
}
}
}
Loading

0 comments on commit 08e119e

Please sign in to comment.