Skip to content
This repository has been archived by the owner on May 31, 2024. It is now read-only.

Commit

Permalink
Fixing some codestyle issues
Browse files Browse the repository at this point in the history
  • Loading branch information
atooni committed Jun 28, 2019
1 parent 2ac2e95 commit cd13d10
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object FlowProcessor {
log.warn(t)(s"Failed to create [${clazz.runtimeClass.getName()}] in [${env.id}]:[$name]")
Left(env.withException(t))
}
case Some(t) =>
case Some(_) =>
log.debug(s"Not executing function [${env.id}]:[$name] as envelope has exception [${env.exception.map(_.getMessage()).getOrElse("")}].")
Left(env)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ object JmsConnector {
def nextSessionId : String = {

if (sessionIdCounter.get() == Long.MaxValue) {
sessionIdCounter.set(0l)
sessionIdCounter.set(0L)
}

s"${sessionIdCounter.incrementAndGet()}"
Expand Down Expand Up @@ -184,13 +184,14 @@ trait JmsConnector[S <: JmsSession] { this : TimerGraphStageLogic =>
"Please see ConnectionRetrySettings.connectTimeout"
)
)
} else
} else {
connectionRef.get match {
case Some(connection) =>
Future.successful(connection)
case None =>
Future.failed(new IllegalStateException("BUG: Connection reference not set when connected"))
}
}
}

