Skip to content

Commit

Permalink
fix(queue): Use a simpler queue serialization migrator (#1995)
Browse files Browse the repository at this point in the history
  • Loading branch information
robzienert committed Feb 15, 2018
1 parent a0d46d1 commit 442ee13
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 234 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ junitVersion=1.0.2
jupiterVersion=5.0.2
junitLegacyVersion=4.12.0
spekVersion=1.1.5
keikoVersion=2.1.3
keikoVersion=2.2.0
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType
import com.netflix.spinnaker.orca.q.migration.MinimalClassTypeInfoSerializationMigrator
import com.netflix.spinnaker.orca.q.redis.migration.ExecutionTypeDeserializer
import com.netflix.spinnaker.orca.q.redis.migration.OrcaToKeikoSerializationMigrator
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.migration.SerializationMigrator
import com.netflix.spinnaker.q.redis.RedisDeadMessageHandler
Expand All @@ -34,6 +34,7 @@ import org.springframework.context.annotation.Configuration
import redis.clients.jedis.Jedis
import redis.clients.util.Pool
import java.time.Clock
import java.util.*

@Configuration
@EnableConfigurationProperties(ObjectMapperSubtypeProperties::class)
Expand All @@ -60,15 +61,17 @@ class RedisOrcaQueueConfiguration : RedisQueueConfiguration() {
}
}

@Bean fun orcaToKeikoSerializationMigrator(objectMapper: ObjectMapper) = OrcaToKeikoSerializationMigrator(objectMapper)

@Bean(name = arrayOf("queueImpl")) override fun queue(
@Qualifier("queueRedisPool") redisPool: Pool<Jedis>,
redisQueueProperties: RedisQueueProperties,
clock: Clock,
deadMessageHandler: RedisDeadMessageHandler,
@Qualifier("queueEventPublisher") publisher: EventPublisher,
mapper: ObjectMapper,
serializationMigrators: List<SerializationMigrator>
serializationMigrator: Optional<SerializationMigrator>
): RedisQueue {
return super.queue(redisPool, redisQueueProperties, clock, deadMessageHandler, publisher, mapper, serializationMigrators)
return super.queue(redisPool, redisQueueProperties, clock, deadMessageHandler, publisher, mapper, serializationMigrator)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import redis.clients.jedis.Jedis
import redis.clients.util.Pool
import java.time.Clock
import java.time.Duration
import java.util.*

@Configuration
@ConditionalOnExpression("\${queue.redis.enabled:true}")
Expand Down Expand Up @@ -65,7 +66,7 @@ class RedisQueueShovelConfiguration {
deadMessageHandler: RedisDeadMessageHandler,
publisher: EventPublisher,
redisQueueObjectMapper: ObjectMapper,
serializationMigrators: List<SerializationMigrator>
serializationMigrator: Optional<SerializationMigrator>
) =
RedisQueue(
queueName = redisQueueProperties.queueName,
Expand All @@ -75,7 +76,7 @@ class RedisQueueShovelConfiguration {
publisher = publisher,
ackTimeout = Duration.ofSeconds(redisQueueProperties.ackTimeoutSeconds.toLong()),
mapper = redisQueueObjectMapper,
serializationMigrators = serializationMigrators
serializationMigrator = serializationMigrator
)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2018 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.orca.q.redis.migration

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.netflix.spinnaker.q.migration.SerializationMigrator

internal val orcaToKeikoTypes = mapOf(
".StartTask" to "startTask",
".CompleteTask" to "completeTask",
".PauseTask" to "pauseTask",
".ResumeTask" to "resumeTask",
".RunTask" to "runTask",
".StartStage" to "startStage",
".ContinueParentStage" to "continueParentStage",
".CompleteStage" to "completeStage",
".SkipStage" to "skipStage",
".AbortStage" to "abortStage",
".PauseStage" to "pauseStage",
".RestartStage" to "restartStage",
".ResumeStage" to "resumeStage",
".CancelStage" to "cancelStage",
".StartExecution" to "startExecution",
".RescheduleExecution" to "rescheduleExecution",
".CompleteExecution" to "completeExecution",
".ResumeExecution" to "resumeExecution",
".CancelExecution" to "cancelExecution",
".InvalidExecutionId" to "invalidExecutionId",
".InvalidStageId" to "invalidStageId",
".InvalidTaskId" to "invalidTaskId",
".InvalidTaskType" to "invalidTaskType",
".NoDownstreamTasks" to "noDownstreamTasks",
".TotalThrottleTimeAttribute" to "totalThrottleTime",
".handler.DeadMessageAttribute" to "deadMessage",
".MaxAttemptsAttribute" to "maxAttempts",
".AttemptsAttribute" to "attempts"
)

class OrcaToKeikoSerializationMigrator(
private val mapper: ObjectMapper
) : SerializationMigrator {

override fun migrate(json: String): String {
val m = mapper.readValue<MutableMap<String, Any?>>(json)

val replaceKind = fun (target: MutableMap<String, Any?>) {
if (target.containsKey("@class")) {
target["kind"] = orcaToKeikoTypes[target["@class"]]
target.remove("@class")
}
}

replaceKind(m)
if (m.containsKey("attributes")) {
(m["attributes"] as List<MutableMap<String, Any?>>).forEach(replaceKind)
}

return mapper.writeValueAsString(m)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ import org.springframework.stereotype.Component
}
}

@JsonTypeName("deadMessage") internal object DeadMessageAttribute : Attribute
@JsonTypeName("deadMessage") object DeadMessageAttribute : Attribute

This file was deleted.

This file was deleted.

0 comments on commit 442ee13

Please sign in to comment.