Skip to content
Permalink
Browse files

Refactoring logging output to optimise for splunk.

  • Loading branch information
atooni committed Nov 23, 2019
1 parent 571dafb commit dcf19ba1c0a024d815e23459c27e8fcbd2954320
@@ -19,7 +19,7 @@ trait Blended {
val akkaHttpProxy = blended("blended.akka.http.proxy")
val akkaHttpRestJms = blended("blended.akka.http.restjms")
val akkaHttpSampleHelloworld = blended("blended.akka.http.sample.helloworld")
val camelUtils = blended("blended.camel.utils")
val akkaLogging = blended("blended.akka.logging")
val containerContextApi = blended("blended.container.context.api")
val containerContextImpl = blended("blended.container.context.impl")
val containerRegistry = blended("blended.container.registry")
@@ -105,7 +105,7 @@ class JmsRetryProcessor(
destination = dest,
deliveryMode = JmsDeliveryMode.Persistent,
priority = settings.priority,
ttl = settings.timeToLive,
ttl = settings.timeToLive
)
}
}
@@ -132,7 +132,7 @@ class JmsRetryProcessor(
log = retryLog,
headerCfg = retryCfg.headerCfg,
connectionFactory = retryCfg.cf,
destinationResolver = s => new RetryDestinationResolver(retryCfg.headerCfg, s, router.validate),
destinationResolver = s => new RetryDestinationResolver(retryCfg.headerCfg, s, router.validate(LogLevel.Trace)),
deliveryMode = JmsDeliveryMode.Persistent,
timeToLive = None,
clearPreviousException = true,
@@ -157,9 +157,9 @@ class JmsRetryProcessor(
log = retryLog
)

Flow.fromGraph(FlowProcessor.fromFunction(id, retryLog)(router.validate))
Flow.fromGraph(FlowProcessor.fromFunction(id, retryLog)(router.validate(LogLevel.Trace)))
.via(FlowProcessor.log(LogLevel.Debug, retryLog, "Creating transaction failed event"))
.via(wiretap.flow())
.via(wiretap.flow(true))
}

