Skip to content

Commit

Permalink
-
Browse files Browse the repository at this point in the history
  • Loading branch information
kostaskougios committed Dec 6, 2023
1 parent 828665b commit 332b601
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import functions.fibers.FiberExecutor
import functions.helidon.transport.HelidonWsTransport
import functions.helidon.ws.ServerWsListener
import functions.model.Serializer
import io.helidon.common.buffers.BufferData
import io.helidon.webclient.websocket.WsClient
import io.helidon.webserver.WebServer
import io.helidon.webserver.websocket.WsRouting
Expand All @@ -33,7 +34,6 @@ class EndToEndHelidonWsSuite extends AnyFunSuite:
def withTransport[R](serverPort: Int, serializer: Serializer)(f: TestsHelidonFunctions => R): R =
FiberExecutor.withFiberExecutor: executor =>
val transport = new HelidonWsTransport(executor)

val uri = URI.create(s"ws://localhost:$serverPort")
val webClient = WsClient
.builder()
Expand All @@ -57,6 +57,14 @@ class EndToEndHelidonWsSuite extends AnyFunSuite:
runTest(serializer): f =>
f.add(1, 3) should be(4)

test(s"$serializer: addLR"):
runTest(serializer): f =>
f.addLR(2, 3) should be(List(Return1(5)))

test(s"$serializer: noArgs"):
runTest(serializer): f =>
f.noArgs() should be(5)

test(s"$serializer: calling multiple functions sequentially"):
runTest(serializer): f =>
for i <- 1 to 10000 do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package functions.helidon.transport

import functions.fibers.FiberExecutor
import functions.helidon.transport.ClientWsListener.PoisonPill
import functions.helidon.transport.exceptions.RemoteFunctionFailedException
import io.helidon.common.buffers.BufferData
import io.helidon.websocket.{WsListener, WsSession}

Expand All @@ -12,24 +13,26 @@ import scala.util.Using.Releasable
class ClientWsListener(fiberExecutor: FiberExecutor) extends WsListener:
private val toSend = new LinkedBlockingQueue[BufferData](64)
private val latchMap = collection.concurrent.TrieMap.empty[Long, CountDownLatch]
private val dataMap = collection.concurrent.TrieMap.empty[Long, Array[Byte]]
private val dataMap = collection.concurrent.TrieMap.empty[Long, (Int, Array[Byte])]

def send(correlationId: Long, a: BufferData): Array[Byte] =
toSend.put(a)
val latch = new CountDownLatch(1)
val latch = new CountDownLatch(1)
latchMap.put(correlationId, latch)
latch.await()
val receivedData = dataMap.getOrElse(correlationId, throw new IllegalStateException(s"No data found for correlationId=$correlationId"))
val (result, receivedData) = dataMap.getOrElse(correlationId, throw new IllegalStateException(s"No data found for correlationId=$correlationId"))
latchMap -= correlationId
dataMap -= correlationId
receivedData
if result == 0 then receivedData
else throw new RemoteFunctionFailedException(new String(receivedData, "UTF-8"))

override def onMessage(session: WsSession, buffer: BufferData, last: Boolean): Unit =
val corId = buffer.readLong()
val data = buffer.readBytes()
val result = buffer.read()
val corId = buffer.readLong()
val data = buffer.readBytes()
latchMap.get(corId) match
case Some(latch) =>
dataMap.put(corId, data)
dataMap.put(corId, (result, data))
latch.countDown()
case None =>
println(s"Correlation id missing: $corId , received data ignored.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import functions.fibers.FiberExecutor
import functions.model.TransportInput
import io.helidon.common.buffers.BufferData
import io.helidon.websocket.WsListener
import jdk.internal.util.ByteArray

import java.io.{ByteArrayOutputStream, DataOutputStream}
import java.util.concurrent.atomic.AtomicLong
import scala.util.Using.Releasable

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package functions.helidon.transport.exceptions

class RemoteFunctionFailedException(remoteStacktrace: String) extends RuntimeException(s"Remote function failed with this exception: $remoteStacktrace")
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package functions.helidon.ws
import functions.model.{Coordinates4, ReceiverInput}
import io.helidon.common.buffers.BufferData
import io.helidon.websocket.{WsListener, WsSession}
import jdk.internal.util.ByteArray

import java.io.{ByteArrayOutputStream, PrintWriter}

class ServerWsListener(invokerMap: Map[Coordinates4, ReceiverInput => Array[Byte]]) extends WsListener:

Expand All @@ -15,12 +16,26 @@ class ServerWsListener(invokerMap: Map[Coordinates4, ReceiverInput => Array[Byte
val coordsStr = new String(coordsByteData, "UTF-8")
val coordinates4 = Coordinates4.unapply(coordsStr)
val data = buffer.readBytes()
val f = invokerMap(coordinates4)
val response = f(ReceiverInput(data))
val buf = BufferData.growing(response.length + 8)
buf.write(longToBytes(corId))
buf.write(response)
session.send(buf, true)
try
val f = invokerMap(coordinates4)
val response = f(ReceiverInput(data))
val buf = BufferData.growing(response.length + 8)
buf.write(0)
buf.write(longToBytes(corId))
buf.write(response)
session.send(buf, true)
catch
case t: Throwable =>
val bos = new ByteArrayOutputStream
val w = new PrintWriter(bos)
t.printStackTrace(w)
w.close()
val data = bos.toByteArray
val buf = BufferData.growing(data.length + 8)
buf.write(1)
buf.write(longToBytes(corId))
buf.write(data)
session.send(buf, true)

private def longToBytes(x: Long): Array[Byte] =
val buffer = new Array[Byte](8)
Expand Down

0 comments on commit 332b601

Please sign in to comment.