Skip to content

Commit

Permalink
-
Browse files Browse the repository at this point in the history
  • Loading branch information
kostaskougios committed Dec 12, 2023
1 parent 6368184 commit 3aed65e
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers.*

import java.net.URI
import java.util.concurrent.TimeUnit
import scala.util.Using

class EndToEndHelidonWsSuite extends AnyFunSuite:
Expand Down
20 changes: 11 additions & 9 deletions fibers/src/main/scala/functions/fibers/Fiber.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package functions.fibers

import java.util.concurrent.Future
import scala.concurrent.duration.TimeUnit

case class Fiber[A](javaFuture: Future[A]):
def isReady: Boolean = javaFuture.isDone
def interrupt(): Unit = javaFuture.cancel(true)
def get(): A = javaFuture.get()
def await(): Either[A, Throwable] =
def isReady: Boolean = javaFuture.isDone
def interrupt(): Unit = javaFuture.cancel(true)
def get(): A = javaFuture.get()
def get(timeout: Long, timeUnit: TimeUnit): A = javaFuture.get(timeout, timeUnit)
def await(): Either[A, Throwable] =
try Left(get())
catch case t: Throwable => Right(t)
def state: Future.State = javaFuture.state()
def isRunning: Boolean = state == Future.State.RUNNING
def isFailed: Boolean = state == Future.State.FAILED
def isSuccess: Boolean = state == Future.State.SUCCESS
def isCancelled: Boolean = state == Future.State.CANCELLED
def state: Future.State = javaFuture.state()
def isRunning: Boolean = state == Future.State.RUNNING
def isFailed: Boolean = state == Future.State.FAILED
def isSuccess: Boolean = state == Future.State.SUCCESS
def isCancelled: Boolean = state == Future.State.CANCELLED
5 changes: 3 additions & 2 deletions fibers/src/main/scala/functions/fibers/FiberExecutor.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package functions.fibers

import java.util.concurrent.{Callable, CountDownLatch, ExecutorService, Executors}
import java.util.concurrent.{Callable, CompletableFuture, CountDownLatch, ExecutorService, Executors}
import scala.util.Using.Releasable

// https://wiki.openjdk.org/display/loom/Getting+started
// https://openjdk.org/jeps/444
class FiberExecutor private (executorService: ExecutorService):
def submit[R](f: => R): Fiber[R] =
val c: Callable[R] = () => f
Fiber(executorService.submit(c))
val fiber = executorService.submit(c)
Fiber(fiber)

def two[A, B](fiber1: TwoFibers[A, B] => A, fiber2: TwoFibers[A, B] => B): TwoFibers[A, B] =
val l = new CountDownLatch(3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,21 @@ class ClientServerWsListener(protocol: InOutMessageProtocol, fiberExecutor: Fibe
dataMap -= correlationId

override def onMessage(session: WsSession, buffer: BufferData, last: Boolean): Unit =
try
protocol.listenerReceived(buffer) match
case Left(out) =>
// act as a server: do a call
session.send(out, true)
case Right(RfWsResponse(result, corId, data)) =>
// act as a client: respond to a call
latchMap.get(corId) match
case Some(latch) =>
dataMap.put(corId, (result, data))
latch.countDown()
case None =>
println(s"Correlation id missing: $corId , received data ignored.")
catch case t: Throwable => t.printStackTrace()
fiberExecutor.submit:
try
protocol.listenerReceived(buffer) match
case Left(out) =>
// act as a server: do a call
toSend.put(out)
case Right(RfWsResponse(result, corId, data)) =>
// act as a client: respond to a call
latchMap.get(corId) match
case Some(latch) =>
dataMap.put(corId, (result, data))
latch.countDown()
case None =>
println(s"Correlation id missing: $corId , received data ignored.")
catch case t: Throwable => t.printStackTrace()

override def onOpen(session: WsSession): Unit =
fiberExecutor.submit:
Expand Down

0 comments on commit 3aed65e

Please sign in to comment.