protected def sendToOriginal : Flow[FlowEnvelope, FlowEnvelope, NotUsed] = resendMessage
@@ -200,7 +200,7 @@ class JmsRetryProcessor(
// case the envelope is marked with an exception after trying to forward the
// message

val transSplit = b.add(FlowProcessor.partition[FlowEnvelope] { env => env.exception.isEmpty && router.validate(env).isFailure })
val transSplit = b.add(FlowProcessor.partition[FlowEnvelope] { env => env.exception.isEmpty && router.validate(LogLevel.Trace)(env).isFailure })
val transMerge = b.add(Merge[FlowEnvelope](2))

merge.out ~> transSplit.in
@@ -10,6 +10,7 @@ import blended.streams.FlowProcessor
import blended.streams.message.{FlowEnvelope, FlowEnvelopeLogger}
import blended.streams.FlowHeaderConfig
import blended.util.logging.LogLevel
import blended.util.logging.LogLevel.LogLevel

import scala.concurrent.duration._
import scala.util.Try
@@ -49,7 +50,7 @@ class JmsRetryRouter(
.withHeader(headerCfg.headerFirstRetry, firstRetry).get
}

val validate : FlowProcessor.IntegrationStep = env => Try {
val validate : LogLevel => FlowProcessor.IntegrationStep = level => env => Try {

val mandatoryHeader : String => Long = h =>
env.header[Long](h) match {
@@ -63,7 +64,7 @@ class JmsRetryRouter(
val firstRetry : Long = mandatoryHeader(headerCfg.headerFirstRetry)

val remaining : FiniteDuration = (retryTimeout - (System.currentTimeMillis() - firstRetry)).millis
log.logEnv(env, LogLevel.Debug, s"Retrying envelope [${env.id}] : [$retryCount / $maxRetries] [${remaining}] remaining")
log.logEnv(env, level, s"Retrying envelope [${env.id}] : [$retryCount / $maxRetries] [${remaining}] remaining", false)

if (maxRetries > 0 && retryCount > maxRetries) {
throw new RetryCountExceededException(maxRetries)
@@ -82,6 +83,6 @@ class JmsRetryRouter(

val flow : Graph[FlowShape[FlowEnvelope, FlowEnvelope], NotUsed] =
Flow.fromGraph(FlowProcessor.fromFunction(name + ".header", log)(header))
.via(FlowProcessor.fromFunction(name + ".validate", log)(validate))
.via(FlowProcessor.fromFunction(name + ".validate", log)(validate(LogLevel.Debug)))

}
@@ -10,7 +10,7 @@ import akka.stream.scaladsl.Flow
import blended.activemq.brokerstarter.internal.BrokerActivator
import blended.akka.internal.BlendedAkkaActivator
import blended.container.context.api.ContainerIdentifierService
import blended.jms.utils.{IdAwareConnectionFactory, JmsDestination}
import blended.jms.utils.{IdAwareConnectionFactory, JmsDestination, JmsQueue}
import blended.streams.jms.{JmsProducerSettings, JmsStreamSupport}
import blended.streams.message.{FlowEnvelope, FlowEnvelopeLogger, FlowMessage}
import blended.streams.processor.Collector
@@ -213,6 +213,18 @@ class JmsRetryProcessorRetryTimeoutSpec extends ProcessorSpecSupport("retryTimeo
assert(first + retryCfg.retryTimeout.toMillis <= now)
}
}

val otherFailed : Collector[FlowEnvelope] = receiveMessages(
headerCfg = headerCfg,
cf = amqCf,
dest = JmsQueue(retryCfg.failedDestName),
log = envLogger,
listener = 1,
completeOn = None,
timeout = Some(timeout)
)

Await.result(otherFailed.result, timeout + 500.millis) should be (empty)
}
}

@@ -7,7 +7,7 @@ import blended.streams.FlowHeaderConfig
import blended.streams.message.{FlowEnvelope, FlowEnvelopeLogger}
import blended.testsupport.scalatest.LoggingFreeSpecLike
import blended.util.RichTry._
import blended.util.logging.Logger
import blended.util.logging.{LogLevel, Logger}
import org.apache.activemq.ActiveMQConnectionFactory
import org.scalatest.Matchers

@@ -53,7 +53,7 @@ class JmsRetryRouterSpec extends TestKit(ActorSystem("RetryRouter"))
val env : FlowEnvelope = FlowEnvelope()
.withHeader(headerCfg.headerRetryCount, maxRetries).unwrap

router.validate(router.header(env).get).unwrap
router.validate(LogLevel.Debug)(router.header(env).get).unwrap
}
}

@@ -70,7 +70,7 @@ class JmsRetryRouterSpec extends TestKit(ActorSystem("RetryRouter"))
val env : FlowEnvelope = FlowEnvelope()
.withHeader(headerCfg.headerFirstRetry, System.currentTimeMillis() - timeout).unwrap

router.validate(router.header(env).get).unwrap
router.validate(LogLevel.Debug)(router.header(env).get).unwrap
}
}

@@ -83,7 +83,7 @@ class JmsRetryRouterSpec extends TestKit(ActorSystem("RetryRouter"))

intercept[MissingRetryDestinationException] {
val env : FlowEnvelope = FlowEnvelope()
router.validate(router.header(env).get).unwrap
router.validate(LogLevel.Debug)(router.header(env).get).unwrap
}
}

@@ -97,8 +97,8 @@ class JmsRetryRouterSpec extends TestKit(ActorSystem("RetryRouter"))
val env : FlowEnvelope = FlowEnvelope()
.withHeader(headerCfg.headerRetryDestination, "myQueue").unwrap

val env1 : FlowEnvelope = router.validate(router.header(env).get).unwrap
val env2 : FlowEnvelope = router.validate(router.header(env1).get).unwrap
val env1 : FlowEnvelope = router.validate(LogLevel.Debug)(router.header(env).get).unwrap
val env2 : FlowEnvelope = router.validate(LogLevel.Debug)(router.header(env1).get).unwrap

env2.header[Int](headerCfg.headerRetryCount) should be(Some(2))
}
@@ -113,8 +113,8 @@ class JmsRetryRouterSpec extends TestKit(ActorSystem("RetryRouter"))
val env : FlowEnvelope = FlowEnvelope()
.withHeader(headerCfg.headerRetryDestination, "myQueue").unwrap

val env1 : FlowEnvelope = router.validate(router.header(env).get).unwrap
val env2 : FlowEnvelope = router.validate(router.header(env1).get).unwrap
val env1 : FlowEnvelope = router.validate(LogLevel.Debug)(router.header(env).get).unwrap
val env2 : FlowEnvelope = router.validate(LogLevel.Debug)(router.header(env1).get).unwrap

