Skip to content

Commit

Permalink
Work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
ncharles committed Jul 23, 2021
1 parent 8af3979 commit 4204073
Show file tree
Hide file tree
Showing 14 changed files with 426 additions and 26 deletions.
Expand Up @@ -51,13 +51,14 @@ import com.normation.rudder.domain.logger.TimingDebugLogger
import com.normation.utils.Control.sequence
import com.normation.rudder.domain.reports.{ExpectedReportsSerialisation, NodeAndConfigId, NodeConfigId, NodeExpectedReports}
import com.normation.rudder.repository.FindExpectedReportRepository
import com.normation.rudder.services.reports.NodeConfigurationService
import org.joda.time.DateTime
import zio.interop.catz._

final case class RoReportsExecutionRepositoryImpl (
db : Doobie
, writeBackend : WoReportsExecutionRepository
, findConfigs : FindExpectedReportRepository
, nodeConfigService: NodeConfigurationService
, pgInClause : PostgresqlInClause
, jdbcMaxBatchSize: Int
) extends RoReportsExecutionRepository with Loggable {
Expand Down Expand Up @@ -86,7 +87,7 @@ final case class RoReportsExecutionRepositoryImpl (
// by construct, we do have a nodeConfigId
agentsRuns = lastRunByNode.map(x => (x._1, NodeAndConfigId(x._1, x._2.nodeConfigVersion.get)))

expectedReports <- findConfigs.getExpectedReports(agentsRuns.values.toSet).toIO
expectedReports <- nodeConfigService.findNodeExpectedReports(agentsRuns.values.toSet).toIO

runs = agentsRuns.map { case (nodeId, nodeAndConfigId) =>
(nodeId,
Expand Down
Expand Up @@ -106,6 +106,13 @@ trait FindExpectedReportRepository {
*/
def findCurrentNodeIds(rule : RuleId) : Box[Set[NodeId]]

/**
* Return node ids associated to the rule (based on expectedreports (the one still pending)) for this Rule,
* only limited on the nodeIds in parameter (used when cache is incomplete)
*/
def findCurrentNodeIdsForRule(ruleId : RuleId, nodeIds: Set[NodeId]) : Box[Set[NodeId]]


/*
* Retrieve the expected reports by config version of the nodes.
*
Expand Down
Expand Up @@ -188,6 +188,21 @@ class FindExpectedReportsJdbcRepository(
""".query[NodeId].to[Set].transact(xa))
}

/**
* Return node ids associated to the rule (based on expectedreports (the one still pending)) for this Rule,
* only limited on the nodeIds in parameter (used when cache is incomplete)
*/
override def findCurrentNodeIdsForRule(ruleId : RuleId, nodeIds: Set[NodeId]) : Box[Set[NodeId]] = {
if (nodeIds.isEmpty) Full(Set.empty[NodeId])
else {
transactRunBox(xa => sql"""
select distinct nodeid from nodeconfigurations
where enddate is null and configuration like ${"%" + ruleId.value + "%"}
and nodeid in (${nodeIds.map(id => s"'${id}'").mkString(",")})
""".query[NodeId].to[Set].transact(xa))
}
}


/*
* Retrieve the list of node config ids
Expand Down
Expand Up @@ -146,6 +146,12 @@ trait NodeInfoService {
*/
def getAll() : Box[Map[NodeId, NodeInfo]]

/**
* Get all nodes id
*/

def getAllNodeIds(): Box[Set[NodeId]]

/**
* Get all nodes.
* That method try to return the maximum
Expand Down Expand Up @@ -863,6 +869,9 @@ trait NodeInfoServiceCached extends NodeInfoService with NamedZioLogger with Cac
def getAll(): Box[Map[NodeId, NodeInfo]] = withUpToDateCache("all nodes info") { cache =>
cache.view.mapValues(_._2).toMap.succeed
}.toBox
def getAllNodeIds(): Box[Set[NodeId]] = withUpToDateCache("all nodes id") { cache =>
cache.keySet.succeed
}.toBox
def getAllSystemNodeIds(): Box[Seq[NodeId]] = withUpToDateCache("all system nodes") { cache =>
cache.collect { case(k, (_,x)) if(x.isPolicyServer) => k }.toSeq.succeed
}.toBox
Expand Down
Expand Up @@ -895,11 +895,11 @@ class PolicyWriterServiceImpl(
case None => // create a symlink from root to policy-server.pem
IOResult.effect {
// we want to have a symlink with a relative path, not full path
val source = Path.of(rootPem.name)
// val source = Path.of(rootPem.name)
val dest = File(paths.newFolder, filepaths.POLICY_SERVER_CERT)
// we can't overwrite a file with a symlink, so erase existing one
if(dest.exists) { dest.delete() }
Files.createSymbolicLink(dest.path, source, File.Attributes.default:_*)
//if(dest.exists) { dest.delete() }
//Files.createSymbolicLink(dest.path, source, File.Attributes.default:_*)
dest
}
}
Expand Down
Expand Up @@ -226,7 +226,7 @@ object QSLdapBackend {

for {
connection <- ldap
nodeIds <- nodeInfos.getAll().map(_.keySet.map(_.value)).toIO
nodeIds <- nodeInfos.getAllNodeIds.toIO
entries <- connection.search(nodeDit.BASE_DN, Sub, filter, returnedAttributes:_*)
} yield {

Expand All @@ -238,7 +238,7 @@ object QSLdapBackend {
//and we get node always with a hostname
val (nodes, others) = entries.partition { x => x.isA(OC_NODE) || x.isA(OC_RUDDER_NODE) }
// merge node attribute for node entries with same node id
val merged = nodes.groupBy( _.value_!(A_NODE_UUID)).filter(e => nodeIds.contains(e._1)).map { case (_, samenodes) =>
val merged = nodes.groupBy( _.value_!(A_NODE_UUID)).filter(e => nodeIds.map(_.value).contains(e._1)).map { case (_, samenodes) =>
samenodes.reduce[LDAPEntry] { case (n1, n2) =>
n2.attributes.foreach( a => n1 mergeAttribute a)
n1
Expand Down

0 comments on commit 4204073

Please sign in to comment.