From 2ba0d1056fada89ae498433ca570c31411d0db65 Mon Sep 17 00:00:00 2001 From: Maxim Valyanskiy Date: Thu, 31 Mar 2022 22:06:45 +0300 Subject: [PATCH] =?UTF-8?q?=D0=BE=D1=82=D1=81=D0=BB=D0=B5=D0=B6=D0=B8?= =?UTF-8?q?=D0=B2=D0=B0=D0=BD=D0=B8=D0=B5=20=D0=BF=D0=BE=D0=BB=D1=8C=D0=B7?= =?UTF-8?q?=D0=BE=D0=B2=D0=B0=D1=82=D0=B5=D0=BB=D1=8F=20+=20=D1=83=D0=BB?= =?UTF-8?q?=D1=83=D1=87=D1=88=D0=B5=D0=BD=D0=B8=D0=B5=20websocket=20=D0=BF?= =?UTF-8?q?=D1=80=D0=BE=D1=82=D0=BE=D0=BA=D0=BE=D0=BB=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/linux/realtime/RealtimeEventHub.scala | 55 +++++++++++-------- src/main/webapp/js/realtime.js | 32 +++++------ 2 files changed, 48 insertions(+), 39 deletions(-) diff --git a/src/main/scala/ru/org/linux/realtime/RealtimeEventHub.scala b/src/main/scala/ru/org/linux/realtime/RealtimeEventHub.scala index 077a473a68..b3b0844ee6 100644 --- a/src/main/scala/ru/org/linux/realtime/RealtimeEventHub.scala +++ b/src/main/scala/ru/org/linux/realtime/RealtimeEventHub.scala @@ -44,7 +44,8 @@ import scala.util.control.NonFatal // TODO ignore list support // TODO fix face conditions on simultaneous posting comment, subscription and missing processing class RealtimeEventHub extends Actor with ActorLogging with Timers { - private val data: mutable.MultiDict[Int, ActorRef] = mutable.MultiDict[Int, ActorRef]() + private val topicSubscriptions: mutable.MultiDict[Int, ActorRef] = mutable.MultiDict[Int, ActorRef]() + private val userSubscriptions: mutable.MultiDict[Int, ActorRef] = mutable.MultiDict[Int, ActorRef]() private val sessions = new mutable.HashMap[String, ActorRef] private var maxDataSize: Int = 0 @@ -53,11 +54,16 @@ class RealtimeEventHub extends Actor with ActorLogging with Timers { override def supervisorStrategy = SupervisorStrategy.stoppingStrategy override def receive: Receive = { - case SessionStarted(session) if !sessions.contains(session.getId) => + case SessionStarted(session, user) if !sessions.contains(session.getId) => val actor = context.actorOf(RealtimeSessionActor.props(session)) context.watch(actor) + sessions += (session.getId -> actor) + user.foreach { user => + userSubscriptions += (user -> actor) + } + val dataSize = context.children.size if (dataSize > maxDataSize) { @@ -68,18 +74,22 @@ class RealtimeEventHub extends Actor with ActorLogging with Timers { case SubscribeTopic(session, topic) if sessions.contains(session.getId) => val actor = sessions(session.getId) - data += (topic -> actor) + topicSubscriptions += (topic -> actor) sender() ! Done case Terminated(actorRef) => log.debug(s"RealtimeSessionActor $actorRef terminated") - data.sets.find(_._2.contains(actorRef)).foreach { case (msgid, _) => - log.debug(s"Removed $actorRef") - data -= (msgid -> actorRef) + topicSubscriptions.sets.find(_._2.contains(actorRef)).foreach { case (msgid, _) => + topicSubscriptions -= (msgid -> actorRef) + } + + userSubscriptions.sets.find(_._2.contains(actorRef)).foreach { case (user, _) => + userSubscriptions -= (user -> actorRef) } sessions.find(_._2 == actorRef).foreach { f => + log.debug(s"Removed $actorRef") sessions.remove(f._1) } case SessionTerminated(id) => @@ -91,7 +101,7 @@ class RealtimeEventHub extends Actor with ActorLogging with Timers { case msg@NewComment(msgid, _) => log.debug(s"New comment in topic $msgid") - data.sets.getOrElse(msgid, Set.empty).foreach { + topicSubscriptions.sets.getOrElse(msgid, Set.empty).foreach { _ ! msg } case Tick => @@ -101,29 +111,28 @@ class RealtimeEventHub extends Actor with ActorLogging with Timers { } object RealtimeEventHub { - case class GetEmmiterForTopic(msgid: Int, missedComments: Vector[Int]) case class NewComment(msgid: Int, cid: Int) case object Tick - case class SessionStarted(session: WebSocketSession) + case class SessionStarted(session: WebSocketSession, user: Option[Int]) case class SubscribeTopic(session: WebSocketSession, topic: Int) case class SessionTerminated(session: String) def props: Props = Props(new RealtimeEventHub()) + + def notifyComment(session: WebSocketSession, comment: Int): Unit = { + session.sendMessage(new TextMessage(s"comment $comment")) + } } class RealtimeSessionActor(session: WebSocketSession) extends Actor with ActorLogging with Timers { private implicit val ec: ExecutionContext = context.dispatcher timers.startTimerWithFixedDelay(Tick, Tick, initialDelay = 5.seconds, delay = 1.minute) - private def notifyComment(comment: Int): Unit = { - session.sendMessage(new TextMessage(comment.toString)) - } - override def receive: Receive = { case NewComment(_, cid) => try { - notifyComment(cid) + notifyComment(session, cid) } catch handleExceptions case Tick => @@ -158,9 +167,14 @@ class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: ActorRef, override def afterConnectionEstablished(session: WebSocketSession): Unit = { try { - logger.debug(s"Connected!") + val currentUser = + Option(session.getPrincipal) + .collect { case token: RememberMeAuthenticationToken if token.isAuthenticated => token.getPrincipal } + .collect { case user: UserDetailsImpl => user.getUser } + + logger.debug(s"Connected! currentUser=${currentUser.map(_.getNick)}") - val result = hub ? SessionStarted(session) + val result = hub ? SessionStarted(session, currentUser.map(_.getId)) Await.result(result, 10.seconds) } catch { @@ -174,12 +188,7 @@ class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: ActorRef, try { val request = message.getPayload - val currentUser = - Option(session.getPrincipal) - .collect { case token: RememberMeAuthenticationToken if token.isAuthenticated => token.getPrincipal } - .collect { case user: UserDetailsImpl => user.getUser } - - logger.debug(s"Got request: $request currentUser=${currentUser.map(_.getNick)}") + logger.debug(s"Got request: $request") val (topicId, maybeComment) = request.split(" ", 2) match { case Array(t) => @@ -202,7 +211,7 @@ class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: ActorRef, missed.foreach { cid => logger.debug(s"Sending missed comment $cid") - session.sendMessage(new TextMessage(cid.toString)) + notifyComment(session, cid) } val result = hub ? SubscribeTopic(session, topic.getId) diff --git a/src/main/webapp/js/realtime.js b/src/main/webapp/js/realtime.js index 833ef15a5a..03155d454f 100644 --- a/src/main/webapp/js/realtime.js +++ b/src/main/webapp/js/realtime.js @@ -29,21 +29,23 @@ var RealtimeContext = { var supportsWebSockets = 'WebSocket' in window || 'MozWebSocket' in window; if (supportsWebSockets) { - var canceled = false; var ws = new WebSocket(wsUrl + "ws"); ws.onmessage = function (event) { - if (!$('#commentForm').find(".spinner").length) { - $("#realtime") - .text("Был добавлен новый комментарий. ") - .append($("").attr("href", RealtimeContext.link + "?cid=" + event.data + "&skipdeleted=true").text("Обновить.")) - .show(); + if (event.data.startsWith("comment ")) { + var comment = event.data.substring("comment ".length) - canceled = true; - ws.close() - } else { - // retry in 5 seconds - ws.close() + if (!$('#commentForm').find(".spinner").length) { + if ($("#realtime").is(":hidden")) { + $("#realtime") + .text("Был добавлен новый комментарий. ") + .append($("").attr("href", RealtimeContext.link + "?cid=" + comment + "&skipdeleted=true").text("Обновить.")) + .show(); + } + } else { + // retry in 5 seconds + ws.close() + } } }; @@ -58,11 +60,9 @@ var RealtimeContext = { } ws.onclose = function () { - if (!canceled) { - setTimeout(function () { - RealtimeContext.start(wsUrl) - }, 5000); - } + setTimeout(function () { + RealtimeContext.start(wsUrl) + }, 5000); }; } });