-
Notifications
You must be signed in to change notification settings - Fork 313
/
counter_manager.go
115 lines (93 loc) · 2.94 KB
/
counter_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package event_schema
// schemaHash -> Key -> FrequencyCounter
var countersCache map[string]map[string]*FrequencyCounter
type CounterItem struct {
Value string
Frequency float64
}
func init() {
if countersCache == nil {
countersCache = make(map[string]map[string]*FrequencyCounter)
}
}
// populateFrequencyCountersBounded is responsible for capturing the frequency counters which
// are available in the db and store them in memory but in a bounded manner.
func populateFrequencyCounters(schemaHash string, frequencyCounters []*FrequencyCounter, bound int) {
frequencyCountersMap := make(map[string]*FrequencyCounter)
for idx, fc := range frequencyCounters {
// If count exceeds for a particular schema hash, break
// the loop
if idx >= bound {
break
}
frequencyCountersMap[fc.Name] = NewPeristedFrequencyCounter(fc)
}
countersCache[schemaHash] = frequencyCountersMap
}
func getAllFrequencyCounters(schemaHash string) []*FrequencyCounter {
schemaVersionCounters, ok := countersCache[schemaHash]
if !ok {
return []*FrequencyCounter{}
}
frequencyCounters := make([]*FrequencyCounter, 0, len(schemaVersionCounters))
for _, v := range schemaVersionCounters {
frequencyCounters = append(frequencyCounters, v)
}
return frequencyCounters
}
// pruneFrequencyCounters brings the frequency counters back to desired bound.
func pruneFrequencyCounters(schemaHash string, bound int) {
countersMap := countersCache[schemaHash]
diff := bound - len(countersMap)
if diff >= 0 {
return
}
toDelete := -1 * diff
for k := range countersMap {
if toDelete > 0 {
delete(countersMap, k)
toDelete--
continue
}
break
}
}
// getFrequencyCounter simply returns frequency counter for flattened
// event key. It creates a new fc in case the key doesn't exist in map.
func getFrequencyCounter(schemaHash, key string, bound int) *FrequencyCounter {
schemaVersionCounters, ok := countersCache[schemaHash]
if !ok {
schemaVersionCounters = make(map[string]*FrequencyCounter)
countersCache[schemaHash] = schemaVersionCounters
}
// Here we add a new frequency counter for schemaVersionCounter
frequencyCounter, ok := schemaVersionCounters[key]
if !ok {
if len(schemaVersionCounters) >= bound {
return nil
}
frequencyCounter = NewFrequencyCounter(key)
schemaVersionCounters[key] = frequencyCounter
}
return frequencyCounter
}
func getSchemaVersionCounters(schemaHash string) map[string][]*CounterItem {
schemaVersionCounters := countersCache[schemaHash]
counters := make(map[string][]*CounterItem)
for key, fc := range schemaVersionCounters {
entries := fc.ItemsAboveThreshold()
counterItems := make([]*CounterItem, 0, len(entries))
for _, entry := range entries {
freq := entry.Frequency
// Capping the freq to 1
if freq > 1 {
freq = 1.0
}
counterItems = append(counterItems, &CounterItem{Value: entry.Key, Frequency: freq})
}
if len(counterItems) > 0 {
counters[key] = counterItems
}
}
return counters
}