Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory improvements first pass #1293

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
60 changes: 51 additions & 9 deletions cmd/otel-allocator/allocation/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,16 @@ type consistentHashingAllocator struct {
consistentHasher *consistent.Consistent

// collectors is a map from a Collector's name to a Collector instance
// collectorKey -> collector pointer
collectors map[string]*Collector

// targetItems is a map from a target item's hash to the target items allocated state
// targetItem hash -> target item pointer
targetItems map[string]*target.Item

// collectorKey -> job -> target item hash -> true
targetItemsPerJobPerCollector map[string]map[string]map[string]bool

log logr.Logger

filter Filter
Expand All @@ -62,10 +67,11 @@ func newConsistentHashingAllocator(log logr.Logger, opts ...AllocationOption) Al
}
consistentHasher := consistent.New(nil, config)
chAllocator := &consistentHashingAllocator{
consistentHasher: consistentHasher,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*target.Item),
log: log,
consistentHasher: consistentHasher,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*target.Item),
targetItemsPerJobPerCollector: make(map[string]map[string]map[string]bool),
log: log,
}
for _, opt := range opts {
opt(chAllocator)
Expand All @@ -79,19 +85,36 @@ func (c *consistentHashingAllocator) SetFilter(filter Filter) {
c.filter = filter
}

// addCollectorTargetItemMapping keeps track of which collector has which jobs and targets
// this allows the allocator to respond without any extra allocations to http calls. The caller of this method
// has to acquire a lock.
func (c *consistentHashingAllocator) addCollectorTargetItemMapping(tg *target.Item) {
if c.targetItemsPerJobPerCollector[tg.CollectorName] == nil {
c.targetItemsPerJobPerCollector[tg.CollectorName] = make(map[string]map[string]bool)
}
if c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] == nil {
c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] = make(map[string]bool)
}
c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName][tg.Hash()] = true
}

// addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems
// This method is called from within SetTargets and SetCollectors, which acquire the needed lock.
// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap.
// INVARIANT: c.collectors must have at least 1 collector set.
// NOTE: by not creating a new target item, there is the potential for a race condition where we modify this target
// item while it's being encoded by the server JSON handler.
func (c *consistentHashingAllocator) addTargetToTargetItems(tg *target.Item) {
// Check if this is a reassignment, if so, decrement the previous collector's NumTargets
if previousColName, ok := c.collectors[tg.CollectorName]; ok {
previousColName.NumTargets--
delete(c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName], tg.Hash())
TargetsPerCollector.WithLabelValues(previousColName.String(), consistentHashingStrategyName).Set(float64(c.collectors[previousColName.String()].NumTargets))
}
colOwner := c.consistentHasher.LocateKey([]byte(tg.Hash()))
targetItem := target.NewItem(tg.JobName, tg.TargetURL, tg.Label, colOwner.String())
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
c.targetItems[targetItem.Hash()] = targetItem
tg.CollectorName = colOwner.String()
c.targetItems[tg.Hash()] = tg
c.addCollectorTargetItemMapping(tg)
c.collectors[colOwner.String()].NumTargets++
TargetsPerCollector.WithLabelValues(colOwner.String(), consistentHashingStrategyName).Set(float64(c.collectors[colOwner.String()].NumTargets))
}
Expand All @@ -107,6 +130,7 @@ func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*target.Ite
col := c.collectors[target.CollectorName]
col.NumTargets--
delete(c.targetItems, k)
delete(c.targetItemsPerJobPerCollector[target.CollectorName][target.JobName], target.Hash())
TargetsPerCollector.WithLabelValues(target.CollectorName, consistentHashingStrategyName).Set(float64(col.NumTargets))
}
}
Expand All @@ -130,6 +154,7 @@ func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collect
// Clear removed collectors
for _, k := range diff.Removals() {
delete(c.collectors, k.Name)
delete(c.targetItemsPerJobPerCollector, k.Name)
c.consistentHasher.Remove(k.Name)
TargetsPerCollector.WithLabelValues(k.Name, consistentHashingStrategyName).Set(0)
}
Expand All @@ -155,7 +180,7 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item)
if c.filter != nil {
targets = c.filter.Apply(targets)
}
RecordTargetsKeptPerJob(targets)
RecordTargetsKept(targets)