env2.header[Long](headerCfg.headerFirstRetry) should be(defined)
env2.header[Long](headerCfg.headerFirstRetry) should be(env1.header[Long](headerCfg.headerFirstRetry))
@@ -20,7 +20,7 @@ class ConnectionMonitor(vendor : String, provider : String, clientId : String) e
def setState(newState : ConnectionState) : Unit = { state = newState }
def getState() : ConnectionState = state

override def getStatus() : String = state.status.toString()
override def getStatus() : String = state.status.toString().toLowerCase()

override def getLastConnect() : String = state.lastConnect match {
case None => "n/a"
@@ -497,11 +497,15 @@ class Launcher private (config : LauncherConfig) {
val frameworkWiring = framework.adapt(classOf[FrameworkWiring])
frameworkWiring.resolveBundles(null /* all bundles */ )
val secondAttemptInstalled = osgiBundles.filter(b => b.bundle.getState() == Bundle.INSTALLED && !isFragment(b))
log.warn(s"The following bundles could not be resolved : ${

val msg : String = s"The following bundles could not be resolved : ${
secondAttemptInstalled.map(
b => s"${b.bundle.getSymbolicName}-${b.bundle.getVersion}"
).mkString("\n", "\n", "")
}")
}"

System.err.println(msg)
log.warn(msg)

if (secondAttemptInstalled.nonEmpty && cmdLine.strict) {
// in strict mode, nor resolved bundles fail the whole container
@@ -226,7 +226,7 @@ abstract class AckSourceLogic[T <: AcknowledgeContext](
}
}

log.underlying.debug(s"Performing poll for [$id]")
log.underlying.trace(s"Performing poll for [$id]")
doPerformPoll(id, ackHandler) match {
case Success(None) =>

@@ -14,7 +14,7 @@ object BlendedStreamsConfig {
}

def create(idSvc : ContainerIdentifierService, cfg : Config) : BlendedStreamsConfig = BlendedStreamsConfig(
transactionShard = cfg.getStringOption("transactionShard"),
transactionShard = cfg.getStringOption("transactionShard").map(s => idSvc.resolvePropertyString(s).get.toString()),
minDelay = cfg.getDuration("minDelay", 5.seconds),
maxDelay = cfg.getDuration("maxDelay", 1.minute),
exponential = cfg.getBoolean("exponential", true),
@@ -54,7 +54,7 @@ object FlowProcessor {
s

case Failure(t) =>
log.logEnv(env.withException(t), LogLevel.Warn, s"Exception in FlowProcessor [${env.id}]:[$name] for message [${env.flowMessage}] : [${t.getClass().getSimpleName()} - ${t.getMessage()}]")
log.logEnv(env.withException(t), LogLevel.Warn, s"Exception in FlowProcessor [${env.id}]:[$name] for message [${env.flowMessage}] : [${t.getClass().getSimpleName()} - ${t.getMessage()}]", false)
env.withException(t)
}

@@ -67,7 +67,7 @@ object FlowProcessor {
}

def log(level : LogLevel.LogLevel, logger : FlowEnvelopeLogger, text : String = "") : Graph[FlowShape[FlowEnvelope, FlowEnvelope], NotUsed] = Flow.fromFunction[FlowEnvelope, FlowEnvelope] { env =>
logger.logEnv(env, level, s"$text : $env")
logger.logEnv(env, level, s"$text : $env", false)
env
}

@@ -60,7 +60,7 @@ class FileAckSource(
) extends DefaultAcknowledgeContext(inflightId, env, System.currentTimeMillis()) {

override def acknowledge() : Unit = {
logger.logEnv(env, LogLevel.Info, s"Successfully processed envelope [${envelope.id}]")
logger.logEnv(env, LogLevel.Debug, s"Successfully processed envelope [${envelope.id}]")
pollCfg.backup match {
case None =>
if (fileToProcess.delete()) {
@@ -9,8 +9,7 @@ import blended.jms.utils._
import blended.streams.message.{AcknowledgeHandler, FlowEnvelope, FlowEnvelopeLogger, FlowMessage}
import blended.streams.{AckSourceLogic, DefaultAcknowledgeContext, FlowHeaderConfig}
import blended.util.RichTry._
import blended.util.logging.LogLevel.LogLevel
import blended.util.logging.Logger
import blended.util.logging.LogLevel
import javax.jms.{Message, MessageConsumer}

import scala.collection.mutable
@@ -64,14 +63,14 @@ final class JmsConsumerStage(
override def deny(): Unit = {
sessionClose(session)
consumerSettings.log.logEnv(
env, consumerSettings.logLevel(env), s"Message [${envelope.id}] has been denied. Closing receiving session."
env, LogLevel.Debug, s"Message [${envelope.id}] has been denied. Closing receiving session."
)
}

override def acknowledge(): Unit = {
jmsMessageAck(jmsMessage)
consumerSettings.log.logEnv(
env, consumerSettings.logLevel(env), s"Acknowledged envelope [${envelope.id}] for session [${session.sessionId}]"
env, LogLevel.Debug, s"Acknowledged envelope [${envelope.id}] for session [${session.sessionId}]"
)
}
}
@@ -117,9 +117,9 @@ final class JmsProducerStage(
)

val logDest = s"${producerSettings.connectionFactory.vendor}:${producerSettings.connectionFactory.provider}:$dest"
producerSettings.log.logEnv(env, LogLevel.Debug,
s"Successfully sent message to [$logDest] with headers [${env.flowMessage.header.mkString(",")}] " +
s"with parameters [${sendParams.deliveryMode}, ${sendParams.priority}, ${sendParams.ttl}]"
producerSettings.log.logEnv(env, producerSettings.logLevel(env),
s"Successfully sent message [${env.id}] to [$logDest] with headers [${env.flowMessage.header.mkString(",")}] " +
s"with parameters [${sendParams.deliveryMode}, ${sendParams.priority}, ${sendParams.ttl}]", false
)
}

@@ -29,13 +29,15 @@ class FlowEnvelopeLogger(

private val mdc : FlowEnvelope => Map[String, String] = env => FlowEnvelopeLogger.mdcMap(prefix, env.flowMessage.header)

def logEnv(env : FlowEnvelope, level : LogLevel, msg : => String) : Unit = logEnv(env, _ => level, _ => msg)
def logEnv(env : FlowEnvelope, level : LogLevel, msg : => String, withStacktrace : Boolean = true) : Unit =
logEnv(env, _ => level, _ => msg, withStacktrace )

def logEnv(env : FlowEnvelope, level : FlowEnvelope => LogLevel, msg: FlowEnvelope => String) : Unit = {
def logEnv(env : FlowEnvelope, level : FlowEnvelope => LogLevel, msg: FlowEnvelope => String, withStacktrace : Boolean) : Unit = {

env.exception match {
case None => underlying.logMdc(mdc(env))(level(env), msg(env))
case Some(e) => underlying.logMdc(e)(mdc(env))(level(env), msg(env))
if (env.exception.isDefined && withStacktrace) {
underlying.logMdc(env.exception.get)(mdc(env))(level(env), msg(env))
} else {
underlying.logMdc(mdc(env))(level(env), msg(env))
}
}
}
@@ -132,12 +132,11 @@ class MultiResultCollector(

case MultiResultTimeout(t) =>
val e : Throwable = new MultiResultTimeoutException(env.id, t)
log.logEnv(env, LogLevel.Warn, e.getMessage())
respond(env.withException(e), None)
}

private def respond(env : FlowEnvelope, timer : Option[Cancellable]): Unit = {
log.logEnv(env, LogLevel.Debug, s"Multiresult processor result is [$env]")
log.logEnv(env, LogLevel.Debug, s"Multiresult processor result is [$env]", false)
timer.foreach(_.cancel())
respondTo ! env
context.stop(self)
@@ -70,7 +70,7 @@ class TransactionWiretap(
updateTransaction(env)
}

log.logEnv(env, LogLevel.Debug, s"Generated bridge transaction event [$event]")
log.logEnv(env, LogLevel.Debug, s"Generated bridge transaction event [$event]", false)
FlowTransactionEvent.event2envelope(headerCfg)(event)
.withHeader(headerCfg.headerTrackSource, trackSource).unwrap
}
@@ -111,7 +111,7 @@ class TransactionWiretap(
Flow.fromGraph(g)
}

def flow() : Flow[FlowEnvelope, FlowEnvelope, NotUsed] = {
def flow(clearException : Boolean = false) : Flow[FlowEnvelope, FlowEnvelope, NotUsed] = {
val g = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._

@@ -123,9 +123,10 @@ class TransactionWiretap(
val select = b.add(
Flow.fromFunction[(FlowEnvelope, FlowEnvelope), FlowEnvelope] { pair =>

pair._1.exception match {
case None => pair._2.clearException()
case Some(e) => pair._2.withException(e)
if (clearException) {
pair._2.clearException()
} else {
pair._2
}
}
)

0 comments on commit dcf19ba

Please sign in to comment.
You can’t perform that action at this time.