Skip to content

Commit

Permalink
Work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
ncharles committed May 22, 2019
1 parent da11982 commit f1f3a0e
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 7 deletions.
Expand Up @@ -37,7 +37,7 @@

package com.normation.inventory.services.core

import com.normation.inventory.domain.{NodeId, SoftwareUuid, Software, InventoryStatus}
import com.normation.inventory.domain.{InventoryStatus, NodeId, Software, SoftwareUuid}
import net.liftweb.common.Box

trait ReadOnlySoftwareDAO {
Expand All @@ -48,4 +48,22 @@ trait ReadOnlySoftwareDAO {
* as possible
*/
def getSoftwareByNode(nodeIds: Set[NodeId], status: InventoryStatus): Box[Map[NodeId, Seq[Software]]]

/**
* Returns all software ids in ou=Software,ou=Inventories
*/
def getAllSoftwareIds() : Box[Set[SoftwareUuid]]

/**
* Returns all software ids pointed by at least a node (in any of the 3 DIT)
*/
def getSoftwaresForAllNodes() : Box[Set[SoftwareUuid]]
}

trait WriteOnlySoftwareDAO {

/**
* Delete softwares in ou=Software,ou=Inventories
*/
def deleteSoftwares(softwares: Seq[SoftwareUuid]): Box[Seq[String]]
}
Expand Up @@ -38,19 +38,21 @@
package com.normation.inventory.ldap.core

import com.normation.ldap.sdk._
import BuildFilter.{EQ,OR}
import BuildFilter.{EQ, IS, OR}
import com.normation.inventory.domain._
import com.normation.inventory.services.core.ReadOnlySoftwareDAO
import com.normation.inventory.services.core._
import LDAPConstants._
import net.liftweb.common._
import Box._
import com.normation.utils.Control.sequence
import com.normation.utils.Control.{bestEffort, sequence}
import com.unboundid.ldap.sdk.DN


class ReadOnlySoftwareDAOImpl(
inventoryDitService:InventoryDitService,
ldap:LDAPConnectionProvider[RoLDAPConnection],
mapper:InventoryMapper
) extends ReadOnlySoftwareDAO {
) extends ReadOnlySoftwareDAO with Loggable {

private[this] def search(con: RoLDAPConnection, ids: Seq[SoftwareUuid]) = {
sequence(con.searchOne(inventoryDitService.getSoftwareBaseDN, OR(ids map {x:SoftwareUuid => EQ(A_SOFTWARE_UUID,x.value) }:_*))) { entry =>
Expand Down Expand Up @@ -90,4 +92,68 @@ class ReadOnlySoftwareDAOImpl(
softwareByNode.mapValues { ids => software.filter(s => ids.contains(s.id)) }
}
}

def getAllSoftwareIds() : Box[Set[SoftwareUuid]] = {
for {
con <- ldap
softwareEntry = con.searchOne(inventoryDitService.getSoftwareBaseDN, IS(OC_SOFTWARE), A_SOFTWARE_UUID)
ids <- sequence(softwareEntry) { entry =>
entry(A_SOFTWARE_UUID) match {
case Some(value) => Full(value)
case _ => Failure(s"Missing attribute ${A_SOFTWARE_UUID} for entry ${entry.dn} ${entry.toString()}")
}
}
softIds = ids.map(id => SoftwareUuid(id)).toSet
} yield {
softIds
}
}

def getSoftwaresForAllNodes() : Box[Set[SoftwareUuid]] = {
// fetch all softwares, for all nodes, in all 3 dits
val acceptedDit = inventoryDitService.getDit(AcceptedInventory)
val pendingDit = inventoryDitService.getDit(PendingInventory)
val removedDit = inventoryDitService.getDit(RemovedInventory)
val dits = Set[InventoryDit](acceptedDit, pendingDit, removedDit)

val t1 = System.currentTimeMillis
for {
con <- ldap
// TODO: This needs pagination, with 1000 nodes, it uses about 1,5 GB
softwareEntry = dits.flatMap { dit => con.searchOne(dit.NODES.dn, IS(OC_NODE), A_SOFTWARE_DN) } // it's really a dn that is stored in software attribute

This comment has been minimized.

Copy link
@ncharles

ncharles May 22, 2019

Author Owner

this returns a seq of LDAPEntries, each containing a Seq of Softwares DN
Is there a way to stream the result, and consume into a set? If not, can we paginate such a search?

t2 = System.currentTimeMillis()
_ = logger.debug(s"All Software DNs from all nodes ${softwareEntry.size} fetched in ${t2-t1}ms")

// This could be more efficient, as it takes 3 secodes to process
ids = softwareEntry.flatMap( entry => entry.valuesFor(A_SOFTWARE_DN).toSet )
t3 = System.currentTimeMillis()
_ = logger.debug(s"All software DNs deduplicated, resulting in ${ids.size} software entries, in ${t3-t2}ms")

softIds <- sequence(ids.toSeq) { id => acceptedDit.SOFTWARE.SOFT.idFromDN(new DN(id)) }
t4 = System.currentTimeMillis()
_ = logger.trace(s"All software DNs to software ids in ${t4-t3}ms")
} yield {
softIds.toSet
}
}

}

class WriteOnlySoftwareDAOImpl(
inventoryDit :InventoryDit
, ldap :LDAPConnectionProvider[RwLDAPConnection]
) extends WriteOnlySoftwareDAO {


def deleteSoftwares(softwareIds: Seq[SoftwareUuid]): Box[Seq[String]] = {
for {
con <- ldap
dns = softwareIds.map(inventoryDit.SOFTWARE.SOFT.dn(_))
res <- bestEffort(dns) { dn =>
con.delete(dn)
}
} yield {
res.flatten.map( entry => entry.getDN)
}
}
}
@@ -0,0 +1,46 @@
package com.normation.inventory.ldap.core


import com.normation.inventory.services.core.{ReadOnlySoftwareDAO, WriteOnlySoftwareDAO}
import net.liftweb.common.Loggable
import net.liftweb.common._


trait SoftwareService {
def deleteUnreferencedSoftware() : Box[Seq[String]]

}
class SoftwareServiceImpl(
readOnlySoftware: ReadOnlySoftwareDAO
, writeOnlySoftwareDAO: WriteOnlySoftwareDAO)
extends SoftwareService with Loggable {

/** Delete all unreferenced softwares
* First search in software, and then in nodes, so that if a node arrives in between (new inventory)
* its software wont be deleted
*/
def deleteUnreferencedSoftware() : Box[Seq[String]] = {
val t1 = System.currentTimeMillis
for {
allSoftwares <- readOnlySoftware.getAllSoftwareIds()
t2 = System.currentTimeMillis()
_ = logger.debug(s"All softwares id in ou=software fetched: ${allSoftwares.size} softwares id in ${t2-t1}ms")

allNodesSoftwares <- readOnlySoftware.getSoftwaresForAllNodes()
t3 = System.currentTimeMillis()
_ = logger.debug(s"All softwares id in nodes fetched: ${allNodesSoftwares.size} softwares id in ${t3-t2}ms")


extraSoftware = allSoftwares -- allNodesSoftwares
_ = logger.debug(s"Found ${extraSoftware.size} unreferenced software in ou=software, going to delete them")

deletedSoftware <- writeOnlySoftwareDAO.deleteSoftwares(extraSoftware.toSeq)
t4 = System.currentTimeMillis()
_ = logger.debug(s"Deleted ${deletedSoftware.size} software in ${t4-t3}ms")


} yield {
deletedSoftware.map(x => x.toString).toSeq
}
}
}
@@ -0,0 +1,80 @@
/*
*************************************************************************************
* Copyright 2017 Normation SAS
*************************************************************************************
*
* This file is part of Rudder.
*
* Rudder is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* In accordance with the terms of section 7 (7. Additional Terms.) of
* the GNU General Public License version 3, the copyright holders add
* the following Additional permissions:
* Notwithstanding to the terms of section 5 (5. Conveying Modified Source
* Versions) and 6 (6. Conveying Non-Source Forms.) of the GNU General
* Public License version 3, when you create a Related Module, this
* Related Module is not considered as a part of the work and may be
* distributed under the license agreement of your choice.
* A "Related Module" means a set of sources files including their
* documentation that, without modification of the Source Code, enables
* supplementary functions or services in addition to those offered by
* the Software.
*
* Rudder is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Rudder. If not, see <http://www.gnu.org/licenses/>.
*
*************************************************************************************
*/

package com.normation.rudder.batch


import com.normation.rudder.domain.logger.ScheduledJobLogger
import com.normation.inventory.ldap.core.SoftwareService
import monix.execution.Scheduler.{global => scheduler}
import net.liftweb.common._

import scala.concurrent.duration._


/**
* A naive scheduler which checks every updateInterval if software needs to be deleted
*/
class PurgeUnreferencedSoftwares(
softwareService : SoftwareService
, updateInterval : FiniteDuration
) {

val logger = ScheduledJobLogger


if (updateInterval < 1.hour) {
logger.info(s"Disable automatic purge of unreferenced softwares (update interval cannot be less than 1 hour)")
} else {
logger.debug(s"***** starting batch that purge unreferenced softwares, every ${updateInterval.toString()} *****")
scheduler.scheduleWithFixedDelay(updateInterval, updateInterval) {
softwareService.deleteUnreferencedSoftware() match {
case Full(softwares) =>
logger.info(s"Purged ${softwares.length} unreferenced softwares")
if (logger.isDebugEnabled && softwares.length > 0)
logger.debug(s"Purged following software: ${softwares.mkString(",")}")
case e: EmptyBox =>
val error = (e ?~! s"Error when deleting unreferenced softwares")
logger.error(error.messageChain)
error.rootExceptionCause.foreach(ex =>
logger.error("Exception was:", ex)
)
}
}
}
}

Expand Up @@ -146,7 +146,7 @@ along with Rudder. If not, see <http://www.gnu.org/licenses/>.
are doing.
-->

<root level="info">
<root level="debug">
<appender-ref ref="STDOUT" />
</root>

Expand Down Expand Up @@ -360,7 +360,7 @@ along with Rudder. If not, see <http://www.gnu.org/licenses/>.
================
This logger is in charge of all scheduled jobs and batches
-->
<logger name="scheduledJob" level="info" additivity="false">
<logger name="scheduledJob" level="debug" additivity="false">
<appender-ref ref="OPSLOG" />
<appender-ref ref="STDOUT" />
</logger>
Expand Down
Expand Up @@ -253,6 +253,17 @@ object RudderConfig extends Loggable {

val RUDDER_DEBUG_NODE_CONFIGURATION_PATH = config.getString("rudder.debug.nodeconfiguration.path")


val RUDDER_BATCH_DELETE_SOFTWARE_INTERVAL = {
try {
config.getInt("rudder.batch.delete.software.interval")
} catch {
case ex: ConfigException =>
ApplicationLogger.info("Property 'rudder.batch.delete.software.interval' is missing or empty in rudder.configFile. Default to 24 hours.")
24
}
}

// Roles definitions
val RUDDER_SERVER_ROLES = Seq(
//each time, it's (role name, key in the config file)
Expand Down Expand Up @@ -358,6 +369,9 @@ object RudderConfig extends Loggable {
//val updateDynamicGroupsService : DynGroupUpdaterService = dynGroupUpdaterService
val updateDynamicGroups: UpdateDynamicGroups = dyngroupUpdaterBatch
val checkInventoryUpdate = new CheckInventoryUpdate(nodeInfoServiceImpl, asyncDeploymentAgent, stringUuidGenerator, 15.seconds)

val purgeUnreferencedSoftwares = new PurgeUnreferencedSoftwares(softwareService, RUDDER_BATCH_DELETE_SOFTWARE_INTERVAL.hours)

val databaseManager: DatabaseManager = databaseManagerImpl
val automaticReportsCleaning: AutomaticReportsCleaning = dbCleaner
val checkTechniqueLibrary: CheckTechniqueLibrary = techniqueLibraryUpdater
Expand Down Expand Up @@ -1064,6 +1078,10 @@ object RudderConfig extends Loggable {
private[this] lazy val fileManagerImpl = new FileManager(UPLOAD_ROOT_DIRECTORY)
private[this] lazy val databaseManagerImpl = new DatabaseManagerImpl(reportsRepositoryImpl, updateExpectedRepo)
private[this] lazy val softwareInventoryDAO: ReadOnlySoftwareDAO = new ReadOnlySoftwareDAOImpl(inventoryDitService, roLdap, inventoryMapper)
private[this] lazy val softwareInventoryRWDAO: WriteOnlySoftwareDAO = new WriteOnlySoftwareDAOImpl(acceptedNodesDitImpl, rwLdap)

private[this] lazy val softwareService: SoftwareService = new SoftwareServiceImpl(softwareInventoryDAO, softwareInventoryRWDAO)

private[this] lazy val nodeSummaryServiceImpl = new NodeSummaryServiceImpl(inventoryDitService, inventoryMapper, roLdap)
private[this] lazy val diffRepos: InventoryHistoryLogRepository =
new InventoryHistoryLogRepository(HISTORY_INVENTORIES_ROOTDIR, new FullInventoryFileMarshalling(fullInventoryFromLdapEntries, inventoryMapper))
Expand Down

0 comments on commit f1f3a0e

Please sign in to comment.