Skip to content
Permalink
Browse files

Introduce connection info message for open message stream connections…

…, for interrogating information about the connection and connected mesh node.
  • Loading branch information...
Brett Morien
Brett Morien committed Nov 8, 2019
1 parent 6fe8e2d commit 821b6892d44b629aa9a6698e8296d961d9354e0a
@@ -75,6 +75,16 @@ fun Messages.MessageContentProto.toMessageContent(): MessageContent =
)
}

hasInfoRequest() -> {
MessageContent.ConnectionInfoRequest()
}

hasInfoResponse() -> {
MessageContent.ConnectionInfoResponse(
nodeId = infoResponse.nodeId.toNodeId()
)
}

else -> throw Throwable("Unknown message type")
}

@@ -107,5 +117,19 @@ fun MessageContent.toMessageContentProto(): Messages.MessageContentProto =
.build()
)
}

is MessageContent.ConnectionInfoRequest -> {
builder.setInfoRequest(
Messages.ConnectionInfoRequestProto.newBuilder()
)
}

is MessageContent.ConnectionInfoResponse -> {
builder.setInfoResponse(
Messages.ConnectionInfoResponseProto.newBuilder()
.setNodeId(nodeId.toNodeIdProto())
)
}

}
}.build()
@@ -29,12 +29,22 @@ message MessageTargetProto {

message MessageContentProto {
oneof content {
InvocationRequestProto invocation_request = 1;
InvocationResponseProto invocation_response = 2;
ErrorProto error = 3;
ErrorProto error = 1;
ConnectionInfoRequestProto info_request = 2;
ConnectionInfoResponseProto info_response = 3;
InvocationRequestProto invocation_request = 4;
InvocationResponseProto invocation_response = 5;
}
}

message ConnectionInfoRequestProto {

}

message ConnectionInfoResponseProto {
NodeIdProto nodeId = 1;
}

message InvocationRequestProto {
AddressableReferenceProto reference = 1;
string method = 2;
@@ -58,8 +58,8 @@ class ClientConnection(
outgoingChannel.send(message.toMessageProto())
}

fun offerMessage(messsage: Message) {
val queued = outgoingChannel.offer(messsage.toMessageProto())
fun offerMessage(message: Message) {
val queued = outgoingChannel.offer(message.toMessageProto())
if (!queued) throw CapacityExceededException("Could not offer message.")
}

@@ -7,13 +7,15 @@
package orbit.server.pipeline.step

import orbit.server.mesh.AddressableManager
import orbit.server.mesh.LocalNodeInfo
import orbit.server.pipeline.PipelineContext
import orbit.shared.net.Message
import orbit.shared.net.MessageContent
import orbit.shared.net.MessageTarget

class PlacementStep(
private val addressableManager: AddressableManager
private val addressableManager: AddressableManager,
private val localNodeInfo: LocalNodeInfo
) : PipelineStep {
override suspend fun onInbound(context: PipelineContext, msg: Message) {
when (val content = msg.content) {
@@ -26,6 +28,17 @@ class PlacementStep(
}
}
}
is MessageContent.ConnectionInfoRequest ->
msg.source?.also { source ->
msg.copy(
target = MessageTarget.Unicast(source),
content = MessageContent.ConnectionInfoResponse(
nodeId = localNodeInfo.info.id
)
).also {
context.pushNew(it)
}
}
else -> context.next(msg)
}
}
@@ -23,9 +23,11 @@ sealed class MessageTarget {
}

sealed class MessageContent {
data class Error(val description: String?) : MessageContent()
class ConnectionInfoRequest : MessageContent()
data class ConnectionInfoResponse(val nodeId: NodeId) : MessageContent()
data class InvocationRequest(val destination: AddressableReference, val method: String, val arguments: String) :
MessageContent()

data class InvocationResponse(val data: String) : MessageContent()
data class Error(val description: String?) : MessageContent()
}

0 comments on commit 821b689

Please sign in to comment.
You can’t perform that action at this time.