Permalink
Browse files

Propagate applied udpate action ids back to mgmt reporter

  • Loading branch information...
lefou committed Nov 27, 2018
1 parent 66d9f41 commit 3f66438d7f563a19294b7702316184a2b067673a
@@ -1,5 +1,6 @@
package blended.mgmt.agent.internal
import scala.collection.immutable.Queue
import scala.concurrent.duration.DurationLong
import scala.util.Try
@@ -57,6 +58,7 @@ trait MgmtReporter extends Actor with PrickleSupport {
private[this] var _ticker: Option[Cancellable] = None
private[this] var _serviceInfos: Map[String, ServiceInfo] = Map()
private[this] var _lastProfileInfo: ProfileInfo = ProfileInfo(0L, Nil)
private[this] var _appliedUpdateActionIds: List[String] = List()
////////////////////
private[this] lazy val log = Logger[MgmtReporter]
@@ -68,6 +70,9 @@ trait MgmtReporter extends Actor with PrickleSupport {
protected def profileInfo: ProfileInfo = _lastProfileInfo
// protected def appliedUpdateActionIds = _appliedUpdateActionIds
// protected def clearAppliedUpdateActions(ids: List[String]) = _appliedUpdateActionIds = _appliedUpdateActionIds.filter(ids.contains)
override def preStart(): Unit = {
super.preStart()
@@ -82,6 +87,7 @@ trait MgmtReporter extends Actor with PrickleSupport {
context.system.eventStream.subscribe(context.self, classOf[ServiceInfo])
context.system.eventStream.subscribe(context.self, classOf[ProfileInfo])
context.system.eventStream.subscribe(context.self, classOf[UpdateActionApplied])
}
override def postStop(): Unit = {
@@ -98,7 +104,10 @@ trait MgmtReporter extends Actor with PrickleSupport {
case Tick =>
config.foreach { config =>
val info = createContainerInfo
// we submit the applied update actions to the mgmt server
val appliedUpdateActions = _appliedUpdateActionIds
val info = createContainerInfo.copy(appliedUpdateActionIds = appliedUpdateActions)
log.debug(s"Performing report [${info}].")
val entity = Marshal(info).to[MessageEntity]
@@ -114,15 +123,19 @@ trait MgmtReporter extends Actor with PrickleSupport {
// TODO think about ssl
val responseFuture = request.flatMap { request =>
Http(context.system).singleRequest(request)
}
}.map(r => r -> appliedUpdateActions)
import akka.pattern.pipe
responseFuture.pipeTo(self)
}
case response @ HttpResponse(status, headers, entity, protocol) =>
case (response @ HttpResponse(status, headers, entity, protocol), appliedUpdateActionIds: List[String]) =>
status match {
case StatusCodes.OK =>
// As the server accepted also the list of applied update action IDs
// we remove those from the list
_appliedUpdateActionIds = _appliedUpdateActionIds.filterNot(appliedUpdateActionIds.contains)
import akka.pattern.pipe
// OK; unmarshal and process
@@ -156,6 +169,10 @@ trait MgmtReporter extends Actor with PrickleSupport {
} else {
log.debug(s"Ingnoring profile info with timestamp [${timestamp.underlying()}] which is older than [${_lastProfileInfo.timeStamp.underlying()}]: ${pi}")
}
case UpdateActionApplied(id, _) =>
_appliedUpdateActionIds ::= id
}
}
@@ -48,3 +48,8 @@ final case class ActivateProfile(
profileVersion: String,
overlays: Set[OverlayRef]
) extends UpdateAction
/**
* Message published to the event stream when an update action with the same `id` was applied.
*/
case class UpdateActionApplied(id: String, error: Option[String] = None)
@@ -9,7 +9,7 @@ import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}
import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, Props}
import akka.event.LoggingReceive
import akka.event.{EventStream, LoggingReceive}
import akka.pattern.ask
import akka.routing.BalancingPool
import akka.util.Timeout
@@ -128,6 +128,12 @@ class Updater(
*/
case object PublishProfileInfo
/**
* Convenience accessor to event stream, also to better see where the event stream is used.
* @return
*/
private[this] def eventStream: EventStream = context.system.eventStream
override def preStart(): Unit = {
log.info("Initiating initial scanning for profiles")
self ! Scan
@@ -159,14 +165,14 @@ class Updater(
log.info("Publishing of service infos and profile infos is disabled")
}
context.system.eventStream.subscribe(context.self, classOf[UpdateAction])
eventStream.subscribe(context.self, classOf[UpdateAction])
super.preStart()
}
override def postStop(): Unit = {
context.system.eventStream.unsubscribe(context.self)
eventStream.unsubscribe(context.self)
tickers.foreach { t =>
log.info(s"Disabling ticker: ${t}")
@@ -176,6 +182,15 @@ class Updater(
super.postStop()
}
private def publishResultEvent[T](id: String, reply: Try[Any]) = {
reply match {
case Success(OperationSucceeded(_)) => eventStream.publish(UpdateActionApplied(id))
case Success(OperationFailed(_, reason)) => eventStream.publish(UpdateActionApplied(id, Option(reason)))
case Failure(e) => eventStream.publish(UpdateActionApplied(id, Option(e.getMessage())))
case x => log.warn(s"Skip publish event of unsupported reply: [${x}]")
}
}
def handleUpdateAction(event: UpdateAction): Unit = event match {
case UAAddRuntimeConfig(id, runtimeConfig) =>
log.debug(s"Received add runtime config request (via event stream) for ${runtimeConfig.name}-${runtimeConfig.version} with ID [${id}]")
@@ -185,6 +200,7 @@ class Updater(
self.ask(AddRuntimeConfig(nextId(), runtimeConfig))(timeout).onComplete { x =>
log.debug(s"Finished add runtime config request (via event stream) for ${runtimeConfig.name}-${runtimeConfig.version} with ID [${id}] with result: ${x}")
publishResultEvent(id, x)
}
case UAAddOverlayConfig(id, overlayConfig) =>
@@ -195,6 +211,7 @@ class Updater(
self.ask(AddOverlayConfig(nextId(), overlayConfig))(timeout).onComplete { x =>
log.debug(s"Finished add overlay config request (via event stream) for ${overlayConfig.name}-${overlayConfig.version} with ID [${id}] with result: ${x}")
publishResultEvent(id, x)
}
case UAStageProfile(id, name, version, overlayRefs) =>
@@ -205,12 +222,15 @@ class Updater(
val request = StageProfile(nextId(), name, version, overlayRefs)
self.ask(request)(timeout).onComplete {
case Success(OperationFailed(_, reason)) =>
case x @ Success(OperationFailed(_, reason)) =>
log.error(s"Could not stage profile: ${reason}")
case Failure(e) =>
publishResultEvent(id, x)
case x @ Failure(e) =>
log.error(e)(s"Could not complete stage profile [${request}]")
publishResultEvent(id, x)
case x =>
log.debug(s"Finished stage profile request (via event stream) for ${name}-${version} and overlays ${overlayRefs} with ID [${id}] with result: ${x}")
publishResultEvent(id, x)
}
case UAActivateProfile(id, name, version, overlayRefs) =>
@@ -221,6 +241,7 @@ class Updater(
self.ask(ActivateProfile(nextId(), name, version, overlayRefs))(timeout).onComplete { x =>
log.debug(s"Finished activation profile request (via event stream) for ${name}-${version} and overlays ${overlayRefs} with ID [${id}] with result: ${x}")
publishResultEvent(id, x)
}
}
@@ -452,7 +473,7 @@ class Updater(
}
val toSend = singleProfiles
log.debug(s"Publishing profile info to event stream: ${toSend}")
context.system.eventStream.publish(ProfileInfo(System.currentTimeMillis(), toSend))
eventStream.publish(ProfileInfo(System.currentTimeMillis(), toSend))
case PublishServiceInfo =>
log.debug("Handling PublishServiceInfo message")
@@ -469,7 +490,7 @@ class Updater(
)
)
log.debug(s"About to publish service info: ${serviceInfo}")
context.system.eventStream.publish(serviceInfo)
eventStream.publish(serviceInfo)
case msg: ArtifactDownloader.Reply => handleArtifactDownloaderReply(msg)

0 comments on commit 3f66438

Please sign in to comment.