Permalink
Browse files

Major update to communication protocol to avoid corner-cases by alway…

…s just using JSON
  • Loading branch information...
darkfrog26 committed Dec 25, 2018
1 parent 4951d3a commit 88a34d3e5b6fa5200b21579fdac3bab4da71b7c7
@@ -3,7 +3,7 @@ import sbtcrossproject.CrossPlugin.autoImport.crossProject

name := "youi"
organization in ThisBuild := "io.youi"
version in ThisBuild := "0.9.8"
version in ThisBuild := "0.9.9-SNAPSHOT"
scalaVersion in ThisBuild := "2.12.7"
crossScalaVersions in ThisBuild := List("2.12.7", "2.11.12")
resolvers in ThisBuild += Resolver.sonatypeRepo("releases")
@@ -79,6 +79,7 @@ class CommunicationInternal private[communication](communication: Communication)

private[communication] def init(): Unit = {
receive.attach { message =>
scribe.debug(s"Received: ${message.messageType}...")
if (message.messageType == CommunicationMessage.MethodResponse) {
synchronized {
val f = queue.get(message.invocationId)
@@ -88,7 +89,7 @@ class CommunicationInternal private[communication](communication: Communication)
case Some(f) => f(message)
case None => {
// TODO: detect if this is the right Communication instance
scribe.debug(s"No entry found for endPoint: ${message.endPoint}, invocationId: ${message.invocationId}, content: ${message.content}.")
scribe.debug(s"No entry found for endPoint: ${message.endPoint}, invocationId: ${message.invocationId}, content: ${message.content}, queue: ${queue.keySet.mkString(", ")}.")
}
}
} else if (message.messageType == CommunicationMessage.MethodRequest) {
@@ -122,9 +123,16 @@ class CommunicationInternal private[communication](communication: Communication)
// Wait for invocation response
def onInvocation[T](invocationId: Int)(f: CommunicationMessage => T): Future[T] = synchronized {
val promise = Promise[T]
val handler: CommunicationMessage => Unit = (m: CommunicationMessage) => m.error match {
case Some(error) => promise.failure(new CommunicationException(error))
case None => promise.success(f(m))
val handler: CommunicationMessage => Unit = (m: CommunicationMessage) => {
scribe.debug(s"Received communication message for handler! $invocationId")
m.error match {
case Some(error) => promise.failure(new CommunicationException(error))
case None => try {
promise.success(f(m))
} catch {
case t: Throwable => promise.failure(t)
}
}
}
val connectionMonitor = communication.connection.connected.attach { b =>
if (!b && !promise.isCompleted) {
@@ -8,11 +8,8 @@ case class CommunicationMessage(messageType: Int,
content: List[String],
error: Option[String]) {
lazy val parsableString: String = {
val message = error match {
case Some(e) => s"0:$e"
case None => s"1:${JsonUtil.toJsonString(content)}"
}
s"$messageType:[$endPoint]:$invocationId:$message"
val json = JsonUtil.toJsonString(this)
s"|CM|$json"
}
}

@@ -21,19 +18,8 @@ object CommunicationMessage {
val MethodResponse = 2
val SharedVariable = 3

private val MessageRegex = """(\d+):\[(.+)\]:(\d+):(\d{1}):(.*)""".r

def unapply(unparsedMessage: String): Option[CommunicationMessage] = unparsedMessage match {
case MessageRegex(messageType, endPoint, invocationId, success, contentJSON) => {
val successful = success.toInt == 1
val (content, error) = if (successful) {
val list = JsonUtil.fromJsonString[List[String]](contentJSON)
list -> None
} else {
Nil -> Some(contentJSON)
}
Some(CommunicationMessage(messageType.toInt, endPoint, invocationId.toInt, content, error))
}
case m if m.startsWith("|CM|") => Some(JsonUtil.fromJsonString[CommunicationMessage](m.substring(4)))
case _ => None
}
}

0 comments on commit 88a34d3

Please sign in to comment.