Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(core): back out debug logging and single thread context #1032

Merged
merged 1 commit into from
Apr 17, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ import com.netflix.spinnaker.kork.exceptions.SpinnakerException
import com.netflix.spinnaker.kork.exceptions.SystemException
import com.netflix.spinnaker.kork.exceptions.UserException
import java.time.Clock
import java.util.UUID
import kotlinx.coroutines.async
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.supervisorScope
import org.slf4j.LoggerFactory
import org.springframework.context.ApplicationEventPublisher
Expand All @@ -62,25 +60,24 @@ class ResourceActuator(

suspend fun <T : ResourceSpec> checkResource(resource: Resource<T>) {
withTracingContext(resource) {
val coid = UUID.randomUUID().toString() // coroutine id for log messages to help debug #951
val id = resource.id
log.debug("checkResource $id [$coid]")
log.debug("checkResource $id")
val plugin = handlers.supporting(resource.kind)

if (actuationPauser.isPaused(resource)) {
log.debug("Actuation for resource {} is paused, skipping checks [$coid]", id)
log.debug("Actuation for resource {} is paused, skipping checks", id)
publisher.publishEvent(ResourceCheckSkipped(resource.kind, id, "ActuationPaused"))
return@withTracingContext
}

if (plugin.actuationInProgress(resource)) {
log.debug("Actuation for resource {} is already running, skipping checks [$coid]", id)
log.debug("Actuation for resource {} is already running, skipping checks", id)
publisher.publishEvent(ResourceCheckSkipped(resource.kind, id, "ActuationInProgress"))
return@withTracingContext
}

try {
val (desired, current) = plugin.resolve(resource, coid)
val (desired, current) = plugin.resolve(resource)
val diff = DefaultResourceDiff(desired, current)
if (diff.hasChanges()) {
diffFingerprintRepository.store(id, diff)
Expand Down Expand Up @@ -127,21 +124,21 @@ class ResourceActuator(
}
}
} catch (e: Exception) {
log.warn("Failed to veto presumed bad artifact version for ${resource.id} [$coid]", e)
log.warn("Failed to veto presumed bad artifact version for ${resource.id}", e)
// TODO: emit metric
}
}
log.debug("Skipping actuation for resource {} because it was vetoed: {} [$coid]", id, response.message)
log.debug("Skipping actuation for resource {} because it was vetoed: {}", id, response.message)
publisher.publishEvent(ResourceCheckSkipped(resource.kind, id, response.vetoName))
publishVetoedEvent(response, resource)
return@withTracingContext
}

log.debug("Checking resource {} [$coid]", id)
log.debug("Checking resource {}", id)

when {
current == null -> {
log.warn("Resource {} is missing [$coid]", id)
log.warn("Resource {} is missing", id)
publisher.publishEvent(ResourceMissing(resource, clock))

plugin.create(resource, diff)
Expand All @@ -150,8 +147,8 @@ class ResourceActuator(
}
}
diff.hasChanges() -> {
log.warn("Resource {} is invalid [$coid]", id)
log.info("Resource {} delta: {} [$coid]", id, diff.toDebug())
log.warn("Resource {} is invalid", id)
log.info("Resource {} delta: {}", id, diff.toDebug())
publisher.publishEvent(ResourceDeltaDetected(resource, diff.toDeltaJson(), clock))

plugin.update(resource, diff)
Expand All @@ -160,7 +157,7 @@ class ResourceActuator(
}
}
else -> {
log.info("Resource {} is valid [$coid]", id)
log.info("Resource {} is valid", id)
val lastEvent = resourceRepository.lastEvent(id)
when (lastEvent) {
is ResourceActuationLaunched -> log.debug("waiting for actuating task to be completed") // do nothing and wait
Expand All @@ -174,10 +171,10 @@ class ResourceActuator(
}
}
} catch (e: ResourceCurrentlyUnresolvable) {
log.warn("Resource check for {} failed (hopefully temporarily) due to {} [$coid]", id, e.message)
log.warn("Resource check for {} failed (hopefully temporarily) due to {}", id, e.message)
publisher.publishEvent(ResourceCheckUnresolvable(resource, e, clock))
} catch (e: Exception) {
log.error("Resource check for $id failed [$coid]", e)
log.error("Resource check for $id failed", e)
publisher.publishEvent(ResourceCheckError(resource, e.toSpinnakerException(), clock))
}
}
Expand All @@ -193,28 +190,21 @@ class ResourceActuator(
else -> SystemException(this)
} as SpinnakerException

private suspend fun <T : Any> ResourceHandler<*, T>.resolve(resource: Resource<out ResourceSpec>, coid: String): Pair<T, T?> =
private suspend fun <T : Any> ResourceHandler<*, T>.resolve(resource: Resource<out ResourceSpec>): Pair<T, T?> =
supervisorScope {
val desired = async {
try {
log.debug("before desired: ${resource.id}: [$coid]")
val result = desired(resource)
log.debug("after desired: ${resource.id}: [$coid]")
result
desired(resource)
} catch (e: ResourceCurrentlyUnresolvable) {
throw e
} catch (e: Throwable) {
throw CannotResolveDesiredState(resource.id, e)
}
}

// Trying single thread context to see if it works around https://github.com/spinnaker/keel/issues/951
val current = async(newSingleThreadContext("actuation.resolve.current")) {
val current = async {
try {
log.debug("before current: ${resource.id}: [$coid]")
val result = current(resource)
log.debug("after current: ${resource.id}: [$coid]")
result
current(resource)
} catch (e: Throwable) {
throw CannotResolveCurrentState(resource.id, e)
}
Expand Down