diff --git a/src/main/scala/ru/org/linux/realtime/RealtimeEventHub.scala b/src/main/scala/ru/org/linux/realtime/RealtimeEventHub.scala
index 077a473a68..b3b0844ee6 100644
--- a/src/main/scala/ru/org/linux/realtime/RealtimeEventHub.scala
+++ b/src/main/scala/ru/org/linux/realtime/RealtimeEventHub.scala
@@ -44,7 +44,8 @@ import scala.util.control.NonFatal
// TODO ignore list support
// TODO fix face conditions on simultaneous posting comment, subscription and missing processing
class RealtimeEventHub extends Actor with ActorLogging with Timers {
- private val data: mutable.MultiDict[Int, ActorRef] = mutable.MultiDict[Int, ActorRef]()
+ private val topicSubscriptions: mutable.MultiDict[Int, ActorRef] = mutable.MultiDict[Int, ActorRef]()
+ private val userSubscriptions: mutable.MultiDict[Int, ActorRef] = mutable.MultiDict[Int, ActorRef]()
private val sessions = new mutable.HashMap[String, ActorRef]
private var maxDataSize: Int = 0
@@ -53,11 +54,16 @@ class RealtimeEventHub extends Actor with ActorLogging with Timers {
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
override def receive: Receive = {
- case SessionStarted(session) if !sessions.contains(session.getId) =>
+ case SessionStarted(session, user) if !sessions.contains(session.getId) =>
val actor = context.actorOf(RealtimeSessionActor.props(session))
context.watch(actor)
+
sessions += (session.getId -> actor)
+ user.foreach { user =>
+ userSubscriptions += (user -> actor)
+ }
+
val dataSize = context.children.size
if (dataSize > maxDataSize) {
@@ -68,18 +74,22 @@ class RealtimeEventHub extends Actor with ActorLogging with Timers {
case SubscribeTopic(session, topic) if sessions.contains(session.getId) =>
val actor = sessions(session.getId)
- data += (topic -> actor)
+ topicSubscriptions += (topic -> actor)
sender() ! Done
case Terminated(actorRef) =>
log.debug(s"RealtimeSessionActor $actorRef terminated")
- data.sets.find(_._2.contains(actorRef)).foreach { case (msgid, _) =>
- log.debug(s"Removed $actorRef")
- data -= (msgid -> actorRef)
+ topicSubscriptions.sets.find(_._2.contains(actorRef)).foreach { case (msgid, _) =>
+ topicSubscriptions -= (msgid -> actorRef)
+ }
+
+ userSubscriptions.sets.find(_._2.contains(actorRef)).foreach { case (user, _) =>
+ userSubscriptions -= (user -> actorRef)
}
sessions.find(_._2 == actorRef).foreach { f =>
+ log.debug(s"Removed $actorRef")
sessions.remove(f._1)
}
case SessionTerminated(id) =>
@@ -91,7 +101,7 @@ class RealtimeEventHub extends Actor with ActorLogging with Timers {
case msg@NewComment(msgid, _) =>
log.debug(s"New comment in topic $msgid")
- data.sets.getOrElse(msgid, Set.empty).foreach {
+ topicSubscriptions.sets.getOrElse(msgid, Set.empty).foreach {
_ ! msg
}
case Tick =>
@@ -101,29 +111,28 @@ class RealtimeEventHub extends Actor with ActorLogging with Timers {
}
object RealtimeEventHub {
- case class GetEmmiterForTopic(msgid: Int, missedComments: Vector[Int])
case class NewComment(msgid: Int, cid: Int)
case object Tick
- case class SessionStarted(session: WebSocketSession)
+ case class SessionStarted(session: WebSocketSession, user: Option[Int])
case class SubscribeTopic(session: WebSocketSession, topic: Int)
case class SessionTerminated(session: String)
def props: Props = Props(new RealtimeEventHub())
+
+ def notifyComment(session: WebSocketSession, comment: Int): Unit = {
+ session.sendMessage(new TextMessage(s"comment $comment"))
+ }
}
class RealtimeSessionActor(session: WebSocketSession) extends Actor with ActorLogging with Timers {
private implicit val ec: ExecutionContext = context.dispatcher
timers.startTimerWithFixedDelay(Tick, Tick, initialDelay = 5.seconds, delay = 1.minute)
- private def notifyComment(comment: Int): Unit = {
- session.sendMessage(new TextMessage(comment.toString))
- }
-
override def receive: Receive = {
case NewComment(_, cid) =>
try {
- notifyComment(cid)
+ notifyComment(session, cid)
} catch handleExceptions
case Tick =>
@@ -158,9 +167,14 @@ class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: ActorRef,
override def afterConnectionEstablished(session: WebSocketSession): Unit = {
try {
- logger.debug(s"Connected!")
+ val currentUser =
+ Option(session.getPrincipal)
+ .collect { case token: RememberMeAuthenticationToken if token.isAuthenticated => token.getPrincipal }
+ .collect { case user: UserDetailsImpl => user.getUser }
+
+ logger.debug(s"Connected! currentUser=${currentUser.map(_.getNick)}")
- val result = hub ? SessionStarted(session)
+ val result = hub ? SessionStarted(session, currentUser.map(_.getId))
Await.result(result, 10.seconds)
} catch {
@@ -174,12 +188,7 @@ class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: ActorRef,
try {
val request = message.getPayload
- val currentUser =
- Option(session.getPrincipal)
- .collect { case token: RememberMeAuthenticationToken if token.isAuthenticated => token.getPrincipal }
- .collect { case user: UserDetailsImpl => user.getUser }
-
- logger.debug(s"Got request: $request currentUser=${currentUser.map(_.getNick)}")
+ logger.debug(s"Got request: $request")
val (topicId, maybeComment) = request.split(" ", 2) match {
case Array(t) =>
@@ -202,7 +211,7 @@ class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: ActorRef,
missed.foreach { cid =>
logger.debug(s"Sending missed comment $cid")
- session.sendMessage(new TextMessage(cid.toString))
+ notifyComment(session, cid)
}
val result = hub ? SubscribeTopic(session, topic.getId)
diff --git a/src/main/webapp/js/realtime.js b/src/main/webapp/js/realtime.js
index 833ef15a5a..03155d454f 100644
--- a/src/main/webapp/js/realtime.js
+++ b/src/main/webapp/js/realtime.js
@@ -29,21 +29,23 @@ var RealtimeContext = {
var supportsWebSockets = 'WebSocket' in window || 'MozWebSocket' in window;
if (supportsWebSockets) {
- var canceled = false;
var ws = new WebSocket(wsUrl + "ws");
ws.onmessage = function (event) {
- if (!$('#commentForm').find(".spinner").length) {
- $("#realtime")
- .text("Был добавлен новый комментарий. ")
- .append($("").attr("href", RealtimeContext.link + "?cid=" + event.data + "&skipdeleted=true").text("Обновить."))
- .show();
+ if (event.data.startsWith("comment ")) {
+ var comment = event.data.substring("comment ".length)
- canceled = true;
- ws.close()
- } else {
- // retry in 5 seconds
- ws.close()
+ if (!$('#commentForm').find(".spinner").length) {
+ if ($("#realtime").is(":hidden")) {
+ $("#realtime")
+ .text("Был добавлен новый комментарий. ")
+ .append($("").attr("href", RealtimeContext.link + "?cid=" + comment + "&skipdeleted=true").text("Обновить."))
+ .show();
+ }
+ } else {
+ // retry in 5 seconds
+ ws.close()
+ }
}
};
@@ -58,11 +60,9 @@ var RealtimeContext = {
}
ws.onclose = function () {
- if (!canceled) {
- setTimeout(function () {
- RealtimeContext.start(wsUrl)
- }, 5000);
- }
+ setTimeout(function () {
+ RealtimeContext.start(wsUrl)
+ }, 5000);
};
}
});