c.m.Lock()
defer c.m.Unlock()
Expand All @@ -175,13 +200,12 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item)
// SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
// This method is called when Collectors are added or removed.
func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collector) {
log := c.log.WithValues("component", "opentelemetry-targetallocator")
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", consistentHashingStrategyName))
defer timer.ObserveDuration()

CollectorsAllocatable.WithLabelValues(consistentHashingStrategyName).Set(float64(len(collectors)))
if len(collectors) == 0 {
log.Info("No collector instances present")
c.log.Info("No collector instances present")
return
}

Expand All @@ -195,6 +219,24 @@ func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collec
}
}

func (c *consistentHashingAllocator) GetTargetsForCollectorAndJob(collector string, job string) []*target.Item {
c.m.RLock()
defer c.m.RUnlock()
if _, ok := c.targetItemsPerJobPerCollector[collector]; !ok {
return []*target.Item{}
}
if _, ok := c.targetItemsPerJobPerCollector[collector][job]; !ok {
return []*target.Item{}
}
targetItemsCopy := make([]*target.Item, len(c.targetItemsPerJobPerCollector[collector][job]))
index := 0
for targetHash := range c.targetItemsPerJobPerCollector[collector][job] {
targetItemsCopy[index] = c.targetItems[targetHash]
index++
}
return targetItemsCopy
}

// TargetItems returns a shallow copy of the targetItems map.
func (c *consistentHashingAllocator) TargetItems() map[string]*target.Item {
c.m.RLock()
Expand Down
1 change: 0 additions & 1 deletion cmd/otel-allocator/allocation/consistent_hashing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func TestRelativelyEvenDistribution(t *testing.T) {
actualCollectors := c.Collectors()
assert.Len(t, actualCollectors, numCols)
for _, col := range actualCollectors {
t.Logf("col: %s \ttargets: %d", col.Name, col.NumTargets)
assert.InDelta(t, col.NumTargets, expectedPerCollector, expectedDelta)
}
}
Expand Down
55 changes: 9 additions & 46 deletions cmd/otel-allocator/allocation/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,61 +18,24 @@ import (
"fmt"
"net/url"

"github.com/prometheus/common/model"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
)

type collectorJSON struct {
Link string `json:"_link"`
Jobs []targetGroupJSON `json:"targets"`
}

type targetGroupJSON struct {
Targets []string `json:"targets"`
Labels model.LabelSet `json:"labels"`
Link string `json:"_link"`
Jobs []*target.Item `json:"targets"`
}

func GetAllTargetsByJob(job string, cMap map[string][]target.Item, allocator Allocator) map[string]collectorJSON {
// GetAllTargetsByJob is a relatively expensive call that is usually only used for debugging purposes.
func GetAllTargetsByJob(allocator Allocator, job string) map[string]collectorJSON {
displayData := make(map[string]collectorJSON)
for _, j := range allocator.TargetItems() {
if j.JobName == job {
var targetList []target.Item
targetList = append(targetList, cMap[j.CollectorName+j.JobName]...)

var targetGroupList []targetGroupJSON

for _, t := range targetList {
targetGroupList = append(targetGroupList, targetGroupJSON{
Targets: []string{t.TargetURL},
Labels: t.Label,
})
}

displayData[j.CollectorName] = collectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(j.JobName), j.CollectorName), Jobs: targetGroupList}

}
for _, col := range allocator.Collectors() {
items := allocator.GetTargetsForCollectorAndJob(col.Name, job)
displayData[col.Name] = collectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(job), col.Name), Jobs: items}
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
}
return displayData
}

func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]target.Item, allocator Allocator) []targetGroupJSON {
var tgs []targetGroupJSON
group := make(map[string]target.Item)
labelSet := make(map[string]model.LabelSet)
if _, ok := allocator.Collectors()[collector]; ok {
for _, targetItemArr := range cMap {
for _, targetItem := range targetItemArr {
if targetItem.CollectorName == collector && targetItem.JobName == job {
group[targetItem.Label.String()] = targetItem
labelSet[targetItem.Hash()] = targetItem.Label
}
}
}
}
for _, v := range group {
tgs = append(tgs, targetGroupJSON{Targets: []string{v.TargetURL}, Labels: labelSet[v.Hash()]})
}

return tgs
func GetAllTargetsByCollectorAndJob(allocator Allocator, collector string, job string) []*target.Item {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in a future PR i'm going to remove this file altogether, for now i'm trying to keep changes to a minimum

return allocator.GetTargetsForCollectorAndJob(collector, job)
}