Skip to content

Commit

Permalink
Merge pull request #8 from vaslabs/feature/websocketSingleConnection
Browse files Browse the repository at this point in the history
Feature/websocket single connection
  • Loading branch information
vaslabs committed Aug 23, 2020
2 parents e9716ca + 4799996 commit aeb8665
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 56 deletions.
35 changes: 26 additions & 9 deletions processor/src/main/scala/cardgame/processor/ActiveGames.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,22 @@ object ActiveGames {
)
Behaviors.same

case (ctx, DoGameAction(gameId, action, remoteClock, replyTo)) =>
gameProcessor(ctx, gameId).map(
_ ! GameProcessor.RunCommand(replyTo, action, remoteClock)
).getOrElse(replyTo ! Left(()))
case (ctx, DoGameAction(gameId, action, remoteClock)) =>
gameProcessor(ctx, gameId).foreach(
_ ! GameProcessor.FireAndForgetCommand(action, remoteClock)
)
Behaviors.same
case (_, Ignore) =>
Behaviors.same
case (ctx, StreamEventsFor(playerId, streamingActor)) =>
val eventStreamerName = s"event-reader-${playerId.value}"
val streamer = ctx.child(eventStreamerName).map(_.unsafeUpcast[PlayerEventsReader.Protocol]).getOrElse(
ctx.spawn(PlayerEventsReader.behavior(playerId, streamingActor), eventStreamerName)
)

streamer ! PlayerEventsReader.UpdateStreamer(streamingActor)


Behaviors.same
}

Expand All @@ -72,6 +84,9 @@ object ActiveGames {
def authToken: String
}


object Ignore extends Protocol

