Skip to content

Commit

Permalink
Work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
ncharles committed Jul 22, 2021
1 parent 8af3979 commit 95f9e6c
Show file tree
Hide file tree
Showing 7 changed files with 345 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,14 @@ import com.normation.rudder.domain.logger.TimingDebugLogger
import com.normation.utils.Control.sequence
import com.normation.rudder.domain.reports.{ExpectedReportsSerialisation, NodeAndConfigId, NodeConfigId, NodeExpectedReports}
import com.normation.rudder.repository.FindExpectedReportRepository
import com.normation.rudder.services.reports.NodeConfigurationService
import org.joda.time.DateTime
import zio.interop.catz._

final case class RoReportsExecutionRepositoryImpl (
db : Doobie
, writeBackend : WoReportsExecutionRepository
, findConfigs : FindExpectedReportRepository
, nodeConfigService: NodeConfigurationService
, pgInClause : PostgresqlInClause
, jdbcMaxBatchSize: Int
) extends RoReportsExecutionRepository with Loggable {
Expand Down Expand Up @@ -86,7 +87,7 @@ final case class RoReportsExecutionRepositoryImpl (
// by construct, we do have a nodeConfigId
agentsRuns = lastRunByNode.map(x => (x._1, NodeAndConfigId(x._1, x._2.nodeConfigVersion.get)))

expectedReports <- findConfigs.getExpectedReports(agentsRuns.values.toSet).toIO
expectedReports <- nodeConfigService.findNodeExpectedReports(agentsRuns.values.toSet).toIO

runs = agentsRuns.map { case (nodeId, nodeAndConfigId) =>
(nodeId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ trait NodeInfoService {
*/
def getAll() : Box[Map[NodeId, NodeInfo]]

/**
* Get all nodes id
*/

def getAllNodeIds(): Box[Set[NodeId]]

/**
* Get all nodes.
* That method try to return the maximum
Expand Down Expand Up @@ -863,6 +869,9 @@ trait NodeInfoServiceCached extends NodeInfoService with NamedZioLogger with Cac
def getAll(): Box[Map[NodeId, NodeInfo]] = withUpToDateCache("all nodes info") { cache =>
cache.view.mapValues(_._2).toMap.succeed
}.toBox
def getAllNodeIds(): Box[Set[NodeId]] = withUpToDateCache("all nodes id") { cache =>
cache.keySet.succeed
}.toBox
def getAllSystemNodeIds(): Box[Seq[NodeId]] = withUpToDateCache("all system nodes") { cache =>
cache.collect { case(k, (_,x)) if(x.isPolicyServer) => k }.toSeq.succeed
}.toBox
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ object QSLdapBackend {

for {
connection <- ldap
nodeIds <- nodeInfos.getAll().map(_.keySet.map(_.value)).toIO
nodeIds <- nodeInfos.getAllNodeIds.toIO
entries <- connection.search(nodeDit.BASE_DN, Sub, filter, returnedAttributes:_*)
} yield {

Expand All @@ -238,7 +238,7 @@ object QSLdapBackend {
//and we get node always with a hostname
val (nodes, others) = entries.partition { x => x.isA(OC_NODE) || x.isA(OC_RUDDER_NODE) }
// merge node attribute for node entries with same node id
val merged = nodes.groupBy( _.value_!(A_NODE_UUID)).filter(e => nodeIds.contains(e._1)).map { case (_, samenodes) =>
val merged = nodes.groupBy( _.value_!(A_NODE_UUID)).filter(e => nodeIds.map(_.value).contains(e._1)).map { case (_, samenodes) =>
samenodes.reduce[LDAPEntry] { case (n1, n2) =>
n2.attributes.foreach( a => n1 mergeAttribute a)
n1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
/*
*************************************************************************************
* Copyright 2021 Normation SAS
*************************************************************************************
*
* This file is part of Rudder.
*
* Rudder is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* In accordance with the terms of section 7 (7. Additional Terms.) of
* the GNU General Public License version 3, the copyright holders add
* the following Additional permissions:
* Notwithstanding to the terms of section 5 (5. Conveying Modified Source
* Versions) and 6 (6. Conveying Non-Source Forms.) of the GNU General
* Public License version 3, when you create a Related Module, this
* Related Module is not considered as a part of the work and may be
* distributed under the license agreement of your choice.
* A "Related Module" means a set of sources files including their
* documentation that, without modification of the Source Code, enables
* supplementary functions or services in addition to those offered by
* the Software.
*
* Rudder is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Rudder. If not, see <http://www.gnu.org/licenses/>.
*
*************************************************************************************
*/

package com.normation.rudder.services.reports
import com.normation.box._
import com.normation.errors._
import com.normation.inventory.domain.NodeId
import com.normation.rudder.domain.logger.{ReportLogger, ReportLoggerPure}
import com.normation.rudder.domain.policies.RuleId
import com.normation.rudder.domain.reports.{NodeAndConfigId, NodeExpectedReports, NodeStatusReport}
import com.normation.rudder.repository.{CachedRepository, FindExpectedReportRepository}
import com.normation.rudder.services.nodes.NodeInfoService
import com.normation.utils.Control.sequence
import com.normation.zio._
import net.liftweb.common._
import org.joda.time._
import zio._
import zio.syntax._

/**
* That service retrieve node configurations (nodeexpectedreports) from the expectedreportsjdbcrepository, unless its already in cache
* cache is driven by reporting serviceimpl
* init add all nodes, withtout anything attached
* initial setting of nodeexpectedreport is less prioritary than update
* deletion removes the entry
* if an entry exists but without anything, then it will query the database
* if an entry exists but wthout the right nodeconfigid, it will query the database (but not update the cache)
*/
trait NodeConfigurationService {
/**
* retrieve expected reports by config version
*/
def findNodeExpectedReports(
nodeConfigIds: Set[NodeAndConfigId]
): Box[Map[NodeAndConfigId, Option[NodeExpectedReports]]]

/**
* get the current expected reports
* fails if request expected reports for a non existent node
*/
def getCurrentExpectedReports(nodeIds: Set[NodeId]): Box[Map[NodeId, Option[NodeExpectedReports]]]

/**
* get the nodes applying the rule
*
*/
def findNodesApplyingRule(ruleId: RuleId): Box[Set[NodeId]]
}

trait CachedNodeConfigurationService extends NodeConfigurationService with CachedRepository {

def confExpectedRepo: FindExpectedReportRepository
def nodeInfoService : NodeInfoService
def batchSize : Int

val semaphore = Semaphore.make(1).runNow

// Init to do
// what's the best method ? init directly from db, fetching all nodeconfigurations
// that are empty
// or initing it with all nodes, and nothing in it, and only then fetching by batch
// batching saves memory, but is slower
// i think it's safer to do the batching part, but i'd like to be sure of that
def init() : IOResult[Unit] = {
for {
// first, get all nodes
nodeIds <- nodeInfoService.getAllNodeIds().toIO
// batch update the cache
// it may be fairly slow - and the sempahore will lock the data
_ <- semaphore.withPermit {
// void the cache the cache
cache = Some(Map.empty[NodeId, Option[NodeExpectedReports]])
// batch fill it
ZIO.foreach(nodeIds.grouped(batchSize).to(Seq)) { nodesToUpdate =>
for {
nodeExpectedReports <- confExpectedRepo.getCurrentExpectedsReports(nodesToUpdate.toSet).toIO
_ <- IOResult.effectNonBlocking {
cache = cache.map(x=> x ++ nodeExpectedReports)
}
_ <- ReportLoggerPure.Cache.debug(s"NodeExpectedReport cache recomputed for nodes: ${nodesToUpdate.map(_.value).mkString(", ")}")
} yield ()

}
}
} yield ()

}

/**
* The cache is managed node by node.
* A missing nodeId mean that the cache wasn't initialized for
* that node, and should fail
*
* This cache is populated by ReportingServiceImpl:
* * init by adding all existing nodes (and NodeExpectedReports if available)
* * update after a policy generation to change the value for a nodeid
* * add a new node when accepting a node
* * deleting a node
*
* Note that a clear cache will None the cache
*
* A query to fetch nodeexpectedreports that is not in the cache will return None
* (if node exists), or fail if node does not exists
*/
private[this] var cache = Option.empty[Map[NodeId, Option[NodeExpectedReports]]]

/**
* The queue of invalidation request.
* The queue size is 1 and new request need to merge with existing request
* It's a List and not a Set, because we want to keep the precedence in
* invalidation request.
* // unsure if its a CacheComplianceQueueAction or another queueaction
*/
private[this] val invalidateNodeConfigurationRequest = Queue.dropping[List[(NodeId, CacheComplianceQueueAction)]](1).runNow

/**
* We need a semaphore to protect queue content merge-update
*/
private[this] val invalidateMergeUpdateSemaphore = Semaphore.make(1).runNow


/**
* Update logic. We take message from queue one at a time, and process.
* we need to keep order
*/
val updateCacheFromRequest: IO[Nothing, Unit] = invalidateNodeConfigurationRequest.take.flatMap(invalidatedIds =>
ZIO.foreach_(invalidatedIds.map(_._2) : List[CacheComplianceQueueAction])(action =>
{
(for {
_ <- performAction(action)
} yield ()).catchAll(err => ReportLoggerPure.Cache.error(s"Error when updating NodeConfiguration cache for node: [${action.nodeId.value}]: ${err.fullMsg}"))
}
)
)
// start updating
updateCacheFromRequest.forever.forkDaemon.runNow
/**
* Clear cache. Try a reload asynchronously, disregarding
* the result
*/
override def clearCache(): Unit = {
init()
ReportLogger.Cache.debug("Node expected reports cache cleared and reinited")
}


/**
* Do something with the action we received
*/
private[this] def performAction(action: CacheComplianceQueueAction): IOResult[Unit] = {
import CacheComplianceQueueAction._
// in a semaphore
semaphore.withPermit(
action match {
case insert: InsertNodeInCache => IOResult.effectNonBlocking { cache = Some(cache.get + (insert.nodeId -> None)) }
case delete: RemoveNodeInCache => IOResult.effectNonBlocking { cache = Some(cache.get.removed(delete.nodeId)) }
case update: UpdateNodeConfiguration => IOResult.effectNonBlocking { cache = Some(cache.get + (update.nodeId -> Some(update.nodeConfiguration))) }
case something =>
Inconsistency(s"NodeConfiguration service cache received unknown command : ${something}").fail
// should not happen
}
)
}

/**
* invalidate with an action to do something
* order is important
*/
def invalidateWithAction(actions: Seq[(NodeId, CacheComplianceQueueAction)]): IOResult[Unit] = {
ZIO.when(actions.nonEmpty) {
ReportLoggerPure.Cache.debug(s"Node Configuration cache: invalidation request for nodes with action: [${actions.map(_._1).map { _.value }.mkString(",")}]") *>
invalidateMergeUpdateSemaphore.withPermit(for {
elements <- invalidateNodeConfigurationRequest.takeAll
allActions = (elements.flatten ++ actions)
_ <- invalidateNodeConfigurationRequest.offer(allActions)
} yield ())
}
}
// ? question ?
// how to properly ensure that cache is synchro ?
// we have the begin date of the nodeexpectedreport that my offer a way to ensure that we don't replace
// a value with something older

/**
* get the current expected reports
* fails if request expected reports for a non existent node
*/
def getCurrentExpectedReports(nodeIds: Set[NodeId]): Box[Map[NodeId, Option[NodeExpectedReports]]] = {
// check if cache is there
if (cache.isEmpty) {
init()
}

// TODO: this is clearly not thread safe
Full(cache.get.filter { case(id, _) => nodeIds.contains(id) } )
}

/**
* get the nodes applying the rule
*
*/
def findNodesApplyingRule(ruleId: RuleId): Box[Set[NodeId]] = {
// check if cache is there
if (cache.isEmpty) {
init()
}

// TODO: this is clearly not thread safe
//look in cache for all NodeExpectedReports having the ruleId
Full(cache.get.filter { case (id, option) => option match {
case None => false
case Some(nodeExpectedReport) =>
nodeExpectedReport.ruleExpectedReports.map(_.ruleId).contains(ruleId)
}
}.keySet)
}


/**
* retrieve expected reports by config version
*/
def findNodeExpectedReports(
nodeConfigIds: Set[NodeAndConfigId]
): Box[Map[NodeAndConfigId, Option[NodeExpectedReports]]] = {
if (cache.isEmpty) {
init()
}

// get them in cache
val inCache = cache.get.map { case (id, expected) => expected match {
case None => None
case Some(nodeExpectedReport) =>
val nodeAndConfigId = NodeAndConfigId(id, nodeExpectedReport.nodeConfigId)
if (nodeConfigIds.contains(nodeAndConfigId)) {
Some(nodeAndConfigId, expected)
} else {
None
}
}}.flatten.toMap

// search for all others in repo
val missingNodeConfigIds = nodeConfigIds -- inCache.keySet
for {
fromDb <- confExpectedRepo.getExpectedReports(missingNodeConfigIds)
} yield {
fromDb ++ inCache
}
}
}



/**
* simple implementation
* simply call the repo, as a passthrough
*/
class NodeConfigurationServiceImpl(
confExpectedRepo: FindExpectedReportRepository
) extends NodeConfigurationService {
/**
* retrieve expected reports by config version
*/
def findNodeExpectedReports(
nodeConfigIds: Set[NodeAndConfigId]
): Box[Map[NodeAndConfigId, Option[NodeExpectedReports]]] = {
confExpectedRepo.getExpectedReports(nodeConfigIds)
}

/**
* get the current expected reports
* fails if request expected reports for a non existent node
*/
def getCurrentExpectedReports(nodeIds: Set[NodeId]): Box[Map[NodeId, Option[NodeExpectedReports]]] = {
confExpectedRepo.getCurrentExpectedsReports(nodeIds)
}

/**
* get the nodes applying the rule
*
*/
def findNodesApplyingRule(ruleId: RuleId): Box[Set[NodeId]] = {
confExpectedRepo.findCurrentNodeIds(ruleId)
}
}

Loading

0 comments on commit 95f9e6c

Please sign in to comment.