Skip to content

Commit

Permalink
отслеживание пользователя + улучшение websocket протокола
Browse files Browse the repository at this point in the history
  • Loading branch information
maxcom committed Mar 31, 2022
1 parent 752db6a commit 2ba0d10
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 39 deletions.
55 changes: 32 additions & 23 deletions src/main/scala/ru/org/linux/realtime/RealtimeEventHub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ import scala.util.control.NonFatal
// TODO ignore list support
// TODO fix face conditions on simultaneous posting comment, subscription and missing processing
class RealtimeEventHub extends Actor with ActorLogging with Timers {
private val data: mutable.MultiDict[Int, ActorRef] = mutable.MultiDict[Int, ActorRef]()
private val topicSubscriptions: mutable.MultiDict[Int, ActorRef] = mutable.MultiDict[Int, ActorRef]()
private val userSubscriptions: mutable.MultiDict[Int, ActorRef] = mutable.MultiDict[Int, ActorRef]()
private val sessions = new mutable.HashMap[String, ActorRef]
private var maxDataSize: Int = 0

Expand All @@ -53,11 +54,16 @@ class RealtimeEventHub extends Actor with ActorLogging with Timers {
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy

override def receive: Receive = {
case SessionStarted(session) if !sessions.contains(session.getId) =>
case SessionStarted(session, user) if !sessions.contains(session.getId) =>
val actor = context.actorOf(RealtimeSessionActor.props(session))
context.watch(actor)

sessions += (session.getId -> actor)

user.foreach { user =>
userSubscriptions += (user -> actor)
}

val dataSize = context.children.size

if (dataSize > maxDataSize) {
Expand All @@ -68,18 +74,22 @@ class RealtimeEventHub extends Actor with ActorLogging with Timers {
case SubscribeTopic(session, topic) if sessions.contains(session.getId) =>
val actor = sessions(session.getId)

data += (topic -> actor)
topicSubscriptions += (topic -> actor)

sender() ! Done
case Terminated(actorRef) =>
log.debug(s"RealtimeSessionActor $actorRef terminated")

data.sets.find(_._2.contains(actorRef)).foreach { case (msgid, _) =>
log.debug(s"Removed $actorRef")
data -= (msgid -> actorRef)
topicSubscriptions.sets.find(_._2.contains(actorRef)).foreach { case (msgid, _) =>
topicSubscriptions -= (msgid -> actorRef)
}

userSubscriptions.sets.find(_._2.contains(actorRef)).foreach { case (user, _) =>
userSubscriptions -= (user -> actorRef)
}

sessions.find(_._2 == actorRef).foreach { f =>
log.debug(s"Removed $actorRef")
sessions.remove(f._1)
}
case SessionTerminated(id) =>
Expand All @@ -91,7 +101,7 @@ class RealtimeEventHub extends Actor with ActorLogging with Timers {
case msg@NewComment(msgid, _) =>
log.debug(s"New comment in topic $msgid")

data.sets.getOrElse(msgid, Set.empty).foreach {
topicSubscriptions.sets.getOrElse(msgid, Set.empty).foreach {
_ ! msg
}
case Tick =>
Expand All @@ -101,29 +111,28 @@ class RealtimeEventHub extends Actor with ActorLogging with Timers {
}

object RealtimeEventHub {
case class GetEmmiterForTopic(msgid: Int, missedComments: Vector[Int])
case class NewComment(msgid: Int, cid: Int)
case object Tick

case class SessionStarted(session: WebSocketSession)
case class SessionStarted(session: WebSocketSession, user: Option[Int])
case class SubscribeTopic(session: WebSocketSession, topic: Int)
case class SessionTerminated(session: String)

def props: Props = Props(new RealtimeEventHub())

def notifyComment(session: WebSocketSession, comment: Int): Unit = {
session.sendMessage(new TextMessage(s"comment $comment"))
}
}

class RealtimeSessionActor(session: WebSocketSession) extends Actor with ActorLogging with Timers {
private implicit val ec: ExecutionContext = context.dispatcher
timers.startTimerWithFixedDelay(Tick, Tick, initialDelay = 5.seconds, delay = 1.minute)

private def notifyComment(comment: Int): Unit = {
session.sendMessage(new TextMessage(comment.toString))
}

override def receive: Receive = {
case NewComment(_, cid) =>
try {
notifyComment(cid)
notifyComment(session, cid)
} catch handleExceptions

case Tick =>
Expand Down Expand Up @@ -158,9 +167,14 @@ class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: ActorRef,

override def afterConnectionEstablished(session: WebSocketSession): Unit = {
try {
logger.debug(s"Connected!")
val currentUser =
Option(session.getPrincipal)
.collect { case token: RememberMeAuthenticationToken if token.isAuthenticated => token.getPrincipal }
.collect { case user: UserDetailsImpl => user.getUser }

logger.debug(s"Connected! currentUser=${currentUser.map(_.getNick)}")

val result = hub ? SessionStarted(session)
val result = hub ? SessionStarted(session, currentUser.map(_.getId))

Await.result(result, 10.seconds)
} catch {
Expand All @@ -174,12 +188,7 @@ class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: ActorRef,
try {
val request = message.getPayload

val currentUser =
Option(session.getPrincipal)
.collect { case token: RememberMeAuthenticationToken if token.isAuthenticated => token.getPrincipal }
.collect { case user: UserDetailsImpl => user.getUser }

logger.debug(s"Got request: $request currentUser=${currentUser.map(_.getNick)}")
logger.debug(s"Got request: $request")

val (topicId, maybeComment) = request.split(" ", 2) match {
case Array(t) =>
Expand All @@ -202,7 +211,7 @@ class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: ActorRef,

missed.foreach { cid =>
logger.debug(s"Sending missed comment $cid")
session.sendMessage(new TextMessage(cid.toString))
notifyComment(session, cid)
}

val result = hub ? SubscribeTopic(session, topic.getId)
Expand Down
32 changes: 16 additions & 16 deletions src/main/webapp/js/realtime.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,23 @@ var RealtimeContext = {
var supportsWebSockets = 'WebSocket' in window || 'MozWebSocket' in window;

if (supportsWebSockets) {
var canceled = false;
var ws = new WebSocket(wsUrl + "ws");

ws.onmessage = function (event) {
if (!$('#commentForm').find(".spinner").length) {
$("#realtime")
.text("Был добавлен новый комментарий. ")
.append($("<a>").attr("href", RealtimeContext.link + "?cid=" + event.data + "&skipdeleted=true").text("Обновить."))
.show();
if (event.data.startsWith("comment ")) {
var comment = event.data.substring("comment ".length)

canceled = true;
ws.close()
} else {
// retry in 5 seconds
ws.close()
if (!$('#commentForm').find(".spinner").length) {
if ($("#realtime").is(":hidden")) {
$("#realtime")
.text("Был добавлен новый комментарий. ")
.append($("<a>").attr("href", RealtimeContext.link + "?cid=" + comment + "&skipdeleted=true").text("Обновить."))
.show();
}
} else {
// retry in 5 seconds
ws.close()
}
}
};

Expand All @@ -58,11 +60,9 @@ var RealtimeContext = {
}

ws.onclose = function () {
if (!canceled) {
setTimeout(function () {
RealtimeContext.start(wsUrl)
}, 5000);
}
setTimeout(function () {
RealtimeContext.start(wsUrl)
}, 5000);
};
}
});
Expand Down

0 comments on commit 2ba0d10

Please sign in to comment.