case class CreateGame(
replyTo: ActorRef[Either[Unit, GameId]],
authToken: String,
Expand Down Expand Up @@ -101,11 +116,12 @@ object ActiveGames {
case class DoGameAction(
id: GameId,
action: PlayingGameAction,
remoteClock: RemoteClock,
replyTo: ActorRef[Either[Unit, ClockedResponse]]
remoteClock: RemoteClock
) extends Protocol


case class StreamEventsFor(playerId: PlayerId, streamingActor: ActorRef[ClockedResponse]) extends Protocol

object api {

implicit final class ActiveGamesOps(actorRef: ActorRef[Protocol]) {
Expand All @@ -129,11 +145,12 @@ object ActiveGames {
timeout: Timeout, scheduler: Scheduler): Future[Either[Unit, Unit]] =
actorRef ? (LoadGame(token, gameId, deckId, server, _))

def action(gameId: GameId, action: PlayingGameAction, remoteClock: RemoteClock)
(implicit timeout: Timeout, scheduler: Scheduler): Future[ActionRes] = {
actorRef ? (DoGameAction(gameId, action, remoteClock, _))
def action(gameId: GameId, action: PlayingGameAction, remoteClock: RemoteClock): Unit = {
actorRef ! (DoGameAction(gameId, action, remoteClock))
}
}

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import cardgame.model._
import cardgame.engine.GameOps._
import cardgame.processor.PlayerEventsReader.UserResponse
import cats.Monoid
import cats.effect.IO
import cats.implicits.catsKernelStdMonoidForMap
Expand All @@ -27,7 +28,7 @@ object GameProcessor {

val (gameAffected, event) = game.action(c.action, randomizer, checkIdempotency)(remoteClock, c.remoteClock)
ctx.system.eventStream !
EventStream.Publish(ClockedResponse(event, newRemoteClock, updateLocalClock))
EventStream.Publish(UserResponse(ClockedResponse(event, newRemoteClock, updateLocalClock)))
c match {
case rc: ReplyCommand =>
rc.replyTo ! Right(ClockedResponse(event, newRemoteClock, updateLocalClock))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ import cardgame.model.{BorrowedCard, CardRecovered, ClockedResponse, GotCard, Hi

object PlayerEventsReader {

def behavior(playerId: PlayerId, replyTo: ActorRef[ClockedResponse]): Behavior[ClockedResponse] = Behaviors.setup {
def behavior(playerId: PlayerId, replyTo: ActorRef[ClockedResponse]): Behavior[Protocol] = Behaviors.setup {
ctx =>
ctx.system.eventStream ! EventStream.Subscribe(ctx.self)
readingEvents(playerId, replyTo)
}

private def readingEvents(id: PlayerId, replyTo: ActorRef[ClockedResponse]): Behavior[ClockedResponse] = Behaviors.receiveMessage {
case msg =>
private def readingEvents(id: PlayerId, replyTo: ActorRef[ClockedResponse]): Behavior[Protocol] = Behaviors.receiveMessage {
case UserResponse(msg) =>
replyTo ! personalise(msg, id)
Behaviors.same
case UpdateStreamer(streamer) =>
readingEvents(id, streamer)
}

private def personalise(clockedResponse: ClockedResponse, playerId: PlayerId): ClockedResponse = {
Expand All @@ -40,5 +42,9 @@ object PlayerEventsReader {

}

sealed trait Protocol
case class UserResponse(clockedResponse: ClockedResponse) extends Protocol
case class UpdateStreamer(streamer: ActorRef[ClockedResponse]) extends Protocol


}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import java.util.UUID
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.typed.eventstream.EventStream
import cardgame.model._
import cardgame.processor.GameProcessor.RunCommand
import cardgame.processor.GameProcessor.FireAndForgetCommand
import cardgame.processor.PlayerEventsReader.UserResponse
import cats.effect.IO
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.must.Matchers
Expand All @@ -28,46 +29,36 @@ class GameProcessorSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll
val playerB = PlayerId("b")
val playerAClock = Map(playerA -> 0L, playerB -> 0L)

val playerTestProbe = actorTestKit.createTestProbe[Either[Unit, ClockedResponse]]("Player")

val streamingPlayer = actorTestKit.createTestProbe[ClockedResponse]("StreamingPlayer")
val streamingPlayer = actorTestKit.createTestProbe[PlayerEventsReader.Protocol]("StreamingPlayer")

actorTestKit.system.eventStream ! EventStream.Subscribe(streamingPlayer.ref)

val gameProcessor = actorTestKit.spawn(GameProcessor.behavior(startedGame, randomizer, 0L, RemoteClock.zero))

"update the local clock and the remote clock on serving a player command" in {
gameProcessor ! RunCommand(playerTestProbe.ref, DrawCard(playerA), RemoteClock(tick(playerAClock, playerA)))
gameProcessor ! FireAndForgetCommand(DrawCard(playerA), RemoteClock(tick(playerAClock, playerA)))

val streamingClockedResponse = streamingPlayer.expectMessageType[ClockedResponse]
val streamingClockedResponse = streamingPlayer.expectMessageType[UserResponse].clockedResponse
streamingClockedResponse.clock mustBe Map(playerA.value -> 1, playerB.value -> 0)
streamingClockedResponse.serverClock mustBe 2
val clockedResponse = playerTestProbe.expectMessageType[Right[Unit, ClockedResponse]]
clockedResponse.value.serverClock mustBe 2
clockedResponse.value.clock mustBe Map(playerA.value -> 1, playerB.value -> 0)

val newPlayerAClock = tick(tick(playerAClock, playerA), playerA)

gameProcessor ! RunCommand(playerTestProbe.ref, EndTurn(playerA), RemoteClock(tick(newPlayerAClock, playerA)))
gameProcessor ! FireAndForgetCommand(EndTurn(playerA), RemoteClock(tick(newPlayerAClock, playerA)))

val endTurnStreamingClockedResponse = streamingPlayer.expectMessageType[ClockedResponse]
val endTurnStreamingClockedResponse = streamingPlayer.expectMessageType[UserResponse].clockedResponse
endTurnStreamingClockedResponse.clock mustBe Map(playerA.value -> 3, playerB.value -> 0)
endTurnStreamingClockedResponse.serverClock mustBe 4

val endTurnResponse = playerTestProbe.expectMessageType[Right[Unit, ClockedResponse]]

endTurnResponse.value.serverClock mustBe 4
endTurnResponse.value.clock mustBe Map(playerA.value -> 3, playerB.value -> 0)

}

"out of sync commands are not processed" in {
val gameStateProbe = actorTestKit.createTestProbe[Either[Unit, Game]]("GameState")
gameProcessor ! GameProcessor.Get(playerA, gameStateProbe.ref)
val gameState = gameStateProbe.expectMessageType[Right[Unit, Game]].value
val outdatedClock = Map(playerA.value -> 3L, playerB.value -> 0L)
gameProcessor ! RunCommand(playerTestProbe.ref, EndTurn(playerA), RemoteClock.of(outdatedClock))
playerTestProbe.expectMessageType[Right[Unit, ClockedResponse]].value.event mustBe OutOfSync(playerA)
gameProcessor ! FireAndForgetCommand(EndTurn(playerA), RemoteClock.of(outdatedClock))
streamingPlayer.expectMessageType[UserResponse].clockedResponse.event mustBe OutOfSync(playerA)
gameProcessor ! GameProcessor.Get(playerA, gameStateProbe.ref)
gameStateProbe.expectMessageType[Right[Unit, Game]].value mustBe gameState
}
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ object Dependencies {
object akka {
val cors = "0.4.2"

val main = "2.6.4"
val main = "2.6.8"
}

object cats {
Expand Down
4 changes: 3 additions & 1 deletion service/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,6 @@ akka-http-cors {
# This value is returned as part of the `Access-Control-Max-Age` preflight response header.
# If `null`, the header is not added to the preflight response.
max-age = 1800 seconds
}
}

akka.http.server.websocket.periodic-keep-alive-max-idle = 1 second
2 changes: 1 addition & 1 deletion service/src/main/scala/cardgame/Bootstrap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object Guardian {
implicit val materializer = Materializer(ctx)
implicit val system = ctx.system.toClassic

val startingGameRoute = new Routes(activeGames)(ctx.system.scheduler, ctx)
val startingGameRoute = new Routes(activeGames)(ctx.system.scheduler)
val bindingFuture = Http().bindAndHandle(
startingGameRoute.main, "0.0.0.0", 8080)

Expand Down
8 changes: 4 additions & 4 deletions service/src/main/scala/cardgame/events/package.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package cardgame

import akka.NotUsed
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.ActorRef
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source
import akka.stream.typed.scaladsl.ActorSource
import cardgame.json.circe._
import cardgame.model.{ClockedResponse, GameCompleted, PlayerId}
import cardgame.processor.PlayerEventsReader
import cardgame.processor.ActiveGames
import io.circe.syntax._

import scala.concurrent.duration._

package object events {

def eventSource(playerId: PlayerId)(implicit actorContext: ActorContext[_]): Source[ClockedResponse, NotUsed] = {
def eventSource(playerId: PlayerId, activeGame: ActorRef[ActiveGames.Protocol]): Source[ClockedResponse, NotUsed] = {
ActorSource.actorRef[ClockedResponse](
{
case _: GameCompleted =>
Expand All @@ -25,7 +25,7 @@ package object events {
OverflowStrategy.dropBuffer
).mapMaterializedValue {
streamingActor =>
actorContext.spawnAnonymous(PlayerEventsReader.behavior(playerId, streamingActor))
activeGame ! ActiveGames.StreamEventsFor(playerId, streamingActor)
NotUsed
}
}
Expand Down
58 changes: 40 additions & 18 deletions service/src/main/scala/cardgame/routes/routes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,69 @@ package cardgame.routes

import java.io.File

import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.{ActorRef, Scheduler}
import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
import akka.http.scaladsl.model.MediaTypes
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.Directives._
import akka.stream.scaladsl.{Flow, Sink}
import akka.stream.typed.scaladsl.ActorSink
import akka.util.Timeout
import cardgame.endpoints._
import cardgame.events._
import cardgame.model.{PlayerId, RemoteClock}
import cardgame.json.circe._
import cardgame.model.{ClockedAction, GameId, PlayerId, RemoteClock}
import cardgame.processor.ActiveGames
import cardgame.processor.ActiveGames.api._
import cardgame.processor.ActiveGames.{DoGameAction, Ignore}
import ch.megard.akka.http.cors.scaladsl.CorsDirectives._
import io.circe.parser._
import io.circe.syntax._
import sttp.tapir.server.akkahttp._

import scala.concurrent.duration._

class Routes(activeGames: ActorRef[ActiveGames.Protocol])(implicit scheduler: Scheduler, actorContext: ActorContext[_]) {
class Routes(activeGames: ActorRef[ActiveGames.Protocol])(implicit scheduler: Scheduler) {

implicit val timeout = Timeout(5 seconds)
private val PlayerIdMatcher = RemainingPath.map(_.toString).map(PlayerId)


val websocketRoute = get {
path("live" / "actions" / PathGameId / PlayerIdMatcher) {
(gameId, playerId) =>
handleWebSocketMessages(gameActionsFlow(gameId, playerId))
}
}

val startingGame = JoiningGame.joinPlayer.toRoute {
case (gameId, playerId) => activeGames.joinGame(gameId, playerId, RemoteClock.zero)
} ~ View.gameStatus.toRoute {
case (gameId, playerId) => activeGames.getGame(gameId, playerId)
} ~ Actions.player.toRoute {
case (gameId, action) =>
activeGames.action(gameId, action.action, RemoteClock.of(action.vectorClock))
} ~ path("events" / PlayerIdMatcher) {
(playerId) => get {
complete(toSse(eventSource(playerId)))
}
} ~ get {
path("img" / RemainingPath) {
imageFile =>
getFromFile(
new File(s"decks/${imageFile.toString()}"),
MediaTypes.`image/jpeg`
)
}
path("img" / RemainingPath) {
imageFile =>
getFromFile(
new File(s"decks/${imageFile.toString()}"),
MediaTypes.`image/jpeg`
)
}
} ~ websocketRoute


def gameActionsFlow(gameId: GameId, playerId: PlayerId): Flow[Message, Message, Any] = {
val source = eventSource(playerId, activeGames).map(_.asJson.noSpaces).map(TextMessage.apply)
val sink: Sink[Message, Any] = ActorSink.actorRef[ActiveGames.Protocol](activeGames, Ignore, _ => Ignore).contramap[ClockedAction](
ca => DoGameAction(gameId, ca.action, RemoteClock.of(ca.vectorClock))
).contramap[Message](extractClockedAction)

Flow.fromSinkAndSource(sink, source)
}

private def extractClockedAction(message: Message): ClockedAction =
parse(message.asTextMessage.getStrictText).flatMap(_.as[ClockedAction]).toOption.get


private final val PathGameId = JavaUUID.map(GameId)


val adminRoutes = admin.createGame.toRoute {
Expand Down

0 comments on commit aeb8665

Please sign in to comment.