Skip to content

Commit

Permalink
[ BUG ] Fix a race condition and a mutable state closure on a future
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanadrien committed Dec 12, 2017
1 parent 6d4124a commit 9849cc2
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
2 changes: 1 addition & 1 deletion scalastyle-config.xml
Expand Up @@ -85,7 +85,7 @@
</check>
<check level="warning" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true">
<parameters>
<parameter name="maximum"><![CDATA[10]]></parameter>
<parameter name="maximum"><![CDATA[15]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check>
Expand Down
Expand Up @@ -19,11 +19,12 @@ abstract class MqttClient(val gatlingMqttId : String) extends Actor with LazyLog

import context._

// FIXME: Timeout and feebackListener polution
private var feedbackListener : Map[String, List[(FeedbackFunction, ActorRef)]] = Map()

private var waitForMessagesReceivedListeners : List[ActorRef] = Nil

private var delayIncomingMessages : Boolean = false

protected def connect(replyTo : ActorRef) : Unit

protected def subscribe(topics : List[(String, MqttQoS)], replyTo : ActorRef) : Unit
Expand Down Expand Up @@ -51,6 +52,9 @@ abstract class MqttClient(val gatlingMqttId : String) extends Actor with LazyLog
fn(payload)
}
// fire matching listeners
if (matching.nonEmpty) {
logger.debug(s"Client ${gatlingMqttId} : message on topic $topic matches ${matching.length} awaiting listener(s)")
}
matching.foreach { case (_, replyTo) =>
replyTo ! FeedbackReceived
}
Expand All @@ -74,15 +78,20 @@ abstract class MqttClient(val gatlingMqttId : String) extends Actor with LazyLog
replyTo : ActorRef
) = {
implicit val timeout = Timeout(1 minute)
delayIncomingMessages = true
self ? MqttCommands.Publish(
topic = topic,
payload = payload,
mqttQoS = qos,
retain = retain
) andThen {
case Success(_) =>
addFeedbackListener(topic, (payloadFeedback, replyTo))
// Register listener
self ! PublishAckRegisterFeedback(
topic, payloadFeedback, replyTo
)
case Failure(th) =>
delayIncomingMessages = false
replyTo ! akka.actor.Status.Failure(th)
}
}
Expand All @@ -100,6 +109,11 @@ abstract class MqttClient(val gatlingMqttId : String) extends Actor with LazyLog
waitForMessagesReceivedListeners = Nil
}

private def publishAckRegisterFeedback(topic : String, payloadFeedback : FeedbackFunction, listener : ActorRef): Unit = {
delayIncomingMessages = false
addFeedbackListener(topic, (payloadFeedback, listener))
}

override def postStop() = {
super.postStop()
close()
Expand All @@ -114,10 +128,16 @@ abstract class MqttClient(val gatlingMqttId : String) extends Actor with LazyLog
publish(topic, payload, mqttQoS, retain, sender())
case PublishAndWait(topic, payload, payloadFeedback, mqttQoS, retain) =>
publishAndWait(topic, payload, payloadFeedback, mqttQoS, retain, sender())
case OnPublish(topic, payload) =>
onPublish(topic, payload)
if (feedbackListener.isEmpty) {
fireAllWaitForMessageListeners
case PublishAckRegisterFeedback(topic, payloadFeedback, listener) =>
publishAckRegisterFeedback(topic, payloadFeedback, listener)
case msg @ OnPublish(topic, payload) =>
if (delayIncomingMessages) {
system.scheduler.scheduleOnce(1 milliseconds, self, msg)
} else {
onPublish(topic, payload)
if (feedbackListener.isEmpty) {
fireAllWaitForMessageListeners
}
}
case WaitForMessages =>
waitForMessages(sender())
Expand Down
@@ -1,5 +1,6 @@
package com.github.jeanadrien.gatling.mqtt.client

import akka.actor.ActorRef
import com.github.jeanadrien.gatling.mqtt.client.MqttClient.FeedbackFunction
import com.github.jeanadrien.gatling.mqtt.client.MqttQoS.MqttQoS

Expand Down Expand Up @@ -28,6 +29,10 @@ object MqttCommands {
topic : String, payload : Array[Byte], payloadFeedback : FeedbackFunction, qos : MqttQoS, retain : Boolean
) extends MqttCommands

case class PublishAckRegisterFeedback(
topic : String, payloadFeedback : FeedbackFunction, listener : ActorRef
) extends MqttCommands

case object FeedbackReceived extends MqttCommands

case object WaitForMessages extends MqttCommands
Expand Down

0 comments on commit 9849cc2

Please sign in to comment.