Future.firstCompletedOf(Iterator(connectionFuture, timeoutFuture))(ec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ final class JmsAckSourceStage(
Success(new JmsAckSession(
connection = connection,
session = session,
sessionId = nextSessionId,
sessionId = nextSessionId(),
jmsDestination = d
))
case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ trait JmsDestinationResolver { this : JmsEnvelopeHeader =>

val msg = flowMsg match {
case t :
TextFlowMessage => session.createTextMessage(Option(t.body()).map(_.toString).getOrElse(null))
// scalastyle:off null
TextFlowMessage => session.createTextMessage(Option(t.body()).map(_.toString).orNull)
// scalastyle:on null
case b :
BinaryFlowMessage =>
val r = session.createBytesMessage()
Expand All @@ -35,13 +37,15 @@ trait JmsDestinationResolver { this : JmsEnvelopeHeader =>
}

flowMsg.header.filter {
case (k, v) => !k.startsWith("JMS")
case (k, _) => !k.startsWith("JMS")
}.foreach {
case (k, v) =>
val propName = k.replaceAll("\\" + dot, dot_repl).replaceAll(hyphen, hyphen_repl)
v match {
case u : UnitMsgProperty => msg.setObjectProperty(propName, null)
// scalastyle:off null
case _ : UnitMsgProperty => msg.setObjectProperty(propName, null)
case o => msg.setObjectProperty(propName, o.value)
// scalastyle:on null
}
}

Expand Down Expand Up @@ -92,7 +96,7 @@ trait FlowHeaderConfigAware extends JmsDestinationResolver {
case None =>
log.trace(s"Trying to resolve destination for [$id] from settings.")
settings.jmsDestination match {
case Some(d) => d
case Some(dest) => dest
case None =>
throw new JMSException(s"Could not resolve JMS destination for [$flowMsg]")
}
Expand All @@ -112,7 +116,7 @@ trait FlowHeaderConfigAware extends JmsDestinationResolver {

val timeToLive : FlowMessage => Option[FiniteDuration] = { flowMsg =>
flowMsg.header[Long](expireHeader(headerConfig.prefix)) match {
case Some(l) => Some((Math.max(1L, l - System.currentTimeMillis())).millis)
case Some(l) => Some(Math.max(1L, l - System.currentTimeMillis()).millis)
case None => settings.timeToLive
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ abstract class JmsStageLogic[S <: JmsSession, T <: JmsSettings](
private[jms] val stopping = new AtomicBoolean(false)

// Is the source stopped ?
private[jms] var stopped = new AtomicBoolean(false)
private[jms] val stopped = new AtomicBoolean(false)

private[jms] def doMarkStopped = stopped.set(true)
private[jms] def doMarkStopped(): Unit = stopped.set(true)

// Mark the source as stopped and try to finish handling all in flight messages
private[jms] val markStopped = getAsyncCallback[Done.type] { _ => doMarkStopped }
private[jms] val markStopped = getAsyncCallback[Done.type] { _ => doMarkStopped() }

// Mark the source as failed and abort all message processing
private[jms] val markAborted = getAsyncCallback[Throwable] { ex =>
Expand Down Expand Up @@ -70,7 +70,7 @@ abstract class JmsStageLogic[S <: JmsSession, T <: JmsSettings](
Future
.sequence(closeSessionFutures)
.onComplete { _ =>
Option(jmsConnection).map { jc =>
Option(jmsConnection).foreach { jc =>
jc.onComplete {
case Success(connection) =>
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ case class FlowTransaction private[transaction] (
copy(creationProps = started.properties)
this

case completed : FlowTransactionCompleted => copy(
case _ : FlowTransactionCompleted => copy(
state = FlowTransactionState.Completed,
worklist = Map.empty
)

case failed : FlowTransactionFailed => copy(
case _ : FlowTransactionFailed => copy(
state = FlowTransactionState.Failed,
worklist = Map.empty
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ object FlowTransactionEvent {
(envelope.header[String](cfg.headerTransId), envelope.header[String](cfg.headerState)) match {
case (Some(id), Some(state)) => FlowTransactionState.withName(state) match {
case FlowTransactionState.Started =>
val header = envelope.flowMessage.header.filter { case (k, v) => !k.startsWith("JMS") }
val header = envelope.flowMessage.header.filter { case (k, _) => !k.startsWith("JMS") }
FlowTransactionStarted(id, header)

case FlowTransactionState.Completed =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ object EventSeverity extends Enumeration {
type EventSeverity = Value
val Unknown, Information, Harmless, Warning, Minor, Critical, Fatal = Value

// scalastyle:off magic.number
implicit def severityToInt(severity : EventSeverity) : Int = severity match {
case Unknown => 0
case Information => 10
Expand All @@ -20,4 +21,5 @@ object EventSeverity extends Enumeration {
case Critical => 50
case Fatal => 60
}
// scalastyle:off magic.number
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class FileSourceSpec extends SimplePojoContainerSpec
val src : Source[FlowEnvelope, NotUsed] =
Source.fromGraph(new FileAckSource(pollCfg)).via(new AckProcessor("simplePoll.ack").flow)

val collector : Collector[FlowEnvelope] = StreamFactories.runSourceWithTimeLimit("simplePoll", src, timeout) { env => }
val collector : Collector[FlowEnvelope] = StreamFactories.runSourceWithTimeLimit("simplePoll", src, timeout) { _ => }

val result : List[FlowEnvelope] = Await.result(collector.result, timeout + 100.millis)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class FlowProcessorSpec extends TestKit(ActorSystem("FlowProcessorSpec"))

val faulty = new FlowProcessor {
override val name : String = "faulty"
override val f : IntegrationStep = env => Failure(new Exception("Boom"))
override val f : IntegrationStep = _ => Failure(new Exception("Boom"))
}

"The FlowProcessor should" - {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import blended.testsupport.scalatest.LoggingFreeSpecLike
import blended.util.logging.Logger
import org.osgi.framework.BundleActivator
import org.scalatest.Matchers
import org.springframework.expression.spel.standard.SpelExpressionParser
import org.springframework.expression.spel.support.StandardEvaluationContext

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
Expand Down Expand Up @@ -55,12 +53,8 @@ class HeaderProcessorSpec extends SimplePojoContainerSpec

"set plain headers correctly" in {

val parser = new SpelExpressionParser()
val exp = parser.parseExpression("foo")
val ctxt = new StandardEvaluationContext()

val r = result(List(
HeaderProcessorConfig("foo", Some("bar"), true)
HeaderProcessorConfig("foo", Some("bar"), overwrite = true)
), None)

r should have size 1
Expand All @@ -69,22 +63,24 @@ class HeaderProcessorSpec extends SimplePojoContainerSpec

"perform the normal resolution of container context properties" in {

implicit val timeout = 3.seconds
implicit val timeout : FiniteDuration = 3.seconds
val idSvc = mandatoryService[ContainerIdentifierService](registry)(None)

idSvc.resolvePropertyString("$[[Country]]").get should be("cc")

val r = result(List(
HeaderProcessorConfig("foo", Some("""$[[Country]]"""), true),
HeaderProcessorConfig("foo2", Some("""${{#foo}}"""), true),
HeaderProcessorConfig("test", Some("${{42}}"), true)
HeaderProcessorConfig("foo", Some("""$[[Country]]"""), overwrite = true),
HeaderProcessorConfig("foo2", Some("""${{#foo}}"""), overwrite = true),
HeaderProcessorConfig("test", Some("${{42}}"), overwrite = true)
), Some(idSvc))

// scalastyle:off magic.number
log.info(r.toString())
r.head.flowMessage.header should have size (3)
r.head.flowMessage.header should have size 3
r.head.header[String]("foo") should be(Some("cc"))
r.head.header[String]("foo2") should be(Some("cc"))
r.head.header[Int]("test") should be(Some(42))
// scalastyle:on magic.number
}
}

Expand Down

0 comments on commit cd13d10

Please sign in to comment.