-
Notifications
You must be signed in to change notification settings - Fork 846
/
HttpEventStreamHandleActor.scala
90 lines (74 loc) · 2.75 KB
/
HttpEventStreamHandleActor.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package mesosphere.marathon.event.http
import java.io.EOFException
import akka.actor.{ Actor, ActorLogging, Status }
import akka.event.EventStream
import akka.pattern.pipe
import mesosphere.marathon.api.v2.json.Formats._
import mesosphere.marathon.event.http.HttpEventStreamHandleActor.WorkDone
import mesosphere.marathon.event.{ EventStreamAttached, EventStreamDetached, MarathonEvent }
import mesosphere.util.ThreadPoolContext
import play.api.libs.json.Json
import scala.concurrent.Future
import scala.util.Try
class HttpEventStreamHandleActor(
handle: HttpEventStreamHandle,
stream: EventStream,
maxOutStanding: Int) extends Actor with ActorLogging {
private[http] var outstanding = List.empty[MarathonEvent]
override def preStart(): Unit = {
stream.subscribe(self, classOf[MarathonEvent])
stream.publish(EventStreamAttached(handle.remoteAddress))
}
override def postStop(): Unit = {
log.info(s"Stop actor $handle")
stream.unsubscribe(self)
stream.publish(EventStreamDetached(handle.remoteAddress))
Try(handle.close()) //ignore, if this fails
}
override def receive: Receive = waitForEvent
def waitForEvent: Receive = {
case event: MarathonEvent =>
outstanding = event :: outstanding
sendAllMessages()
}
def stashEvents: Receive = handleWorkDone orElse {
case event: MarathonEvent if outstanding.size >= maxOutStanding => dropEvent(event)
case event: MarathonEvent => outstanding = event :: outstanding
}
def handleWorkDone: Receive = {
case WorkDone => sendAllMessages()
case Status.Failure(ex) =>
handleException(ex)
sendAllMessages()
}
private[this] def sendAllMessages(): Unit = {
if (outstanding.nonEmpty) {
implicit val ec = ThreadPoolContext.context
val toSend = outstanding.reverse
outstanding = List.empty[MarathonEvent]
context.become(stashEvents)
Future {
toSend.foreach(event => handle.sendEvent(event.eventType, Json.stringify(eventToJson(event))))
WorkDone
} pipeTo self
}
else {
context.become(waitForEvent)
}
}
private[this] def handleException(ex: Throwable): Unit = ex match {
case eof: EOFException =>
log.info(s"Received EOF from stream handle $handle. Ignore subsequent events.")
//We know the connection is dead, but it is not finalized from the container.
//Do not act any longer on any event.
context.become(Actor.emptyBehavior)
case _ =>
log.warning("Could not send message to {} reason: {}", handle, ex)
}
private[this] def dropEvent(event: MarathonEvent): Unit = {
log.warning("Ignore event {} for handle {} (slow consumer)", event, handle)
}
}
object HttpEventStreamHandleActor {
object WorkDone
}