/
gathering.go
94 lines (77 loc) · 2.63 KB
/
gathering.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package factsengine
import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/trento-project/agent/internal/factsengine/factscache"
"github.com/trento-project/agent/internal/factsengine/gatherers"
"github.com/trento-project/agent/pkg/factsengine/entities"
"golang.org/x/sync/errgroup"
)
func gatherFacts(
executionID,
agentID string,
groupID string,
agentFacts *entities.FactsGatheringRequestedTarget,
registry gatherers.Registry,
) (entities.FactsGathered, error) {
factsResults := entities.FactsGathered{
ExecutionID: executionID,
AgentID: agentID,
FactsGathered: nil,
GroupID: groupID,
}
groupedFactsRequest := groupFactsRequestByGatherer(agentFacts)
factsCh := make(chan []entities.Fact, len(groupedFactsRequest.FactRequests))
g := new(errgroup.Group)
cache := factscache.NewFactsCache()
log.Infof("Starting facts gathering process")
// Gather facts asynchronously
for gathererType, f := range groupedFactsRequest.FactRequests {
factsRequest := f
gatherer, err := registry.GetGatherer(gathererType)
if err != nil {
log.Errorf("Fact gatherer %s does not exist", gathererType)
continue
}
// Check if the gatherer implements FactGathererWithCache to set cache
if gathererWithCache, ok := gatherer.(gatherers.FactGathererWithCache); ok {
gathererWithCache.SetCache(cache)
}
// Execute the fact gathering asynchronously and in parallel
g.Go(func() error {
var gatheringError *entities.FactGatheringError
newFacts, err := gatherer.Gather(factsRequest)
switch {
case err == nil:
factsCh <- newFacts
case errors.As(err, &gatheringError):
log.Error(gatheringError)
factsCh <- entities.NewFactsGatheredListWithError(factsRequest, gatheringError)
default:
log.Error(err)
}
return nil
})
}
if err := g.Wait(); err != nil {
return factsResults, err
}
close(factsCh)
for newFacts := range factsCh {
factsResults.FactsGathered = append(factsResults.FactsGathered, newFacts...)
}
log.Infof("Requested facts gathered")
return factsResults, nil
}
// Group the received facts by gatherer type, so they are executed in the same moment with the same source of truth
func groupFactsRequestByGatherer(
factsRequest *entities.FactsGatheringRequestedTarget) entities.GroupedByGathererRequestedTarget {
groupedFactsRequest := entities.GroupedByGathererRequestedTarget{
FactRequests: make(map[string][]entities.FactRequest),
}
for _, factRequest := range factsRequest.FactRequests {
groupedFactsRequest.FactRequests[factRequest.Gatherer] = append(
groupedFactsRequest.FactRequests[factRequest.Gatherer], factRequest)
}
return groupedFactsRequest
}