Skip to content

Commit

Permalink
-
Browse files Browse the repository at this point in the history
  • Loading branch information
kostaskougios committed Dec 12, 2023
1 parent 634ef45 commit 213ca99
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 51 deletions.
12 changes: 10 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ val KafkaClient = "org.apache.kafka" % "kafka-clients"
val EmbeddedKafka = "io.github.embeddedkafka" %% "embedded-kafka" % "3.6.0" % Test

val HelidonVersion = "4.0.1"
val HelidonCommonBuffers = "io.helidon.common" % "helidon-common-buffers" % HelidonVersion
val HelidonServerHttp2 = "io.helidon.webserver" % "helidon-webserver-http2" % HelidonVersion
val HelidonWebClientHttp2 = "io.helidon.webclient" % "helidon-webclient-http2" % HelidonVersion

Expand Down Expand Up @@ -176,19 +177,26 @@ lazy val `helidon-client` = project
)
.dependsOn(`functions-common`)

lazy val `helidon-ws-client-server-common` = project
.settings(
commonSettings,
libraryDependencies ++= Seq(ScalaTest, HelidonCommonBuffers)
)
.dependsOn(`functions-common`, fibers)

lazy val `helidon-ws-client` = project
.settings(
commonSettings,
libraryDependencies ++= Seq(ScalaTest, HelidonWebSocketClient)
)
.dependsOn(`functions-common`, fibers)
.dependsOn(`functions-common`, `helidon-ws-client-server-common`, fibers)

lazy val `helidon-ws-server` = project
.settings(
commonSettings,
libraryDependencies ++= Seq(ScalaTest, HelidonServerWebSocket)
)
.dependsOn(`functions-common`, fibers)
.dependsOn(`functions-common`, `helidon-ws-client-server-common`, fibers)

// ----------------------- end to end test modules --------------------------------
val endToEndTestsSettings = Seq(
Expand Down
2 changes: 2 additions & 0 deletions functions-common/src/main/scala/functions/model/common.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package functions.model

type InvokerMap = Map[Coordinates4, ReceiverInput => Array[Byte]]

import java.util.function.BiFunction

type TransportFunction = TransportInput => Array[Byte]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package functions.helidon.ws

import functions.model.*
import io.helidon.common.buffers.BufferData

import java.io.{ByteArrayOutputStream, PrintWriter}

class InOutMessageProtocol(invokerMap: InvokerMap):
private val im = invokerMap.map:
case (c4, i) =>
(c4.toRawCoordinates, i)

def incoming(buffer: BufferData): BufferData =
val receiverId = buffer.readInt32()
val corId = buffer.readLong()
val coordsLength = buffer.readUnsignedInt32()
val coordsByteData = new Array[Byte](coordsLength.toInt)
buffer.read(coordsByteData)
val coordsStr = new String(coordsByteData, "UTF-8")
val coordinates4 = Coordinates4.unapply(coordsStr)
val dataLen = buffer.readInt32()
val data = new Array[Byte](dataLen)
buffer.read(data)
val argLen = buffer.readInt32()
val arg = new Array[Byte](argLen)
buffer.read(arg)
try
val f = im(coordinates4.toRawCoordinates)
val response = f(ReceiverInput(data, arg))
val buf = BufferData.growing(response.length + 12)
buf.writeInt32(receiverId)
buf.write(0)
buf.write(longToBytes(corId))
buf.write(response)
buf
catch
case t: Throwable =>
val bos = new ByteArrayOutputStream
val w = new PrintWriter(bos)
t.printStackTrace(w)
w.close()
val data = bos.toByteArray
val buf = BufferData.growing(data.length + 12)
buf.writeInt32(receiverId)
buf.write(1)
buf.write(longToBytes(corId))
buf.write(data)
buf

private def longToBytes(x: Long): Array[Byte] =
val buffer = new Array[Byte](8)
for (i <- 0 until 8)
// Shift the long value 8*(7-i) bits to the right and take the lowest 8 bits
buffer(i) = (x >> (8 * (7 - i))).toByte
buffer
Original file line number Diff line number Diff line change
@@ -1,57 +1,13 @@
package functions.helidon.ws

import functions.model.{Coordinates4, ReceiverInput}
import functions.model.InvokerMap
import io.helidon.common.buffers.BufferData
import io.helidon.websocket.{WsListener, WsSession}

import java.io.{ByteArrayOutputStream, PrintWriter}
class ServerWsListener(invokerMap: InvokerMap) extends WsListener:

class ServerWsListener(invokerMap: Map[Coordinates4, ReceiverInput => Array[Byte]]) extends WsListener:

private val im = invokerMap.map:
case (c4, i) =>
(c4.toRawCoordinates, i)
private val protocol = new InOutMessageProtocol(invokerMap)

override def onMessage(session: WsSession, buffer: BufferData, last: Boolean): Unit =
val receiverId = buffer.readInt32()
val corId = buffer.readLong()
val coordsLength = buffer.readUnsignedInt32()
val coordsByteData = new Array[Byte](coordsLength.toInt)
buffer.read(coordsByteData)
val coordsStr = new String(coordsByteData, "UTF-8")
val coordinates4 = Coordinates4.unapply(coordsStr)
val dataLen = buffer.readInt32()
val data = new Array[Byte](dataLen)
buffer.read(data)
val argLen = buffer.readInt32()
val arg = new Array[Byte](argLen)
buffer.read(arg)
try
val f = im(coordinates4.toRawCoordinates)
val response = f(ReceiverInput(data, arg))
val buf = BufferData.growing(response.length + 12)
buf.writeInt32(receiverId)
buf.write(0)
buf.write(longToBytes(corId))
buf.write(response)
session.send(buf, true)
catch
case t: Throwable =>
val bos = new ByteArrayOutputStream
val w = new PrintWriter(bos)
t.printStackTrace(w)
w.close()
val data = bos.toByteArray
val buf = BufferData.growing(data.length + 12)
buf.writeInt32(receiverId)
buf.write(1)
buf.write(longToBytes(corId))
buf.write(data)
session.send(buf, true)

private def longToBytes(x: Long): Array[Byte] =
val buffer = new Array[Byte](8)
for (i <- 0 until 8)
// Shift the long value 8*(7-i) bits to the right and take the lowest 8 bits
buffer(i) = (x >> (8 * (7 - i))).toByte
buffer
val out = protocol.incoming(buffer)
session.send(out, true)

0 comments on commit 213ca99

Please sign in to comment.